View Javadoc

1   /*
2    * The contents of this file are subject to the terms
3    * of the Common Development and Distribution License
4    * (the "License").  You may not use this file except
5    * in compliance with the License.
6    *
7    * You can obtain a copy of the license at
8    * https://open-esb.dev.java.net/public/CDDLv1.0.html.
9    * See the License for the specific language governing
10   * permissions and limitations under the License.
11   *
12   * When distributing Covered Code, include this CDDL
13   * HEADER in each file and include the License file at
14   * https://open-esb.dev.java.net/public/CDDLv1.0.html.
15   * If applicable add the following below this CDDL HEADER,
16   * with the fields enclosed by brackets "[]" replaced with
17   * your own identifying information: Portions Copyright
18   * [year] [name of copyright owner]
19   */
20  
21  /*
22   * Copyright 2004-2006 Sun Microsystems, Inc. All Rights Reserved.
23   */
24  
25  /*
26   * MessageExchangeReceiver.java
27   *
28   */
29  
30  package it.imolinfo.jbi4ejb.jbi.component.runtime;
31  
32  import it.imolinfo.jbi4ejb.Logger;
33  import it.imolinfo.jbi4ejb.LoggerFactory;
34  import it.imolinfo.jbi4ejb.jbi.Messages;
35  
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.Executors;
38  import java.util.concurrent.TimeUnit;
39  import javax.jbi.messaging.DeliveryChannel;
40  import javax.jbi.messaging.MessageExchange;
41  import javax.jbi.messaging.MessagingException;
42  
43  /**
44   * This main purpose of this class is to manage receiving the MessageExchange
45   * object from the DeliveryChannel and process them by delegating the processing
46   * of the MessageExchange object to MessageExchangeHandlers configured for the
47   * component. It also provides the controller methods to start and stop
48   * processing the MessageExchange objects which will be used when the component
49   * lifecycle is controlled.
50   *
51   * This class creates a single thread to receive the MessageExchange objects
52   * from the delivery channel and then finds the appropriate message exchange
53   * handler to process the message exchange. Each message exchange handler will
54   * be executed in a separate thread from a pool of handler threads.
55   *
56   * @author Sun Microsystems, Inc.
57   */
58  public class MessageExchangeReceiver {
59  	/** The logger. */
60  	private static final Logger LOG = LoggerFactory.getLogger(MessageExchangeReceiver.class);    
61      private static final Messages MESSAGES = Messages.getMessages(MessageExchangeReceiver.class);   
62      /** delivery channel accept time out */
63      private final static long DC_ACCEPT_TIME_OUT = 3000; // milliseconds
64      
65      /** receiver thread wait time before polling for messages after woke up **/
66      private final static long RECEIVER_WAIT_TIME = 2000; // milliseconds
67      
68      /** receiver thread wait time before force shutdown */
69      private final static long RECEIVER_SHUTDOWN_WAIT_TIME = 10; // seconds
70      
71      /** handler threads wait time before forced shutdown  */
72      private final static long HANDLERS_SHUTDOWN_WAIT_TIME = 30; // seconds
73      /** handler thread pool size */
74      private final static int HANDLER_THREAD_POOL_SIZE = 5;
75      /** receiver thread accept message exchange condition */
76      private Boolean mCanAccept = false;
77      /** receiver thread termination condition */
78      private Boolean mContinue = true;
79      /** receiver thread executor service */
80      private ExecutorService mReceiverThreadMgr;
81      
82      /** handler thread executor service */
83      private ExecutorService mHandlerThreadPool;
84      
85      /** no default constructor for extended classes */
86      public MessageExchangeReceiver() throws Exception {
87      }
88      
89      public final void initReceiver() throws Exception {
90          
91          this.mHandlerThreadPool = Executors.newFixedThreadPool(HANDLER_THREAD_POOL_SIZE);
92          this.mReceiverThreadMgr = Executors.newSingleThreadExecutor();
93          
94          this.mReceiverThreadMgr.execute(new Runnable() {
95              public void run() {
96                  Thread t = Thread.currentThread();
97                  while ( mContinue ) {
98                      if (mCanAccept) {
99                          receiveAndProcessMessageExchange();
100                     } else {
101                         try {
102                             t.sleep(RECEIVER_WAIT_TIME);
103                         } catch (InterruptedException interruptException) {
104                             // someone must have interrupted this thread
105                             // do nothing
106                             RuntimeHelper.logDebug("Interrupted the MessageReceiverThread in Sleep");
107                         }
108                     }
109                 }
110             }
111         });
112     }
113     
114     public final void shutdownReceiver() throws Exception {
115         
116         synchronized ( mContinue ) {
117             mContinue = false;
118         }
119         boolean terminated = false;
120         try {
121             this.mReceiverThreadMgr.shutdown();
122             terminated = this.mReceiverThreadMgr.awaitTermination(
123                 RECEIVER_SHUTDOWN_WAIT_TIME, TimeUnit.SECONDS);
124         } catch (InterruptedException ex) {
125             RuntimeHelper.logDebug(ex);
126         } finally {
127             if ( !terminated ) {
128                 RuntimeHelper.logDebug("Message Receiver not shutdown. Forcing shutdown");
129                 this.mReceiverThreadMgr.shutdownNow();
130             }
131         }
132         shutdownHandlers();
133     }
134     
135     private final void shutdownHandlers() throws Exception {
136         
137         boolean terminated = false;
138         try {
139             this.mHandlerThreadPool.shutdown();
140             terminated = this.mHandlerThreadPool.awaitTermination(
141                 HANDLERS_SHUTDOWN_WAIT_TIME, TimeUnit.SECONDS);
142         } catch (InterruptedException ex) {
143             RuntimeHelper.logDebug(ex);
144         } finally {
145             if ( !terminated ) {
146                 RuntimeHelper.logDebug("Handler threads not shutdown. Forcing shutdown");
147                 this.mHandlerThreadPool.shutdownNow();
148             }
149         }
150     }
151     
152     public final void startProcessing() throws Exception {
153         
154         synchronized ( this.mCanAccept ) {
155             this.mCanAccept = true;
156         }
157     }
158     
159     public final void stopProcessing() throws Exception {
160         
161         synchronized ( this.mCanAccept ) {
162             this.mCanAccept = false;
163         }
164     }
165     
166     private void receiveAndProcessMessageExchange() {
167         
168         try {
169             
170             DeliveryChannel channel = RuntimeHelper.getDeliveryChannel();
171             MessageExchange msgExchange = null;
172             
173             if ( channel == null ) {
174                 RuntimeHelper.logDebug("DeliveryChannel Not Opened for receiving messages");
175                 return;
176             }
177             
178             msgExchange = channel.accept(DC_ACCEPT_TIME_OUT);
179             
180             if ( msgExchange == null ) {
181                 // RuntimeHelper.logDebug("DeliveryChannel returned null message exchange from accept");
182                 return;
183             }
184             
185             MessageExchangeHandler handler = findMessageExchangeHandler(msgExchange);
186             
187             if ( handler == null ) {
188 
189             	LOG.error("EJB000224_Message_ExchangeHandler_NULL", new Object[] {msgExchange});
190                 
191                 return;
192             }
193             // process message exchange. This could be done in a separate thread.
194             handler.setMessageExchange(msgExchange);
195             //  handler.processMessageExchange();
196             // try using the pool
197             this.mHandlerThreadPool.execute(handler);
198             
199         } catch (MessagingException ex) {
200 
201         	LOG.info("EJB000225_Warning_in_preceive_and_process_message_exchange", new Object[] {ex});
202             ex.printStackTrace();
203         }
204     }
205     
206     private MessageExchangeHandler findMessageExchangeHandler(MessageExchange msgExchange) {
207         MessageExchangeHandler handler = null;
208         handler = RuntimeContext.getInstance().newMessageExchangeHandler(msgExchange);
209         return handler;
210     }
211     
212 }