diff --git a/camel-core/src/main/java/org/apache/camel/builder/DataFormatClause.java b/camel-core/src/main/java/org/apache/camel/builder/DataFormatClause.java index c2ce089bfef9b..b28d0625d28d2 100644 --- a/camel-core/src/main/java/org/apache/camel/builder/DataFormatClause.java +++ b/camel-core/src/main/java/org/apache/camel/builder/DataFormatClause.java @@ -180,6 +180,14 @@ public T csv() { return dataFormat(new CsvDataFormat()); } + /** + * Uses the CSV data format for a huge file. + * Sequential access through an iterator. + */ + public T csvLazyLoad() { + return dataFormat(new CsvDataFormat(true)); + } + /** * Uses the custom data format */ diff --git a/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java b/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java index 3d5d53405ca97..cfd4f6bada435 100644 --- a/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java +++ b/camel-core/src/main/java/org/apache/camel/model/dataformat/CsvDataFormat.java @@ -46,6 +46,8 @@ public class CsvDataFormat extends DataFormatDefinition { private String strategyRef; @XmlAttribute private Boolean skipFirstLine; + @XmlAttribute + private Boolean lazyLoad; public CsvDataFormat() { super("csv"); @@ -56,6 +58,11 @@ public CsvDataFormat(String delimiter) { setDelimiter(delimiter); } + public CsvDataFormat(boolean lazyLoad) { + this(); + setLazyLoad(lazyLoad); + } + public Boolean isAutogenColumns() { return autogenColumns; } @@ -96,6 +103,14 @@ public void setSkipFirstLine(Boolean skipFirstLine) { this.skipFirstLine = skipFirstLine; } + public Boolean getLazyLoad() { + return lazyLoad; + } + + public void setLazyLoad(Boolean lazyLoad) { + this.lazyLoad = lazyLoad; + } + @Override protected DataFormat createDataFormat(RouteContext routeContext) { DataFormat csvFormat = super.createDataFormat(routeContext); @@ -131,5 +146,9 @@ protected void configureDataFormat(DataFormat dataFormat, CamelContext camelCont if (skipFirstLine != null) { setProperty(camelContext, dataFormat, "skipFirstLine", skipFirstLine); } + + if (lazyLoad != null) { + setProperty(camelContext, dataFormat, "lazyLoad", lazyLoad); + } } -} \ No newline at end of file +} diff --git a/components/camel-csv/pom.xml b/components/camel-csv/pom.xml index c55bfde5ca248..a265c089034b5 100644 --- a/components/camel-csv/pom.xml +++ b/components/camel-csv/pom.xml @@ -56,6 +56,12 @@ slf4j-log4j12 test + + com.googlecode.jmockit + jmockit + 1.5 + test + junit junit diff --git a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java index 9ad7b53d3eb8a..4ce6e7e91cfd8 100644 --- a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java +++ b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvDataFormat.java @@ -16,13 +16,15 @@ */ package org.apache.camel.dataformat.csv; +import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Writer; import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -54,6 +56,10 @@ public class CsvDataFormat implements DataFormat { private boolean autogenColumns = true; private String delimiter; private boolean skipFirstLine; + /** + * Lazy row loading with iterator for big files. + */ + private boolean lazyLoad; public void marshal(Exchange exchange, Object object, OutputStream outputStream) throws Exception { if (delimiter != null) { @@ -96,32 +102,42 @@ public Object unmarshal(Exchange exchange, InputStream inputStream) throws Excep strategy.setDelimiter(config.getDelimiter()); InputStreamReader in = new InputStreamReader(inputStream, IOHelper.getCharsetName(exchange)); - + CsvIterator csvIterator; try { - CSVParser parser = new CSVParser(in, strategy); - List> list = new ArrayList>(); - boolean isFirstLine = true; - while (true) { - String[] strings = parser.getLine(); - if (isFirstLine) { - isFirstLine = false; - if (skipFirstLine) { - // skip considering the first line if we're asked to do so - continue; - } - } - if (strings == null) { - break; - } - List line = Arrays.asList(strings); - list.add(line); + CSVParser parser = createParser(in); + if (parser == null) { + IOHelper.close(in); + return Collections.emptyIterator(); } - return list; - } finally { + csvIterator = new CsvIterator(parser, in); + } catch (IOException e) { IOHelper.close(in); + throw e; + } + if (lazyLoad) { + return csvIterator; } + return loadAllAsList(csvIterator); } - + + private CSVParser createParser(InputStreamReader in) throws IOException { + CSVParser parser = new CSVParser(in, strategy); + if (skipFirstLine) { + if (null == parser.getLine()) { + return null; + } + } + return parser; + } + + private List> loadAllAsList(CsvIterator iter) throws IOException { + List> list = new ArrayList>(); + while (iter.hasNext()) { + list.add(iter.next()); + } + return list; + } + public String getDelimiter() { return delimiter; } @@ -170,6 +186,14 @@ public void setSkipFirstLine(boolean skipFirstLine) { this.skipFirstLine = skipFirstLine; } + public boolean isLazyLoad() { + return lazyLoad; + } + + public void setLazyLoad(boolean lazyLoad) { + this.lazyLoad = lazyLoad; + } + private synchronized void updateFieldsInConfig(Set set, Exchange exchange) { for (Object value : set) { if (value != null) { diff --git a/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java new file mode 100644 index 0000000000000..68df9c4d11512 --- /dev/null +++ b/components/camel-csv/src/main/java/org/apache/camel/dataformat/csv/CsvIterator.java @@ -0,0 +1,63 @@ +package org.apache.camel.dataformat.csv; + +import org.apache.camel.util.IOHelper; +import org.apache.commons.csv.CSVParser; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + */ +public class CsvIterator implements Iterator>, Closeable { + + private final CSVParser parser; + private final InputStreamReader in; + private String[] line; + + public CsvIterator(CSVParser parser, InputStreamReader in) + throws IOException + { + this.parser = parser; + this.in = in; + line = parser.getLine(); + } + + @Override + public boolean hasNext() { + return line != null; + } + + @Override + public List next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + List result = Arrays.asList(line); + try { + line = parser.getLine(); + } catch (IOException e) { + line = null; + IOHelper.close(in); + throw new IllegalStateException(e); + } + if (line == null) { + IOHelper.close(in); + } + return result; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + + @Override + public void close() throws IOException { + in.close(); + } +} diff --git a/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvIteratorTest.java b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvIteratorTest.java new file mode 100644 index 0000000000000..db60e9a6a7b1e --- /dev/null +++ b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvIteratorTest.java @@ -0,0 +1,100 @@ +package org.apache.camel.dataformat.csv; + +import mockit.Expectations; +import mockit.Injectable; +import org.apache.commons.csv.CSVParser; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.Arrays; +import java.util.NoSuchElementException; + +/** + */ +public class CsvIteratorTest { + + public static final String HDD_CRASH = "HDD crash"; + + @Test + public void closeIfError( + final @Injectable InputStreamReader reader, + final @Injectable CSVParser parser) + throws IOException + { + new Expectations() { + { + parser.getLine(); + result = new String[] { "1" }; + + parser.getLine(); + result = new String[] { "2" }; + + parser.getLine(); + result = new IOException(HDD_CRASH); + + reader.close(); + } + }; + + CsvIterator iterator = new CsvIterator(parser, reader); + Assert.assertTrue(iterator.hasNext()); + Assert.assertEquals(Arrays.asList("1"), iterator.next()); + Assert.assertTrue(iterator.hasNext()); + + try { + iterator.next(); + Assert.fail("exception expected"); + } catch (IllegalStateException e) { + Assert.assertEquals(HDD_CRASH, e.getCause().getMessage()); + } + + Assert.assertFalse(iterator.hasNext()); + + try { + iterator.next(); + Assert.fail("exception expected"); + } catch (NoSuchElementException e) { + // okay + } + } + + @Test + public void normalCycle(final @Injectable InputStreamReader reader, + final @Injectable CSVParser parser) + throws IOException + { + new Expectations() { + { + parser.getLine(); + result = new String[] { "1" }; + + parser.getLine(); + result = new String[] { "2" }; + + parser.getLine(); + result = null; + + reader.close(); + } + }; + + CsvIterator iterator = new CsvIterator(parser, reader); + Assert.assertTrue(iterator.hasNext()); + Assert.assertEquals(Arrays.asList("1"), iterator.next()); + + Assert.assertTrue(iterator.hasNext()); + Assert.assertEquals(Arrays.asList("2"), iterator.next()); + + Assert.assertFalse(iterator.hasNext()); + + try { + iterator.next(); + Assert.fail("exception expected"); + } catch (NoSuchElementException e) { + // okay + } + + } +} diff --git a/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest.java b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest.java new file mode 100644 index 0000000000000..0727801570dbb --- /dev/null +++ b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.dataformat.csv; + +import org.apache.camel.EndpointInject; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.spring.CamelSpringTestSupport; +import org.junit.Test; +import org.springframework.context.support.AbstractApplicationContext; +import org.springframework.context.support.ClassPathXmlApplicationContext; + +import java.util.Arrays; +import java.util.Iterator; + +public class CsvUnmarshalStreamSpringTest extends CamelSpringTestSupport { + + public static final String MESSAGE = "message"; + + @EndpointInject(uri = "mock:result") + private MockEndpoint result; + + @SuppressWarnings("unchecked") + @Test + public void testCsvUnMarshal() throws Exception { + result.expectedMessageCount(1); + + template.sendBody("direct:start", MESSAGE + "\n"); + + assertMockEndpointsSatisfied(); + + Iterator body = result.getReceivedExchanges().get(0) + .getIn().getBody(Iterator.class); + assertEquals(CsvIterator.class, body.getClass()); + assertEquals(Arrays.asList(MESSAGE), body.next()); + } + + @Override + protected AbstractApplicationContext createApplicationContext() { + return new ClassPathXmlApplicationContext( + "org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest-context.xml"); + } +} diff --git a/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java new file mode 100644 index 0000000000000..21627b2d5e125 --- /dev/null +++ b/components/camel-csv/src/test/java/org/apache/camel/dataformat/csv/CsvUnmarshalStreamTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.dataformat.csv; + +import org.apache.camel.EndpointInject; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +/** + * Spring based integration test for the CsvDataFormat + * @version + */ +public class CsvUnmarshalStreamTest extends CamelTestSupport { + + public static final int EXPECTED_COUNT = 3; + + @EndpointInject(uri = "mock:result") + private MockEndpoint result; + + @SuppressWarnings("unchecked") + @Test + public void testCsvUnMarshal() throws Exception { + result.expectedMessageCount(EXPECTED_COUNT); + + String message = ""; + for (int i = 0; i < EXPECTED_COUNT; ++i) { + message += i + "|\"" + i + "\n" + i + "\"\n"; + } + + template.sendBody("direct:start", message); + + assertMockEndpointsSatisfied(); + + for (int i = 0; i < EXPECTED_COUNT; ++i) { + List body = result.getReceivedExchanges().get(i) + .getIn().getBody(List.class); + assertEquals(2, body.size()); + assertEquals(String.valueOf(i), body.get(0)); + assertEquals(String.format("%d\n%d", i, i), body.get(1)); + } + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + CsvDataFormat csv = new CsvDataFormat(); + csv.setLazyLoad(true); + csv.setDelimiter("|"); + + from("direct:start") + .unmarshal(csv) + .split(body()) + .to("mock:result"); + } + }; + } +} \ No newline at end of file diff --git a/components/camel-csv/src/test/resources/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest-context.xml b/components/camel-csv/src/test/resources/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest-context.xml new file mode 100644 index 0000000000000..f510201a76e0a --- /dev/null +++ b/components/camel-csv/src/test/resources/org/apache/camel/dataformat/csv/CsvUnmarshalStreamSpringTest-context.xml @@ -0,0 +1,32 @@ + + + + + + + + + + + + + +