001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.model; 018 019 import java.util.ArrayList; 020 import java.util.Arrays; 021 import java.util.Collection; 022 import java.util.Collections; 023 import java.util.HashSet; 024 import java.util.LinkedList; 025 import java.util.List; 026 import java.util.Set; 027 import java.util.concurrent.ThreadPoolExecutor; 028 029 import javax.xml.bind.annotation.XmlAccessType; 030 import javax.xml.bind.annotation.XmlAccessorType; 031 import javax.xml.bind.annotation.XmlAttribute; 032 import javax.xml.bind.annotation.XmlTransient; 033 034 import org.apache.camel.CamelContext; 035 import org.apache.camel.CamelException; 036 import org.apache.camel.Endpoint; 037 import org.apache.camel.Exchange; 038 import org.apache.camel.Expression; 039 import org.apache.camel.Predicate; 040 import org.apache.camel.Processor; 041 import org.apache.camel.Route; 042 import org.apache.camel.builder.DataFormatClause; 043 import org.apache.camel.builder.DeadLetterChannelBuilder; 044 import org.apache.camel.builder.ErrorHandlerBuilder; 045 import org.apache.camel.builder.ErrorHandlerBuilderRef; 046 import org.apache.camel.builder.ExpressionClause; 047 import org.apache.camel.builder.NoErrorHandlerBuilder; 048 import org.apache.camel.builder.ProcessorBuilder; 049 import org.apache.camel.impl.DefaultCamelContext; 050 import org.apache.camel.model.dataformat.DataFormatType; 051 import org.apache.camel.model.language.ConstantExpression; 052 import org.apache.camel.model.language.ExpressionType; 053 import org.apache.camel.model.language.LanguageExpression; 054 import org.apache.camel.processor.ConvertBodyProcessor; 055 import org.apache.camel.processor.DelegateProcessor; 056 import org.apache.camel.processor.Pipeline; 057 import org.apache.camel.processor.aggregate.AggregationCollection; 058 import org.apache.camel.processor.aggregate.AggregationStrategy; 059 import org.apache.camel.processor.idempotent.MessageIdRepository; 060 import org.apache.camel.spi.DataFormat; 061 import org.apache.camel.spi.ErrorHandlerWrappingStrategy; 062 import org.apache.camel.spi.InterceptStrategy; 063 import org.apache.camel.spi.Policy; 064 import org.apache.camel.spi.RouteContext; 065 import org.apache.commons.logging.Log; 066 import org.apache.commons.logging.LogFactory; 067 068 /** 069 * Base class for processor types that most XML types extend. 070 * 071 * @version $Revision: 708107 $ 072 */ 073 @XmlAccessorType(XmlAccessType.PROPERTY) 074 public abstract class ProcessorType<Type extends ProcessorType> extends OptionalIdentifiedType<Type> implements Block { 075 public static final String DEFAULT_TRACE_CATEGORY = "org.apache.camel.TRACE"; 076 private static final transient Log LOG = LogFactory.getLog(ProcessorType.class); 077 private ErrorHandlerBuilder errorHandlerBuilder; 078 private Boolean inheritErrorHandlerFlag; 079 private NodeFactory nodeFactory; 080 private LinkedList<Block> blocks = new LinkedList<Block>(); 081 private ProcessorType<? extends ProcessorType> parent; 082 private List<InterceptorType> interceptors = new ArrayList<InterceptorType>(); 083 private String errorHandlerRef; 084 085 // else to use an optional attribute in JAXB2 086 public abstract List<ProcessorType<?>> getOutputs(); 087 088 089 public Processor createProcessor(RouteContext routeContext) throws Exception { 090 throw new UnsupportedOperationException("Not implemented yet for class: " + getClass().getName()); 091 } 092 093 public Processor createOutputsProcessor(RouteContext routeContext) throws Exception { 094 Collection<ProcessorType<?>> outputs = getOutputs(); 095 return createOutputsProcessor(routeContext, outputs); 096 } 097 098 public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception { 099 Processor processor = makeProcessor(routeContext); 100 if (!routeContext.isRouteAdded()) { 101 routeContext.addEventDrivenProcessor(processor); 102 } 103 } 104 105 /** 106 * Wraps the child processor in whatever necessary interceptors and error 107 * handlers 108 */ 109 public Processor wrapProcessor(RouteContext routeContext, Processor processor) throws Exception { 110 processor = wrapProcessorInInterceptors(routeContext, processor); 111 return wrapInErrorHandler(routeContext, processor); 112 } 113 114 // Fluent API 115 // ------------------------------------------------------------------------- 116 117 /** 118 * Sends the exchange to the given endpoint URI 119 */ 120 public Type to(String uri) { 121 addOutput(new ToType(uri)); 122 return (Type) this; 123 } 124 125 /** 126 * Sends the exchange to the given endpoint 127 */ 128 public Type to(Endpoint endpoint) { 129 addOutput(new ToType(endpoint)); 130 return (Type) this; 131 } 132 133 /** 134 * Sends the exchange to a list of endpoints using the 135 * {@link org.apache.camel.processor.MulticastProcessor} pattern 136 */ 137 public Type to(String... uris) { 138 for (String uri : uris) { 139 addOutput(new ToType(uri)); 140 } 141 return (Type) this; 142 } 143 144 /** 145 * Sends the exchange to a list of endpoints using the 146 * {@link org.apache.camel.processor.MulticastProcessor} pattern 147 */ 148 public Type to(Endpoint... endpoints) { 149 for (Endpoint endpoint : endpoints) { 150 addOutput(new ToType(endpoint)); 151 } 152 return (Type) this; 153 } 154 155 /** 156 * Sends the exchange to a list of endpoint using the 157 * {@link org.apache.camel.processor.MulticastProcessor} pattern 158 */ 159 public Type to(Collection<Endpoint> endpoints) { 160 for (Endpoint endpoint : endpoints) { 161 addOutput(new ToType(endpoint)); 162 } 163 return (Type) this; 164 } 165 166 /** 167 * Multicasts messages to all its child outputs; so that each processor and 168 * destination gets a copy of the original message to avoid the processors 169 * interfering with each other. 170 */ 171 public MulticastType multicast() { 172 MulticastType answer = new MulticastType(); 173 addOutput(answer); 174 return answer; 175 } 176 177 /** 178 * Multicasts messages to all its child outputs; so that each processor and 179 * destination gets a copy of the original message to avoid the processors 180 * interfering with each other. 181 * @param aggregationStrategy the strategy used to aggregate responses for 182 * every part 183 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer 184 * @return the multicast type 185 */ 186 public MulticastType multicast(AggregationStrategy aggregationStrategy, boolean parallelProcessing) { 187 MulticastType answer = new MulticastType(); 188 addOutput(answer); 189 answer.setAggregationStrategy(aggregationStrategy); 190 answer.setParallelProcessing(parallelProcessing); 191 return answer; 192 } 193 194 /** 195 * Multicasts messages to all its child outputs; so that each processor and 196 * destination gets a copy of the original message to avoid the processors 197 * interfering with each other. 198 * @param aggregationStrategy the strategy used to aggregate responses for 199 * every part 200 * @return the multicast type 201 */ 202 public MulticastType multicast(AggregationStrategy aggregationStrategy) { 203 MulticastType answer = new MulticastType(); 204 addOutput(answer); 205 answer.setAggregationStrategy(aggregationStrategy); 206 return answer; 207 } 208 209 /** 210 * Creates a {@link Pipeline} of the list of endpoints so that the message 211 * will get processed by each endpoint in turn and for request/response the 212 * output of one endpoint will be the input of the next endpoint 213 */ 214 public Type pipeline(String... uris) { 215 // TODO pipeline v mulicast 216 return to(uris); 217 } 218 219 /** 220 * Creates a {@link Pipeline} of the list of endpoints so that the message 221 * will get processed by each endpoint in turn and for request/response the 222 * output of one endpoint will be the input of the next endpoint 223 */ 224 public Type pipeline(Endpoint... endpoints) { 225 // TODO pipeline v mulicast 226 return to(endpoints); 227 } 228 229 /** 230 * Creates a {@link Pipeline} of the list of endpoints so that the message 231 * will get processed by each endpoint in turn and for request/response the 232 * output of one endpoint will be the input of the next endpoint 233 */ 234 public Type pipeline(Collection<Endpoint> endpoints) { 235 // TODO pipeline v mulicast 236 return to(endpoints); 237 } 238 239 /** 240 * Ends the current block 241 */ 242 public ProcessorType<? extends ProcessorType> end() { 243 if (blocks.isEmpty()) { 244 if (parent == null) { 245 throw new IllegalArgumentException("Root node with no active block"); 246 } 247 return parent; 248 } 249 popBlock(); 250 return this; 251 } 252 253 /** 254 * Causes subsequent processors to be called asynchronously 255 * 256 * @param coreSize the number of threads that will be used to process 257 * messages in subsequent processors. 258 * @return a ThreadType builder that can be used to further configure the 259 * the thread pool. 260 */ 261 public ThreadType thread(int coreSize) { 262 ThreadType answer = new ThreadType(coreSize); 263 addOutput(answer); 264 return answer; 265 } 266 267 /** 268 * Causes subsequent processors to be called asynchronously 269 * 270 * @param executor the executor that will be used to process 271 * messages in subsequent processors. 272 * @return a ThreadType builder that can be used to further configure the 273 * the thread pool. 274 */ 275 public ProcessorType<Type> thread(ThreadPoolExecutor executor) { 276 ThreadType answer = new ThreadType(executor); 277 addOutput(answer); 278 return this; 279 } 280 281 /** 282 * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer} 283 * to avoid duplicate messages 284 */ 285 public IdempotentConsumerType idempotentConsumer(Expression messageIdExpression, 286 MessageIdRepository messageIdRepository) { 287 IdempotentConsumerType answer = new IdempotentConsumerType(messageIdExpression, messageIdRepository); 288 addOutput(answer); 289 return answer; 290 } 291 292 /** 293 * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer} 294 * to avoid duplicate messages 295 * 296 * @return the builder used to create the expression 297 */ 298 public ExpressionClause<IdempotentConsumerType> idempotentConsumer(MessageIdRepository messageIdRepository) { 299 IdempotentConsumerType answer = new IdempotentConsumerType(); 300 answer.setMessageIdRepository(messageIdRepository); 301 addOutput(answer); 302 return ExpressionClause.createAndSetExpression(answer); 303 } 304 305 /** 306 * Creates a predicate expression which only if it is true then the 307 * exchange is forwarded to the destination 308 * 309 * @return the clause used to create the filter expression 310 */ 311 public ExpressionClause<FilterType> filter() { 312 FilterType filter = new FilterType(); 313 addOutput(filter); 314 return ExpressionClause.createAndSetExpression(filter); 315 } 316 317 /** 318 * Creates a predicate which is applied and only if it is true then the 319 * exchange is forwarded to the destination 320 * 321 * @return the builder for a predicate 322 */ 323 public FilterType filter(Predicate predicate) { 324 FilterType filter = new FilterType(predicate); 325 addOutput(filter); 326 return filter; 327 } 328 329 public FilterType filter(ExpressionType expression) { 330 FilterType filter = getNodeFactory().createFilter(); 331 filter.setExpression(expression); 332 addOutput(filter); 333 return filter; 334 } 335 336 public FilterType filter(String language, String expression) { 337 return filter(new LanguageExpression(language, expression)); 338 } 339 340 public LoadBalanceType loadBalance() { 341 LoadBalanceType answer = new LoadBalanceType(); 342 addOutput(answer); 343 return answer; 344 } 345 346 347 /** 348 * Creates a choice of one or more predicates with an otherwise clause 349 * 350 * @return the builder for a choice expression 351 */ 352 public ChoiceType choice() { 353 ChoiceType answer = new ChoiceType(); 354 addOutput(answer); 355 return answer; 356 } 357 358 /** 359 * Creates a try/catch block 360 * 361 * @return the builder for a tryBlock expression 362 */ 363 public TryType tryBlock() { 364 TryType answer = new TryType(); 365 addOutput(answer); 366 return answer; 367 } 368 369 /** 370 * Creates a dynamic <a 371 * href="http://activemq.apache.org/camel/recipient-list.html">Recipient 372 * List</a> pattern. 373 * 374 * @param recipients is the builder of the expression used in the 375 * {@link org.apache.camel.processor.RecipientList} 376 * to decide the destinations 377 */ 378 public Type recipientList(Expression recipients) { 379 RecipientListType answer = new RecipientListType(recipients); 380 addOutput(answer); 381 return (Type) this; 382 } 383 384 /** 385 * Creates a dynamic <a 386 * href="http://activemq.apache.org/camel/recipient-list.html">Recipient 387 * List</a> pattern. 388 * 389 * @return the expression clause for the expression used in the 390 * {@link org.apache.camel.processor.RecipientList} 391 * to decide the destinations 392 */ 393 public ExpressionClause<ProcessorType<Type>> recipientList() { 394 RecipientListType answer = new RecipientListType(); 395 addOutput(answer); 396 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this); 397 answer.setExpression(clause); 398 return clause; 399 } 400 401 /** 402 * Creates a <a 403 * href="http://activemq.apache.org/camel/routing-slip.html">Routing 404 * Slip</a> pattern. 405 * 406 * @param header is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip} 407 * class will look in for the list of URIs to route the message to. 408 * @param uriDelimiter is the delimiter that will be used to split up 409 * the list of URIs in the routing slip. 410 */ 411 public Type routingSlip(String header, String uriDelimiter) { 412 RoutingSlipType answer = new RoutingSlipType(header, uriDelimiter); 413 addOutput(answer); 414 return (Type) this; 415 } 416 417 /** 418 * Creates a <a 419 * href="http://activemq.apache.org/camel/routing-slip.html">Routing 420 * Slip</a> pattern. 421 * 422 * @param header is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip} 423 * class will look in for the list of URIs to route the message to. The list of URIs 424 * will be split based on the default delimiter 425 * {@link RoutingSlipType#DEFAULT_DELIMITER}. 426 */ 427 public Type routingSlip(String header) { 428 RoutingSlipType answer = new RoutingSlipType(header); 429 addOutput(answer); 430 return (Type) this; 431 } 432 433 /** 434 * Creates a <a 435 * href="http://activemq.apache.org/camel/routing-slip.html">Routing 436 * Slip</a> pattern with the default header {@link RoutingSlipType#ROUTING_SLIP_HEADER}. 437 * The list of URIs in the header will be split based on the default delimiter 438 * {@link RoutingSlipType#DEFAULT_DELIMITER}. 439 */ 440 public Type routingSlip() { 441 RoutingSlipType answer = new RoutingSlipType(); 442 addOutput(answer); 443 return (Type) this; 444 } 445 446 /** 447 * Creates the <a 448 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 449 * pattern where an expression is evaluated to iterate through each of the 450 * parts of a message and then each part is then send to some endpoint. 451 * This splitter responds with the latest message returned from destination 452 * endpoint. 453 * 454 * @param receipients the expression on which to split 455 * @return the builder 456 */ 457 public SplitterType splitter(Expression receipients) { 458 SplitterType answer = new SplitterType(receipients); 459 addOutput(answer); 460 return answer; 461 } 462 463 /** 464 * Creates the <a 465 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 466 * pattern where an expression is evaluated to iterate through each of the 467 * parts of a message and then each part is then send to some endpoint. 468 * This splitter responds with the latest message returned from destination 469 * endpoint. 470 * 471 * @return the expression clause for the expression on which to split 472 */ 473 public ExpressionClause<SplitterType> splitter() { 474 SplitterType answer = new SplitterType(); 475 addOutput(answer); 476 return ExpressionClause.createAndSetExpression(answer); 477 } 478 479 /** 480 * Creates the <a 481 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 482 * pattern where an expression is evaluated to iterate through each of the 483 * parts of a message and then each part is then send to some endpoint. 484 * Answer from the splitter is produced using given {@link AggregationStrategy} 485 * @param partsExpression the expression on which to split 486 * @param aggregationStrategy the strategy used to aggregate responses for 487 * every part 488 * @return the builder 489 */ 490 public SplitterType splitter(Expression partsExpression, AggregationStrategy aggregationStrategy) { 491 SplitterType answer = new SplitterType(partsExpression); 492 addOutput(answer); 493 answer.setAggregationStrategy(aggregationStrategy); 494 return answer; 495 } 496 497 /** 498 * Creates the <a 499 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 500 * pattern where an expression is evaluated to iterate through each of the 501 * parts of a message and then each part is then send to some endpoint. 502 * Answer from the splitter is produced using given {@link AggregationStrategy} 503 * @param aggregationStrategy the strategy used to aggregate responses for 504 * every part 505 * @return the expression clause for the expression on which to split 506 */ 507 public ExpressionClause<SplitterType> splitter(AggregationStrategy aggregationStrategy) { 508 SplitterType answer = new SplitterType(); 509 addOutput(answer); 510 answer.setAggregationStrategy(aggregationStrategy); 511 return ExpressionClause.createAndSetExpression(answer); 512 } 513 514 /** 515 * Creates the <a 516 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 517 * pattern where an expression is evaluated to iterate through each of the 518 * parts of a message and then each part is then send to some endpoint. 519 * This splitter responds with the latest message returned from destination 520 * endpoint. 521 * 522 * @param receipients the expression on which to split 523 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer 524 * @return the builder 525 */ 526 public SplitterType splitter(Expression receipients, boolean parallelProcessing) { 527 SplitterType answer = new SplitterType(receipients); 528 addOutput(answer); 529 answer.setParallelProcessing(parallelProcessing); 530 return answer; 531 } 532 533 /** 534 * Creates the <a 535 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 536 * pattern where an expression is evaluated to iterate through each of the 537 * parts of a message and then each part is then send to some endpoint. 538 * This splitter responds with the latest message returned from destination 539 * endpoint. 540 * 541 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer 542 * @return the expression clause for the expression on which to split 543 */ 544 public ExpressionClause<SplitterType> splitter(boolean parallelProcessing) { 545 SplitterType answer = new SplitterType(); 546 addOutput(answer); 547 answer.setParallelProcessing(parallelProcessing); 548 return ExpressionClause.createAndSetExpression(answer); 549 } 550 551 /** 552 * Creates the <a 553 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 554 * pattern where an expression is evaluated to iterate through each of the 555 * parts of a message and then each part is then send to some endpoint. 556 * Answer from the splitter is produced using given {@link AggregationStrategy} 557 * @param partsExpression the expression on which to split 558 * @param aggregationStrategy the strategy used to aggregate responses for 559 * every part 560 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer 561 * @return the builder 562 */ 563 public SplitterType splitter(Expression partsExpression, 564 AggregationStrategy aggregationStrategy, boolean parallelProcessing) { 565 SplitterType answer = new SplitterType(partsExpression); 566 addOutput(answer); 567 answer.setAggregationStrategy(aggregationStrategy); 568 answer.setParallelProcessing(parallelProcessing); 569 return answer; 570 } 571 572 /** 573 * Creates the <a 574 * href="http://activemq.apache.org/camel/splitter.html">Splitter</a> 575 * pattern where an expression is evaluated to iterate through each of the 576 * parts of a message and then each part is then send to some endpoint. 577 * Answer from the splitter is produced using given {@link AggregationStrategy} 578 * @param aggregationStrategy the strategy used to aggregate responses for 579 * every part 580 * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer 581 * @return the expression clause for the expression on which to split 582 */ 583 public ExpressionClause<SplitterType> splitter(AggregationStrategy aggregationStrategy, boolean parallelProcessing) { 584 SplitterType answer = new SplitterType(); 585 addOutput(answer); 586 answer.setAggregationStrategy(aggregationStrategy); 587 answer.setParallelProcessing(parallelProcessing); 588 return ExpressionClause.createAndSetExpression(answer); 589 } 590 591 592 /** 593 * Creates the <a 594 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> 595 * pattern where a list of expressions are evaluated to be able to compare 596 * the message exchanges to reorder them. e.g. you may wish to sort by some 597 * headers 598 * 599 * @return the expression clause for the expressions on which to compare messages in order 600 */ 601 public ExpressionClause<ResequencerType> resequencer() { 602 ResequencerType answer = new ResequencerType(); 603 addOutput(answer); 604 ExpressionClause<ResequencerType> clause = new ExpressionClause<ResequencerType>(answer); 605 answer.expression(clause); 606 return clause; 607 } 608 609 /** 610 * Creates the <a 611 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> 612 * pattern where an expression is evaluated to be able to compare the 613 * message exchanges to reorder them. e.g. you may wish to sort by some 614 * header 615 * 616 * @param expression the expression on which to compare messages in order 617 * @return the builder 618 */ 619 public ResequencerType resequencer(Expression<Exchange> expression) { 620 return resequencer(Collections.<Expression>singletonList(expression)); 621 } 622 623 /** 624 * Creates the <a 625 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> 626 * pattern where a list of expressions are evaluated to be able to compare 627 * the message exchanges to reorder them. e.g. you may wish to sort by some 628 * headers 629 * 630 * @param expressions the expressions on which to compare messages in order 631 * @return the builder 632 */ 633 public ResequencerType resequencer(List<Expression> expressions) { 634 ResequencerType answer = new ResequencerType(expressions); 635 addOutput(answer); 636 return answer; 637 } 638 639 /** 640 * Creates the <a 641 * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a> 642 * pattern where a list of expressions are evaluated to be able to compare 643 * the message exchanges to reorder them. e.g. you may wish to sort by some 644 * headers 645 * 646 * @param expressions the expressions on which to compare messages in order 647 * @return the builder 648 */ 649 public ResequencerType resequencer(Expression... expressions) { 650 List<Expression> list = new ArrayList<Expression>(); 651 list.addAll(Arrays.asList(expressions)); 652 return resequencer(list); 653 } 654 655 /** 656 * Creates an <a 657 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> 658 * pattern where a batch of messages are processed (up to a maximum amount 659 * or until some timeout is reached) and messages for the same correlation 660 * key are combined together using some kind of {@link AggregationStrategy} 661 * (by default the latest message is used) to compress many message exchanges 662 * into a smaller number of exchanges. 663 * <p/> 664 * A good example of this is stock market data; you may be receiving 30,000 665 * messages/second and you may want to throttle it right down so that multiple 666 * messages for the same stock are combined (or just the latest message is used 667 * and older prices are discarded). Another idea is to combine line item messages 668 * together into a single invoice message. 669 */ 670 public ExpressionClause<AggregatorType> aggregator() { 671 if (!getOutputs().isEmpty()) { 672 throw new IllegalArgumentException("Aggregator must be the only output added to the route: " + this); 673 } 674 AggregatorType answer = new AggregatorType(); 675 addOutput(answer); 676 return ExpressionClause.createAndSetExpression(answer); 677 } 678 679 /** 680 * Creates an <a 681 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> 682 * pattern where a batch of messages are processed (up to a maximum amount 683 * or until some timeout is reached) and messages for the same correlation 684 * key are combined together using some kind of {@link AggregationStrategy} 685 * (by default the latest message is used) to compress many message exchanges 686 * into a smaller number of exchanges. 687 * <p/> 688 * A good example of this is stock market data; you may be receiving 30,000 689 * messages/second and you may want to throttle it right down so that multiple 690 * messages for the same stock are combined (or just the latest message is used 691 * and older prices are discarded). Another idea is to combine line item messages 692 * together into a single invoice message. 693 * 694 * @param aggregationStrategy the strategy used for the aggregation 695 */ 696 public ExpressionClause<AggregatorType> aggregator(AggregationStrategy aggregationStrategy) { 697 if (!getOutputs().isEmpty()) { 698 throw new IllegalArgumentException("Aggregator must be the only output added to the route: " + this); 699 } 700 AggregatorType answer = new AggregatorType(); 701 answer.setAggregationStrategy(aggregationStrategy); 702 addOutput(answer); 703 return ExpressionClause.createAndSetExpression(answer); 704 } 705 706 /** 707 * Creates an <a 708 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> 709 * pattern using a custom aggregation collection implementation. The aggregation collection must 710 * be configued with the strategy and correlation expression that this aggregator should use. 711 * This avoids duplicating this configuration on both the collection and the aggregator itself. 712 * 713 * @param aggregationCollection the collection used to perform the aggregation 714 */ 715 public AggregatorType aggregator(AggregationCollection aggregationCollection) { 716 if (!getOutputs().isEmpty()) { 717 throw new IllegalArgumentException("Aggregator must be the only output added to the route: " + this); 718 } 719 AggregatorType answer = new AggregatorType(); 720 answer.setAggregationCollection(aggregationCollection); 721 addOutput(answer); 722 return answer; 723 } 724 725 /** 726 * Creates an <a 727 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> 728 * pattern where a batch of messages are processed (up to a maximum amount 729 * or until some timeout is reached) and messages for the same correlation 730 * key are combined together using some kind of {@link AggregationStrategy} 731 * (by default the latest message is used) to compress many message exchanges 732 * into a smaller number of exchanges. 733 * <p/> 734 * A good example of this is stock market data; you may be receiving 30,000 735 * messages/second and you may want to throttle it right down so that multiple 736 * messages for the same stock are combined (or just the latest message is used 737 * and older prices are discarded). Another idea is to combine line item messages 738 * together into a single invoice message. 739 * 740 * @param correlationExpression the expression used to calculate the 741 * correlation key. For a JMS message this could be the 742 * expression <code>header("JMSDestination")</code> or 743 * <code>header("JMSCorrelationID")</code> 744 */ 745 public AggregatorType aggregator(Expression correlationExpression) { 746 if (!getOutputs().isEmpty()) { 747 throw new IllegalArgumentException("Aggregator must be the only output added to the route: " + this); 748 } 749 AggregatorType answer = new AggregatorType(correlationExpression); 750 addOutput(answer); 751 return answer; 752 } 753 754 /** 755 * Creates an <a 756 * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> 757 * pattern where a batch of messages are processed (up to a maximum amount 758 * or until some timeout is reached) and messages for the same correlation 759 * key are combined together using some kind of {@link AggregationStrategy} 760 * (by default the latest message is used) to compress many message exchanges 761 * into a smaller number of exchanges. 762 * <p/> 763 * A good example of this is stock market data; you may be receiving 30,000 764 * messages/second and you may want to throttle it right down so that multiple 765 * messages for the same stock are combined (or just the latest message is used 766 * and older prices are discarded). Another idea is to combine line item messages 767 * together into a single invoice message. 768 * 769 * @param correlationExpression the expression used to calculate the 770 * correlation key. For a JMS message this could be the 771 * expression <code>header("JMSDestination")</code> or 772 * <code>header("JMSCorrelationID")</code> 773 */ 774 public AggregatorType aggregator(Expression correlationExpression, AggregationStrategy aggregationStrategy) { 775 if (!getOutputs().isEmpty()) { 776 throw new IllegalArgumentException("Aggregator must be the only output added to the route: " + this); 777 } 778 AggregatorType answer = new AggregatorType(correlationExpression, aggregationStrategy); 779 addOutput(answer); 780 return answer; 781 } 782 783 /** 784 * Creates the <a 785 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 786 * where an expression is used to calculate the time which the message will 787 * be dispatched on 788 * 789 * @param processAtExpression an expression to calculate the time at which 790 * the messages should be processed 791 * @return the builder 792 */ 793 public DelayerType delayer(Expression<Exchange> processAtExpression) { 794 return delayer(processAtExpression, 0L); 795 } 796 797 /** 798 * Creates the <a 799 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 800 * where an expression is used to calculate the time which the message will 801 * be dispatched on 802 * 803 * @param processAtExpression an expression to calculate the time at which 804 * the messages should be processed 805 * @param delay the delay in milliseconds which is added to the 806 * processAtExpression to determine the time the message 807 * should be processed 808 * @return the builder 809 */ 810 public DelayerType delayer(Expression<Exchange> processAtExpression, long delay) { 811 DelayerType answer = new DelayerType(processAtExpression, delay); 812 addOutput(answer); 813 return answer; 814 } 815 816 /** 817 * Creates the <a 818 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 819 * where an expression is used to calculate the time which the message will 820 * be dispatched on 821 * @return the expression clause to create the expression 822 */ 823 public ExpressionClause<DelayerType> delayer() { 824 DelayerType answer = new DelayerType(); 825 addOutput(answer); 826 return ExpressionClause.createAndSetExpression(answer); 827 } 828 829 /** 830 * Creates the <a 831 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 832 * where a fixed amount of milliseconds are used to delay processing of a 833 * message exchange 834 * 835 * @param delay the default delay in milliseconds 836 * @return the builder 837 */ 838 public DelayerType delayer(long delay) { 839 return delayer(null, delay); 840 } 841 842 /** 843 * Creates the <a 844 * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern 845 * where an expression is used to calculate the time which the message will 846 * be dispatched on 847 * 848 * @return the builder 849 */ 850 public ThrottlerType throttler(long maximumRequestCount) { 851 ThrottlerType answer = new ThrottlerType(maximumRequestCount); 852 addOutput(answer); 853 return answer; 854 } 855 856 /** 857 * Creates a expression which must evaluate to an integer that determines 858 * how many times the exchange should be sent down the rest of the route. 859 * 860 * @return the clause used to create the loop expression 861 */ 862 public ExpressionClause<LoopType> loop() { 863 LoopType loop = new LoopType(); 864 addOutput(loop); 865 return ExpressionClause.createAndSetExpression(loop); 866 } 867 868 public LoopType loop(Expression<?> expression) { 869 LoopType loop = getNodeFactory().createLoop(); 870 loop.setExpression(expression); 871 addOutput(loop); 872 return loop; 873 } 874 875 public LoopType loop(int count) { 876 LoopType loop = getNodeFactory().createLoop(); 877 loop.setExpression(new ConstantExpression(Integer.toString(count))); 878 addOutput(loop); 879 return loop; 880 } 881 882 public Type throwFault(Throwable fault) { 883 ThrowFaultType answer = new ThrowFaultType(); 884 answer.setFault(fault); 885 addOutput(answer); 886 return (Type) this; 887 } 888 889 public Type throwFault(String message) { 890 return throwFault(new CamelException(message)); 891 } 892 893 /** 894 * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement) 895 */ 896 public Type interceptor(String ref) { 897 InterceptorRef interceptor = new InterceptorRef(ref); 898 intercept(interceptor); 899 return (Type) this; 900 } 901 902 /** 903 * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement) 904 */ 905 public Type intercept(DelegateProcessor interceptor) { 906 intercept(new InterceptorRef(interceptor)); 907 //lastInterceptor = interceptor; 908 return (Type) this; 909 } 910 911 /** 912 * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement) 913 */ 914 public InterceptType intercept() { 915 InterceptType answer = new InterceptType(); 916 addOutput(answer); 917 return answer; 918 } 919 920 /** 921 * Intercepts outputs added to this node in the future (i.e. intercepts outputs added after this statement) 922 */ 923 public void intercept(InterceptorType interceptor) { 924 addOutput(interceptor); 925 pushBlock(interceptor); 926 } 927 928 /** 929 * Adds an interceptor around the whole of this nodes processing 930 * 931 * @param interceptor 932 */ 933 public void addInterceptor(InterceptorType interceptor) { 934 interceptors.add(interceptor); 935 } 936 937 /** 938 * Adds an interceptor around the whole of this nodes processing 939 * 940 * @param interceptor 941 */ 942 public void addInterceptor(DelegateProcessor interceptor) { 943 addInterceptor(new InterceptorRef(interceptor)); 944 } 945 946 public void pushBlock(Block block) { 947 blocks.add(block); 948 } 949 950 public Block popBlock() { 951 return blocks.isEmpty() ? null : blocks.removeLast(); 952 } 953 954 public Type proceed() { 955 ProceedType proceed = null; 956 ProcessorType currentProcessor = this; 957 958 if (currentProcessor instanceof InterceptType) { 959 proceed = ((InterceptType) currentProcessor).getProceed(); 960 LOG.info("proceed() is the implied and hence not needed for an intercept()"); 961 } 962 if (proceed == null) { 963 for (ProcessorType node = parent; node != null; node = node.getParent()) { 964 if (node instanceof InterceptType) { 965 InterceptType intercept = (InterceptType)node; 966 proceed = intercept.getProceed(); 967 break; 968 } 969 } 970 971 if (proceed == null) { 972 throw new IllegalArgumentException("Cannot use proceed() without being within an intercept() block"); 973 } 974 975 } 976 977 addOutput(proceed); 978 return (Type) this; 979 } 980 981 public Type stop() { 982 ProcessorType currentProcessor = this; 983 984 if (currentProcessor instanceof InterceptType) { 985 ((InterceptType) currentProcessor).stopIntercept(); 986 } else { 987 ProcessorType node; 988 for (node = parent; node != null; node = node.getParent()) { 989 if (node instanceof InterceptType) { 990 ((InterceptType) node).stopIntercept(); 991 break; 992 } 993 } 994 if (node == null) { 995 throw new IllegalArgumentException("Cannot use stop() without being within an intercept() block"); 996 } 997 } 998 999 return (Type) this; 1000 } 1001 1002 /** 1003 * Catches an exception type. 1004 * 1005 * @deprecated Please use {@link #onException(Class)} instead. Will be removed in Camel 2.0. 1006 */ 1007 public ExceptionType exception(Class exceptionType) { 1008 return onException(exceptionType); 1009 } 1010 1011 /** 1012 * Catches an exception type. 1013 */ 1014 public ExceptionType onException(Class exceptionType) { 1015 ExceptionType answer = new ExceptionType(exceptionType); 1016 addOutput(answer); 1017 return answer; 1018 } 1019 1020 /** 1021 * Apply an interceptor route if the predicate is true 1022 */ 1023 public ChoiceType intercept(Predicate predicate) { 1024 InterceptType answer = new InterceptType(); 1025 addOutput(answer); 1026 return answer.when(predicate); 1027 } 1028 1029 public Type interceptors(String... refs) { 1030 for (String ref : refs) { 1031 interceptor(ref); 1032 } 1033 return (Type) this; 1034 } 1035 1036 /** 1037 * Trace logs the exchange before it goes to the next processing step using 1038 * the {@link #DEFAULT_TRACE_CATEGORY} logging category. 1039 * 1040 * @deprecated Please use <a href="http://activemq.apache.org/camel/tracer.html>Tracer Support</a> 1041 * instead. Will be removed in Camel 2.0. 1042 */ 1043 public Type trace() { 1044 return trace(DEFAULT_TRACE_CATEGORY); 1045 } 1046 1047 /** 1048 * Trace logs the exchange before it goes to the next processing step using 1049 * the specified logging category. 1050 * 1051 * @param category the logging category trace messages will sent to. 1052 * 1053 * @deprecated Please use <a href="http://activemq.apache.org/camel/tracer.html>Tracer Support</a> 1054 * instead. Will be removed in Camel 2.0. 1055 */ 1056 public Type trace(String category) { 1057 final Log log = LogFactory.getLog(category); 1058 return intercept(new DelegateProcessor() { 1059 @Override 1060 public void process(Exchange exchange) throws Exception { 1061 log.trace(exchange); 1062 processNext(exchange); 1063 } 1064 }); 1065 } 1066 1067 public PolicyRef policies() { 1068 PolicyRef answer = new PolicyRef(); 1069 addOutput(answer); 1070 return answer; 1071 } 1072 1073 public PolicyRef policy(Policy policy) { 1074 PolicyRef answer = new PolicyRef(policy); 1075 addOutput(answer); 1076 return answer; 1077 } 1078 1079 /** 1080 * Forces handling of faults as exceptions 1081 * 1082 * @return the current builder with the fault handler configured 1083 */ 1084 public Type handleFault() { 1085 intercept(new HandleFaultType()); 1086 return (Type) this; 1087 } 1088 1089 /** 1090 * Installs the given error handler builder 1091 * 1092 * @param errorHandlerBuilder the error handler to be used by default for 1093 * all child routes 1094 * @return the current builder with the error handler configured 1095 */ 1096 public Type errorHandler(ErrorHandlerBuilder errorHandlerBuilder) { 1097 setErrorHandlerBuilder(errorHandlerBuilder); 1098 return (Type) this; 1099 } 1100 1101 /** 1102 * Configures whether or not the error handler is inherited by every 1103 * processing node (or just the top most one) 1104 * 1105 * @param condition the flag as to whether error handlers should be 1106 * inherited or not 1107 * @return the current builder 1108 */ 1109 public Type inheritErrorHandler(boolean condition) { 1110 setInheritErrorHandlerFlag(condition); 1111 return (Type) this; 1112 } 1113 1114 // Transformers 1115 // ------------------------------------------------------------------------- 1116 1117 /** 1118 * Adds the custom processor to this destination which could be a final 1119 * destination, or could be a transformation in a pipeline 1120 */ 1121 public Type process(Processor processor) { 1122 ProcessorRef answer = new ProcessorRef(processor); 1123 addOutput(answer); 1124 return (Type) this; 1125 } 1126 1127 /** 1128 * Adds the custom processor reference to this destination which could be a final 1129 * destination, or could be a transformation in a pipeline 1130 */ 1131 public Type processRef(String ref) { 1132 ProcessorRef answer = new ProcessorRef(); 1133 answer.setRef(ref); 1134 addOutput(answer); 1135 return (Type) this; 1136 } 1137 1138 /** 1139 * Adds a bean which is invoked which could be a final destination, or could 1140 * be a transformation in a pipeline 1141 */ 1142 public Type bean(Object bean) { 1143 BeanRef answer = new BeanRef(); 1144 answer.setBean(bean); 1145 addOutput(answer); 1146 return (Type) this; 1147 } 1148 1149 /** 1150 * Adds a bean and method which is invoked which could be a final 1151 * destination, or could be a transformation in a pipeline 1152 */ 1153 public Type bean(Object bean, String method) { 1154 BeanRef answer = new BeanRef(); 1155 answer.setBean(bean); 1156 answer.setMethod(method); 1157 addOutput(answer); 1158 return (Type) this; 1159 } 1160 1161 /** 1162 * Adds a bean by type which is invoked which could be a final destination, or could 1163 * be a transformation in a pipeline 1164 */ 1165 public Type bean(Class beanType) { 1166 BeanRef answer = new BeanRef(); 1167 answer.setBeanType(beanType); 1168 addOutput(answer); 1169 return (Type) this; 1170 } 1171 1172 /** 1173 * Adds a bean type and method which is invoked which could be a final 1174 * destination, or could be a transformation in a pipeline 1175 */ 1176 public Type bean(Class beanType, String method) { 1177 BeanRef answer = new BeanRef(); 1178 answer.setBeanType(beanType); 1179 answer.setMethod(method); 1180 addOutput(answer); 1181 return (Type) this; 1182 } 1183 1184 /** 1185 * Adds a bean which is invoked which could be a final destination, or could 1186 * be a transformation in a pipeline 1187 */ 1188 public Type beanRef(String ref) { 1189 BeanRef answer = new BeanRef(ref); 1190 addOutput(answer); 1191 return (Type) this; 1192 } 1193 1194 /** 1195 * Adds a bean and method which is invoked which could be a final 1196 * destination, or could be a transformation in a pipeline 1197 */ 1198 public Type beanRef(String ref, String method) { 1199 BeanRef answer = new BeanRef(ref, method); 1200 addOutput(answer); 1201 return (Type) this; 1202 } 1203 1204 /** 1205 * Adds a processor which sets the body on the IN message 1206 */ 1207 public ExpressionClause<ProcessorType<Type>> setBody() { 1208 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this); 1209 SetBodyType answer = new SetBodyType(clause); 1210 addOutput(answer); 1211 return clause; 1212 } 1213 1214 /** 1215 * Adds a processor which sets the body on the IN message 1216 */ 1217 public Type setBody(Expression expression) { 1218 SetBodyType answer = new SetBodyType(expression); 1219 addOutput(answer); 1220 return (Type) this; 1221 } 1222 1223 /** 1224 * Adds a processor which sets the body on the OUT message 1225 * 1226 * @deprecated Please use {@link #transform(Expression)} instead. Will be removed in Camel 2.0. 1227 */ 1228 @Deprecated 1229 public Type setOutBody(Expression expression) { 1230 return transform(expression); 1231 } 1232 1233 /** 1234 * Adds a processor which sets the body on the OUT message 1235 * 1236 * @deprecated Please use {@link #transform()} instead. Will be removed in Camel 2.0. 1237 */ 1238 @Deprecated 1239 public ExpressionClause<ProcessorType<Type>> setOutBody() { 1240 return transform(); 1241 } 1242 1243 /** 1244 * Adds a processor which sets the body on the OUT message 1245 */ 1246 public Type transform(Expression expression) { 1247 TransformType answer = new TransformType(expression); 1248 addOutput(answer); 1249 return (Type) this; 1250 } 1251 1252 /** 1253 * Adds a processor which sets the body on the OUT message 1254 */ 1255 public ExpressionClause<ProcessorType<Type>> transform() { 1256 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this); 1257 TransformType answer = new TransformType(clause); 1258 addOutput(answer); 1259 return clause; 1260 } 1261 1262 /** 1263 * Adds a processor which sets the body on the FAULT message 1264 */ 1265 public Type setFaultBody(Expression expression) { 1266 return process(ProcessorBuilder.setFaultBody(expression)); 1267 } 1268 1269 /** 1270 * Adds a processor which sets the header on the IN message 1271 */ 1272 public ExpressionClause<ProcessorType<Type>> setHeader(String name) { 1273 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this); 1274 SetHeaderType answer = new SetHeaderType(name, clause); 1275 addOutput(answer); 1276 return clause; 1277 } 1278 1279 /** 1280 * Adds a processor which sets the header on the IN message 1281 */ 1282 public Type setHeader(String name, Expression expression) { 1283 SetHeaderType answer = new SetHeaderType(name, expression); 1284 addOutput(answer); 1285 return (Type) this; 1286 } 1287 1288 /** 1289 * Adds a processor which sets the header on the IN message to the given value 1290 * @deprecated Please use {@link #setHeader(String, Expression)} instead. Will be removed in Camel 2.0. 1291 */ 1292 public Type setHeader(String name, String value) { 1293 SetHeaderType answer = new SetHeaderType(name, value); 1294 addOutput(answer); 1295 return (Type) this; 1296 } 1297 1298 /** 1299 * Adds a processor which sets the header on the OUT message 1300 */ 1301 public ExpressionClause<ProcessorType<Type>> setOutHeader(String name) { 1302 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this); 1303 SetOutHeaderType answer = new SetOutHeaderType(name, clause); 1304 addOutput(answer); 1305 return clause; 1306 } 1307 1308 /** 1309 * Adds a processor which sets the header on the OUT message 1310 */ 1311 public Type setOutHeader(String name, Expression expression) { 1312 SetOutHeaderType answer = new SetOutHeaderType(name, expression); 1313 addOutput(answer); 1314 return (Type) this; 1315 } 1316 1317 /** 1318 * Adds a processor which sets the header on the FAULT message 1319 */ 1320 public Type setFaultHeader(String name, Expression expression) { 1321 return process(ProcessorBuilder.setFaultHeader(name, expression)); 1322 } 1323 1324 /** 1325 * Adds a processor which sets the exchange property 1326 */ 1327 public Type setProperty(String name, Expression expression) { 1328 SetPropertyType answer = new SetPropertyType(name, expression); 1329 addOutput(answer); 1330 return (Type) this; 1331 } 1332 1333 1334 /** 1335 * Adds a processor which sets the exchange property 1336 */ 1337 public ExpressionClause<ProcessorType<Type>> setProperty(String name) { 1338 ExpressionClause<ProcessorType<Type>> clause = new ExpressionClause<ProcessorType<Type>>((Type) this); 1339 SetPropertyType answer = new SetPropertyType(name, clause); 1340 addOutput(answer); 1341 return clause; 1342 } 1343 1344 /** 1345 * Adds a processor which removes the header on the IN message 1346 */ 1347 public Type removeHeader(String name) { 1348 RemoveHeaderType answer = new RemoveHeaderType(name); 1349 addOutput(answer); 1350 return (Type) this; 1351 } 1352 1353 /** 1354 * Adds a processor which removes the header on the FAULT message 1355 */ 1356 public Type removeFaultHeader(String name) { 1357 return process(ProcessorBuilder.removeFaultHeader(name)); 1358 } 1359 1360 /** 1361 * Adds a processor which removes the exchange property 1362 */ 1363 public Type removeProperty(String name) { 1364 RemovePropertyType answer = new RemovePropertyType(name); 1365 addOutput(answer); 1366 return (Type) this; 1367 } 1368 1369 /** 1370 * Converts the IN message body to the specified type 1371 */ 1372 public Type convertBodyTo(Class type) { 1373 addOutput(new ConvertBodyType(type)); 1374 return (Type) this; 1375 } 1376 1377 /** 1378 * Converts the IN message body to the specified class type 1379 */ 1380 public Type convertBodyTo(String typeString) { 1381 addOutput(new ConvertBodyType(typeString)); 1382 return (Type) this; 1383 } 1384 1385 /** 1386 * Converts the OUT message body to the specified type 1387 * 1388 * @deprecated Please use {@link #convertBodyTo(Class)} instead. Will be removed in Camel 2.0. 1389 */ 1390 @Deprecated 1391 public Type convertOutBodyTo(Class type) { 1392 return process(new ConvertBodyProcessor(type)); 1393 } 1394 1395 /** 1396 * Converts the FAULT message body to the specified type 1397 * 1398 * @deprecated Please use {@link #convertBodyTo(Class)} instead. Will be removed in Camel 2.0. 1399 */ 1400 @Deprecated 1401 public Type convertFaultBodyTo(Class type) { 1402 return process(new ConvertBodyProcessor(type)); 1403 } 1404 1405 // DataFormat support 1406 // ------------------------------------------------------------------------- 1407 1408 /** 1409 * Unmarshals the in body using a {@link DataFormat} expression to define 1410 * the format of the input message and the output will be set on the out message body. 1411 * 1412 * @return the expression to create the {@link DataFormat} 1413 */ 1414 public DataFormatClause<ProcessorType<Type>> unmarshal() { 1415 return new DataFormatClause<ProcessorType<Type>>(this, DataFormatClause.Operation.Unmarshal); 1416 } 1417 1418 /** 1419 * Unmarshals the in body using the specified {@link DataFormat} 1420 * and sets the output on the out message body. 1421 * 1422 * @return this object 1423 */ 1424 public Type unmarshal(DataFormatType dataFormatType) { 1425 addOutput(new UnmarshalType(dataFormatType)); 1426 return (Type) this; 1427 } 1428 1429 /** 1430 * Unmarshals the in body using the specified {@link DataFormat} 1431 * and sets the output on the out message body. 1432 * 1433 * @return this object 1434 */ 1435 public Type unmarshal(DataFormat dataFormat) { 1436 return unmarshal(new DataFormatType(dataFormat)); 1437 } 1438 1439 /** 1440 * Unmarshals the in body using the specified {@link DataFormat} 1441 * reference in the {@link org.apache.camel.spi.Registry} and sets 1442 * the output on the out message body. 1443 * 1444 * @return this object 1445 */ 1446 public Type unmarshal(String dataTypeRef) { 1447 addOutput(new UnmarshalType(dataTypeRef)); 1448 return (Type) this; 1449 } 1450 1451 /** 1452 * Marshals the in body using a {@link DataFormat} expression to define 1453 * the format of the output which will be added to the out body. 1454 * 1455 * @return the expression to create the {@link DataFormat} 1456 */ 1457 public DataFormatClause<ProcessorType<Type>> marshal() { 1458 return new DataFormatClause<ProcessorType<Type>>(this, DataFormatClause.Operation.Marshal); 1459 } 1460 1461 /** 1462 * Marshals the in body using the specified {@link DataFormat} 1463 * and sets the output on the out message body. 1464 * 1465 * @return this object 1466 */ 1467 public Type marshal(DataFormatType dataFormatType) { 1468 addOutput(new MarshalType(dataFormatType)); 1469 return (Type) this; 1470 } 1471 1472 /** 1473 * Marshals the in body using the specified {@link DataFormat} 1474 * and sets the output on the out message body. 1475 * 1476 * @return this object 1477 */ 1478 public Type marshal(DataFormat dataFormat) { 1479 return marshal(new DataFormatType(dataFormat)); 1480 } 1481 1482 /** 1483 * Marshals the in body the specified {@link DataFormat} 1484 * reference in the {@link org.apache.camel.spi.Registry} and sets 1485 * the output on the out message body. 1486 * 1487 * @return this object 1488 */ 1489 public Type marshal(String dataTypeRef) { 1490 addOutput(new MarshalType(dataTypeRef)); 1491 return (Type) this; 1492 } 1493 1494 // Properties 1495 // ------------------------------------------------------------------------- 1496 @XmlTransient 1497 public ProcessorType<? extends ProcessorType> getParent() { 1498 return parent; 1499 } 1500 1501 public void setParent(ProcessorType<? extends ProcessorType> parent) { 1502 this.parent = parent; 1503 } 1504 1505 @XmlTransient 1506 public ErrorHandlerBuilder getErrorHandlerBuilder() { 1507 if (errorHandlerBuilder == null) { 1508 errorHandlerBuilder = createErrorHandlerBuilder(); 1509 } 1510 return errorHandlerBuilder; 1511 } 1512 1513 /** 1514 * Sets the error handler to use with processors created by this builder 1515 */ 1516 public void setErrorHandlerBuilder(ErrorHandlerBuilder errorHandlerBuilder) { 1517 this.errorHandlerBuilder = errorHandlerBuilder; 1518 } 1519 1520 /** 1521 * Sets the error handler if one is not already set 1522 */ 1523 protected void setErrorHandlerBuilderIfNull(ErrorHandlerBuilder errorHandlerBuilder) { 1524 if (this.errorHandlerBuilder == null) { 1525 setErrorHandlerBuilder(errorHandlerBuilder); 1526 } 1527 } 1528 1529 public String getErrorHandlerRef() { 1530 return errorHandlerRef; 1531 } 1532 1533 /** 1534 * Sets the bean ref name of the error handler builder to use on this route 1535 */ 1536 @XmlAttribute(required = false) 1537 public void setErrorHandlerRef(String errorHandlerRef) { 1538 this.errorHandlerRef = errorHandlerRef; 1539 setErrorHandlerBuilder(new ErrorHandlerBuilderRef(errorHandlerRef)); 1540 } 1541 1542 @XmlTransient 1543 public boolean isInheritErrorHandler() { 1544 return isInheritErrorHandler(getInheritErrorHandlerFlag()); 1545 } 1546 1547 /** 1548 * Lets default the inherit value to be true if there is none specified 1549 */ 1550 public static boolean isInheritErrorHandler(Boolean value) { 1551 return value == null || value.booleanValue(); 1552 } 1553 1554 @XmlAttribute(name = "inheritErrorHandler", required = false) 1555 public Boolean getInheritErrorHandlerFlag() { 1556 return inheritErrorHandlerFlag; 1557 } 1558 1559 public void setInheritErrorHandlerFlag(Boolean inheritErrorHandlerFlag) { 1560 this.inheritErrorHandlerFlag = inheritErrorHandlerFlag; 1561 } 1562 1563 @XmlTransient 1564 public NodeFactory getNodeFactory() { 1565 if (nodeFactory == null) { 1566 nodeFactory = new NodeFactory(); 1567 } 1568 return nodeFactory; 1569 } 1570 1571 public void setNodeFactory(NodeFactory nodeFactory) { 1572 this.nodeFactory = nodeFactory; 1573 } 1574 1575 /** 1576 * Returns a label to describe this node such as the expression if some kind of expression node 1577 */ 1578 public String getLabel() { 1579 return ""; 1580 } 1581 1582 // Implementation methods 1583 // ------------------------------------------------------------------------- 1584 1585 /** 1586 * Creates the processor and wraps it in any necessary interceptors and 1587 * error handlers 1588 */ 1589 protected Processor makeProcessor(RouteContext routeContext) throws Exception { 1590 Processor processor = createProcessor(routeContext); 1591 return wrapProcessor(routeContext, processor); 1592 } 1593 1594 /** 1595 * A strategy method which allows derived classes to wrap the child 1596 * processor in some kind of interceptor 1597 * 1598 * @param routeContext 1599 * @param target the processor which can be wrapped 1600 * @return the original processor or a new wrapped interceptor 1601 */ 1602 protected Processor wrapProcessorInInterceptors(RouteContext routeContext, Processor target) throws Exception { 1603 // The target is required. 1604 if (target == null) { 1605 throw new IllegalArgumentException("target not provided on node: " + this); 1606 } 1607 1608 List<InterceptStrategy> strategies = new ArrayList<InterceptStrategy>(); 1609 CamelContext camelContext = routeContext.getCamelContext(); 1610 if (camelContext instanceof DefaultCamelContext) { 1611 DefaultCamelContext defaultCamelContext = (DefaultCamelContext) camelContext; 1612 strategies.addAll(defaultCamelContext.getInterceptStrategies()); 1613 } 1614 strategies.addAll(routeContext.getInterceptStrategies()); 1615 for (InterceptStrategy strategy : strategies) { 1616 if (strategy != null) { 1617 target = strategy.wrapProcessorInInterceptors(this, target); 1618 } 1619 } 1620 1621 List<InterceptorType> list = routeContext.getRoute().getInterceptors(); 1622 if (interceptors != null) { 1623 list.addAll(interceptors); 1624 } 1625 // lets reverse the list so we apply the inner interceptors first 1626 Collections.reverse(list); 1627 Set<Processor> interceptors = new HashSet<Processor>(); 1628 interceptors.add(target); 1629 for (InterceptorType interceptorType : list) { 1630 DelegateProcessor interceptor = interceptorType.createInterceptor(routeContext); 1631 if (!interceptors.contains(interceptor)) { 1632 interceptors.add(interceptor); 1633 if (interceptor.getProcessor() != null) { 1634 LOG.warn("Interceptor " + interceptor + " currently wraps target " 1635 + interceptor.getProcessor() 1636 + " is attempting to change target " + target 1637 + " new wrapping has been denied."); 1638 } else { 1639 interceptor.setProcessor(target); 1640 target = interceptor; 1641 } 1642 } 1643 } 1644 return target; 1645 } 1646 1647 /** 1648 * A strategy method to allow newly created processors to be wrapped in an 1649 * error handler. 1650 */ 1651 protected Processor wrapInErrorHandler(RouteContext routeContext, Processor target) throws Exception { 1652 // The target is required. 1653 if (target == null) { 1654 throw new IllegalArgumentException("target not provided on node: " + this); 1655 } 1656 1657 ErrorHandlerWrappingStrategy strategy = routeContext.getErrorHandlerWrappingStrategy(); 1658 1659 if (strategy != null) { 1660 return strategy.wrapProcessorInErrorHandler(routeContext, this, target); 1661 } 1662 1663 return getErrorHandlerBuilder().createErrorHandler(routeContext, target); 1664 } 1665 1666 protected ErrorHandlerBuilder createErrorHandlerBuilder() { 1667 if (errorHandlerRef != null) { 1668 return new ErrorHandlerBuilderRef(errorHandlerRef); 1669 } 1670 if (isInheritErrorHandler()) { 1671 return new DeadLetterChannelBuilder(); 1672 } else { 1673 return new NoErrorHandlerBuilder(); 1674 } 1675 } 1676 1677 protected void configureChild(ProcessorType output) { 1678 output.setNodeFactory(getNodeFactory()); 1679 } 1680 1681 public void addOutput(ProcessorType processorType) { 1682 processorType.setParent(this); 1683 configureChild(processorType); 1684 if (blocks.isEmpty()) { 1685 getOutputs().add(processorType); 1686 } else { 1687 Block block = blocks.getLast(); 1688 block.addOutput(processorType); 1689 } 1690 } 1691 1692 /** 1693 * Creates a new instance of some kind of composite processor which defaults 1694 * to using a {@link Pipeline} but derived classes could change the 1695 * behaviour 1696 */ 1697 protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) { 1698 // return new MulticastProcessor(list); 1699 return new Pipeline(list); 1700 } 1701 1702 protected Processor createOutputsProcessor(RouteContext routeContext, Collection<ProcessorType<?>> outputs) 1703 throws Exception { 1704 List<Processor> list = new ArrayList<Processor>(); 1705 for (ProcessorType output : outputs) { 1706 Processor processor = output.createProcessor(routeContext); 1707 // if the ProceedType create processor is null we keep on going 1708 if (output instanceof ProceedType && processor == null) { 1709 continue; 1710 } 1711 processor = output.wrapProcessorInInterceptors(routeContext, processor); 1712 1713 ProcessorType currentProcessor = this; 1714 if (!(currentProcessor instanceof ExceptionType || currentProcessor instanceof TryType)) { 1715 processor = output.wrapInErrorHandler(routeContext, processor); 1716 } 1717 1718 list.add(processor); 1719 } 1720 Processor processor = null; 1721 if (!list.isEmpty()) { 1722 if (list.size() == 1) { 1723 processor = list.get(0); 1724 } else { 1725 processor = createCompositeProcessor(routeContext, list); 1726 } 1727 } 1728 return processor; 1729 } 1730 1731 public void clearOutput() { 1732 getOutputs().clear(); 1733 blocks.clear(); 1734 } 1735 }