001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.model;
018    
019    import java.util.Collection;
020    
021    import javax.xml.bind.annotation.XmlAccessType;
022    import javax.xml.bind.annotation.XmlAccessorType;
023    import javax.xml.bind.annotation.XmlAttribute;
024    import javax.xml.bind.annotation.XmlElement;
025    import javax.xml.bind.annotation.XmlRootElement;
026    import javax.xml.bind.annotation.XmlTransient;
027    
028    import org.apache.camel.Endpoint;
029    import org.apache.camel.Exchange;
030    import org.apache.camel.Expression;
031    import org.apache.camel.Predicate;
032    import org.apache.camel.Processor;
033    import org.apache.camel.Route;
034    import org.apache.camel.builder.ExpressionClause;
035    import org.apache.camel.model.language.ExpressionType;
036    import org.apache.camel.processor.Aggregator;
037    import org.apache.camel.processor.aggregate.AggregationCollection;
038    import org.apache.camel.processor.aggregate.AggregationStrategy;
039    import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
040    import org.apache.camel.spi.RouteContext;
041    
042    /**
043     * Represents an XML <aggregator/> element
044     *
045     * @version $Revision: 707063 $
046     */
047    @XmlRootElement(name = "aggregator")
048    @XmlAccessorType(XmlAccessType.FIELD)
049    public class AggregatorType extends ExpressionNode {
050        @XmlTransient
051        private AggregationStrategy aggregationStrategy;
052        @XmlTransient
053        private AggregationCollection aggregationCollection;
054        @XmlAttribute(required = false)
055        private Integer batchSize;
056        @XmlAttribute(required = false)
057        private Integer outBatchSize;
058        @XmlAttribute(required = false)
059        private Long batchTimeout;
060        @XmlAttribute(required = false)
061        private String strategyRef;
062        @XmlElement(name = "completedPredicate", required = false)
063        private ExpressionSubElementType completedPredicate;
064    
065        public AggregatorType() {
066        }
067    
068        public AggregatorType(Expression correlationExpression) {
069            super(correlationExpression);
070        }
071    
072        public AggregatorType(ExpressionType correlationExpression) {
073            super(correlationExpression);
074        }
075    
076        public AggregatorType(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
077            super(correlationExpression);
078            this.aggregationStrategy = aggregationStrategy;
079        }
080    
081        @Override
082        public String toString() {
083            return "Aggregator[" + getExpression() + " -> " + getOutputs() + "]";
084        }
085    
086        @Override
087        public String getShortName() {
088            return "aggregator";
089        }
090    
091        @SuppressWarnings("unchecked")
092        @Override
093        public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
094            final Aggregator aggregator = createAggregator(routeContext);
095            doAddRoute(routeContext, routes, aggregator);
096        }
097        
098        private void doAddRoute(RouteContext routeContext, Collection<Route> routes, final Aggregator aggregator)
099            throws Exception {
100            Route route = new Route<Exchange>(aggregator.getEndpoint(), aggregator) {
101                @Override
102                public String toString() {
103                    return "AggregatorRoute[" + getEndpoint() + " -> " + aggregator.getProcessor() + "]";
104                }
105            };
106    
107            routes.add(route);
108        }
109     
110        @Override
111        public Processor createProcessor(RouteContext routeContext) throws Exception {
112            final Aggregator aggregator = createAggregator(routeContext);
113            
114            doAddRoute(routeContext, routeContext.getCamelContext().getRoutes(), aggregator);
115            routeContext.setIsRouteAdded(true);
116            return aggregator;
117        }
118    
119        protected Aggregator createAggregator(RouteContext routeContext) throws Exception {
120            Endpoint from = routeContext.getEndpoint();
121            final Processor processor = routeContext.createProcessor(this);
122    
123            final Aggregator aggregator;
124            if (aggregationCollection != null) {
125                // create the aggregator using the collection
126                // pre configure the collection if its expression and strategy is not set, then
127                // use the ones that is pre configured with this type
128                if (aggregationCollection.getCorrelationExpression() == null) {
129                    aggregationCollection.setCorrelationExpression(getExpression());
130                }
131                if (aggregationCollection.getAggregationStrategy() == null) {
132                    AggregationStrategy strategy = createAggregationStrategy(routeContext);
133                    aggregationCollection.setAggregationStrategy(strategy);
134                }
135                aggregator = new Aggregator(from, processor, aggregationCollection);
136            } else {
137                // create the aggregator using a default collection
138                AggregationStrategy strategy = createAggregationStrategy(routeContext);
139    
140                Expression aggregateExpression = getExpression().createExpression(routeContext);
141    
142                Predicate predicate = null;
143                if (getCompletedPredicate() != null) {
144                    predicate = getCompletedPredicate().createPredicate(routeContext);
145                }
146                if (predicate != null) {
147                    aggregator = new Aggregator(from, processor, aggregateExpression, strategy, predicate);
148                } else {
149                    aggregator = new Aggregator(from, processor, aggregateExpression, strategy);
150                }
151            }
152            
153            if (batchSize != null) {
154                aggregator.setBatchSize(batchSize);
155            }
156            
157            if (batchTimeout != null) {
158                aggregator.setBatchTimeout(batchTimeout);
159            }
160    
161            if (outBatchSize != null) {
162                aggregator.setOutBatchSize(outBatchSize);
163            }
164            
165            return aggregator;
166        }
167    
168        private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
169            AggregationStrategy strategy = getAggregationStrategy();
170            if (strategy == null && strategyRef != null) {
171                strategy = routeContext.lookup(strategyRef, AggregationStrategy.class);
172            }
173            if (strategy == null) {
174                // fallback to use latest
175                strategy = new UseLatestAggregationStrategy();
176            }
177            return strategy;
178        }
179    
180        public AggregationCollection getAggregationCollection() {
181            return aggregationCollection;
182        }
183    
184        public void setAggregationCollection(AggregationCollection aggregationCollection) {
185            this.aggregationCollection = aggregationCollection;
186        }
187    
188        public AggregationStrategy getAggregationStrategy() {
189            return aggregationStrategy;
190        }
191    
192        public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
193            this.aggregationStrategy = aggregationStrategy;
194        }
195    
196        public Integer getBatchSize() {
197            return batchSize;
198        }
199    
200        public void setBatchSize(Integer batchSize) {
201            this.batchSize = batchSize;
202        }
203    
204        public Integer getOutBatchSize() {
205            return outBatchSize;
206        }
207    
208        public void setOutBatchSize(Integer outBatchSize) {
209            this.outBatchSize = outBatchSize;
210        }
211    
212        public Long getBatchTimeout() {
213            return batchTimeout;
214        }
215    
216        public void setBatchTimeout(Long batchTimeout) {
217            this.batchTimeout = batchTimeout;
218        }
219    
220        public String getStrategyRef() {
221            return strategyRef;
222        }
223    
224        public void setStrategyRef(String strategyRef) {
225            this.strategyRef = strategyRef;
226        }
227    
228        public void setCompletedPredicate(ExpressionSubElementType completedPredicate) {
229            this.completedPredicate = completedPredicate;
230        }
231    
232        public ExpressionSubElementType getCompletedPredicate() {
233            return completedPredicate;
234        }
235    
236        // Fluent API
237        //-------------------------------------------------------------------------
238        public AggregatorType batchSize(int batchSize) {
239            setBatchSize(batchSize);
240            return this;
241        }
242    
243        public AggregatorType outBatchSize(int batchSize) {
244            setOutBatchSize(batchSize);
245            return this;
246        }
247    
248        public AggregatorType batchTimeout(long batchTimeout) {
249            setBatchTimeout(batchTimeout);
250            return this;
251        }
252    
253        public AggregatorType aggregationCollection(AggregationCollection aggregationCollection) {
254            setAggregationCollection(aggregationCollection);
255            return this;
256        }
257    
258        public AggregatorType aggregationStrategy(AggregationStrategy aggregationStrategy) {
259            setAggregationStrategy(aggregationStrategy);
260            return this;
261        }
262    
263        public AggregatorType strategyRef(String strategyRef) {
264            setStrategyRef(strategyRef);
265            return this;
266        }
267    
268        /**
269         * Sets the predicate used to determine if the aggregation is completed
270         *
271         * @return the clause used to create the predicate
272         */
273        public ExpressionClause<AggregatorType> completedPredicate() {
274            checkNoCompletedPredicate();
275            ExpressionClause<AggregatorType> clause = new ExpressionClause<AggregatorType>(this);
276            setCompletedPredicate(new ExpressionSubElementType((Expression)clause));
277            return clause;
278        }
279    
280        /**
281         * Sets the predicate used to determine if the aggregation is completed
282         */
283        public AggregatorType completedPredicate(Predicate predicate) {
284            checkNoCompletedPredicate();
285            setCompletedPredicate(new ExpressionSubElementType(predicate));
286            return this;
287        }
288    
289        protected void checkNoCompletedPredicate() {
290            if (getCompletedPredicate() != null) {
291                throw new IllegalArgumentException("There is already a completedPredicate defined for this aggregator: " + this);
292            }
293        }
294    }