001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 020 021 import java.util.ArrayList; 022 import java.util.Collection; 023 import java.util.List; 024 import java.util.concurrent.ArrayBlockingQueue; 025 import java.util.concurrent.CountDownLatch; 026 import java.util.concurrent.RejectedExecutionException; 027 import java.util.concurrent.RejectedExecutionHandler; 028 import java.util.concurrent.ThreadPoolExecutor; 029 import java.util.concurrent.TimeUnit; 030 import java.util.concurrent.atomic.AtomicBoolean; 031 032 import org.apache.camel.AsyncCallback; 033 import org.apache.camel.Endpoint; 034 import org.apache.camel.Exchange; 035 import org.apache.camel.Processor; 036 import org.apache.camel.impl.ServiceSupport; 037 import org.apache.camel.processor.aggregate.AggregationStrategy; 038 import org.apache.camel.util.ExchangeHelper; 039 import org.apache.camel.util.ServiceHelper; 040 import static org.apache.camel.util.ObjectHelper.notNull; 041 042 /** 043 * Implements the Multicast pattern to send a message exchange to a number of 044 * endpoints, each endpoint receiving a copy of the message exchange. 045 * 046 * @see Pipeline 047 * @version $Revision: 662940 $ 048 */ 049 public class MulticastProcessor extends ServiceSupport implements Processor { 050 static class ProcessorExchangePair { 051 private final Processor processor; 052 private final Exchange exchange; 053 054 public ProcessorExchangePair(Processor processor, Exchange exchange) { 055 this.processor = processor; 056 this.exchange = exchange; 057 } 058 059 public Processor getProcessor() { 060 return processor; 061 } 062 063 public Exchange getExchange() { 064 return exchange; 065 } 066 067 068 } 069 070 private Collection<Processor> processors; 071 private AggregationStrategy aggregationStrategy; 072 private boolean isParallelProcessing; 073 private ThreadPoolExecutor executor; 074 private final AtomicBoolean shutdown = new AtomicBoolean(true); 075 076 public MulticastProcessor(Collection<Processor> processors) { 077 this(processors, null); 078 } 079 080 public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy) { 081 this(processors, aggregationStrategy, false, null); 082 } 083 084 public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ThreadPoolExecutor executor) { 085 notNull(processors, "processors"); 086 this.processors = processors; 087 this.aggregationStrategy = aggregationStrategy; 088 this.isParallelProcessing = parallelProcessing; 089 if (isParallelProcessing) { 090 if (executor != null) { 091 this.executor = executor; 092 } else { // setup default Executor 093 this.executor = new ThreadPoolExecutor(processors.size(), processors.size(), 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(processors.size())); 094 } 095 096 } 097 098 } 099 100 /** 101 * A helper method to convert a list of endpoints into a list of processors 102 */ 103 public static <E extends Exchange> Collection<Processor> toProducers(Collection<Endpoint> endpoints) 104 throws Exception { 105 Collection<Processor> answer = new ArrayList<Processor>(); 106 for (Endpoint endpoint : endpoints) { 107 answer.add(endpoint.createProducer()); 108 } 109 return answer; 110 } 111 112 @Override 113 public String toString() { 114 return "Multicast" + getProcessors(); 115 } 116 117 class ProcessCall implements Runnable { 118 private final Exchange exchange; 119 private final AsyncCallback callback; 120 private final Processor processor; 121 122 public ProcessCall(Exchange exchange, Processor processor, AsyncCallback callback) { 123 this.exchange = exchange; 124 this.callback = callback; 125 this.processor = processor; 126 } 127 128 public void run() { 129 if (shutdown.get()) { 130 exchange.setException(new RejectedExecutionException()); 131 callback.done(false); 132 } else { 133 try { 134 processor.process(exchange); 135 } catch (Exception ex) { 136 exchange.setException(ex); 137 } 138 callback.done(false); 139 } 140 } 141 } 142 143 public void process(Exchange exchange) throws Exception { 144 Exchange result = null; 145 146 List<ProcessorExchangePair> pairs = createProcessorExchangePairs(exchange); 147 148 // Parallel Processing the producer 149 if (isParallelProcessing) { 150 Exchange[] exchanges = new Exchange[pairs.size()]; 151 final CountDownLatch completedExchanges = new CountDownLatch(pairs.size()); 152 int i = 0; 153 for (ProcessorExchangePair pair : pairs) { 154 Processor producer = pair.getProcessor(); 155 exchanges[i] = pair.getExchange(); 156 updateNewExchange(exchanges[i], i, pairs); 157 ProcessCall call = new ProcessCall(exchanges[i], producer, new AsyncCallback() { 158 public void done(boolean doneSynchronously) { 159 completedExchanges.countDown(); 160 } 161 162 }); 163 executor.execute(call); 164 i++; 165 } 166 completedExchanges.await(); 167 if (aggregationStrategy != null) { 168 for (Exchange resultExchange : exchanges) { 169 if (result == null) { 170 result = resultExchange; 171 } else { 172 result = aggregationStrategy.aggregate(result, resultExchange); 173 } 174 } 175 } 176 177 } else { 178 // we call the producer one by one sequentially 179 int i = 0; 180 for (ProcessorExchangePair pair : pairs) { 181 Processor producer = pair.getProcessor(); 182 Exchange subExchange = pair.getExchange(); 183 updateNewExchange(subExchange, i, pairs); 184 185 producer.process(subExchange); 186 if (aggregationStrategy != null) { 187 if (result == null) { 188 result = subExchange; 189 } else { 190 result = aggregationStrategy.aggregate(result, subExchange); 191 } 192 } 193 i++; 194 } 195 } 196 if (result != null) { 197 ExchangeHelper.copyResults(exchange, result); 198 } 199 } 200 201 protected void updateNewExchange(Exchange exchange, int i, List<ProcessorExchangePair> allPairs) { 202 // No updates needed 203 } 204 205 protected List<ProcessorExchangePair> createProcessorExchangePairs( 206 Exchange exchange) { 207 List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size()); 208 Processor[] processorsArray = processors.toArray(new Processor[processors.size()]); 209 for (int i = 0; i < processorsArray.length; i++) { 210 result.add(new ProcessorExchangePair(processorsArray[i], exchange.copy())); 211 } 212 return result; 213 } 214 215 protected void doStop() throws Exception { 216 shutdown.set(true); 217 if (executor != null) { 218 executor.shutdown(); 219 executor.awaitTermination(0, TimeUnit.SECONDS); 220 } 221 ServiceHelper.stopServices(processors); 222 } 223 224 protected void doStart() throws Exception { 225 shutdown.set(false); 226 if (executor != null) { 227 executor.setRejectedExecutionHandler(new RejectedExecutionHandler() { 228 public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { 229 ProcessCall call = (ProcessCall)runnable; 230 call.exchange.setException(new RejectedExecutionException()); 231 call.callback.done(false); 232 } 233 }); 234 } 235 ServiceHelper.startServices(processors); 236 } 237 238 /** 239 * Returns the producers to multicast to 240 */ 241 public Collection<Processor> getProcessors() { 242 return processors; 243 } 244 245 public AggregationStrategy getAggregationStrategy() { 246 return aggregationStrategy; 247 } 248 }