001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.processor; 018 019 import java.util.concurrent.RejectedExecutionException; 020 021 import org.apache.camel.AsyncCallback; 022 import org.apache.camel.AsyncProcessor; 023 import org.apache.camel.Exchange; 024 import org.apache.camel.Message; 025 import org.apache.camel.Predicate; 026 import org.apache.camel.Processor; 027 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter; 028 import org.apache.camel.model.ExceptionType; 029 import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy; 030 import org.apache.camel.util.AsyncProcessorHelper; 031 import org.apache.camel.util.ServiceHelper; 032 import org.apache.commons.logging.Log; 033 import org.apache.commons.logging.LogFactory; 034 035 /** 036 * Implements a <a 037 * href="http://activemq.apache.org/camel/dead-letter-channel.html">Dead Letter 038 * Channel</a> after attempting to redeliver the message using the 039 * {@link RedeliveryPolicy} 040 * 041 * @version $Revision: 706857 $ 042 */ 043 public class DeadLetterChannel extends ErrorHandlerSupport implements AsyncProcessor { 044 public static final String REDELIVERY_COUNTER = "org.apache.camel.RedeliveryCounter"; 045 public static final String REDELIVERED = "org.apache.camel.Redelivered"; 046 public static final String EXCEPTION_CAUSE_PROPERTY = "CamelCauseException"; 047 048 private static final transient Log LOG = LogFactory.getLog(DeadLetterChannel.class); 049 private static final String FAILURE_HANDLED_PROPERTY = DeadLetterChannel.class.getName() + ".FAILURE_HANDLED"; 050 private Processor output; 051 private Processor deadLetter; 052 private AsyncProcessor outputAsync; 053 private RedeliveryPolicy redeliveryPolicy; 054 private Logger logger; 055 056 private class RedeliveryData { 057 int redeliveryCounter; 058 long redeliveryDelay; 059 boolean sync = true; 060 Predicate handledPredicate; 061 062 // default behavior which can be overloaded on a per exception basis 063 RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy; 064 Processor failureProcessor = deadLetter; 065 066 } 067 068 public DeadLetterChannel(Processor output, Processor deadLetter) { 069 this(output, deadLetter, new RedeliveryPolicy(), DeadLetterChannel.createDefaultLogger(), 070 ErrorHandlerSupport.createDefaultExceptionPolicyStrategy()); 071 } 072 073 public DeadLetterChannel(Processor output, Processor deadLetter, RedeliveryPolicy redeliveryPolicy, Logger logger, ExceptionPolicyStrategy exceptionPolicyStrategy) { 074 this.deadLetter = deadLetter; 075 this.output = output; 076 this.outputAsync = AsyncProcessorTypeConverter.convert(output); 077 078 this.redeliveryPolicy = redeliveryPolicy; 079 this.logger = logger; 080 setExceptionPolicy(exceptionPolicyStrategy); 081 } 082 083 public static <E extends Exchange> Logger createDefaultLogger() { 084 return new Logger(LOG, LoggingLevel.ERROR); 085 } 086 087 @Override 088 public String toString() { 089 return "DeadLetterChannel[" + output + ", " + deadLetter + "]"; 090 } 091 092 public boolean process(Exchange exchange, final AsyncCallback callback) { 093 return process(exchange, callback, new RedeliveryData()); 094 } 095 096 public boolean process(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) { 097 098 while (true) { 099 // we can't keep retrying if the route is being shutdown. 100 if (!isRunAllowed()) { 101 if (exchange.getException() == null) { 102 exchange.setException(new RejectedExecutionException()); 103 } 104 callback.done(data.sync); 105 return data.sync; 106 } 107 108 // if the exchange is transacted then let the underlying system handle the redelivery etc. 109 // this DeadLetterChannel is only for non transacted exchanges 110 if (exchange.isTransacted() && exchange.getException() != null) { 111 if (LOG.isDebugEnabled()) { 112 LOG.debug("This is a transacted exchange, bypassing this DeadLetterChannel: " + this + " for exchange: " + exchange); 113 } 114 return data.sync; 115 } 116 117 // did previous processing caused an exception? 118 if (exchange.getException() != null) { 119 Throwable e = exchange.getException(); 120 // set the original caused exception 121 exchange.setProperty(EXCEPTION_CAUSE_PROPERTY, e); 122 123 logger.log("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". On delivery attempt: " + data.redeliveryCounter + " caught: " + e, e); 124 data.redeliveryCounter = incrementRedeliveryCounter(exchange, e); 125 126 // find the error handler to use (if any) 127 ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e); 128 if (exceptionPolicy != null) { 129 data.currentRedeliveryPolicy = exceptionPolicy.createRedeliveryPolicy(data.currentRedeliveryPolicy); 130 data.handledPredicate = exceptionPolicy.getHandledPolicy(); 131 Processor processor = exceptionPolicy.getErrorHandler(); 132 if (processor != null) { 133 data.failureProcessor = processor; 134 } 135 } 136 } 137 138 // should we redeliver or not? 139 if (!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) { 140 // we did not success with the redelivery so now we let the failure processor handle it 141 setFailureHandled(exchange); 142 // must decrement the redelivery counter as we didn't process the redelivery but is 143 // handling by the failure handler. So we must -1 to not let the counter be out-of-sync 144 decrementRedeliveryCounter(exchange); 145 146 AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor); 147 boolean sync = afp.process(exchange, new AsyncCallback() { 148 public void done(boolean sync) { 149 restoreExceptionOnExchange(exchange, data.handledPredicate); 150 callback.done(data.sync); 151 } 152 }); 153 154 // The line below shouldn't be needed, it is invoked by the AsyncCallback above 155 //restoreExceptionOnExchange(exchange, data.handledPredicate); 156 logger.log("Failed delivery for exchangeId: " + exchange.getExchangeId() + ". Handled by the failure processor: " + data.failureProcessor); 157 return sync; 158 } 159 160 // should we redeliver 161 if (data.redeliveryCounter > 0) { 162 // okay we will give it another go so clear the exception so we can try again 163 if (exchange.getException() != null) { 164 exchange.setException(null); 165 } 166 167 // wait until we should redeliver 168 data.redeliveryDelay = data.currentRedeliveryPolicy.sleep(data.redeliveryDelay); 169 } 170 171 // process the exchange 172 boolean sync = outputAsync.process(exchange, new AsyncCallback() { 173 public void done(boolean sync) { 174 // Only handle the async case... 175 if (sync) { 176 return; 177 } 178 data.sync = false; 179 // only process if the exchange hasn't failed 180 // and it has not been handled by the error processor 181 if (exchange.getException() != null && !isFailureHandled(exchange)) { 182 process(exchange, callback, data); 183 } else { 184 callback.done(sync); 185 } 186 } 187 }); 188 if (!sync) { 189 // It is going to be processed async.. 190 return false; 191 } 192 if (exchange.getException() == null || isFailureHandled(exchange)) { 193 // If everything went well.. then we exit here.. 194 callback.done(true); 195 return true; 196 } 197 // error occurred so loop back around..... 198 } 199 200 } 201 202 public static boolean isFailureHandled(Exchange exchange) { 203 return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null; 204 } 205 206 public static void setFailureHandled(Exchange exchange) { 207 exchange.setProperty(FAILURE_HANDLED_PROPERTY, exchange.getException()); 208 exchange.setException(null); 209 } 210 211 protected static void restoreExceptionOnExchange(Exchange exchange, Predicate handledPredicate) { 212 if (handledPredicate == null || !handledPredicate.matches(exchange)) { 213 if (LOG.isDebugEnabled()) { 214 LOG.debug("This exchange is not handled so its marked as failed: " + exchange); 215 } 216 // exception not handled, put exception back in the exchange 217 exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, Throwable.class)); 218 } else { 219 if (LOG.isDebugEnabled()) { 220 LOG.debug("This exchange is handled so its marked as not failed: " + exchange); 221 } 222 exchange.setProperty(Exchange.EXCEPTION_HANDLED_PROPERTY, Boolean.TRUE); 223 } 224 } 225 226 public void process(Exchange exchange) throws Exception { 227 AsyncProcessorHelper.process(this, exchange); 228 } 229 230 // Properties 231 // ------------------------------------------------------------------------- 232 233 /** 234 * Returns the output processor 235 */ 236 public Processor getOutput() { 237 return output; 238 } 239 240 /** 241 * Returns the dead letter that message exchanges will be sent to if the 242 * redelivery attempts fail 243 */ 244 public Processor getDeadLetter() { 245 return deadLetter; 246 } 247 248 public RedeliveryPolicy getRedeliveryPolicy() { 249 return redeliveryPolicy; 250 } 251 252 /** 253 * Sets the redelivery policy 254 */ 255 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { 256 this.redeliveryPolicy = redeliveryPolicy; 257 } 258 259 public Logger getLogger() { 260 return logger; 261 } 262 263 /** 264 * Sets the logger strategy; which {@link Log} to use and which 265 * {@link LoggingLevel} to use 266 */ 267 public void setLogger(Logger logger) { 268 this.logger = logger; 269 } 270 271 // Implementation methods 272 // ------------------------------------------------------------------------- 273 274 /** 275 * Increments the redelivery counter and adds the redelivered flag if the 276 * message has been redelivered 277 */ 278 protected int incrementRedeliveryCounter(Exchange exchange, Throwable e) { 279 Message in = exchange.getIn(); 280 Integer counter = in.getHeader(REDELIVERY_COUNTER, Integer.class); 281 int next = 1; 282 if (counter != null) { 283 next = counter + 1; 284 } 285 in.setHeader(REDELIVERY_COUNTER, next); 286 in.setHeader(REDELIVERED, Boolean.TRUE); 287 return next; 288 } 289 290 /** 291 * Prepares the redelivery counter and boolean flag for the failure handle processor 292 */ 293 private void decrementRedeliveryCounter(Exchange exchange) { 294 Message in = exchange.getIn(); 295 Integer counter = in.getHeader(REDELIVERY_COUNTER, Integer.class); 296 if (counter != null) { 297 int prev = counter - 1; 298 in.setHeader(REDELIVERY_COUNTER, prev); 299 // set boolean flag according to counter 300 in.setHeader(REDELIVERED, prev > 0 ? Boolean.TRUE : Boolean.FALSE); 301 } else { 302 // not redelivered 303 in.setHeader(REDELIVERY_COUNTER, 0); 304 in.setHeader(REDELIVERED, Boolean.FALSE); 305 } 306 } 307 308 @Override 309 protected void doStart() throws Exception { 310 ServiceHelper.startServices(output, deadLetter); 311 } 312 313 @Override 314 protected void doStop() throws Exception { 315 ServiceHelper.stopServices(deadLetter, output); 316 } 317 }