001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.processor.resequencer;
018    
019    import java.util.Timer;
020    
021    /**
022     * Resequences elements based on a given {@link SequenceElementComparator}.
023     * This resequencer is designed for resequencing element streams. Stream-based
024     * resequencing has the advantage that the number of elements to be resequenced
025     * need not be known in advance. Resequenced elements are delivered via a
026     * {@link SequenceSender}.
027     * <p>
028     * The resequencer's behaviour for a given comparator is controlled by the
029     * <code>timeout</code> property. This is the timeout (in milliseconds) for a
030     * given element managed by this resequencer. An out-of-sequence element can
031     * only be marked as <i>ready-for-delivery</i> if it either times out or if it
032     * has an immediate predecessor (in that case it is in-sequence). If an
033     * immediate predecessor of a waiting element arrives the timeout task for the
034     * waiting element will be cancelled (which marks it as <i>ready-for-delivery</i>).
035     * <p>
036     * If the maximum out-of-sequence time difference between elements within a
037     * stream is known, the <code>timeout</code> value should be set to this
038     * value. In this case it is guaranteed that all elements of a stream will be
039     * delivered in sequence via the {@link SequenceSender}. The lower the
040     * <code>timeout</code> value is compared to the out-of-sequence time
041     * difference between elements within a stream the higher the probability is for
042     * out-of-sequence elements delivered by this resequencer. Delivery of elements
043     * must be explicitly triggered by applications using the {@link #deliver()} or
044     * {@link #deliverNext()} methods. Only elements that are <i>ready-for-delivery</i>
045     * are delivered by these methods. The longer an application waits to trigger a
046     * delivery the more elements may become <i>ready-for-delivery</i>.
047     * <p>
048     * The resequencer remembers the last-delivered element. If an element arrives
049     * which is the immediate successor of the last-delivered element it is
050     * <i>ready-for-delivery</i> immediately. After delivery the last-delivered
051     * element is adjusted accordingly. If the last-delivered element is
052     * <code>null</code> i.e. the resequencer was newly created the first arriving
053     * element needs <code>timeout</code> milliseconds in any case for becoming
054     * <i>ready-for-delivery</i>.
055     * <p>
056     * <strong>Note:</strong> Instances of this class are not thread-safe.
057     * Resequencing should be done by calling {@link #insert(Object)} and
058     * {@link #deliver()} or {@link #deliverNext()} from a single thread.
059     *
060     * @author Martin Krasser
061     *
062     * @version $Revision: 701059 $
063     */
064    public class ResequencerEngine<E> {
065    
066        /**
067         * The element that most recently hash been delivered or <code>null</code>
068         * if no element has been delivered yet.
069         */
070        private Element<E> lastDelivered;
071    
072        /**
073         * Minimum amount of time to wait for out-of-sequence elements.
074         */
075        private long timeout;
076    
077        /**
078         * A sequence of elements for sorting purposes.
079         */
080        private Sequence<Element<E>> sequence;
081    
082        /**
083         * A timer for scheduling timeout notifications.
084         */
085        private Timer timer;
086    
087        /**
088         * A strategy for sending sequence elements.
089         */
090        private SequenceSender<E> sequenceSender;
091    
092        /**
093         * Creates a new resequencer instance with a default timeout of 2000
094         * milliseconds.
095         *
096         * @param comparator a sequence element comparator.
097         */
098        public ResequencerEngine(SequenceElementComparator<E> comparator) {
099            this.sequence = createSequence(comparator);
100            this.timeout = 2000L;
101            this.lastDelivered = null;
102        }
103    
104        public void start() {
105            timer = new Timer("Stream Resequencer Timer");
106        }
107    
108        /**
109         * Stops this resequencer (i.e. this resequencer's {@link Timer} instance).
110         */
111        public void stop() {
112            timer.cancel();
113        }
114    
115        /**
116         * Returns the number of elements currently maintained by this resequencer.
117         *
118         * @return the number of elements currently maintained by this resequencer.
119         */
120        public int size() {
121            return sequence.size();
122        }
123    
124        /**
125         * Returns this resequencer's timeout value.
126         *
127         * @return the timeout in milliseconds.
128         */
129        public long getTimeout() {
130            return timeout;
131        }
132    
133        /**
134         * Sets this sequencer's timeout value.
135         *
136         * @param timeout the timeout in milliseconds.
137         */
138        public void setTimeout(long timeout) {
139            this.timeout = timeout;
140        }
141    
142        /**
143         * Returns the sequence sender.
144         *
145         * @return the sequence sender.
146         */
147        public SequenceSender<E> getSequenceSender() {
148            return sequenceSender;
149        }
150    
151        /**
152         * Sets the sequence sender.
153         *
154         * @param sequenceSender a sequence element sender.
155         */
156        public void setSequenceSender(SequenceSender<E> sequenceSender) {
157            this.sequenceSender = sequenceSender;
158        }
159    
160        /**
161         * Returns the last delivered element.
162         *
163         * @return the last delivered element or <code>null</code> if no delivery
164         *         has been made yet.
165         */
166        E getLastDelivered() {
167            if (lastDelivered == null) {
168                return null;
169            }
170            return lastDelivered.getObject();
171        }
172    
173        /**
174         * Sets the last delivered element. This is for testing purposes only.
175         *
176         * @param o an element.
177         */
178        void setLastDelivered(E o) {
179            lastDelivered = new Element<E>(o);
180        }
181    
182        /**
183         * Inserts the given element into this resequencer. If the element is not
184         * ready for immediate delivery and has no immediate presecessor then it is
185         * scheduled for timing out. After being timed out it is ready for delivery.
186         *
187         * @param o an element.
188         */
189        public void insert(E o) {
190            // wrap object into internal element
191            Element<E> element = new Element<E>(o);
192            // add element to sequence in proper order
193            sequence.add(element);
194    
195            Element<E> successor = sequence.successor(element);
196    
197            // check if there is an immediate successor and cancel
198            // timer task (no need to wait any more for timeout)
199            if (successor != null) {
200                successor.cancel();
201            }
202    
203            // start delivery if current element is successor of last delivered element
204            if (successorOfLastDelivered(element)) {
205                // nothing to schedule
206            } else if (sequence.predecessor(element) != null) {
207                // nothing to schedule
208            } else {
209                element.schedule(defineTimeout());
210            }
211        }
212    
213        /**
214         * Delivers all elements which are currently ready to deliver.
215         *
216         * @throws Exception thrown by {@link SequenceSender#sendElement(Object)}.
217         *
218         * @see ResequencerEngine#deliverNext() 
219         */
220        public void deliver() throws Exception {
221            while (deliverNext()) {
222                // do nothing here
223            }
224        }
225    
226        /**
227         * Attempts to deliver a single element from the head of the resequencer
228         * queue (sequence). Only elements which have not been scheduled for timing
229         * out or which already timed out can be delivered. Elements are deliveref via
230         * {@link SequenceSender#sendElement(Object)}.
231         *
232         * @return <code>true</code> if the element has been delivered
233         *         <code>false</code> otherwise.
234         *
235         * @throws Exception thrown by {@link SequenceSender#sendElement(Object)}.
236         *
237         */
238        public boolean deliverNext() throws Exception {
239            if (sequence.size() == 0) {
240                return false;
241            }
242            // inspect element with lowest sequence value
243            Element<E> element = sequence.first();
244    
245            // if element is scheduled do not deliver and return
246            if (element.scheduled()) {
247                return false;
248            }
249    
250            // remove deliverable element from sequence
251            sequence.remove(element);
252    
253            // set the delivered element to last delivered element
254            lastDelivered = element;
255    
256            // deliver the sequence element
257            sequenceSender.sendElement(element.getObject());
258    
259            // element has been delivered
260            return true;
261        }
262    
263        /**
264         * Returns <code>true</code> if the given element is the immediate
265         * successor of the last delivered element.
266         *
267         * @param element an element.
268         * @return <code>true</code> if the given element is the immediate
269         *         successor of the last delivered element.
270         */
271        private boolean successorOfLastDelivered(Element<E> element) {
272            if (lastDelivered == null) {
273                return false;
274            }
275            if (sequence.comparator().successor(element, lastDelivered)) {
276                return true;
277            }
278            return false;
279        }
280    
281        /**
282         * Creates a timeout task based on the timeout setting of this resequencer.
283         *
284         * @return a new timeout task.
285         */
286        private Timeout defineTimeout() {
287            return new Timeout(timer, timeout);
288        }
289    
290        private static <E> Sequence<Element<E>> createSequence(SequenceElementComparator<E> comparator) {
291            return new Sequence<Element<E>>(new ElementComparator<E>(comparator));
292        }
293    
294    }