1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  
20  
21  
22  
23  
24  
25  
26  
27  
28  
29  
30  package it.imolinfo.jbi4ejb.jbi.component.runtime;
31  
32  import it.imolinfo.jbi4ejb.Logger;
33  import it.imolinfo.jbi4ejb.LoggerFactory;
34  import it.imolinfo.jbi4ejb.jbi.Messages;
35  
36  import java.util.concurrent.ExecutorService;
37  import java.util.concurrent.Executors;
38  import java.util.concurrent.TimeUnit;
39  import javax.jbi.messaging.DeliveryChannel;
40  import javax.jbi.messaging.MessageExchange;
41  import javax.jbi.messaging.MessagingException;
42  
43  
44  
45  
46  
47  
48  
49  
50  
51  
52  
53  
54  
55  
56  
57  
58  public class MessageExchangeReceiver {
59  	
60  	private static final Logger LOG = LoggerFactory.getLogger(MessageExchangeReceiver.class);    
61      private static final Messages MESSAGES = Messages.getMessages(MessageExchangeReceiver.class);   
62      
63      private final static long DC_ACCEPT_TIME_OUT = 3000; 
64      
65      
66      private final static long RECEIVER_WAIT_TIME = 2000; 
67      
68      
69      private final static long RECEIVER_SHUTDOWN_WAIT_TIME = 10; 
70      
71      
72      private final static long HANDLERS_SHUTDOWN_WAIT_TIME = 30; 
73      
74      private final static int HANDLER_THREAD_POOL_SIZE = 5;
75      
76      private Boolean mCanAccept = false;
77      
78      private Boolean mContinue = true;
79      
80      private ExecutorService mReceiverThreadMgr;
81      
82      
83      private ExecutorService mHandlerThreadPool;
84      
85      
86      public MessageExchangeReceiver() throws Exception {
87      }
88      
89      public final void initReceiver() throws Exception {
90          
91          this.mHandlerThreadPool = Executors.newFixedThreadPool(HANDLER_THREAD_POOL_SIZE);
92          this.mReceiverThreadMgr = Executors.newSingleThreadExecutor();
93          
94          this.mReceiverThreadMgr.execute(new Runnable() {
95              public void run() {
96                  Thread t = Thread.currentThread();
97                  while ( mContinue ) {
98                      if (mCanAccept) {
99                          receiveAndProcessMessageExchange();
100                     } else {
101                         try {
102                             t.sleep(RECEIVER_WAIT_TIME);
103                         } catch (InterruptedException interruptException) {
104                             
105                             
106                             RuntimeHelper.logDebug("Interrupted the MessageReceiverThread in Sleep");
107                         }
108                     }
109                 }
110             }
111         });
112     }
113     
114     public final void shutdownReceiver() throws Exception {
115         
116         synchronized ( mContinue ) {
117             mContinue = false;
118         }
119         boolean terminated = false;
120         try {
121             this.mReceiverThreadMgr.shutdown();
122             terminated = this.mReceiverThreadMgr.awaitTermination(
123                 RECEIVER_SHUTDOWN_WAIT_TIME, TimeUnit.SECONDS);
124         } catch (InterruptedException ex) {
125             RuntimeHelper.logDebug(ex);
126         } finally {
127             if ( !terminated ) {
128                 RuntimeHelper.logDebug("Message Receiver not shutdown. Forcing shutdown");
129                 this.mReceiverThreadMgr.shutdownNow();
130             }
131         }
132         shutdownHandlers();
133     }
134     
135     private final void shutdownHandlers() throws Exception {
136         
137         boolean terminated = false;
138         try {
139             this.mHandlerThreadPool.shutdown();
140             terminated = this.mHandlerThreadPool.awaitTermination(
141                 HANDLERS_SHUTDOWN_WAIT_TIME, TimeUnit.SECONDS);
142         } catch (InterruptedException ex) {
143             RuntimeHelper.logDebug(ex);
144         } finally {
145             if ( !terminated ) {
146                 RuntimeHelper.logDebug("Handler threads not shutdown. Forcing shutdown");
147                 this.mHandlerThreadPool.shutdownNow();
148             }
149         }
150     }
151     
152     public final void startProcessing() throws Exception {
153         
154         synchronized ( this.mCanAccept ) {
155             this.mCanAccept = true;
156         }
157     }
158     
159     public final void stopProcessing() throws Exception {
160         
161         synchronized ( this.mCanAccept ) {
162             this.mCanAccept = false;
163         }
164     }
165     
166     private void receiveAndProcessMessageExchange() {
167         
168         try {
169             
170             DeliveryChannel channel = RuntimeHelper.getDeliveryChannel();
171             MessageExchange msgExchange = null;
172             
173             if ( channel == null ) {
174                 RuntimeHelper.logDebug("DeliveryChannel Not Opened for receiving messages");
175                 return;
176             }
177             
178             msgExchange = channel.accept(DC_ACCEPT_TIME_OUT);
179             
180             if ( msgExchange == null ) {
181                 
182                 return;
183             }
184             
185             MessageExchangeHandler handler = findMessageExchangeHandler(msgExchange);
186             
187             if ( handler == null ) {
188 
189             	LOG.error("EJB000224_Message_ExchangeHandler_NULL", new Object[] {msgExchange});
190                 
191                 return;
192             }
193             
194             handler.setMessageExchange(msgExchange);
195             
196             
197             this.mHandlerThreadPool.execute(handler);
198             
199         } catch (MessagingException ex) {
200 
201         	LOG.info("EJB000225_Warning_in_preceive_and_process_message_exchange", new Object[] {ex});
202             ex.printStackTrace();
203         }
204     }
205     
206     private MessageExchangeHandler findMessageExchangeHandler(MessageExchange msgExchange) {
207         MessageExchangeHandler handler = null;
208         handler = RuntimeContext.getInstance().newMessageExchangeHandler(msgExchange);
209         return handler;
210     }
211     
212 }