001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one or more
003     * contributor license agreements.  See the NOTICE file distributed with
004     * this work for additional information regarding copyright ownership.
005     * The ASF licenses this file to You under the Apache License, Version 2.0
006     * (the "License"); you may not use this file except in compliance with
007     * the License.  You may obtain a copy of the License at
008     *
009     *      http://www.apache.org/licenses/LICENSE-2.0
010     *
011     * Unless required by applicable law or agreed to in writing, software
012     * distributed under the License is distributed on an "AS IS" BASIS,
013     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014     * See the License for the specific language governing permissions and
015     * limitations under the License.
016     */
017    package org.apache.camel.component.file;
018    
019    import java.io.File;
020    import java.util.concurrent.ConcurrentHashMap;
021    
022    import org.apache.camel.AsyncCallback;
023    import org.apache.camel.Processor;
024    import org.apache.camel.impl.ScheduledPollConsumer;
025    import org.apache.camel.processor.DeadLetterChannel;
026    import org.apache.commons.logging.Log;
027    import org.apache.commons.logging.LogFactory;
028    
029    /**
030     * For consuming files.
031     *
032     * @version $Revision: 662314 $
033     */
034    public class FileConsumer extends ScheduledPollConsumer<FileExchange> {
035        private static final transient Log LOG = LogFactory.getLog(FileConsumer.class);
036    
037        private FileEndpoint endpoint;
038        private ConcurrentHashMap<File, File> filesBeingProcessed = new ConcurrentHashMap<File, File>();
039        private ConcurrentHashMap<File, Long> fileSizes = new ConcurrentHashMap<File, Long>();
040        private ConcurrentHashMap<File, Long> noopMap = new ConcurrentHashMap<File, Long>();
041    
042        private boolean generateEmptyExchangeWhenIdle;
043        private boolean recursive = true;
044        private String regexPattern = "";
045    
046        private long lastPollTime;
047        private int unchangedDelay;
048        private boolean unchangedSize;
049    
050    
051        public FileConsumer(final FileEndpoint endpoint, Processor processor) {
052            super(endpoint, processor);
053            this.endpoint = endpoint;
054        }
055    
056        protected synchronized void poll() throws Exception {
057            int rc = pollFileOrDirectory(endpoint.getFile(), isRecursive());
058            if (rc == 0 && generateEmptyExchangeWhenIdle) {
059                final FileExchange exchange = endpoint.createExchange((File)null);
060                getAsyncProcessor().process(exchange, new AsyncCallback() {
061                    public void done(boolean sync) {
062                    }
063                });
064            }
065            lastPollTime = System.currentTimeMillis();
066        }
067    
068        /**
069         * Pools the given file or directory for files to process.
070         *
071         * @param fileOrDirectory  file or directory
072         * @param processDir  recursive
073         * @return the number of files processed or being processed async.
074         */
075        protected int pollFileOrDirectory(File fileOrDirectory, boolean processDir) {
076            if (!fileOrDirectory.isDirectory()) {
077                return pollFile(fileOrDirectory); // process the file
078            } else if (processDir) {
079                int rc = 0;
080                if (isValidFile(fileOrDirectory)) {
081                    LOG.debug("Polling directory " + fileOrDirectory);
082                    File[] files = fileOrDirectory.listFiles();
083                    for (File file : files) {
084                        rc += pollFileOrDirectory(file, isRecursive()); // self-recursion
085                    }
086                }
087                return rc;
088            } else {
089                LOG.debug("Skipping directory " + fileOrDirectory);
090                return 0;
091            }
092        }
093    
094        /**
095         * Polls the given file
096         *
097         * @param file  the file
098         * @return returns 1 if the file was processed, 0 otherwise.
099         */
100        protected int pollFile(final File file) {
101    
102            if (!file.exists()) {
103                return 0;
104            }
105            if (!isValidFile(file)) {
106                return 0;
107            }
108            // we only care about file modified times if we are not deleting/moving files
109            if (!endpoint.isNoop()) {
110                if (filesBeingProcessed.contains(file)) {
111                    return 1;
112                }
113                filesBeingProcessed.put(file, file);
114            }
115    
116            final FileProcessStrategy processStrategy = endpoint.getFileStrategy();
117            final FileExchange exchange = endpoint.createExchange(file);
118    
119            endpoint.configureMessage(file, exchange.getIn());
120            try {
121                if (LOG.isDebugEnabled()) {
122                    LOG.debug("About to process file: " + file + " using exchange: " + exchange);
123                }
124                if (processStrategy.begin(endpoint, exchange, file)) {
125    
126                    // Use the async processor interface so that processing of
127                    // the exchange can happen asynchronously
128                    getAsyncProcessor().process(exchange, new AsyncCallback() {
129                        public void done(boolean sync) {
130                            boolean failed = exchange.isFailed();
131                            boolean handled = DeadLetterChannel.isFailureHandled(exchange);
132    
133                            if (LOG.isDebugEnabled()) {
134                                LOG.debug("Done processing file: " + file + ". Status is: " + (failed ? "failed: " + failed + ", handled by failure processor: " + handled : "OK"));
135                            }
136    
137                            if (!failed || handled) {
138                                // commit the file strategy if there was no failure or already handled by the DeadLetterChannel
139                                processStrategyCommit(processStrategy, exchange, file, handled);
140                            } else if (failed && !handled) {
141                                // there was an exception but it was not handled by the DeadLetterChannel
142                                handleException(exchange.getException());
143                            }
144    
145                            filesBeingProcessed.remove(file);
146                        }
147                    });
148    
149                } else {
150                    if (LOG.isDebugEnabled()) {
151                        LOG.debug(endpoint + " cannot process file: " + file);
152                    }
153                }
154            } catch (Throwable e) {
155                handleException(e);
156            }
157    
158            return 1;
159        }
160    
161        /**
162         * Strategy when the file was processed and a commit should be executed.
163         *
164         * @param processStrategy   the strategy to perform the commit
165         * @param exchange          the exchange
166         * @param file              the file processed
167         * @param failureHandled    is <tt>false</tt> if the exchange was processed succesfully, <tt>true</tt> if
168         * an exception occured during processing but it was handled by the failure processor (usually the
169         * DeadLetterChannel).
170         */
171        protected void processStrategyCommit(FileProcessStrategy processStrategy, FileExchange exchange,
172                                             File file, boolean failureHandled) {
173            try {
174                if (LOG.isDebugEnabled()) {
175                    LOG.debug("Committing file strategy: " + processStrategy + " for file: " + file + (failureHandled ? " that was handled by the failure processor." : ""));
176                }
177                processStrategy.commit(endpoint, exchange, file);
178            } catch (Exception e) {
179                LOG.warn("Error committing file strategy: " + processStrategy, e);
180                handleException(e);
181            }
182        }
183    
184        protected boolean isValidFile(File file) {
185            boolean result = false;
186            if (file != null && file.exists()) {
187                // TODO: maybe use a configurable strategy instead of the hardcoded one based on last file change
188                if (isMatched(file) && isChanged(file)) {
189                    result = true;
190                }
191            }
192            return result;
193        }
194    
195        protected boolean isChanged(File file) {
196            if (file == null) {
197                // Sanity check
198                return false;
199            } else if (file.isDirectory()) {
200                // Allow recursive polling to descend into this directory
201                return true;
202            } else {
203                boolean lastModifiedCheck = false;
204                long modifiedDuration = 0;
205                if (getUnchangedDelay() > 0) {
206                    modifiedDuration = System.currentTimeMillis() - file.lastModified();
207                    lastModifiedCheck = modifiedDuration >= getUnchangedDelay();
208                }
209    
210                long fileModified = file.lastModified();
211                Long previousModified = noopMap.get(file);
212                noopMap.put(file, fileModified);
213                if (previousModified == null || fileModified > previousModified) {
214                    lastModifiedCheck = true;
215                }
216    
217                boolean sizeCheck = false;
218                long sizeDifference = 0;
219                if (isUnchangedSize()) {
220                    Long value = fileSizes.get(file);
221                    if (value == null) {
222                        sizeCheck = true;
223                    } else {
224                        sizeCheck = file.length() != value;
225                    }
226                }
227    
228                boolean answer = lastModifiedCheck || sizeCheck;
229    
230                if (LOG.isDebugEnabled()) {
231                    LOG.debug("file:" + file + " isChanged:" + answer + " " + "sizeCheck:" + sizeCheck + "("
232                              + sizeDifference + ") " + "lastModifiedCheck:" + lastModifiedCheck + "("
233                              + modifiedDuration + ")");
234                }
235    
236                if (isUnchangedSize()) {
237                    if (answer) {
238                        fileSizes.put(file, file.length());
239                    } else {
240                        fileSizes.remove(file);
241                    }
242                }
243    
244                return answer;
245            }
246        }
247    
248        protected boolean isMatched(File file) {
249            String name = file.getName();
250            if (regexPattern != null && regexPattern.length() > 0) {
251                if (!name.matches(getRegexPattern())) {
252                    return false;
253                }
254            }
255            String[] prefixes = endpoint.getExcludedNamePrefixes();
256            if (prefixes != null) {
257                for (String prefix : prefixes) {
258                    if (name.startsWith(prefix)) {
259                        return false;
260                    }
261                }
262            }
263            String[] postfixes = endpoint.getExcludedNamePostfixes();
264            if (postfixes != null) {
265                for (String postfix : postfixes) {
266                    if (name.endsWith(postfix)) {
267                        return false;
268                    }
269                }
270            }
271            return true;
272        }
273    
274        public boolean isRecursive() {
275            return this.recursive;
276        }
277    
278        public void setRecursive(boolean recursive) {
279            this.recursive = recursive;
280        }
281    
282        public String getRegexPattern() {
283            return this.regexPattern;
284        }
285    
286        public void setRegexPattern(String regexPattern) {
287            this.regexPattern = regexPattern;
288        }
289    
290        public boolean isGenerateEmptyExchangeWhenIdle() {
291            return generateEmptyExchangeWhenIdle;
292        }
293    
294        public void setGenerateEmptyExchangeWhenIdle(boolean generateEmptyExchangeWhenIdle) {
295            this.generateEmptyExchangeWhenIdle = generateEmptyExchangeWhenIdle;
296        }
297    
298        public int getUnchangedDelay() {
299            return unchangedDelay;
300        }
301    
302        public void setUnchangedDelay(int unchangedDelay) {
303            this.unchangedDelay = unchangedDelay;
304        }
305    
306        public boolean isUnchangedSize() {
307            return unchangedSize;
308        }
309    
310        public void setUnchangedSize(boolean unchangedSize) {
311            this.unchangedSize = unchangedSize;
312        }
313    
314    }