View Javadoc

1   /*******************************************************************************
2    *  Copyright (c) 2005, 2006, 2007 Imola Informatica.
3    *  All rights reserved. This program and the accompanying materials
4    *  are made available under the terms of the LGPL License v2.1
5    *  which accompanies this distribution, and is available at
6    *  http://www.gnu.org/licenses/lgpl.html
7    *******************************************************************************/
8   package it.imolinfo.jbi4ejb.processor;
9   
10  import it.imolinfo.jbi4ejb.Logger;
11  import it.imolinfo.jbi4ejb.LoggerFactory;
12  import it.imolinfo.jbi4ejb.exception.Jbi4EjbException;
13  import it.imolinfo.jbi4ejb.jbi.component.runtime.RuntimeHelper;
14  import it.imolinfo.jbi4ejb.jbi.endpoint.Jbi4EjbProviderEndpoint;
15  import it.imolinfo.jbi4ejb.jbi.xfire.EjbTransport;
16  import it.imolinfo.jbi4ejb.processor.transform.SourceTransformer;
17  import it.imolinfo.jbi4ejb.processor.transform.StringSource;
18  
19  import java.io.ByteArrayInputStream;
20  import java.io.ByteArrayOutputStream;
21  import java.io.InputStream;
22  import java.io.StringWriter;
23  import java.util.Iterator;
24  import java.util.List;
25  
26  import javax.activation.DataHandler;
27  import javax.jbi.messaging.ExchangeStatus;
28  import javax.jbi.messaging.Fault;
29  import javax.jbi.messaging.InOptionalOut;
30  import javax.jbi.messaging.InOut;
31  import javax.jbi.messaging.MessageExchange;
32  import javax.jbi.messaging.NormalizedMessage;
33  import javax.xml.stream.XMLOutputFactory;
34  import javax.xml.stream.XMLStreamException;
35  import javax.xml.stream.XMLStreamWriter;
36  import javax.xml.transform.Source;
37  import javax.xml.transform.stream.StreamSource;
38  
39  import org.codehaus.xfire.MessageContext;
40  import org.codehaus.xfire.XFire;
41  import org.codehaus.xfire.attachments.Attachment;
42  import org.codehaus.xfire.attachments.Attachments;
43  import org.codehaus.xfire.attachments.JavaMailAttachments;
44  import org.codehaus.xfire.attachments.SimpleAttachment;
45  import org.codehaus.xfire.exchange.InMessage;
46  import org.codehaus.xfire.fault.XFireFault;
47  import org.codehaus.xfire.service.OperationInfo;
48  import org.codehaus.xfire.service.Service;
49  import org.codehaus.xfire.transport.Channel;
50  import org.codehaus.xfire.transport.Transport;
51  import org.codehaus.xfire.util.jdom.StaxSerializer;
52  
53  
54  /**
55   * The Class ProviderExchangeProcessor.
56   * 
57   * @author <a href="mailto:mpiraccini@imolinfo.it">Marco Piraccini</a>
58   * 
59   */
60  public class ProviderExchangeProcessor implements ExchangeProcessor {
61  
62      /** The Logger. */
63      private static final Logger LOG
64      = LoggerFactory.getLogger(ProviderExchangeProcessor.class);   
65      
66      /** The endpoint. */ 
67      private Jbi4EjbProviderEndpoint endpoint;
68      
69      /** The SourceTransformer. */
70      private SourceTransformer transformer;
71      
72      /** message denormalizer. */
73      private Jbi4EjbDenormalizer messageDenormalizer;
74      
75      /** message normalizer. */
76      private Jbi4EjbNormalizer messageNormalizer;
77  
78      /**
79       * Instantiates a new provider exchange processor.
80       * 
81       * @param endpoint the endpoint managed
82       * 
83       * @throws Jbi4EjbException if some problem occurs in creation
84       */
85      public ProviderExchangeProcessor(Jbi4EjbProviderEndpoint endpoint) throws Jbi4EjbException  {
86          this.endpoint = endpoint;      
87          transformer = new SourceTransformer();
88          messageDenormalizer = new Jbi4EjbDenormalizer();
89          messageNormalizer = new Jbi4EjbNormalizer();               
90      }
91  
92      /**
93       * Process the message exchange.
94       * 
95       * @param exchange
96       *            the message exchange
97       */
98      public void process(MessageExchange exchange) {
99          
100         LOG.debug("Processing message exchange: " + exchange);
101 
102         // TODO implement      
103         if (exchange.getStatus() == ExchangeStatus.DONE) {
104             LOG.debug("ExchangeStatus.DONE");
105             return;
106         } else if (exchange.getStatus() == ExchangeStatus.ERROR) {
107             LOG.debug("ExchangeStatus.ERROR");
108             return;
109         }
110         
111         try {
112             XFire xfire = endpoint.getSuManager().getLifeCycle().getXfire();            
113             Service service = endpoint.getXfireService();
114 
115             // Configures xfire
116             // Transport t = xfire.getTransportManager().getTransport(EjbTransport.EJB_BINDING);            
117             Transport t = xfire.getTransportManager().getTransport(EjbTransport.EJB_BINDING);
118             
119             ByteArrayOutputStream xfireOut = new ByteArrayOutputStream();
120             Channel xfireChannel = t.createChannel();
121             MessageContext xfireCtx = new MessageContext();
122             xfireCtx.setXFire(xfire);
123             xfireCtx.setService(service);
124             xfireCtx.setProperty(Channel.BACKCHANNEL_URI, xfireOut);           
125             xfireCtx.setExchange(new org.codehaus.xfire.exchange.MessageExchange(xfireCtx));
126             InMessage xfireMsg = new InMessage();
127 
128             xfireCtx.getExchange().setInMessage(xfireMsg);
129             if (exchange.getOperation() != null) {
130 
131                 OperationInfo op = service.getServiceInfo().getOperation(
132                         exchange.getOperation().getLocalPart());
133                 if (op != null) {
134                     xfireCtx.getExchange().setOperation(op);
135                 } else {
136                     LOG.debug("OperationInfo is null.");
137                 }
138             }
139             xfireCtx.setCurrentMessage(xfireMsg);
140             
141             // Gets the JBI message
142             NormalizedMessage in = exchange.getMessage("in");           
143             if (LOG.isDebugEnabled()) {
144                 String inMessage = "In message, before unwrapping: " + transformer.contentToString(in);
145                 LOG.debug(inMessage);
146             }
147             Jbi4EjbMessage inMsg = messageDenormalizer.denormalize(in, endpoint, exchange.getOperation());     
148             if (LOG.isDebugEnabled()) {
149                 String inMessage = "In message, after unwrapping: " + transformer.toString(inMsg.getMessageSource());
150                 LOG.debug(inMessage);
151             }
152             
153             // Sets the Source as XMLStreamReader to the xfireMessage            
154             xfireMsg.setXMLStreamReader(transformer.toXMLStreamReader(inMsg.getMessageSource()));            
155 
156             if (in.getAttachmentNames() != null
157                     && in.getAttachmentNames().size() > 0) {
158 
159                 JavaMailAttachments attachments = new JavaMailAttachments();
160                 for (Iterator it = in.getAttachmentNames().iterator(); it.hasNext();) {
161                     String name = (String) it.next();
162                     DataHandler dh = in.getAttachment(name);
163                     attachments.addPart(new SimpleAttachment(name, dh));
164                 }
165                 xfireMsg.setAttachments(attachments);
166             }
167 
168             xfireChannel.receive(xfireCtx, xfireMsg);
169             xfireChannel.close();
170 
171             // Set response or DONE status
172             if (isInAndOut(exchange)) {
173                 if (xfireCtx.getExchange().hasFaultMessage()
174                         && xfireCtx.getExchange().getFaultMessage().getBody() != null) {
175 
176                     
177                     LOG.debug("Fault message");
178                     Fault fault = exchange.createFault();
179 
180                     // The XFireFault is a SOAP Fault wrapper
181                     XFireFault xFault = (XFireFault) xfireCtx.getExchange().getFaultMessage().getBody();                   
182                     
183                     // If the XFireFault has details, the Fault is declared.
184                     if (xFault.hasDetails()) {
185                         // Serialize correctly the fault to a SOAP message
186                         Source faultSource = createJBIFaultSourceFromSOAPFault(xfireCtx);
187                         
188                         // Gets the fault message name from the xFire fault detail
189                         String faultName = ((org.jdom.Element)xFault.getDetail().getContent().get(0)).getName();
190                         
191                         // Normalize the fault message
192                         messageNormalizer.normalizeFault(faultSource, fault, endpoint, exchange.getOperation(),  faultName, inMsg.isWrapped());                                               
193 
194                         exchange.setFault(fault);
195                     } else {
196                         // Sets the cause. Notice that (from the specs):
197                         // Used to specify the source of a failure status.
198                         // Invoking this method automatically adjusts the status of the
199                         // ME to ExchangeStatus.ERROR.
200                         // So, the cause can be setted only if no Fault is setted.
201 
202                         if (xFault.getCause() instanceof Exception) {
203                             exchange.setError((Exception) xFault.getCause());
204                         } else {
205                             exchange.setError(new Exception(xFault.getCause()));
206                         }
207                     }
208 
209                 } else {
210                     NormalizedMessage outMsg = exchange.createMessage();
211                     Attachments attachments = xfireCtx.getCurrentMessage().getAttachments();
212                     if (attachments != null) {
213                         for (Iterator it = attachments.getParts(); it.hasNext();) {
214                             Attachment att = (Attachment) it.next();
215                             outMsg.addAttachment(att.getId(), att.getDataHandler());
216                         }
217                     }                    
218                     
219                     // Gets the output source                    
220                     InputStream inputStream = new ByteArrayInputStream(xfireOut.toByteArray());
221                     Source outSource = transformer.toDOMSourceFromStream(new StreamSource(inputStream));
222                     
223                     if (LOG.isDebugEnabled()) {
224                         String inMessage = "Out message, before wrapping: " + transformer.toString(outSource);
225                         LOG.debug(inMessage);
226                     }                    
227                     // Normalize the source and sets to the output message.
228                     messageNormalizer.normalize(outSource, outMsg, endpoint, exchange.getOperation(), inMsg.isWrapped());
229                     
230                     if (LOG.isDebugEnabled()) {
231                         String inMessage = "Out message, after wrapping: " + transformer.contentToString(outMsg);
232                         LOG.debug(inMessage);
233                     }
234                     
235                     exchange.setMessage(outMsg, "out");
236                 }
237             } else {
238             	LOG.warn("EJB000606_Error_in_exchange_type", new Object[]{exchange.getClass().getName()});
239             }
240 
241             LOG.debug("before - Channel.send");
242             RuntimeHelper.getDeliveryChannel().send(exchange);
243             LOG.debug("after - Channel.send");
244 
245         } catch (Exception ex)  { 
246             LOG.error("EJB000607_Error_in_message_exchange", new Object[]{ex.getMessage()});
247             // No exception is thrown...
248             exchange.setError(ex);
249         }
250     }
251    
252     /**
253      * Checks if the message is InOut.
254      * 
255      * @param exchange the message exchange
256      * 
257      * @return true, if is in and out
258      */
259     protected boolean isInAndOut(MessageExchange exchange) {
260         return exchange instanceof InOut || exchange instanceof InOptionalOut;
261     }
262 
263     /**
264      * Create a JBI Fault <code>Source</code> using the XFireFault details
265      * (that are JDOM elements!!!).
266      * 
267      * @param ctx the message context
268      * 
269      * @return the JBI fault <code>Source</code>
270      * 
271      * @throws XMLStreamException if some problem occurs in reading the XMLStream
272      */
273     private Source createJBIFaultSourceFromSOAPFault(MessageContext ctx) throws XMLStreamException {
274 
275         LOG.debug("Creating JBI fault from xfire fault");
276         StringWriter strWriter = new StringWriter();
277         XMLOutputFactory witerFactory = XMLOutputFactory.newInstance();
278         XMLStreamWriter writer = witerFactory.createXMLStreamWriter(strWriter);
279         XFireFault fault = (XFireFault) ctx.getExchange().getFaultMessage().getBody();
280         LOG.debug("Xfire fault: " + fault);
281         if (fault.hasDetails())
282         {
283             // It's a JDom Element :-(
284             org.jdom.Element detail = fault.getDetail();
285             StaxSerializer serializer = new StaxSerializer();
286             List details = detail.getContent();
287             for (int i = 0; i < details.size(); i++)
288             {
289                 serializer.writeElement((org.jdom.Element) details.get(i), writer);
290             }
291         }
292         writer.flush();
293         
294         StringSource stringSource = new StringSource(strWriter.toString());
295 
296         LOG.debug("Fault message produced: " + stringSource);
297 
298         return stringSource;
299     }
300     
301     
302 
303 }
304 
305