1
2
3
4
5
6
7
8 package it.imolinfo.jbi4ejb.processor;
9
10 import it.imolinfo.jbi4ejb.Logger;
11 import it.imolinfo.jbi4ejb.LoggerFactory;
12 import it.imolinfo.jbi4ejb.exception.Jbi4EjbException;
13 import it.imolinfo.jbi4ejb.jbi.component.runtime.RuntimeHelper;
14 import it.imolinfo.jbi4ejb.jbi.endpoint.Jbi4EjbProviderEndpoint;
15 import it.imolinfo.jbi4ejb.jbi.xfire.EjbTransport;
16 import it.imolinfo.jbi4ejb.processor.transform.SourceTransformer;
17 import it.imolinfo.jbi4ejb.processor.transform.StringSource;
18
19 import java.io.ByteArrayInputStream;
20 import java.io.ByteArrayOutputStream;
21 import java.io.InputStream;
22 import java.io.StringWriter;
23 import java.util.Iterator;
24 import java.util.List;
25
26 import javax.activation.DataHandler;
27 import javax.jbi.messaging.ExchangeStatus;
28 import javax.jbi.messaging.Fault;
29 import javax.jbi.messaging.InOptionalOut;
30 import javax.jbi.messaging.InOut;
31 import javax.jbi.messaging.MessageExchange;
32 import javax.jbi.messaging.NormalizedMessage;
33 import javax.xml.stream.XMLOutputFactory;
34 import javax.xml.stream.XMLStreamException;
35 import javax.xml.stream.XMLStreamWriter;
36 import javax.xml.transform.Source;
37 import javax.xml.transform.stream.StreamSource;
38
39 import org.codehaus.xfire.MessageContext;
40 import org.codehaus.xfire.XFire;
41 import org.codehaus.xfire.attachments.Attachment;
42 import org.codehaus.xfire.attachments.Attachments;
43 import org.codehaus.xfire.attachments.JavaMailAttachments;
44 import org.codehaus.xfire.attachments.SimpleAttachment;
45 import org.codehaus.xfire.exchange.InMessage;
46 import org.codehaus.xfire.fault.XFireFault;
47 import org.codehaus.xfire.service.OperationInfo;
48 import org.codehaus.xfire.service.Service;
49 import org.codehaus.xfire.transport.Channel;
50 import org.codehaus.xfire.transport.Transport;
51 import org.codehaus.xfire.util.jdom.StaxSerializer;
52
53
54
55
56
57
58
59
60 public class ProviderExchangeProcessor implements ExchangeProcessor {
61
62
63 private static final Logger LOG
64 = LoggerFactory.getLogger(ProviderExchangeProcessor.class);
65
66
67 private Jbi4EjbProviderEndpoint endpoint;
68
69
70 private SourceTransformer transformer;
71
72
73 private Jbi4EjbDenormalizer messageDenormalizer;
74
75
76 private Jbi4EjbNormalizer messageNormalizer;
77
78
79
80
81
82
83
84
85 public ProviderExchangeProcessor(Jbi4EjbProviderEndpoint endpoint) throws Jbi4EjbException {
86 this.endpoint = endpoint;
87 transformer = new SourceTransformer();
88 messageDenormalizer = new Jbi4EjbDenormalizer();
89 messageNormalizer = new Jbi4EjbNormalizer();
90 }
91
92
93
94
95
96
97
98 public void process(MessageExchange exchange) {
99
100 LOG.debug("Processing message exchange: " + exchange);
101
102
103 if (exchange.getStatus() == ExchangeStatus.DONE) {
104 LOG.debug("ExchangeStatus.DONE");
105 return;
106 } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
107 LOG.debug("ExchangeStatus.ERROR");
108 return;
109 }
110
111 try {
112 XFire xfire = endpoint.getSuManager().getLifeCycle().getXfire();
113 Service service = endpoint.getXfireService();
114
115
116
117 Transport t = xfire.getTransportManager().getTransport(EjbTransport.EJB_BINDING);
118
119 ByteArrayOutputStream xfireOut = new ByteArrayOutputStream();
120 Channel xfireChannel = t.createChannel();
121 MessageContext xfireCtx = new MessageContext();
122 xfireCtx.setXFire(xfire);
123 xfireCtx.setService(service);
124 xfireCtx.setProperty(Channel.BACKCHANNEL_URI, xfireOut);
125 xfireCtx.setExchange(new org.codehaus.xfire.exchange.MessageExchange(xfireCtx));
126 InMessage xfireMsg = new InMessage();
127
128 xfireCtx.getExchange().setInMessage(xfireMsg);
129 if (exchange.getOperation() != null) {
130
131 OperationInfo op = service.getServiceInfo().getOperation(
132 exchange.getOperation().getLocalPart());
133 if (op != null) {
134 xfireCtx.getExchange().setOperation(op);
135 } else {
136 LOG.debug("OperationInfo is null.");
137 }
138 }
139 xfireCtx.setCurrentMessage(xfireMsg);
140
141
142 NormalizedMessage in = exchange.getMessage("in");
143 if (LOG.isDebugEnabled()) {
144 String inMessage = "In message, before unwrapping: " + transformer.contentToString(in);
145 LOG.debug(inMessage);
146 }
147 Jbi4EjbMessage inMsg = messageDenormalizer.denormalize(in, endpoint, exchange.getOperation());
148 if (LOG.isDebugEnabled()) {
149 String inMessage = "In message, after unwrapping: " + transformer.toString(inMsg.getMessageSource());
150 LOG.debug(inMessage);
151 }
152
153
154 xfireMsg.setXMLStreamReader(transformer.toXMLStreamReader(inMsg.getMessageSource()));
155
156 if (in.getAttachmentNames() != null
157 && in.getAttachmentNames().size() > 0) {
158
159 JavaMailAttachments attachments = new JavaMailAttachments();
160 for (Iterator it = in.getAttachmentNames().iterator(); it.hasNext();) {
161 String name = (String) it.next();
162 DataHandler dh = in.getAttachment(name);
163 attachments.addPart(new SimpleAttachment(name, dh));
164 }
165 xfireMsg.setAttachments(attachments);
166 }
167
168 xfireChannel.receive(xfireCtx, xfireMsg);
169 xfireChannel.close();
170
171
172 if (isInAndOut(exchange)) {
173 if (xfireCtx.getExchange().hasFaultMessage()
174 && xfireCtx.getExchange().getFaultMessage().getBody() != null) {
175
176
177 LOG.debug("Fault message");
178 Fault fault = exchange.createFault();
179
180
181 XFireFault xFault = (XFireFault) xfireCtx.getExchange().getFaultMessage().getBody();
182
183
184 if (xFault.hasDetails()) {
185
186 Source faultSource = createJBIFaultSourceFromSOAPFault(xfireCtx);
187
188
189 String faultName = ((org.jdom.Element)xFault.getDetail().getContent().get(0)).getName();
190
191
192 messageNormalizer.normalizeFault(faultSource, fault, endpoint, exchange.getOperation(), faultName, inMsg.isWrapped());
193
194 exchange.setFault(fault);
195 } else {
196
197
198
199
200
201
202 if (xFault.getCause() instanceof Exception) {
203 exchange.setError((Exception) xFault.getCause());
204 } else {
205 exchange.setError(new Exception(xFault.getCause()));
206 }
207 }
208
209 } else {
210 NormalizedMessage outMsg = exchange.createMessage();
211 Attachments attachments = xfireCtx.getCurrentMessage().getAttachments();
212 if (attachments != null) {
213 for (Iterator it = attachments.getParts(); it.hasNext();) {
214 Attachment att = (Attachment) it.next();
215 outMsg.addAttachment(att.getId(), att.getDataHandler());
216 }
217 }
218
219
220 InputStream inputStream = new ByteArrayInputStream(xfireOut.toByteArray());
221 Source outSource = transformer.toDOMSourceFromStream(new StreamSource(inputStream));
222
223 if (LOG.isDebugEnabled()) {
224 String inMessage = "Out message, before wrapping: " + transformer.toString(outSource);
225 LOG.debug(inMessage);
226 }
227
228 messageNormalizer.normalize(outSource, outMsg, endpoint, exchange.getOperation(), inMsg.isWrapped());
229
230 if (LOG.isDebugEnabled()) {
231 String inMessage = "Out message, after wrapping: " + transformer.contentToString(outMsg);
232 LOG.debug(inMessage);
233 }
234
235 exchange.setMessage(outMsg, "out");
236 }
237 } else {
238 LOG.warn("EJB000606_Error_in_exchange_type", new Object[]{exchange.getClass().getName()});
239 }
240
241 LOG.debug("before - Channel.send");
242 RuntimeHelper.getDeliveryChannel().send(exchange);
243 LOG.debug("after - Channel.send");
244
245 } catch (Exception ex) {
246 LOG.error("EJB000607_Error_in_message_exchange", new Object[]{ex.getMessage()});
247
248 exchange.setError(ex);
249 }
250 }
251
252
253
254
255
256
257
258
259 protected boolean isInAndOut(MessageExchange exchange) {
260 return exchange instanceof InOut || exchange instanceof InOptionalOut;
261 }
262
263
264
265
266
267
268
269
270
271
272
273 private Source createJBIFaultSourceFromSOAPFault(MessageContext ctx) throws XMLStreamException {
274
275 LOG.debug("Creating JBI fault from xfire fault");
276 StringWriter strWriter = new StringWriter();
277 XMLOutputFactory witerFactory = XMLOutputFactory.newInstance();
278 XMLStreamWriter writer = witerFactory.createXMLStreamWriter(strWriter);
279 XFireFault fault = (XFireFault) ctx.getExchange().getFaultMessage().getBody();
280 LOG.debug("Xfire fault: " + fault);
281 if (fault.hasDetails())
282 {
283
284 org.jdom.Element detail = fault.getDetail();
285 StaxSerializer serializer = new StaxSerializer();
286 List details = detail.getContent();
287 for (int i = 0; i < details.size(); i++)
288 {
289 serializer.writeElement((org.jdom.Element) details.get(i), writer);
290 }
291 }
292 writer.flush();
293
294 StringSource stringSource = new StringSource(strWriter.toString());
295
296 LOG.debug("Fault message produced: " + stringSource);
297
298 return stringSource;
299 }
300
301
302
303 }
304
305