From 9bd8e04715b3a4e63bf988e53a6f8188c6a2e40b Mon Sep 17 00:00:00 2001 From: Pontus Ullgren Date: Sun, 29 Dec 2013 22:38:18 +0100 Subject: [PATCH] CAMEL-7101 - Add aggregation strategy to aggregate multiple messages into a zip file --- .../zipfile/ZipAggregationStrategy.java | 223 ++++++++++++++++++ .../zipfile/ZipAggregationStrategyTest.java | 78 ++++++ .../camel/aggregate/zipfile/data/chiau.txt | 1 + .../camel/aggregate/zipfile/data/hi.txt | 1 + .../camel/aggregate/zipfile/data/hola.txt | 1 + 5 files changed, 304 insertions(+) create mode 100644 components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java create mode 100644 components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyTest.java create mode 100644 components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/chiau.txt create mode 100644 components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hi.txt create mode 100644 components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hola.txt diff --git a/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java b/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java new file mode 100644 index 0000000000000..08ae688489f0b --- /dev/null +++ b/components/camel-zipfile/src/main/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategy.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate.zipfile; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; +import java.util.zip.ZipOutputStream; + +import org.apache.camel.Exchange; +import org.apache.camel.component.file.FileConsumer; +import org.apache.camel.component.file.GenericFile; +import org.apache.camel.component.file.GenericFileMessage; +import org.apache.camel.component.file.GenericFileOperationFailedException; +import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.processor.aggregate.AggregationStrategy; +import org.apache.camel.spi.Synchronization; +import org.apache.camel.util.FileUtil; + +/** + * This aggregation strategy will aggregate all incoming messages into a ZIP file. + *

If the incoming exchanges contain {@link GenericFileMessage} file name will + * be taken from the body otherwise the body content will be treated as a byte + * array and the ZIP entry will be named using the message id.

+ *

Note: Please note that this aggregation strategy requires eager + * completion check to work properly.

+ * + * @author Pontus Ullgren + */ +public class ZipAggregationStrategy implements AggregationStrategy { + + private String filePrefix; + private String fileSuffix = ".zip"; + + /** + * Gets the prefix used when creating the ZIP file name. + * @return the prefix + */ + public String getFilePrefix() { + return filePrefix; + } + + /** + * Sets the prefix that will be used when creating the ZIP filename. + * @param filePrefix prefix to use on ZIP file. + */ + public void setFilePrefix(String filePrefix) { + this.filePrefix = filePrefix; + } + + /** + * Gets the suffix used when creating the ZIP file name. + * @return the suffix + */ + public String getFileSuffix() { + return fileSuffix; + } + /** + * Sets the suffix that will be used when creating the ZIP filename. + * @param fileSuffix suffix to use on ZIP file. + */ + public void setFileSuffix(String fileSuffix) { + this.fileSuffix = fileSuffix; + } + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + File zipFile; + Exchange answer = oldExchange; + + // Guard against empty new exchanges + if (newExchange == null) { + return oldExchange; + } + + // First time for this aggregation + if (oldExchange == null) { + try { + zipFile = FileUtil.createTempFile(this.filePrefix, this.fileSuffix); + } catch (IOException e) { + throw new GenericFileOperationFailedException(e.getMessage(), e); + } + DefaultEndpoint endpoint = (DefaultEndpoint) newExchange.getFromEndpoint(); + answer = endpoint.createExchange(); + answer.addOnCompletion(new DeleteZipFileOnCompletion(zipFile)); + } else { + zipFile = oldExchange.getIn().getBody(File.class); + } + + // Handle GenericFileMessages + if (GenericFileMessage.class.isAssignableFrom(newExchange.getIn().getClass())) { + try { + File appendFile = newExchange.getIn().getBody(File.class); + if (appendFile != null) { + addFilesToZip(zipFile, new File[]{appendFile}); + GenericFile genericFile = + FileConsumer.asGenericFile( + zipFile.getParent(), + zipFile, + Charset.defaultCharset().toString()); + genericFile.bindToExchange(answer); + } else { + throw new GenericFileOperationFailedException("Could not get body as file."); + } + } catch (IOException e) { + throw new GenericFileOperationFailedException(e.getMessage(), e); + } + } else { + // Handle all other messages + byte[] buffer = newExchange.getIn().getBody(byte[].class); + try { + addEntryToZip(zipFile, newExchange.getIn().getMessageId(), buffer, buffer.length); + GenericFile genericFile = FileConsumer.asGenericFile( + zipFile.getParent(), zipFile, Charset.defaultCharset().toString()); + genericFile.bindToExchange(answer); + } catch (IOException e) { + throw new GenericFileOperationFailedException(e.getMessage(), e); + } + } + + return answer; + } + + private static void addFilesToZip(File source, File[] files) throws IOException { + File tmpZip = File.createTempFile(source.getName(), null); + tmpZip.delete(); + if (!source.renameTo(tmpZip)) { + throw new IOException("Could not make temp file (" + source.getName() + ")"); + } + byte[] buffer = new byte[1024]; + ZipInputStream zin = new ZipInputStream(new FileInputStream(tmpZip)); + ZipOutputStream out = new ZipOutputStream(new FileOutputStream(source)); + + for (int i = 0; i < files.length; i++) { + InputStream in = new FileInputStream(files[i]); + out.putNextEntry(new ZipEntry(files[i].getName())); + for (int read = in.read(buffer); read > -1; read = in.read(buffer)) { + out.write(buffer, 0, read); + } + out.closeEntry(); + in.close(); + } + + for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = zin.getNextEntry()) { + out.putNextEntry(ze); + for (int read = zin.read(buffer); read > -1; read = zin.read(buffer)) { + out.write(buffer, 0, read); + } + out.closeEntry(); + } + + out.close(); + tmpZip.delete(); + } + + private static void addEntryToZip(File source, String entryName, byte[] buffer, int length) throws IOException { + + File tmpZip = File.createTempFile(source.getName(), null); + tmpZip.delete(); + if (!source.renameTo(tmpZip)) { + throw new IOException("Could not make temp file (" + source.getName() + ")"); + } + ZipInputStream zin = new ZipInputStream(new FileInputStream(tmpZip)); + ZipOutputStream out = new ZipOutputStream(new FileOutputStream(source)); + + out.putNextEntry(new ZipEntry(entryName)); + out.write(buffer, 0, length); + out.closeEntry(); + + for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = zin.getNextEntry()) { + out.putNextEntry(ze); + for (int read = zin.read(buffer); read > -1; read = zin.read(buffer)) { + out.write(buffer, 0, read); + } + out.closeEntry(); + } + + out.close(); + tmpZip.delete(); + } + + /** + * This callback class is used to clean up the temporary ZIP file once the exchange has completed. + * + */ + private class DeleteZipFileOnCompletion implements Synchronization { + + private File fileToDelete; + + public DeleteZipFileOnCompletion(File fileToDelete) { + this.fileToDelete = fileToDelete; + } + + @Override + public void onFailure(Exchange exchange) { + // Keep the file if somthing gone a miss. + } + + @Override + public void onComplete(Exchange exchange) { + FileUtil.deleteFile(this.fileToDelete); + } + } +} diff --git a/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyTest.java b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyTest.java new file mode 100644 index 0000000000000..cf9178a2b40ba --- /dev/null +++ b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategyTest.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate.zipfile; + + +import java.io.File; +import java.io.FileInputStream; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; +import java.util.zip.ZipOutputStream; + +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.file.GenericFileMessage; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class ZipAggregationStrategyTest extends CamelTestSupport { + + private static final int EXPECTED_NO_FILES = 3; + + @Test + public void testSplitter() throws Exception { + MockEndpoint aggregateToZipEntry = getMockEndpoint("mock:aggregateToZipEntry"); + aggregateToZipEntry.expectedMessageCount(1); + assertMockEndpointsSatisfied(); + + Exchange out = aggregateToZipEntry.getExchanges().get(0); + assertTrue("Result message does not contain GenericFileMessage", GenericFileMessage.class.isAssignableFrom(out.getIn().getClass())); + File resultFile = out.getIn().getBody(File.class); + assertNotNull(resultFile); + assertTrue("Zip file should exist", resultFile.isFile()); + assertTrue("Result file name does not end with .zip", resultFile.getName().endsWith(".zip")); + + ZipInputStream zin = new ZipInputStream(new FileInputStream(resultFile)); + int fileCount = 0; + for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = zin.getNextEntry()) { + fileCount++; + } + assertTrue("Zip file should contains " + ZipAggregationStrategyTest.EXPECTED_NO_FILES + " files", + fileCount == ZipAggregationStrategyTest.EXPECTED_NO_FILES); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // Unzip file and Split it according to FileEntry + from("file:src/test/resources/org/apache/camel/aggregate/zipfile/data?consumer.delay=1000&noop=true") + .aggregate(new ZipAggregationStrategy()) + .constant(true) + .completionFromBatchConsumer() + .eagerCheckCompletion() + .to("mock:aggregateToZipEntry") + .log("Done processing big file: ${header.CamelFileName}"); + } + }; + + } +} diff --git a/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/chiau.txt b/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/chiau.txt new file mode 100644 index 0000000000000..7842486f7b40d --- /dev/null +++ b/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/chiau.txt @@ -0,0 +1 @@ +chau \ No newline at end of file diff --git a/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hi.txt b/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hi.txt new file mode 100644 index 0000000000000..32f95c0d1244a --- /dev/null +++ b/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hi.txt @@ -0,0 +1 @@ +hi \ No newline at end of file diff --git a/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hola.txt b/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hola.txt new file mode 100644 index 0000000000000..b8b4a4e2a5db3 --- /dev/null +++ b/components/camel-zipfile/src/test/resources/org/apache/camel/aggregate/zipfile/data/hola.txt @@ -0,0 +1 @@ +hola \ No newline at end of file