Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.ExpressionIllegalSyntaxException;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.impl.ScheduledPollEndpoint;
Expand Down Expand Up @@ -133,6 +134,8 @@ public abstract class GenericFileEndpoint<T> extends ScheduledPollEndpoint imple
@UriParam
protected long readLockTimeout = 10000;
@UriParam
protected LoggingLevel readLockLoggingLevel = LoggingLevel.WARN;
@UriParam
protected long readLockMinLength = 1;
@UriParam
protected GenericFileExclusiveReadLockStrategy<T> exclusiveReadLockStrategy;
Expand Down Expand Up @@ -596,6 +599,14 @@ public void setReadLockTimeout(long readLockTimeout) {
this.readLockTimeout = readLockTimeout;
}

public LoggingLevel getReadLockLoggingLevel() {
return readLockLoggingLevel;
}

public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) {
this.readLockLoggingLevel = readLockLoggingLevel;
}

public long getReadLockMinLength() {
return readLockMinLength;
}
Expand Down Expand Up @@ -820,6 +831,7 @@ protected Map<String, Object> getParamsAsMap() {
params.put("readLockTimeout", readLockTimeout);
}
params.put("readLockMinLength", readLockMinLength);
params.put("readLockLoggingLevel", readLockLoggingLevel);

return params;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.camel.component.file;

import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;

/**
* Strategy for acquiring exclusive read locks for files to be consumed. After
Expand Down Expand Up @@ -89,4 +90,15 @@ public interface GenericFileExclusiveReadLockStrategy<T> {
*/
void setCheckInterval(long checkInterval);

/**
* Sets logging level used when a read lock could not be acquired.
* <p/>
* Logging level used when a read lock could not be acquired.
* <p/>
* The default logging level is WARN
* @param readLockLoggingLevel LoggingLevel
*/
void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel);


}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import java.io.File;

import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.component.file.GenericFileOperations;
import org.apache.camel.util.CamelLogger;
import org.apache.camel.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -34,7 +36,9 @@ public class FileChangedExclusiveReadLockStrategy extends MarkerFileExclusiveRea
private long timeout;
private long checkInterval = 1000;
private long minLength = 1;
private LoggingLevel readLockLoggingLevel = LoggingLevel.WARN;

@Override
public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
// must call super
if (!super.acquireExclusiveReadLock(operations, file, exchange)) {
Expand All @@ -55,7 +59,8 @@ public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations,
if (timeout > 0) {
long delta = watch.taken();
if (delta > timeout) {
LOG.warn("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
CamelLogger.log(LOG, readLockLoggingLevel,
"Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
// we could not get the lock within the timeout period, so return false
return false;
}
Expand Down Expand Up @@ -101,6 +106,7 @@ public long getTimeout() {
return timeout;
}

@Override
public void setTimeout(long timeout) {
this.timeout = timeout;
}
Expand All @@ -109,10 +115,16 @@ public long getCheckInterval() {
return checkInterval;
}

@Override
public void setCheckInterval(long checkInterval) {
this.checkInterval = checkInterval;
}

@Override
public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) {
this.readLockLoggingLevel = readLockLoggingLevel;
}

public long getMinLength() {
return minLength;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import java.nio.channels.FileLock;

import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.component.file.GenericFileEndpoint;
import org.apache.camel.component.file.GenericFileOperations;
import org.apache.camel.util.CamelLogger;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.StopWatch;
import org.slf4j.Logger;
Expand All @@ -43,11 +45,14 @@ public class FileLockExclusiveReadLockStrategy extends MarkerFileExclusiveReadLo
private long checkInterval = 1000;
private FileLock lock;
private String lockFileName;
private LoggingLevel readLockLoggingLevel = LoggingLevel.WARN;

@Override
public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) {
// noop
}

@Override
public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations, GenericFile<File> file, Exchange exchange) throws Exception {
// must call super
if (!super.acquireExclusiveReadLock(operations, file, exchange)) {
Expand All @@ -70,7 +75,8 @@ public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations,
if (timeout > 0) {
long delta = watch.taken();
if (delta > timeout) {
LOG.warn("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + target);
CamelLogger.log(LOG, readLockLoggingLevel,
"Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + target);
// we could not get the lock within the timeout period, so return false
return false;
}
Expand Down Expand Up @@ -112,6 +118,7 @@ public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations,
return true;
}

@Override
public void releaseExclusiveReadLock(GenericFileOperations<File> operations,
GenericFile<File> file, Exchange exchange) throws Exception {

Expand Down Expand Up @@ -144,12 +151,19 @@ public long getTimeout() {
return timeout;
}

@Override
public void setTimeout(long timeout) {
this.timeout = timeout;
}

@Override
public void setCheckInterval(long checkInterval) {
this.checkInterval = checkInterval;
}

@Override
public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) {
this.readLockLoggingLevel = readLockLoggingLevel;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.camel.CamelContext;
import org.apache.camel.Expression;
import org.apache.camel.LoggingLevel;
import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy;
import org.apache.camel.component.file.GenericFileProcessStrategy;
import org.apache.camel.spi.Language;
Expand Down Expand Up @@ -109,48 +110,37 @@ private static GenericFileExclusiveReadLockStrategy<File> getExclusiveReadLockSt
if (ObjectHelper.isNotEmpty(readLock)) {
if ("none".equals(readLock) || "false".equals(readLock)) {
return null;
} else if ("markerFile".equals(readLock)) {
return new MarkerFileExclusiveReadLockStrategy();
} else if ("fileLock".equals(readLock)) {
GenericFileExclusiveReadLockStrategy<File> readLockStrategy = new FileLockExclusiveReadLockStrategy();
Long timeout = (Long) params.get("readLockTimeout");
if (timeout != null) {
readLockStrategy.setTimeout(timeout);
}
Long checkInterval = (Long) params.get("readLockCheckInterval");
if (checkInterval != null) {
readLockStrategy.setCheckInterval(checkInterval);
}
return readLockStrategy;
strategy = new FileLockExclusiveReadLockStrategy();
} else if ("rename".equals(readLock)) {
GenericFileExclusiveReadLockStrategy<File> readLockStrategy = new FileRenameExclusiveReadLockStrategy();
Long timeout = (Long) params.get("readLockTimeout");
if (timeout != null) {
readLockStrategy.setTimeout(timeout);
}
Long checkInterval = (Long) params.get("readLockCheckInterval");
if (checkInterval != null) {
readLockStrategy.setCheckInterval(checkInterval);
}
return readLockStrategy;
strategy = new FileRenameExclusiveReadLockStrategy();
} else if ("changed".equals(readLock)) {
FileChangedExclusiveReadLockStrategy readLockStrategy = new FileChangedExclusiveReadLockStrategy();
Long minLength = (Long) params.get("readLockMinLength");
if (minLength != null) {
readLockStrategy.setMinLength(minLength);
}
strategy = readLockStrategy;
}

if (strategy != null) {
Long timeout = (Long) params.get("readLockTimeout");
if (timeout != null) {
readLockStrategy.setTimeout(timeout);
strategy.setTimeout(timeout);
}
Long checkInterval = (Long) params.get("readLockCheckInterval");
if (checkInterval != null) {
readLockStrategy.setCheckInterval(checkInterval);
strategy.setCheckInterval(checkInterval);
}
Long minLength = (Long) params.get("readLockMinLength");
if (minLength != null) {
readLockStrategy.setMinLength(minLength);
LoggingLevel readLockLoggingLevel = (LoggingLevel) params.get("readLockLoggingLevel");
if (readLockLoggingLevel != null) {
strategy.setReadLockLoggingLevel(readLockLoggingLevel);
}
return readLockStrategy;
} else if ("markerFile".equals(readLock)) {
return new MarkerFileExclusiveReadLockStrategy();
}
}

return null;
return strategy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
package org.apache.camel.component.file.strategy;

import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.component.file.GenericFileEndpoint;
import org.apache.camel.component.file.GenericFileExclusiveReadLockStrategy;
import org.apache.camel.component.file.GenericFileOperations;
import org.apache.camel.util.CamelLogger;
import org.apache.camel.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -34,11 +36,14 @@ public class GenericFileRenameExclusiveReadLockStrategy<T> implements GenericFil
private static final transient Logger LOG = LoggerFactory.getLogger(GenericFileRenameExclusiveReadLockStrategy.class);
private long timeout;
private long checkInterval;
private LoggingLevel readLockLoggingLevel = LoggingLevel.WARN;

@Override
public void prepareOnStartup(GenericFileOperations<T> operations, GenericFileEndpoint<T> endpoint) throws Exception {
// noop
}

@Override
public boolean acquireExclusiveReadLock(GenericFileOperations<T> operations, GenericFile<T> file,
Exchange exchange) throws Exception {
LOG.trace("Waiting for exclusive read lock to file: {}", file);
Expand All @@ -58,7 +63,8 @@ public boolean acquireExclusiveReadLock(GenericFileOperations<T> operations, Gen
if (timeout > 0) {
long delta = watch.taken();
if (delta > timeout) {
LOG.warn("Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
CamelLogger.log(LOG, readLockLoggingLevel,
"Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);
// we could not get the lock within the timeout period, so return false
return false;
}
Expand All @@ -81,6 +87,7 @@ public boolean acquireExclusiveReadLock(GenericFileOperations<T> operations, Gen
return true;
}

@Override
public void releaseExclusiveReadLock(GenericFileOperations<T> operations, GenericFile<T> file,
Exchange exchange) throws Exception {
// noop
Expand All @@ -101,11 +108,18 @@ public long getTimeout() {
return timeout;
}

@Override
public void setTimeout(long timeout) {
this.timeout = timeout;
}

@Override
public void setCheckInterval(long checkInterval) {
this.checkInterval = checkInterval;
}

@Override
public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) {
this.readLockLoggingLevel = readLockLoggingLevel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.io.File;

import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.component.file.FileComponent;
import org.apache.camel.component.file.GenericFile;
import org.apache.camel.component.file.GenericFileEndpoint;
Expand All @@ -36,6 +37,7 @@
public class MarkerFileExclusiveReadLockStrategy implements GenericFileExclusiveReadLockStrategy<File> {
private static final transient Logger LOG = LoggerFactory.getLogger(MarkerFileExclusiveReadLockStrategy.class);

@Override
public void prepareOnStartup(GenericFileOperations<File> operations, GenericFileEndpoint<File> endpoint) {
String dir = endpoint.getConfiguration().getDirectory();
File file = new File(dir);
Expand All @@ -51,6 +53,7 @@ public void prepareOnStartup(GenericFileOperations<File> operations, GenericFile
}
}

@Override
public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations,
GenericFile<File> file, Exchange exchange) throws Exception {
String lockFileName = getLockFileName(file);
Expand All @@ -64,6 +67,7 @@ public boolean acquireExclusiveReadLock(GenericFileOperations<File> operations,
return acquired;
}

@Override
public void releaseExclusiveReadLock(GenericFileOperations<File> operations,
GenericFile<File> file, Exchange exchange) throws Exception {
String lockFileName = exchange.getProperty(Exchange.FILE_LOCK_FILE_NAME, getLockFileName(file), String.class);
Expand All @@ -78,14 +82,21 @@ public void releaseExclusiveReadLock(GenericFileOperations<File> operations,
}
}

@Override
public void setTimeout(long timeout) {
// noop
}

@Override
public void setCheckInterval(long checkInterval) {
// noop
}

@Override
public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) {
// noop
}

private static void deleteLockFiles(File dir, boolean recursive) {
File[] files = dir.listFiles();
if (files == null || files.length == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.LoggingLevel;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.JndiRegistry;

Expand Down Expand Up @@ -110,6 +111,11 @@ public void setCheckInterval(long checkInterval) {
// noop
}

@Override
public void setReadLockLoggingLevel(LoggingLevel readLockLoggingLevel) {
// noop
}

public int getCounter() {
return counter;
}
Expand Down
Loading