From 588271ffbefb4dc847cfb3380da040c0f4a7b165 Mon Sep 17 00:00:00 2001 From: "Daneel S. Yaitskov" Date: Tue, 17 Dec 2013 23:53:41 +0400 Subject: [PATCH 1/9] add cvs iterator --- .../camel/dataformat/csv/CsvIterator.java | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java diff --git a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java new file mode 100644 index 0000000000000..8fbea9ca0438a --- /dev/null +++ b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java @@ -0,0 +1,61 @@ +package org.apache.camel.dataformat.csv; + +import org.apache.camel.util.IOHelper; +import org.apache.commons.csv.CSVParser; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + */ +public class CsvIterator implements Iterator>, Closeable { + + private final CSVParser parser; + private final InputStreamReader in; + private String[] line; + + public CsvIterator(CSVParser parser, InputStreamReader in) + throws IOException + { + this.parser = parser; + this.in = in; + line = parser.getLine(); + } + + @Override + public boolean hasNext() { + return line != null; + } + + @Override + public List next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + List result = Arrays.asList(line); + try { + line = parser.getLine(); + } catch (IOException e) { + throw new IllegalStateException(e); + } + if (line == null) { + IOHelper.close(in); + } + return result; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException { + in.close(); + } +} From be7299e4313c157e2e1d4e13a67773794337ba09 Mon Sep 17 00:00:00 2001 From: "Daneel S. Yaitskov" Date: Tue, 17 Dec 2013 23:54:10 +0400 Subject: [PATCH 2/9] add lazyLoad support --- .../camel/dataformat/csv/CsvDataFormat.java | 68 +++++++++++++------ 1 file changed, 46 insertions(+), 22 deletions(-) diff --git a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java index 9ad7b53d3eb8a..4ce6e7e91cfd8 100644 --- a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java +++ b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java @@ -16,13 +16,15 @@ */ package org.apache.camel.dataformat.csv; +import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Writer; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -54,6 +56,10 @@ public class CsvDataFormat implements DataFormat { private boolean autogenColumns = true; private String delimiter; private boolean skipFirstLine; + /** + * Lazy row loading with iterator for big files. + */ + private boolean lazyLoad; public void marshal(Exchange exchange, Object object, OutputStream outputStream) throws Exception { if (delimiter != null) { @@ -96,32 +102,42 @@ public Object unmarshal(Exchange exchange, InputStream inputStream) throws Excep strategy.setDelimiter(config.getDelimiter()); InputStreamReader in = new InputStreamReader(inputStream, IOHelper.getCharsetName(exchange)); - + CsvIterator csvIterator; try { - CSVParser parser = new CSVParser(in, strategy); - List> list = new ArrayList>(); - boolean isFirstLine = true; - while (true) { - String[] strings = parser.getLine(); - if (isFirstLine) { - isFirstLine = false; - if (skipFirstLine) { - // skip considering the first line if we're asked to do so - continue; - } - } - if (strings == null) { - break; - } - List line = Arrays.asList(strings); - list.add(line); + CSVParser parser = createParser(in); + if (parser == null) { + IOHelper.close(in); + return Collections.emptyIterator(); } - return list; - } finally { + csvIterator = new CsvIterator(parser, in); + } catch (IOException e) { IOHelper.close(in); + throw e; + } + if (lazyLoad) { + return csvIterator; } + return loadAllAsList(csvIterator); } - + + private CSVParser createParser(InputStreamReader in) throws IOException { + CSVParser parser = new CSVParser(in, strategy); + if (skipFirstLine) { + if (null == parser.getLine()) { + return null; + } + } + return parser; + } + + private List> loadAllAsList(CsvIterator iter) throws IOException { + List> list = new ArrayList>(); + while (iter.hasNext()) { + list.add(iter.next()); + } + return list; + } + public String getDelimiter() { return delimiter; } @@ -170,6 +186,14 @@ public void setSkipFirstLine(boolean skipFirstLine) { this.skipFirstLine = skipFirstLine; } + public boolean isLazyLoad() { + return lazyLoad; + } + + public void setLazyLoad(boolean lazyLoad) { + this.lazyLoad = lazyLoad; + } + private synchronized void updateFieldsInConfig(Set set, Exchange exchange) { for (Object value : set) { if (value != null) { From 09b2d0afbbda2527a74d08fbceb2ecacc4bb12a1 Mon Sep 17 00:00:00 2001 From: "Daneel S. Yaitskov" Date: Tue, 17 Dec 2013 23:54:26 +0400 Subject: [PATCH 3/9] test lazyLoad --- .../csv/CsvUnmarshalStreamTest.java | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java diff --git a/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java new file mode 100644 index 0000000000000..cde74bb72334e --- /dev/null +++ b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.dataformat.csv; + +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +/** + * Spring based integration test for the CsvDataFormat + * @version + */ +public class CsvUnmarshalStreamTest extends CamelTestSupport { + + @EndpointInject(uri = "mock:result") + private MockEndpoint result; + + @SuppressWarnings("unchecked") + @Test + public void testCsvUnMarshal() throws Exception { + result.expectedMessageCount(1); + + template.sendBody("direct:start", + "123|Camel in Action|\"1\n124\"|ActiveMQ in Action|2\n" + + "333|Shark in Action|\"1\n124\"|Cassandra in Action|3\n" + + "777|Penguin in Action|\"1\n124\"|Astyanax in Action|4\n"); + + assertMockEndpointsSatisfied(); + + Iterator> body = result.getReceivedExchanges().get(0) + .getIn().getBody(Iterator.class); + assertTrue(body.hasNext()); + List row = body.next(); + assertEquals(5, row.size()); + assertEquals("123", row.get(0)); + assertTrue(body.hasNext()); + row = body.next(); + assertEquals(5, row.size()); + assertTrue(body.hasNext()); + row = body.next(); + assertEquals(5, row.size()); + assertFalse(body.hasNext()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + CsvDataFormat csv = new CsvDataFormat(); + csv.setLazyLoad(true); + csv.setDelimiter("|"); + + from("direct:start").unmarshal(csv) + .to("mock:result"); + } + }; + } +} \ No newline at end of file From 05e9eef990f1688f61b2b2d911accf3c1a128579 Mon Sep 17 00:00:00 2001 From: "Daneel S. Yaitskov" Date: Wed, 18 Dec 2013 22:01:51 +0400 Subject: [PATCH 4/9] test csv iterator with splitter --- .../csv/CsvUnmarshalStreamTest.java | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java index cde74bb72334e..21627b2d5e125 100644 --- a/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java +++ b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java @@ -32,34 +32,32 @@ */ public class CsvUnmarshalStreamTest extends CamelTestSupport { + public static final int EXPECTED_COUNT = 3; + @EndpointInject(uri = "mock:result") private MockEndpoint result; @SuppressWarnings("unchecked") @Test public void testCsvUnMarshal() throws Exception { - result.expectedMessageCount(1); + result.expectedMessageCount(EXPECTED_COUNT); + + String message = ""; + for (int i = 0; i < EXPECTED_COUNT; ++i) { + message += i + "|\"" + i + "\n" + i + "\"\n"; + } - template.sendBody("direct:start", - "123|Camel in Action|\"1\n124\"|ActiveMQ in Action|2\n" - + "333|Shark in Action|\"1\n124\"|Cassandra in Action|3\n" - + "777|Penguin in Action|\"1\n124\"|Astyanax in Action|4\n"); + template.sendBody("direct:start", message); assertMockEndpointsSatisfied(); - Iterator> body = result.getReceivedExchanges().get(0) - .getIn().getBody(Iterator.class); - assertTrue(body.hasNext()); - List row = body.next(); - assertEquals(5, row.size()); - assertEquals("123", row.get(0)); - assertTrue(body.hasNext()); - row = body.next(); - assertEquals(5, row.size()); - assertTrue(body.hasNext()); - row = body.next(); - assertEquals(5, row.size()); - assertFalse(body.hasNext()); + for (int i = 0; i < EXPECTED_COUNT; ++i) { + List body = result.getReceivedExchanges().get(i) + .getIn().getBody(List.class); + assertEquals(2, body.size()); + assertEquals(String.valueOf(i), body.get(0)); + assertEquals(String.format("%d\n%d", i, i), body.get(1)); + } } @Override @@ -71,8 +69,10 @@ public void configure() throws Exception { csv.setLazyLoad(true); csv.setDelimiter("|"); - from("direct:start").unmarshal(csv) - .to("mock:result"); + from("direct:start") + .unmarshal(csv) + .split(body()) + .to("mock:result"); } }; } From 680a8b9eb10d215b26ca5d81c8154f43b05678e3 Mon Sep 17 00:00:00 2001 From: "Daneel S. Yaitskov" Date: Wed, 18 Dec 2013 22:29:25 +0400 Subject: [PATCH 5/9] add jmockit --- components/camel-csv/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/components/camel-csv/pom.xml b/components/camel-csv/pom.xml index c55bfde5ca248..a265c089034b5 100644 --- a/components/camel-csv/pom.xml +++ b/components/camel-csv/pom.xml @@ -56,6 +56,12 @@ slf4j-log4j12 test + + com.googlecode.jmockit + jmockit + 1.5 + test + junit junit From 335faa180a918a7a1fadaf34b7c27004dc29f0e0 Mon Sep 17 00:00:00 2001 From: "Daneel S. Yaitskov" Date: Wed, 18 Dec 2013 22:29:47 +0400 Subject: [PATCH 6/9] CsvIteratorTest --- .../camel/dataformat/csv/CsvIteratorTest.java | 100 ++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvIteratorTest.java diff --git a/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvIteratorTest.java b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvIteratorTest.java new file mode 100644 index 0000000000000..db60e9a6a7b1e --- /dev/null +++ b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvIteratorTest.java @@ -0,0 +1,100 @@ +package org.apache.camel.dataformat.csv; + +import mockit.Expectations; +import mockit.Injectable; +import org.apache.commons.csv.CSVParser; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.NoSuchElementException; + +/** + */ +public class CsvIteratorTest { + + public static final String HDD_CRASH = "HDD crash"; + + @Test + public void closeIfError( + final @Injectable InputStreamReader reader, + final @Injectable CSVParser parser) + throws IOException + { + new Expectations() { + { + parser.getLine(); + result = new String[] { "1" }; + + parser.getLine(); + result = new String[] { "2" }; + + parser.getLine(); + result = new IOException(HDD_CRASH); + + reader.close(); + } + }; + + CsvIterator iterator = new CsvIterator(parser, reader); + Assert.assertTrue(iterator.hasNext()); + Assert.assertEquals(Arrays.asList("1"), iterator.next()); + Assert.assertTrue(iterator.hasNext()); + + try { + iterator.next(); + Assert.fail("exception expected"); + } catch (IllegalStateException e) { + Assert.assertEquals(HDD_CRASH, e.getCause().getMessage()); + } + + Assert.assertFalse(iterator.hasNext()); + + try { + iterator.next(); + Assert.fail("exception expected"); + } catch (NoSuchElementException e) { + // okay + } + } + + @Test + public void normalCycle(final @Injectable InputStreamReader reader, + final @Injectable CSVParser parser) + throws IOException + { + new Expectations() { + { + parser.getLine(); + result = new String[] { "1" }; + + parser.getLine(); + result = new String[] { "2" }; + + parser.getLine(); + result = null; + + reader.close(); + } + }; + + CsvIterator iterator = new CsvIterator(parser, reader); + Assert.assertTrue(iterator.hasNext()); + Assert.assertEquals(Arrays.asList("1"), iterator.next()); + + Assert.assertTrue(iterator.hasNext()); + Assert.assertEquals(Arrays.asList("2"), iterator.next()); + + Assert.assertFalse(iterator.hasNext()); + + try { + iterator.next(); + Assert.fail("exception expected"); + } catch (NoSuchElementException e) { + // okay + } + + } +} From d46c500aeeeb451a1c4d73a0dc1d0ee5d2e389a2 Mon Sep 17 00:00:00 2001 From: "Daneel S. Yaitskov" Date: Wed, 18 Dec 2013 22:33:09 +0400 Subject: [PATCH 7/9] fix close stream exception and mark iterator end --- .../main/java/org/apache/camel/dataformat/csv/CsvIterator.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java index 8fbea9ca0438a..68df9c4d11512 100644 --- a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java +++ b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java @@ -41,6 +41,8 @@ public List next() { try { line = parser.getLine(); } catch (IOException e) { + line = null; + IOHelper.close(in); throw new IllegalStateException(e); } if (line == null) { From e8f4ad0aa1cdc85c700ef0a0480960967c5d24a3 Mon Sep 17 00:00:00 2001 From: "Daneel S. Yaitskov" Date: Wed, 18 Dec 2013 23:03:59 +0400 Subject: [PATCH 8/9] add lazyLoad to csv data format definition --- .../camel/model/dataformat/CsvDataFormat.java | 16 +++++- .../csv/CsvUnmarshalStreamSpringTest.java | 56 +++++++++++++++++++ .../CsvUnmarshalStreamSpringTest-context.xml | 32 +++++++++++ 3 files changed, 103 insertions(+), 1 deletion(-) create mode 100644 components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest.java create mode 100644 components/camel-csv/src/test/resources/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest-context.xml diff --git a/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java b/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java index 3d5d53405ca97..93a7d6f3e44cc 100644 --- a/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java +++ b/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java @@ -46,6 +46,8 @@ public class CsvDataFormat extends DataFormatDefinition { private String strategyRef; @XmlAttribute private Boolean skipFirstLine; + @XmlAttribute + private Boolean lazyLoad; public CsvDataFormat() { super("csv"); @@ -96,6 +98,14 @@ public void setSkipFirstLine(Boolean skipFirstLine) { this.skipFirstLine = skipFirstLine; } + public Boolean getLazyLoad() { + return lazyLoad; + } + + public void setLazyLoad(Boolean lazyLoad) { + this.lazyLoad = lazyLoad; + } + @Override protected DataFormat createDataFormat(RouteContext routeContext) { DataFormat csvFormat = super.createDataFormat(routeContext); @@ -131,5 +141,9 @@ protected void configureDataFormat(DataFormat dataFormat, CamelContext camelCont if (skipFirstLine != null) { setProperty(camelContext, dataFormat, "skipFirstLine", skipFirstLine); } + + if (lazyLoad != null) { + setProperty(camelContext, dataFormat, "lazyLoad", lazyLoad); + } } -} \ No newline at end of file +} diff --git a/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest.java b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest.java new file mode 100644 index 0000000000000..0727801570dbb --- /dev/null +++ b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.dataformat.csv; + +import org.apache.camel.EndpointInject; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.spring.CamelSpringTestSupport; +import org.junit.Test; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +import java.util.Arrays; +import java.util.Iterator; + +public class CsvUnmarshalStreamSpringTest extends CamelSpringTestSupport { + + public static final String MESSAGE = "message"; + + @EndpointInject(uri = "mock:result") + private MockEndpoint result; + + @SuppressWarnings("unchecked") + @Test + public void testCsvUnMarshal() throws Exception { + result.expectedMessageCount(1); + + template.sendBody("direct:start", MESSAGE + "\n"); + + assertMockEndpointsSatisfied(); + + Iterator body = result.getReceivedExchanges().get(0) + .getIn().getBody(Iterator.class); + assertEquals(CsvIterator.class, body.getClass()); + assertEquals(Arrays.asList(MESSAGE), body.next()); + } + + @Override + protected AbstractApplicationContext createApplicationContext() { + return new ClassPathXmlApplicationContext( + "org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest-context.xml"); + } +} diff --git a/components/camel-csv/src/test/resources/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest-context.xml b/components/camel-csv/src/test/resources/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest-context.xml new file mode 100644 index 0000000000000..f510201a76e0a --- /dev/null +++ b/components/camel-csv/src/test/resources/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest-context.xml @@ -0,0 +1,32 @@ + + + + + + + + + + + + + + From e630e780e558c608e83927516dc2f400129d1fed Mon Sep 17 00:00:00 2001 From: "Daneel S. Yaitskov" Date: Wed, 18 Dec 2013 23:10:09 +0400 Subject: [PATCH 9/9] add csvLazyLoad factory to DSL --- .../java/org/apache/camel/builder/DataFormatClause.java | 8 ++++++++ .../org/apache/camel/model/dataformat/CsvDataFormat.java | 5 +++++ 2 files changed, 13 insertions(+) diff --git a/camel-core/src/main/java/org/apache/camel/builder/DataFormatClause.java b/camel-core/src/main/java/org/apache/camel/builder/DataFormatClause.java index c2ce089bfef9b..b28d0625d28d2 100644 --- a/camel-core/src/main/java/org/apache/camel/builder/DataFormatClause.java +++ b/camel-core/src/main/java/org/apache/camel/builder/DataFormatClause.java @@ -180,6 +180,14 @@ public T csv() { return dataFormat(new CsvDataFormat()); } + /** + * Uses the CSV data format for a huge file. + * Sequential access through an iterator. + */ + public T csvLazyLoad() { + return dataFormat(new CsvDataFormat(true)); + } + /** * Uses the custom data format */ diff --git a/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java b/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java index 93a7d6f3e44cc..cfd4f6bada435 100644 --- a/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java +++ b/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java @@ -58,6 +58,11 @@ public CsvDataFormat(String delimiter) { setDelimiter(delimiter); } + public CsvDataFormat(boolean lazyLoad) { + this(); + setLazyLoad(lazyLoad); + } + public Boolean isAutogenColumns() { return autogenColumns; }