1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 package it.imolinfo.jbi4ejb.jbi.component.runtime;
31
32 import it.imolinfo.jbi4ejb.Logger;
33 import it.imolinfo.jbi4ejb.LoggerFactory;
34 import it.imolinfo.jbi4ejb.jbi.Messages;
35
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.TimeUnit;
39 import javax.jbi.messaging.DeliveryChannel;
40 import javax.jbi.messaging.MessageExchange;
41 import javax.jbi.messaging.MessagingException;
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 public class MessageExchangeReceiver {
59
60 private static final Logger LOG = LoggerFactory.getLogger(MessageExchangeReceiver.class);
61 private static final Messages MESSAGES = Messages.getMessages(MessageExchangeReceiver.class);
62
63 private final static long DC_ACCEPT_TIME_OUT = 3000;
64
65
66 private final static long RECEIVER_WAIT_TIME = 2000;
67
68
69 private final static long RECEIVER_SHUTDOWN_WAIT_TIME = 10;
70
71
72 private final static long HANDLERS_SHUTDOWN_WAIT_TIME = 30;
73
74 private final static int HANDLER_THREAD_POOL_SIZE = 5;
75
76 private Boolean mCanAccept = false;
77
78 private Boolean mContinue = true;
79
80 private ExecutorService mReceiverThreadMgr;
81
82
83 private ExecutorService mHandlerThreadPool;
84
85
86 public MessageExchangeReceiver() throws Exception {
87 }
88
89 public final void initReceiver() throws Exception {
90
91 this.mHandlerThreadPool = Executors.newFixedThreadPool(HANDLER_THREAD_POOL_SIZE);
92 this.mReceiverThreadMgr = Executors.newSingleThreadExecutor();
93
94 this.mReceiverThreadMgr.execute(new Runnable() {
95 public void run() {
96 Thread t = Thread.currentThread();
97 while ( mContinue ) {
98 if (mCanAccept) {
99 receiveAndProcessMessageExchange();
100 } else {
101 try {
102 t.sleep(RECEIVER_WAIT_TIME);
103 } catch (InterruptedException interruptException) {
104
105
106 RuntimeHelper.logDebug("Interrupted the MessageReceiverThread in Sleep");
107 }
108 }
109 }
110 }
111 });
112 }
113
114 public final void shutdownReceiver() throws Exception {
115
116 synchronized ( mContinue ) {
117 mContinue = false;
118 }
119 boolean terminated = false;
120 try {
121 this.mReceiverThreadMgr.shutdown();
122 terminated = this.mReceiverThreadMgr.awaitTermination(
123 RECEIVER_SHUTDOWN_WAIT_TIME, TimeUnit.SECONDS);
124 } catch (InterruptedException ex) {
125 RuntimeHelper.logDebug(ex);
126 } finally {
127 if ( !terminated ) {
128 RuntimeHelper.logDebug("Message Receiver not shutdown. Forcing shutdown");
129 this.mReceiverThreadMgr.shutdownNow();
130 }
131 }
132 shutdownHandlers();
133 }
134
135 private final void shutdownHandlers() throws Exception {
136
137 boolean terminated = false;
138 try {
139 this.mHandlerThreadPool.shutdown();
140 terminated = this.mHandlerThreadPool.awaitTermination(
141 HANDLERS_SHUTDOWN_WAIT_TIME, TimeUnit.SECONDS);
142 } catch (InterruptedException ex) {
143 RuntimeHelper.logDebug(ex);
144 } finally {
145 if ( !terminated ) {
146 RuntimeHelper.logDebug("Handler threads not shutdown. Forcing shutdown");
147 this.mHandlerThreadPool.shutdownNow();
148 }
149 }
150 }
151
152 public final void startProcessing() throws Exception {
153
154 synchronized ( this.mCanAccept ) {
155 this.mCanAccept = true;
156 }
157 }
158
159 public final void stopProcessing() throws Exception {
160
161 synchronized ( this.mCanAccept ) {
162 this.mCanAccept = false;
163 }
164 }
165
166 private void receiveAndProcessMessageExchange() {
167
168 try {
169
170 DeliveryChannel channel = RuntimeHelper.getDeliveryChannel();
171 MessageExchange msgExchange = null;
172
173 if ( channel == null ) {
174 RuntimeHelper.logDebug("DeliveryChannel Not Opened for receiving messages");
175 return;
176 }
177
178 msgExchange = channel.accept(DC_ACCEPT_TIME_OUT);
179
180 if ( msgExchange == null ) {
181
182 return;
183 }
184
185 MessageExchangeHandler handler = findMessageExchangeHandler(msgExchange);
186
187 if ( handler == null ) {
188
189 LOG.error("EJB000224_Message_ExchangeHandler_NULL", new Object[] {msgExchange});
190
191 return;
192 }
193
194 handler.setMessageExchange(msgExchange);
195
196
197 this.mHandlerThreadPool.execute(handler);
198
199 } catch (MessagingException ex) {
200
201 LOG.info("EJB000225_Warning_in_preceive_and_process_message_exchange", new Object[] {ex});
202 ex.printStackTrace();
203 }
204 }
205
206 private MessageExchangeHandler findMessageExchangeHandler(MessageExchange msgExchange) {
207 MessageExchangeHandler handler = null;
208 handler = RuntimeContext.getInstance().newMessageExchangeHandler(msgExchange);
209 return handler;
210 }
211
212 }