001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017 package org.apache.camel.impl; 018 019 import java.util.HashMap; 020 import java.util.Map; 021 022 import org.apache.camel.AsyncCallback; 023 import org.apache.camel.CamelContext; 024 import org.apache.camel.Endpoint; 025 import org.apache.camel.Exchange; 026 import org.apache.camel.ExchangePattern; 027 import org.apache.camel.Message; 028 import org.apache.camel.NoSuchEndpointException; 029 import org.apache.camel.Processor; 030 import org.apache.camel.Producer; 031 import org.apache.camel.ProducerTemplate; 032 import org.apache.camel.util.ObjectHelper; 033 import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException; 034 035 /** 036 * A client helper object (named like Spring's TransactionTemplate & JmsTemplate 037 * et al) for working with Camel and sending {@link org.apache.camel.Message} instances in an 038 * {@link org.apache.camel.Exchange} to an {@link org.apache.camel.Endpoint}. 039 * 040 * @version $Revision: 695516 $ 041 */ 042 public class DefaultProducerTemplate<E extends Exchange> extends ServiceSupport implements ProducerTemplate<E> { 043 private CamelContext context; 044 private final ProducerCache<E> producerCache = new ProducerCache<E>(); 045 private boolean useEndpointCache = true; 046 private final Map<String, Endpoint<E>> endpointCache = new HashMap<String, Endpoint<E>>(); 047 private Endpoint<E> defaultEndpoint; 048 049 public DefaultProducerTemplate(CamelContext context) { 050 this.context = context; 051 } 052 053 public DefaultProducerTemplate(CamelContext context, Endpoint defaultEndpoint) { 054 this(context); 055 this.defaultEndpoint = defaultEndpoint; 056 } 057 058 public E send(String endpointUri, E exchange) { 059 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 060 return send(endpoint, exchange); 061 } 062 063 public E send(String endpointUri, Processor processor) { 064 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 065 return send(endpoint, processor); 066 } 067 068 public E send(String endpointUri, Processor processor, AsyncCallback callback) { 069 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 070 return send(endpoint, processor, callback); 071 } 072 073 public E send(String endpointUri, ExchangePattern pattern, Processor processor) { 074 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 075 return send(endpoint, pattern, processor); 076 } 077 078 public E send(Endpoint<E> endpoint, E exchange) { 079 E convertedExchange = exchange; 080 producerCache.send(endpoint, convertedExchange); 081 return convertedExchange; 082 } 083 084 public E send(Endpoint<E> endpoint, Processor processor) { 085 return producerCache.send(endpoint, processor); 086 } 087 088 public E send(Endpoint<E> endpoint, Processor processor, AsyncCallback callback) { 089 return producerCache.send(endpoint, processor, callback); 090 } 091 092 public E send(Endpoint<E> endpoint, ExchangePattern pattern, Processor processor) { 093 return producerCache.send(endpoint, pattern, processor); 094 } 095 096 public Object sendBody(Endpoint<E> endpoint, ExchangePattern pattern, Object body) { 097 E result = send(endpoint, pattern, createSetBodyProcessor(body)); 098 return extractResultBody(result, pattern); 099 } 100 101 public Object sendBody(Endpoint<E> endpoint, Object body) { 102 E result = send(endpoint, createSetBodyProcessor(body)); 103 return extractResultBody(result); 104 } 105 106 public Object sendBody(String endpointUri, Object body) { 107 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 108 return sendBody(endpoint, body); 109 } 110 111 public Object sendBody(String endpointUri, ExchangePattern pattern, Object body) { 112 Endpoint endpoint = resolveMandatoryEndpoint(endpointUri); 113 return sendBody(endpoint, pattern, body); 114 } 115 116 public Object sendBodyAndHeader(String endpointUri, final Object body, final String header, 117 final Object headerValue) { 118 return sendBodyAndHeader(resolveMandatoryEndpoint(endpointUri), body, header, headerValue); 119 } 120 121 public Object sendBodyAndHeader(Endpoint endpoint, final Object body, final String header, 122 final Object headerValue) { 123 E result = send(endpoint, createBodyAndHeaderProcessor(body, header, headerValue)); 124 return extractResultBody(result); 125 } 126 127 public Object sendBodyAndHeader(Endpoint endpoint, ExchangePattern pattern, final Object body, final String header, 128 final Object headerValue) { 129 E result = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue)); 130 return extractResultBody(result, pattern); 131 } 132 133 public Object sendBodyAndHeader(String endpoint, ExchangePattern pattern, final Object body, final String header, 134 final Object headerValue) { 135 E result = send(endpoint, pattern, createBodyAndHeaderProcessor(body, header, headerValue)); 136 return extractResultBody(result, pattern); 137 } 138 139 public Object sendBodyAndHeaders(String endpointUri, final Object body, final Map<String, Object> headers) { 140 return sendBodyAndHeaders(resolveMandatoryEndpoint(endpointUri), body, headers); 141 } 142 143 public Object sendBodyAndHeaders(Endpoint endpoint, final Object body, final Map<String, Object> headers) { 144 E result = send(endpoint, new Processor() { 145 public void process(Exchange exchange) { 146 Message in = exchange.getIn(); 147 for (Map.Entry<String, Object> header : headers.entrySet()) { 148 in.setHeader(header.getKey(), header.getValue()); 149 } 150 in.setBody(body); 151 } 152 }); 153 return extractResultBody(result); 154 } 155 156 // Methods using an InOut ExchangePattern 157 // ----------------------------------------------------------------------- 158 159 public E request(Endpoint<E> endpoint, Processor processor) { 160 return send(endpoint, ExchangePattern.InOut, processor); 161 } 162 163 public Object requestBody(Endpoint<E> endpoint, Object body) { 164 return sendBody(endpoint, ExchangePattern.InOut, body); 165 } 166 167 public Object requestBodyAndHeader(Endpoint<E> endpoint, Object body, String header, Object headerValue) { 168 return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue); 169 } 170 171 public E request(String endpoint, Processor processor) { 172 return send(endpoint, ExchangePattern.InOut, processor); 173 } 174 175 public Object requestBody(String endpoint, Object body) { 176 return sendBody(endpoint, ExchangePattern.InOut, body); 177 } 178 179 public Object requestBodyAndHeader(String endpoint, Object body, String header, Object headerValue) { 180 return sendBodyAndHeader(endpoint, ExchangePattern.InOut, body, header, headerValue); 181 } 182 183 // Methods using the default endpoint 184 // ----------------------------------------------------------------------- 185 186 public Object sendBody(Object body) { 187 return sendBody(getMandatoryDefaultEndpoint(), body); 188 } 189 190 public E send(E exchange) { 191 return send(getMandatoryDefaultEndpoint(), exchange); 192 } 193 194 public E send(Processor processor) { 195 return send(getMandatoryDefaultEndpoint(), processor); 196 } 197 198 public Object sendBodyAndHeader(Object body, String header, Object headerValue) { 199 return sendBodyAndHeader(getMandatoryDefaultEndpoint(), body, header, headerValue); 200 } 201 202 public Object sendBodyAndHeaders(Object body, Map<String, Object> headers) { 203 return sendBodyAndHeaders(getMandatoryDefaultEndpoint(), body, headers); 204 } 205 206 // Properties 207 // ----------------------------------------------------------------------- 208 public Producer<E> getProducer(Endpoint<E> endpoint) { 209 return producerCache.getProducer(endpoint); 210 } 211 212 public CamelContext getContext() { 213 return context; 214 } 215 216 public Endpoint<E> getDefaultEndpoint() { 217 return defaultEndpoint; 218 } 219 220 public void setDefaultEndpoint(Endpoint<E> defaultEndpoint) { 221 this.defaultEndpoint = defaultEndpoint; 222 } 223 224 /** 225 * Sets the default endpoint to use if none is specified 226 */ 227 public void setDefaultEndpointUri(String endpointUri) { 228 setDefaultEndpoint(getContext().getEndpoint(endpointUri)); 229 } 230 231 public boolean isUseEndpointCache() { 232 return useEndpointCache; 233 } 234 235 public void setUseEndpointCache(boolean useEndpointCache) { 236 this.useEndpointCache = useEndpointCache; 237 } 238 239 public <T extends Endpoint<?>> T getResolvedEndpoint(String endpointUri, Class<T> expectedClass) { 240 Endpoint<?> e = null; 241 synchronized (endpointCache) { 242 e = endpointCache.get(endpointUri); 243 } 244 if (e != null && expectedClass.isAssignableFrom(e.getClass())) { 245 return expectedClass.asSubclass(expectedClass).cast(e); 246 } 247 return null; 248 } 249 250 // Implementation methods 251 // ----------------------------------------------------------------------- 252 253 protected Processor createBodyAndHeaderProcessor(final Object body, final String header, final Object headerValue) { 254 return new Processor() { 255 public void process(Exchange exchange) { 256 Message in = exchange.getIn(); 257 in.setHeader(header, headerValue); 258 in.setBody(body); 259 } 260 }; 261 } 262 263 protected Processor createSetBodyProcessor(final Object body) { 264 return new Processor() { 265 public void process(Exchange exchange) { 266 Message in = exchange.getIn(); 267 in.setBody(body); 268 } 269 }; 270 } 271 272 protected Endpoint resolveMandatoryEndpoint(String endpointUri) { 273 Endpoint endpoint = null; 274 275 if (isUseEndpointCache()) { 276 synchronized (endpointCache) { 277 endpoint = endpointCache.get(endpointUri); 278 if (endpoint == null) { 279 endpoint = context.getEndpoint(endpointUri); 280 if (endpoint != null) { 281 endpointCache.put(endpointUri, endpoint); 282 } 283 } 284 } 285 } else { 286 endpoint = context.getEndpoint(endpointUri); 287 } 288 if (endpoint == null) { 289 throw new NoSuchEndpointException(endpointUri); 290 } 291 return endpoint; 292 } 293 294 protected Endpoint<E> getMandatoryDefaultEndpoint() { 295 Endpoint<E> answer = getDefaultEndpoint(); 296 ObjectHelper.notNull(answer, "defaultEndpoint"); 297 return answer; 298 } 299 300 protected void doStart() throws Exception { 301 producerCache.start(); 302 } 303 304 protected void doStop() throws Exception { 305 producerCache.stop(); 306 endpointCache.clear(); 307 } 308 309 /** 310 * Extracts the body from the given result. 311 * 312 * @param result the result 313 * @return the result, can be <tt>null</tt>. 314 */ 315 protected Object extractResultBody(E result) { 316 return extractResultBody(result, null); 317 } 318 319 /** 320 * Extracts the body from the given result. 321 * <p/> 322 * If the exchange pattern is provided it will try to honor it and retrive the body 323 * from either IN or OUT according to the pattern. 324 * 325 * @param result the result 326 * @param pattern exchange pattern if given, can be <tt>null</tt> 327 * @return the result, can be <tt>null</tt>. 328 */ 329 protected Object extractResultBody(E result, ExchangePattern pattern) { 330 Object answer = null; 331 if (result != null) { 332 // rethrow if there was an exception 333 if (result.getException() != null) { 334 throw wrapRuntimeCamelException(result.getException()); 335 } 336 337 // result could have a fault message 338 if (hasFaultMessage(result)) { 339 return result.getFault().getBody(); 340 } 341 342 // okay no fault then return the response according to the pattern 343 // try to honor pattern if provided 344 boolean notOut = pattern != null && !pattern.isOutCapable(); 345 boolean hasOut = result.getOut(false) != null; 346 if (hasOut && !notOut) { 347 answer = result.getOut().getBody(); 348 } else { 349 answer = result.getIn().getBody(); 350 } 351 } 352 return answer; 353 } 354 355 protected boolean hasFaultMessage(E result) { 356 Message faultMessage = result.getFault(false); 357 if (faultMessage != null) { 358 Object faultBody = faultMessage.getBody(); 359 if (faultBody != null) { 360 return true; 361 } 362 } 363 return false; 364 } 365 366 }