001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor;
018    
019    
020    
021    import java.util.ArrayList;
022    import java.util.Collection;
023    import java.util.List;
024    import java.util.concurrent.ArrayBlockingQueue;
025    import java.util.concurrent.CountDownLatch;
026    import java.util.concurrent.RejectedExecutionException;
027    import java.util.concurrent.RejectedExecutionHandler;
028    import java.util.concurrent.ThreadPoolExecutor;
029    import java.util.concurrent.TimeUnit;
030    import java.util.concurrent.atomic.AtomicBoolean;
031    
032    import org.apache.camel.AsyncCallback;
033    import org.apache.camel.Endpoint;
034    import org.apache.camel.Exchange;
035    import org.apache.camel.Processor;
036    import org.apache.camel.impl.ServiceSupport;
037    import org.apache.camel.processor.aggregate.AggregationStrategy;
038    import org.apache.camel.util.ExchangeHelper;
039    import org.apache.camel.util.ServiceHelper;
040    import static org.apache.camel.util.ObjectHelper.notNull;
041    
042    /**
043     * Implements the Multicast pattern to send a message exchange to a number of
044     * endpoints, each endpoint receiving a copy of the message exchange.
045     *
046     * @see Pipeline
047     * @version $Revision: 662940 $
048     */
049    public class MulticastProcessor extends ServiceSupport implements Processor {
050        static class ProcessorExchangePair {
051            private final Processor processor;
052            private final Exchange exchange;
053    
054            public ProcessorExchangePair(Processor processor, Exchange exchange) {
055                this.processor = processor;
056                this.exchange = exchange;
057            }
058    
059            public Processor getProcessor() {
060                return processor;
061            }
062    
063            public Exchange getExchange() {
064                return exchange;
065            }
066    
067    
068        }
069    
070        private Collection<Processor> processors;
071        private AggregationStrategy aggregationStrategy;
072        private boolean isParallelProcessing;
073        private ThreadPoolExecutor executor;
074        private final AtomicBoolean shutdown = new AtomicBoolean(true);
075    
076        public MulticastProcessor(Collection<Processor> processors) {
077            this(processors, null);
078        }
079    
080        public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy) {
081            this(processors, aggregationStrategy, false, null);
082        }
083    
084        public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ThreadPoolExecutor executor) {
085            notNull(processors, "processors");
086            this.processors = processors;
087            this.aggregationStrategy = aggregationStrategy;
088            this.isParallelProcessing = parallelProcessing;
089            if (isParallelProcessing) {
090                if (executor != null) {
091                    this.executor = executor;
092                } else { // setup default Executor
093                    this.executor = new ThreadPoolExecutor(processors.size(), processors.size(), 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(processors.size()));
094                }
095    
096            }
097    
098        }
099    
100        /**
101         * A helper method to convert a list of endpoints into a list of processors
102         */
103        public static <E extends Exchange> Collection<Processor> toProducers(Collection<Endpoint> endpoints)
104            throws Exception {
105            Collection<Processor> answer = new ArrayList<Processor>();
106            for (Endpoint endpoint : endpoints) {
107                answer.add(endpoint.createProducer());
108            }
109            return answer;
110        }
111    
112        @Override
113        public String toString() {
114            return "Multicast" + getProcessors();
115        }
116    
117        class ProcessCall implements Runnable {
118            private final Exchange exchange;
119            private final AsyncCallback callback;
120            private final Processor processor;
121    
122            public ProcessCall(Exchange exchange, Processor processor, AsyncCallback callback) {
123                this.exchange = exchange;
124                this.callback = callback;
125                this.processor = processor;
126            }
127    
128            public void run() {
129                if (shutdown.get()) {
130                    exchange.setException(new RejectedExecutionException());
131                    callback.done(false);
132                } else {
133                    try {
134                        processor.process(exchange);
135                    } catch (Exception ex) {
136                        exchange.setException(ex);
137                    }
138                    callback.done(false);
139                }
140            }
141        }
142    
143        public void process(Exchange exchange) throws Exception {
144            Exchange result = null;
145    
146            List<ProcessorExchangePair> pairs = createProcessorExchangePairs(exchange);
147    
148            // Parallel Processing the producer
149            if (isParallelProcessing) {
150                Exchange[] exchanges = new Exchange[pairs.size()];
151                final CountDownLatch completedExchanges = new CountDownLatch(pairs.size());
152                int i = 0;
153                for (ProcessorExchangePair pair : pairs) {
154                    Processor producer = pair.getProcessor();
155                    exchanges[i] = pair.getExchange();
156                    updateNewExchange(exchanges[i], i, pairs);
157                    ProcessCall call = new ProcessCall(exchanges[i], producer, new AsyncCallback() {
158                        public void done(boolean doneSynchronously) {
159                            completedExchanges.countDown();
160                        }
161    
162                    });
163                    executor.execute(call);
164                    i++;
165                }
166                completedExchanges.await();
167                if (aggregationStrategy != null) {
168                    for (Exchange resultExchange : exchanges) {
169                        if (result == null) {
170                            result = resultExchange;
171                        } else {
172                            result = aggregationStrategy.aggregate(result, resultExchange);
173                        }
174                    }
175                }
176    
177            } else {
178                // we call the producer one by one sequentially
179                int i = 0;
180                for (ProcessorExchangePair pair : pairs) {
181                    Processor producer = pair.getProcessor();
182                    Exchange subExchange = pair.getExchange();
183                    updateNewExchange(subExchange, i, pairs);
184    
185                    producer.process(subExchange);
186                    if (aggregationStrategy != null) {
187                        if (result == null) {
188                            result = subExchange;
189                        } else {
190                            result = aggregationStrategy.aggregate(result, subExchange);
191                        }
192                    }
193                    i++;
194                }
195            }
196            if (result != null) {
197                ExchangeHelper.copyResults(exchange, result);
198            }
199        }
200    
201        protected void updateNewExchange(Exchange exchange, int i, List<ProcessorExchangePair> allPairs) {
202            // No updates needed
203        }
204    
205        protected List<ProcessorExchangePair> createProcessorExchangePairs(
206            Exchange exchange) {
207            List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size());
208            Processor[] processorsArray = processors.toArray(new Processor[processors.size()]);
209            for (int i = 0; i < processorsArray.length; i++) {
210                result.add(new ProcessorExchangePair(processorsArray[i], exchange.copy()));
211            }
212            return result;
213        }
214    
215        protected void doStop() throws Exception {
216            shutdown.set(true);
217            if (executor != null) {
218                executor.shutdown();
219                executor.awaitTermination(0, TimeUnit.SECONDS);
220            }
221            ServiceHelper.stopServices(processors);
222        }
223    
224        protected void doStart() throws Exception {
225            shutdown.set(false);
226            if (executor != null) {
227                executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
228                    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
229                        ProcessCall call = (ProcessCall)runnable;
230                        call.exchange.setException(new RejectedExecutionException());
231                        call.callback.done(false);
232                    }
233                });
234            }
235            ServiceHelper.startServices(processors);
236        }
237    
238        /**
239         * Returns the producers to multicast to
240         */
241        public Collection<Processor> getProcessors() {
242            return processors;
243        }
244    
245        public AggregationStrategy getAggregationStrategy() {
246            return aggregationStrategy;
247        }
248    }