001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.file;
018    
019    import java.io.File;
020    import java.io.IOException;
021    import java.io.RandomAccessFile;
022    import java.nio.channels.FileChannel;
023    import java.nio.channels.FileLock;
024    import java.util.concurrent.ConcurrentHashMap;
025    
026    import org.apache.camel.AsyncCallback;
027    import org.apache.camel.Processor;
028    import org.apache.camel.impl.ScheduledPollConsumer;
029    import org.apache.camel.processor.DeadLetterChannel;
030    import org.apache.camel.util.ObjectHelper;
031    import org.apache.commons.logging.Log;
032    import org.apache.commons.logging.LogFactory;
033    
034    /**
035     * For consuming files.
036     *
037     * @version $Revision: 697185 $
038     */
039    public class FileConsumer extends ScheduledPollConsumer<FileExchange> {
040        private static final transient Log LOG = LogFactory.getLog(FileConsumer.class);
041    
042        private FileEndpoint endpoint;
043        private ConcurrentHashMap<File, File> filesBeingProcessed = new ConcurrentHashMap<File, File>();
044        private ConcurrentHashMap<File, Long> fileSizes = new ConcurrentHashMap<File, Long>();
045        private ConcurrentHashMap<File, Long> noopMap = new ConcurrentHashMap<File, Long>();
046    
047        // the options below is @deprecated and will be removed in Camel 2.0
048        private long lastPollTime;
049        private int unchangedDelay;
050        private boolean unchangedSize;
051        private boolean generateEmptyExchangeWhenIdle;
052        private boolean alwaysConsume;
053    
054        private boolean recursive;
055        private String regexPattern = "";
056        private boolean exclusiveReadLock = true;
057    
058        public FileConsumer(final FileEndpoint endpoint, Processor processor) {
059            super(endpoint, processor);
060            this.endpoint = endpoint;
061        }
062    
063        protected synchronized void poll() throws Exception {
064            // should be true the first time as its the top directory
065            int rc = pollFileOrDirectory(endpoint.getFile(), true);
066    
067            // if no files consumes and using generateEmptyExchangeWhenIdle option then process an empty exchange 
068            if (rc == 0 && generateEmptyExchangeWhenIdle) {
069                final FileExchange exchange = endpoint.createExchange((File)null);
070                getAsyncProcessor().process(exchange, new AsyncCallback() {
071                    public void done(boolean sync) {
072                    }
073                });
074            }
075    
076            lastPollTime = System.currentTimeMillis();
077        }
078    
079        /**
080         * Pools the given file or directory for files to process.
081         *
082         * @param fileOrDirectory  file or directory
083         * @param processDir  recursive
084         * @return the number of files processed or being processed async.
085         */
086        protected int pollFileOrDirectory(File fileOrDirectory, boolean processDir) {
087            if (!fileOrDirectory.isDirectory()) {
088                // process the file
089                return pollFile(fileOrDirectory);
090            } else if (processDir) {
091                // directory that can be recursive
092                int rc = 0;
093                if (isValidFile(fileOrDirectory)) {
094                    if (LOG.isTraceEnabled()) {
095                        LOG.trace("Polling directory " + fileOrDirectory);
096                    }
097                    File[] files = fileOrDirectory.listFiles();
098                    for (File file : files) {
099                        rc += pollFileOrDirectory(file, isRecursive()); // self-recursion
100                    }
101                }
102                return rc;
103            } else {
104                if (LOG.isTraceEnabled()) {
105                    LOG.trace("Skipping directory " + fileOrDirectory);
106                }
107                return 0;
108            }
109        }
110    
111        /**
112         * Polls the given file
113         *
114         * @param file  the file
115         * @return returns 1 if the file was processed, 0 otherwise.
116         */
117        protected int pollFile(final File file) {
118            if (LOG.isTraceEnabled()) {
119                LOG.trace("Polling file: " + file);
120            }
121    
122            if (!file.exists()) {
123                return 0;
124            }
125            if (!isValidFile(file)) {
126                return 0;
127            }
128            // we only care about file modified times if we are not deleting/moving files
129            if (!endpoint.isNoop()) {
130                if (filesBeingProcessed.contains(file)) {
131                    return 1;
132                }
133                filesBeingProcessed.put(file, file);
134            }
135    
136            final FileProcessStrategy processStrategy = endpoint.getFileStrategy();
137            final FileExchange exchange = endpoint.createExchange(file);
138    
139            endpoint.configureMessage(file, exchange.getIn());
140            try {
141                // is we use excluse read then acquire the exclusive read (waiting until we got it)
142                if (exclusiveReadLock) {
143                    acquireExclusiveReadLock(file);
144                }
145    
146                if (LOG.isDebugEnabled()) {
147                    LOG.debug("About to process file: " + file + " using exchange: " + exchange);
148                }
149                if (processStrategy.begin(endpoint, exchange, file)) {
150    
151                    // Use the async processor interface so that processing of
152                    // the exchange can happen asynchronously
153                    getAsyncProcessor().process(exchange, new AsyncCallback() {
154                        public void done(boolean sync) {
155                            boolean failed = exchange.isFailed();
156                            boolean handled = DeadLetterChannel.isFailureHandled(exchange);
157    
158                            if (LOG.isDebugEnabled()) {
159                                LOG.debug("Done processing file: " + file + ". Status is: " + (failed ? "failed: " + failed + ", handled by failure processor: " + handled : "processed OK"));
160                            }
161    
162                            boolean committed = false;
163                            try {
164                                if (!failed || handled) {
165                                    // commit the file strategy if there was no failure or already handled by the DeadLetterChannel
166                                    processStrategyCommit(processStrategy, exchange, file, handled);
167                                    committed = true;
168                                } else {
169                                    // there was an exception but it was not handled by the DeadLetterChannel
170                                    handleException(exchange.getException());
171                                }
172                            } finally {
173                                if (!committed) {
174                                    processStrategyRollback(processStrategy, exchange, file);
175                                }
176                                filesBeingProcessed.remove(file);
177                            }
178                        }
179                    });
180    
181                } else {
182                    if (LOG.isDebugEnabled()) {
183                        LOG.debug(endpoint + " can not process file: " + file);
184                    }
185                }
186            } catch (Throwable e) {
187                handleException(e);
188            }
189    
190            return 1;
191        }
192    
193        /**
194         * Acquires exclusive read lock to the given file. Will wait until the lock is granted.
195         * After granting the read lock it is realeased, we just want to make sure that when we start
196         * consuming the file its not currently in progress of being written by third party.
197         */
198        protected void acquireExclusiveReadLock(File file) throws IOException {
199            if (LOG.isTraceEnabled()) {
200                LOG.trace("Waiting for exclusive read lock to file: " + file);
201            }
202    
203            // try to acquire rw lock on the file before we can consume it
204            FileChannel channel = new RandomAccessFile(file, "rw").getChannel();
205            try {
206                FileLock lock = channel.lock();
207                if (LOG.isTraceEnabled()) {
208                    LOG.trace("Acquired exclusive read lock: " + lock + " to file: " + file);
209                }
210                // just release it now we dont want to hold it during the rest of the processing
211                lock.release();
212            } finally {
213                // must close channel
214                ObjectHelper.close(channel, "FileConsumer during acquiring of exclusive read lock", LOG);
215            }
216        }
217    
218        /**
219         * Strategy when the file was processed and a commit should be executed.
220         *
221         * @param processStrategy   the strategy to perform the commit
222         * @param exchange          the exchange
223         * @param file              the file processed
224         * @param failureHandled    is <tt>false</tt> if the exchange was processed succesfully, <tt>true</tt> if
225         * an exception occured during processing but it was handled by the failure processor (usually the
226         * DeadLetterChannel).
227         */
228        protected void processStrategyCommit(FileProcessStrategy processStrategy, FileExchange exchange,
229                                             File file, boolean failureHandled) {
230            try {
231                if (LOG.isDebugEnabled()) {
232                    LOG.debug("Committing file strategy: " + processStrategy + " for file: " + file + (failureHandled ? " that was handled by the failure processor." : ""));
233                }
234                processStrategy.commit(endpoint, exchange, file);
235            } catch (Exception e) {
236                LOG.warn("Error committing file strategy: " + processStrategy, e);
237                handleException(e);
238            }
239        }
240    
241        /**
242         * Strategy when the file was not processed and a rollback should be executed.
243         *
244         * @param processStrategy   the strategy to perform the commit
245         * @param exchange          the exchange
246         * @param file              the file processed
247         */
248        protected void processStrategyRollback(FileProcessStrategy processStrategy, FileExchange exchange, File file) {
249            if (LOG.isDebugEnabled()) {
250                LOG.debug("Rolling back file strategy: " + processStrategy + " for file: " + file);
251            }
252            processStrategy.rollback(endpoint, exchange, file);
253        }
254    
255        protected boolean isValidFile(File file) {
256            boolean result = false;
257            if (file != null && file.exists()) {
258                // TODO: maybe use a configurable strategy instead of the hardcoded one based on last file change
259                if (isMatched(file) && (alwaysConsume || isChanged(file))) {
260                    result = true;
261                }
262            }
263            return result;
264        }
265    
266        protected boolean isChanged(File file) {
267            if (file == null) {
268                // Sanity check
269                return false;
270            } else if (file.isDirectory()) {
271                // Allow recursive polling to descend into this directory
272                return true;
273            } else {
274                // @deprecated will be removed on Camel 2.0
275                // the code below is kinda hard to maintain. We should strive to remove
276                // this stuff in Camel 2.0 to keep this component simple and no surprises for end-users
277                // this stuff is not persistent so restarting Camel will reset the state
278                boolean lastModifiedCheck = false;
279                long modifiedDuration = 0;
280                if (getUnchangedDelay() > 0) {
281                    modifiedDuration = System.currentTimeMillis() - file.lastModified();
282                    lastModifiedCheck = modifiedDuration >= getUnchangedDelay();
283                }
284    
285                long fileModified = file.lastModified();
286                Long previousModified = noopMap.get(file);
287                noopMap.put(file, fileModified);
288                if (previousModified == null || fileModified > previousModified) {
289                    lastModifiedCheck = true;
290                }
291    
292                boolean sizeCheck = false;
293                long sizeDifference = 0;
294                if (isUnchangedSize()) {
295                    Long value = fileSizes.get(file);
296                    if (value == null) {
297                        sizeCheck = true;
298                    } else {
299                        sizeCheck = file.length() != value;
300                    }
301                }
302    
303                boolean answer = lastModifiedCheck || sizeCheck;
304    
305                if (LOG.isDebugEnabled()) {
306                    LOG.debug("file:" + file + " isChanged:" + answer + " " + "sizeCheck:" + sizeCheck + "("
307                              + sizeDifference + ") " + "lastModifiedCheck:" + lastModifiedCheck + "("
308                              + modifiedDuration + ")");
309                }
310    
311                if (isUnchangedSize()) {
312                    if (answer) {
313                        fileSizes.put(file, file.length());
314                    } else {
315                        fileSizes.remove(file);
316                    }
317                }
318    
319                return answer;
320            }
321        }
322    
323        protected boolean isMatched(File file) {
324            String name = file.getName();
325    
326            // folders/names starting with dot is always skipped (eg. ".", ".camel", ".camelLock")
327            if (name.startsWith(".")) {
328                return false;
329            }
330            // lock files should be skipped
331            if (name.endsWith(FileEndpoint.DEFAULT_LOCK_FILE_POSTFIX)) {
332                return false;
333            }
334    
335            // directories so far is always regarded as matched (matching on the name is only for files)
336            if (file.isDirectory()) {
337                return true;
338            }
339    
340            if (regexPattern != null && regexPattern.length() > 0) {
341                if (!name.matches(regexPattern)) {
342                    return false;
343                }
344            }
345    
346            if (endpoint.getExcludedNamePrefix() != null) {
347                if (name.startsWith(endpoint.getExcludedNamePrefix())) {
348                    return false;
349                }
350            }
351            String[] prefixes = endpoint.getExcludedNamePrefixes();
352            if (prefixes != null) {
353                for (String prefix : prefixes) {
354                    if (name.startsWith(prefix)) {
355                        return false;
356                    }
357                }
358            }
359            if (endpoint.getExcludedNamePostfix() != null) {
360                if (name.endsWith(endpoint.getExcludedNamePostfix())) {
361                    return false;
362                }
363            }
364            String[] postfixes = endpoint.getExcludedNamePostfixes();
365            if (postfixes != null) {
366                for (String postfix : postfixes) {
367                    if (name.endsWith(postfix)) {
368                        return false;
369                    }
370                }
371            }
372    
373            return true;
374        }
375    
376        public boolean isRecursive() {
377            return this.recursive;
378        }
379    
380        public void setRecursive(boolean recursive) {
381            this.recursive = recursive;
382        }
383    
384        public String getRegexPattern() {
385            return this.regexPattern;
386        }
387    
388        public void setRegexPattern(String regexPattern) {
389            this.regexPattern = regexPattern;
390        }
391    
392        public boolean isGenerateEmptyExchangeWhenIdle() {
393            return generateEmptyExchangeWhenIdle;
394        }
395    
396        /**
397         * @deprecated will be removed in Camel 2.0
398         */
399        public void setGenerateEmptyExchangeWhenIdle(boolean generateEmptyExchangeWhenIdle) {
400            this.generateEmptyExchangeWhenIdle = generateEmptyExchangeWhenIdle;
401        }
402    
403        public int getUnchangedDelay() {
404            return unchangedDelay;
405        }
406    
407        /**
408         * @deprecated will be removed in Camel 2.0
409         */
410        public void setUnchangedDelay(int unchangedDelay) {
411            this.unchangedDelay = unchangedDelay;
412        }
413    
414        public boolean isUnchangedSize() {
415            return unchangedSize;
416        }
417    
418        /**
419         * @deprecated will be removed in Camel 2.0
420         */
421        public void setUnchangedSize(boolean unchangedSize) {
422            this.unchangedSize = unchangedSize;
423        }
424    
425        public boolean isExclusiveReadLock() {
426            return exclusiveReadLock;
427        }
428    
429        public void setExclusiveReadLock(boolean exclusiveReadLock) {
430            this.exclusiveReadLock = exclusiveReadLock;
431        }
432    
433        public boolean isAlwaysConsume() {
434            return alwaysConsume;
435        }
436    
437        /**
438         * @deprecated will be removed in Camel 2.0 (not needed when we get rid of last polltimestamp)
439         */
440        public void setAlwaysConsume(boolean alwaysConsume) {
441            this.alwaysConsume = alwaysConsume;
442        }
443    
444        public boolean isTimestamp() {
445            return !alwaysConsume;
446        }
447    
448        /**
449         * @deprecated will be removed in Camel 2.0 (not needed when we get rid of last polltimestamp)
450         */
451        public void setTimestamp(boolean timestamp) {
452            this.alwaysConsume = !timestamp;
453        }
454    }