Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,93 +111,95 @@ protected void processExchange(Exchange exchange, String target) throws Exceptio
log.trace("Processing file: {} for exchange: {}", target, exchange);

try {
preWriteCheck();

// should we write to a temporary name and then afterwards rename to real target
boolean writeAsTempAndRename = ObjectHelper.isNotEmpty(endpoint.getTempFileName());
String tempTarget = null;
// remember if target exists to avoid checking twice
Boolean targetExists = null;
if (writeAsTempAndRename) {
// compute temporary name with the temp prefix
tempTarget = createTempFileName(exchange, target);

log.trace("Writing using tempNameFile: {}", tempTarget);

// cater for file exists option on the real target as
// the file operations code will work on the temp file

// if an existing file already exists what should we do?
targetExists = operations.existsFile(target);
if (targetExists) {
if (endpoint.getFileExist() == GenericFileExist.Ignore) {
// ignore but indicate that the file was written
log.trace("An existing file already exists: {}. Ignore and do not override it.", target);
return;
} else if (endpoint.getFileExist() == GenericFileExist.Fail) {
throw new GenericFileOperationFailedException("File already exist: " + target + ". Cannot write new file.");
} else if (endpoint.isEagerDeleteTargetFile() && endpoint.getFileExist() == GenericFileExist.Override) {
// we override the target so we do this by deleting it so the temp file can be renamed later
// with success as the existing target file have been deleted
log.trace("Eagerly deleting existing file: {}", target);
if (!operations.deleteFile(target)) {
throw new GenericFileOperationFailedException("Cannot delete file: " + target);
preWriteCheck(exchange);

if (isUploadFile()) {
// should we write to a temporary name and then afterwards rename to real target
boolean writeAsTempAndRename = ObjectHelper.isNotEmpty(endpoint.getTempFileName());
String tempTarget = null;
// remember if target exists to avoid checking twice
Boolean targetExists = null;
if (writeAsTempAndRename) {
// compute temporary name with the temp prefix
tempTarget = createTempFileName(exchange, target);

log.trace("Writing using tempNameFile: {}", tempTarget);

// cater for file exists option on the real target as
// the file operations code will work on the temp file

// if an existing file already exists what should we do?
targetExists = operations.existsFile(target);
if (targetExists) {
if (endpoint.getFileExist() == GenericFileExist.Ignore) {
// ignore but indicate that the file was written
log.trace("An existing file already exists: {}. Ignore and do not override it.", target);
return;
} else if (endpoint.getFileExist() == GenericFileExist.Fail) {
throw new GenericFileOperationFailedException("File already exist: " + target + ". Cannot write new file.");
} else if (endpoint.isEagerDeleteTargetFile() && endpoint.getFileExist() == GenericFileExist.Override) {
// we override the target so we do this by deleting it so the temp file can be renamed later
// with success as the existing target file have been deleted
log.trace("Eagerly deleting existing file: {}", target);
if (!operations.deleteFile(target)) {
throw new GenericFileOperationFailedException("Cannot delete file: " + target);
}
}
}
}

// delete any pre existing temp file
if (operations.existsFile(tempTarget)) {
log.trace("Deleting existing temp file: {}", tempTarget);
if (!operations.deleteFile(tempTarget)) {
throw new GenericFileOperationFailedException("Cannot delete file: " + tempTarget);
// delete any pre existing temp file
if (operations.existsFile(tempTarget)) {
log.trace("Deleting existing temp file: {}", tempTarget);
if (!operations.deleteFile(tempTarget)) {
throw new GenericFileOperationFailedException("Cannot delete file: " + tempTarget);
}
}
}
}

// write/upload the file
writeFile(exchange, tempTarget != null ? tempTarget : target);

// if we did write to a temporary name then rename it to the real
// name after we have written the file
if (tempTarget != null) {

// if we should not eager delete the target file then do it now just before renaming
if (!endpoint.isEagerDeleteTargetFile() && targetExists
&& endpoint.getFileExist() == GenericFileExist.Override) {
// we override the target so we do this by deleting it so the temp file can be renamed later
// with success as the existing target file have been deleted
log.trace("Deleting existing file: {}", target);
if (!operations.deleteFile(target)) {
throw new GenericFileOperationFailedException("Cannot delete file: " + target);
// write/upload the file
writeFile(exchange, tempTarget != null ? tempTarget : target);

// if we did write to a temporary name then rename it to the real
// name after we have written the file
if (tempTarget != null) {

// if we should not eager delete the target file then do it now just before renaming
if (!endpoint.isEagerDeleteTargetFile() && targetExists
&& endpoint.getFileExist() == GenericFileExist.Override) {
// we override the target so we do this by deleting it so the temp file can be renamed later
// with success as the existing target file have been deleted
log.trace("Deleting existing file: {}", target);
if (!operations.deleteFile(target)) {
throw new GenericFileOperationFailedException("Cannot delete file: " + target);
}
}
}

// now we are ready to rename the temp file to the target file
log.trace("Renaming file: [{}] to: [{}]", tempTarget, target);
boolean renamed = operations.renameFile(tempTarget, target);
if (!renamed) {
throw new GenericFileOperationFailedException("Cannot rename file from: " + tempTarget + " to: " + target);
// now we are ready to rename the temp file to the target file
log.trace("Renaming file: [{}] to: [{}]", tempTarget, target);
boolean renamed = operations.renameFile(tempTarget, target);
if (!renamed) {
throw new GenericFileOperationFailedException("Cannot rename file from: " + tempTarget + " to: " + target);
}
}
}

// any done file to write?
if (endpoint.getDoneFileName() != null) {
String doneFileName = endpoint.createDoneFileName(target);
ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint);
// any done file to write?
if (endpoint.getDoneFileName() != null) {
String doneFileName = endpoint.createDoneFileName(target);
ObjectHelper.notEmpty(doneFileName, "doneFileName", endpoint);

// create empty exchange with empty body to write as the done file
Exchange empty = new DefaultExchange(exchange);
empty.getIn().setBody("");
// create empty exchange with empty body to write as the done file
Exchange empty = new DefaultExchange(exchange);
empty.getIn().setBody("");

log.trace("Writing done file: [{}]", doneFileName);
// delete any existing done file
if (operations.existsFile(doneFileName)) {
if (!operations.deleteFile(doneFileName)) {
throw new GenericFileOperationFailedException("Cannot delete existing done file: " + doneFileName);
log.trace("Writing done file: [{}]", doneFileName);
// delete any existing done file
if (operations.existsFile(doneFileName)) {
if (!operations.deleteFile(doneFileName)) {
throw new GenericFileOperationFailedException("Cannot delete existing done file: " + doneFileName);
}
}
writeFile(empty, doneFileName);
}
writeFile(empty, doneFileName);
}

// let's store the name we really used in the header, so end-users
Expand All @@ -210,6 +212,15 @@ protected void processExchange(Exchange exchange, String target) throws Exceptio
postWriteCheck();
}

/**
* Override if required. Files are actually sent / returns true by default
*
* @return <tt>true</tt> to send files, <tt>false</tt> to skip sending files.
*/
protected boolean isUploadFile() {
return true;
}

/**
* If we fail writing out a file, we will call this method. This hook is
* provided to disconnect from servers or clean up files we created (if needed).
Expand All @@ -221,7 +232,7 @@ public void handleFailedWrite(Exchange exchange, Exception exception) throws Exc
/**
* Perform any actions that need to occur before we write such as connecting to an FTP server etc.
*/
public void preWriteCheck() throws Exception {
public void preWriteCheck(Exchange exchange) throws Exception {
// nothing needed to check
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,13 @@ public boolean connect(RemoteFileConfiguration configuration) throws GenericFile
throw new GenericFileOperationFailedException(client.getReplyCode(), client.getReplyString(), e.getMessage(), e);
}

// site commands
if (endpoint.getConfiguration().getSiteCommand() != null) {
return true;
}

public void sendSiteCommands(RemoteFileConfiguration configuration, Exchange exchange) throws GenericFileOperationFailedException {
if (configuration.getSiteCommand() != null) {
final boolean captureOutput = exchange != null && configuration.isSiteCommandCapture();
final List<String> siteOutput = new ArrayList<String>();
// commands can be separated using new line
Iterator<?> it = ObjectHelper.createIterator(endpoint.getConfiguration().getSiteCommand(), "\n");
while (it.hasNext()) {
Expand All @@ -177,14 +182,21 @@ public boolean connect(RemoteFileConfiguration configuration) throws GenericFile
log.trace("Site command to send: {}", command);
if (command != null) {
boolean result = sendSiteCommand(command);

if (captureOutput) {
String reply = getFtpClient().getReplyString();
siteOutput.add( reply );
}

if (!result) {
throw new GenericFileOperationFailedException("Site command: " + command + " returned false");
}
}
}
if ( captureOutput ) {
exchange.getIn().setBody(siteOutput);
}
}

return true;
}

public boolean isConnected() throws GenericFileOperationFailedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public enum PathSeparator { UNIX, Windows, Auto };
private int soTimeout;
private boolean throwExceptionOnConnectFailed;
private String siteCommand;
private boolean siteCommandCapture = false;
private boolean stepwise = true;
private PathSeparator separator = PathSeparator.Auto;
private boolean streamDownload;
Expand All @@ -57,7 +58,7 @@ public RemoteFileConfiguration() {
public RemoteFileConfiguration(URI uri) {
configure(uri);
}

@Override
public boolean needToNormalize() {
return false;
Expand Down Expand Up @@ -230,6 +231,25 @@ public void setSiteCommand(String siteCommand) {
this.siteCommand = siteCommand;
}

public boolean isSiteCommandCapture() {
return siteCommandCapture;
}

/**
* Capture the output of the site commands. The output of the site commands will be returned
* in the exchange's body as a List<String> with the output of each executed site command
* as a list element in order of execution. The order of execution is defined by the order
* of the site commands in the siteCommand option (each separated by a newline character).
* If false, the body remains untouched.
* <p/>
* The default is false.
*
* @param siteCommandCapture if true, capture site command output.
*/
public void setSiteCommandCapture(boolean siteCommandCapture) {
this.siteCommandCapture = siteCommandCapture;
}

public boolean isStepwise() {
return stepwise;
}
Expand Down Expand Up @@ -261,7 +281,7 @@ public PathSeparator getSeparator() {
public void setSeparator(PathSeparator separator) {
this.separator = separator;
}

public boolean isStreamDownload() {
return streamDownload;
}
Expand All @@ -271,7 +291,7 @@ public boolean isStreamDownload() {
* the remote files are streamed to the route as they are read. When set to false, the remote files
* are loaded into memory before being sent into the route.
*
* @param streamDownload
* @param streamDownload
*/
public void setStreamDownload(boolean streamDownload) {
this.streamDownload = streamDownload;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ protected boolean prePollCheck() throws Exception {
} else {
connectIfNecessary();
}
getOperations().sendSiteCommands( getEndpoint().getConfiguration(), null );
} catch (Exception e) {
loggedIn = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public abstract class RemoteFileEndpoint<T> extends GenericFileEndpoint<T> {
private boolean disconnect;
private boolean fastExistsCheck;
private boolean download = true;
private boolean upload = true;

public RemoteFileEndpoint() {
// no args constructor for spring bean endpoint configuration
Expand Down Expand Up @@ -208,4 +209,12 @@ public boolean isDownload() {
public void setDownload(boolean download) {
this.download = download;
}

public boolean isUpload() {
return this.upload;
}

public void setUpload(boolean upload) {
this.upload = upload;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.camel.component.file.remote;

import org.apache.camel.Exchange;
import org.apache.camel.component.file.GenericFileOperationFailedException;
import org.apache.camel.component.file.GenericFileOperations;

Expand Down Expand Up @@ -65,4 +66,17 @@ public interface RemoteFileOperations<T> extends GenericFileOperations<T> {
*/
boolean sendSiteCommand(String command) throws GenericFileOperationFailedException;

/**
* Sends the site commands to the remote server which are specified in the
* configuration option siteCommand. If option siteCommandCapture is true,
* the output of each site command will be captured and returned as a List<String>
* in the body.
* <p/>
* Works with the producer only.
*
* @param configuration the configuration
* @param exchange the exchange; if null, output will not be captured
* @throws GenericFileOperationFailedException can be thrown
*/
void sendSiteCommands(RemoteFileConfiguration configuration, Exchange exchange) throws GenericFileOperationFailedException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void disconnect() throws GenericFileOperationFailedException {
}

@Override
public void preWriteCheck() throws Exception {
public void preWriteCheck(Exchange exchange) throws Exception {
// before writing send a noop to see if the connection is alive and works
boolean noop = false;
if (loggedIn) {
Expand All @@ -114,6 +114,7 @@ public void preWriteCheck() throws Exception {
} else {
connectIfNecessary();
}
getOperations().sendSiteCommands(getEndpoint().getConfiguration(), exchange);
} catch (Exception e) {
loggedIn = false;

Expand All @@ -136,6 +137,11 @@ public void postWriteCheck() {
}
}

@Override
protected boolean isUploadFile() {
return getEndpoint().isUpload();
}

@Override
protected void doStart() throws Exception {
log.debug("Starting");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -867,4 +867,9 @@ public boolean sendSiteCommand(String command) throws GenericFileOperationFailed
// is not implemented
return true;
}

@Override
public void sendSiteCommands(RemoteFileConfiguration configuration, Exchange exchange) throws GenericFileOperationFailedException {
// is not implemented
}
}
Loading