001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    import java.util.Collection;
020    import java.util.Iterator;
021    
022    import org.apache.camel.Endpoint;
023    import org.apache.camel.Exchange;
024    import org.apache.camel.PollingConsumer;
025    import org.apache.camel.Processor;
026    import org.apache.camel.impl.LoggingExceptionHandler;
027    import org.apache.camel.impl.ServiceSupport;
028    import org.apache.camel.spi.ExceptionHandler;
029    import org.apache.camel.util.ServiceHelper;
030    import org.apache.commons.logging.Log;
031    import org.apache.commons.logging.LogFactory;
032    
033    /**
034     * A base class for any kind of {@link Processor} which implements some kind of
035     * batch processing.
036     * 
037     * @version $Revision: 703141 $
038     */
039    public class BatchProcessor extends ServiceSupport implements Runnable, Processor {
040        public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
041        public static final int DEFAULT_BATCH_SIZE = 100;
042    
043        private static final transient Log LOG = LogFactory.getLog(BatchProcessor.class);
044        private Endpoint endpoint;
045        private Processor processor;
046        private Collection<Exchange> collection;
047        private long batchTimeout = DEFAULT_BATCH_TIMEOUT;
048        private int batchSize = DEFAULT_BATCH_SIZE;
049        private int outBatchSize;
050        private PollingConsumer consumer;
051        private ExceptionHandler exceptionHandler;
052    
053        public BatchProcessor(Endpoint endpoint, Processor processor, Collection<Exchange> collection) {
054            this.endpoint = endpoint;
055            this.processor = processor;
056            this.collection = collection;
057        }
058    
059        @Override
060        public String toString() {
061            return "BatchProcessor[to: " + processor + "]";
062        }
063    
064        public void run() {
065            LOG.debug("Starting thread for " + this);
066            while (isRunAllowed()) {
067                try {
068                    processBatch();
069                } catch (Exception e) {
070                    getExceptionHandler().handleException(e);
071                }
072            }
073            collection.clear();
074        }
075    
076        // Properties
077        // -------------------------------------------------------------------------
078        public ExceptionHandler getExceptionHandler() {
079            if (exceptionHandler == null) {
080                exceptionHandler = new LoggingExceptionHandler(getClass());
081            }
082            return exceptionHandler;
083        }
084    
085        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
086            this.exceptionHandler = exceptionHandler;
087        }
088    
089        public int getBatchSize() {
090            return batchSize;
091        }
092    
093        /**
094         * Sets the <b>in</b> batch size. This is the number of incoming exchanges that this batch processor
095         * will process before its completed. The default value is {@link #DEFAULT_BATCH_SIZE}.
096         *
097         * @param batchSize the size
098         */
099        public void setBatchSize(int batchSize) {
100            this.batchSize = batchSize;
101        }
102    
103        public int getOutBatchSize() {
104            return outBatchSize;
105        }
106    
107        /**
108         * Sets the <b>out</b> batch size. If the batch processor holds more exchanges than this out size then
109         * the completion is triggered. Can for instance be used to ensure that this batch is completed when
110         * a certain number of exchanges has been collected. By default this feature is <b>not</b> enabled.
111         *
112         * @param outBatchSize the size
113         */
114        public void setOutBatchSize(int outBatchSize) {
115            this.outBatchSize = outBatchSize;
116        }
117    
118        public long getBatchTimeout() {
119            return batchTimeout;
120        }
121    
122        public void setBatchTimeout(long batchTimeout) {
123            this.batchTimeout = batchTimeout;
124        }
125    
126        public Endpoint getEndpoint() {
127            return endpoint;
128        }
129    
130        public Processor getProcessor() {
131            return processor;
132        }
133    
134        /**
135         * A transactional method to process a batch of messages up to a timeout
136         * period or number of messages reached.
137         */
138        protected synchronized void processBatch() throws Exception {
139            long start = System.currentTimeMillis();
140            long end = start + batchTimeout;
141            for (int i = 0; !isBatchCompleted(i); i++) {
142                long timeout = end - System.currentTimeMillis();
143                if (timeout < 0L) {                
144                    if (LOG.isTraceEnabled()) {
145                        LOG.trace("batch timeout expired at batch index: " + i);
146                    }
147                    break;
148                }
149                Exchange exchange = consumer.receive(timeout);
150                if (exchange == null) {
151                    if (LOG.isTraceEnabled()) {
152                        LOG.trace("receive with timeout: " + timeout + " expired at batch index: " + i);
153                    }
154                    break;
155                }
156                collection.add(exchange);
157            }
158    
159            // we should NOT log the collection directly as it will invoke a toString() on collection
160            // and it will call collection.iterator() where end-users might do stuff that would break
161            // calling the iterator a 2nd time as below
162    
163            // lets send the batch
164            Iterator<Exchange> iter = collection.iterator();
165            while (iter.hasNext()) {
166                Exchange exchange = iter.next();
167                iter.remove();
168                processExchange(exchange);
169            }
170        }
171    
172        /**
173         * A strategy method to decide if the batch is completed the resulting exchanges should be sent
174         */
175        protected boolean isBatchCompleted(int index) {
176            // out batch size is optional and we should only check it if its enabled (= >0)
177            if (outBatchSize > 0 && collection.size() >= outBatchSize) {
178                return true;
179            }
180            // fallback to regular batch size check
181            return index >= batchSize;
182        }
183    
184        /**
185         * Strategy Method to process an exchange in the batch. This method allows
186         * derived classes to perform custom processing before or after an
187         * individual exchange is processed
188         */
189        protected void processExchange(Exchange exchange) throws Exception {
190            processor.process(exchange);
191        }
192    
193        protected void doStart() throws Exception {
194            consumer = endpoint.createPollingConsumer();
195    
196            ServiceHelper.startServices(processor, consumer);
197    
198            Thread thread = new Thread(this, this + " Polling Thread");
199            thread.start();
200        }
201    
202        protected void doStop() throws Exception {
203            ServiceHelper.stopServices(consumer, processor);
204            collection.clear();
205        }
206    
207        protected Collection<Exchange> getCollection() {
208            return collection;
209        }
210    
211        public void process(Exchange exchange) throws Exception {
212            // empty since exchanges come from endpoint's polling consumer
213        }
214    }