001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.ArrayList;
020    import java.util.Collection;
021    import java.util.List;
022    
023    import javax.xml.bind.annotation.XmlElement;
024    import javax.xml.bind.annotation.XmlElementRef;
025    import javax.xml.bind.annotation.XmlRootElement;
026    import javax.xml.bind.annotation.XmlTransient;
027    
028    import org.apache.camel.Exchange;
029    import org.apache.camel.Expression;
030    import org.apache.camel.Processor;
031    import org.apache.camel.Route;
032    import org.apache.camel.model.config.BatchResequencerConfig;
033    import org.apache.camel.model.config.StreamResequencerConfig;
034    import org.apache.camel.model.language.ExpressionType;
035    import org.apache.camel.processor.Resequencer;
036    import org.apache.camel.processor.StreamResequencer;
037    import org.apache.camel.processor.resequencer.ExpressionResultComparator;
038    import org.apache.camel.spi.RouteContext;
039    
040    /**
041     * Represents an XML <resequencer/> element
042     *
043     * @version $Revision: 705880 $
044     */
045    @XmlRootElement(name = "resequencer")
046    public class ResequencerType extends ProcessorType<ProcessorType> {
047        @XmlElementRef
048        private List<ExpressionType> expressions = new ArrayList<ExpressionType>();
049        @XmlElementRef
050        private List<ProcessorType<?>> outputs = new ArrayList<ProcessorType<?>>();
051        // Binding annotation at setter
052        private BatchResequencerConfig batchConfig;
053        // Binding annotation at setter
054        private StreamResequencerConfig streamConfig;
055        @XmlTransient
056        private List<Expression> expressionList;
057    
058        public ResequencerType() {
059            this(null);
060        }
061    
062        public ResequencerType(List<Expression> expressions) {
063            this.expressionList = expressions;
064            this.batch();
065        }
066    
067        @Override
068        public String getShortName() {
069            return "resequencer";
070        }
071    
072        /**
073         * Configures the stream-based resequencing algorithm using the default
074         * configuration.
075         *
076         * @return <code>this</code> instance.
077         */
078        public ResequencerType stream() {
079            return stream(StreamResequencerConfig.getDefault());
080        }
081    
082        /**
083         * Configures the batch-based resequencing algorithm using the default
084         * configuration.
085         *
086         * @return <code>this</code> instance.
087         */
088        public ResequencerType batch() {
089            return batch(BatchResequencerConfig.getDefault());
090        }
091    
092        /**
093         * Configures the stream-based resequencing algorithm using the given
094         * {@link StreamResequencerConfig}.
095         *
096         * @return <code>this</code> instance.
097         */
098        public ResequencerType stream(StreamResequencerConfig config) {
099            this.streamConfig = config;
100            this.batchConfig = null;
101            return this;
102        }
103    
104        /**
105         * Configures the batch-based resequencing algorithm using the given
106         * {@link BatchResequencerConfig}.
107         *
108         * @return <code>this</code> instance.
109         */
110        public ResequencerType batch(BatchResequencerConfig config) {
111            this.batchConfig = config;
112            this.streamConfig = null;
113            return this;
114        }
115    
116        public ResequencerType expression(ExpressionType expression) {
117            expressions.add(expression);
118            return this;
119        }
120    
121        @Override
122        public String toString() {
123            return "Resequencer[" + getExpressions() + " -> " + getOutputs() + "]";
124        }
125    
126        @Override
127        public String getLabel() {
128            return ExpressionType.getLabel(getExpressions());
129        }
130    
131        public List<ExpressionType> getExpressions() {
132            return expressions;
133        }
134    
135        public List<ProcessorType<?>> getOutputs() {
136            return outputs;
137        }
138    
139        public void setOutputs(List<ProcessorType<?>> outputs) {
140            this.outputs = outputs;
141        }
142    
143        public BatchResequencerConfig getBatchConfig() {
144            return batchConfig;
145        }
146    
147        public BatchResequencerConfig getBatchConfig(BatchResequencerConfig defaultConfig) {
148            return batchConfig;
149        }
150    
151        public StreamResequencerConfig getStreamConfig() {
152            return streamConfig;
153        }
154    
155        @XmlElement(name = "batch-config", required = false)
156        public void setBatchConfig(BatchResequencerConfig batchConfig) {
157            // TODO: find out how to have these two within an <xsd:choice>
158            batch(batchConfig);
159        }
160    
161        @XmlElement(name = "stream-config", required = false)
162        public void setStreamConfig(StreamResequencerConfig streamConfig) {
163            // TODO: find out how to have these two within an <xsd:choice>
164            stream(streamConfig);
165        }
166        
167        public ResequencerType timeout(long timeout) {
168            if (batchConfig != null) {
169                batchConfig.setBatchTimeout(timeout);
170            } else {
171                streamConfig.setTimeout(timeout);
172            }
173            return this;
174        }
175        
176        public ResequencerType size(int batchSize) {
177            if (batchConfig == null) {
178                throw new IllegalStateException("size() only supported for batch resequencer");
179            }
180            batchConfig.setBatchSize(batchSize);
181            return this;
182        }
183    
184        public ResequencerType capacity(int capacity) {
185            if (streamConfig == null) {
186                throw new IllegalStateException("capacity() only supported for stream resequencer");
187            }
188            streamConfig.setCapacity(capacity);
189            return this;
190            
191        }
192        
193        public ResequencerType comparator(ExpressionResultComparator<Exchange> comparator) {
194            if (streamConfig == null) {
195                throw new IllegalStateException("comparator() only supported for stream resequencer");
196            }
197            streamConfig.setComparator(comparator);
198            return this;
199            
200        }
201        
202        @Override
203        public Processor createProcessor(RouteContext routeContext) throws Exception {
204            if (batchConfig != null) {
205                return createBatchResequencer(routeContext, batchConfig);
206            } else {
207                // streamConfig should be non-null if batchConfig is null
208                return createStreamResequencer(routeContext, streamConfig);
209            }
210        }
211    
212        /**
213         * Creates a batch {@link Resequencer} instance applying the given
214         * <code>config</code>.
215         * 
216         * @param routeContext
217         *            route context.
218         * @param config
219         *            batch resequencer configuration.
220         * @return the configured batch resequencer.
221         * @throws Exception 
222         */
223        protected Resequencer createBatchResequencer(RouteContext routeContext,
224                BatchResequencerConfig config) throws Exception {
225            Processor processor = routeContext.createProcessor(this);
226            Resequencer resequencer = new Resequencer(routeContext.getEndpoint(),
227                    processor, resolveExpressionList(routeContext));
228            resequencer.setBatchSize(config.getBatchSize());
229            resequencer.setBatchTimeout(config.getBatchTimeout());
230            return resequencer;
231        }
232    
233        /**
234         * Creates a {@link StreamResequencer} instance applying the given
235         * <code>config</code>.
236         * 
237         * @param routeContext
238         *            route context.
239         * @param config
240         *            stream resequencer configuration.
241         * @return the configured stream resequencer.
242         * @throws Exception
243         */
244        protected StreamResequencer createStreamResequencer(RouteContext routeContext, 
245                StreamResequencerConfig config) throws Exception {
246            config.getComparator().setExpressions(resolveExpressionList(routeContext));
247            Processor processor = routeContext.createProcessor(this);
248            StreamResequencer resequencer = new StreamResequencer(routeContext.getEndpoint(),
249                    processor, config.getComparator());
250            resequencer.setTimeout(config.getTimeout());
251            resequencer.setCapacity(config.getCapacity());
252            return resequencer;
253            
254        }
255        
256        private Route<? extends Exchange> createBatchResequencerRoute(RouteContext routeContext) throws Exception {
257            final Resequencer resequencer = createBatchResequencer(routeContext, batchConfig);
258            return new Route(routeContext.getEndpoint(), resequencer) {
259                @Override
260                public String toString() {
261                    return "BatchResequencerRoute[" + getEndpoint() + " -> " + resequencer.getProcessor() + "]";
262                }
263            };
264        }
265        
266        private Route<? extends Exchange> createStreamResequencerRoute(RouteContext routeContext) throws Exception {
267            final StreamResequencer resequencer = createStreamResequencer(routeContext, streamConfig);
268            return new Route(routeContext.getEndpoint(), resequencer) {
269                @Override
270                public String toString() {
271                    return "StreamResequencerRoute[" + getEndpoint() + " -> " + resequencer.getProcessor() + "]";
272                }
273            };
274        }
275        
276        private List<Expression> resolveExpressionList(RouteContext routeContext) {
277            if (expressionList == null) {
278                expressionList = new ArrayList<Expression>();
279                for (ExpressionType expression : expressions) {
280                    expressionList.add(expression.createExpression(routeContext));
281                }
282            }
283            if (expressionList.isEmpty()) {
284                throw new IllegalArgumentException("No expressions configured for: " + this);
285            }
286            return expressionList;
287        }
288    }