From df2d33ff5d39e347876e79b4f66814dde6cf2641 Mon Sep 17 00:00:00 2001 From: subhankar Date: Mon, 11 Apr 2016 11:48:57 +0530 Subject: [PATCH 1/2] [STORM-979][storm-elasticsearch] BaseQueryFunction to query to ES for trident --- external/storm-elasticsearch/README.md | 18 +++ .../ElasticsearchSearchRequest.java | 34 +++++ .../elasticsearch/EsSearchResultOutput.java | 41 +++++ .../elasticsearch/bolt/EsSearchBolt.java | 76 ++++++++++ .../storm/elasticsearch/trident/EsSearch.java | 24 +++ .../storm/elasticsearch/trident/EsState.java | 50 ++++++ .../elasticsearch/trident/EsStateFactory.java | 31 +++- .../bolt/EsSearchBoltIntegrationTest.java | 143 ++++++++++++++++++ .../elasticsearch/bolt/EsSearchBoltTest.java | 106 +++++++++++++ 9 files changed, 519 insertions(+), 4 deletions(-) create mode 100644 external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/ElasticsearchSearchRequest.java create mode 100644 external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/EsSearchResultOutput.java create mode 100644 external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsSearchBolt.java create mode 100644 external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsSearch.java create mode 100644 external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsSearchBoltIntegrationTest.java create mode 100644 external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsSearchBoltTest.java diff --git a/external/storm-elasticsearch/README.md b/external/storm-elasticsearch/README.md index 2c52531c4e..479c3743ef 100644 --- a/external/storm-elasticsearch/README.md +++ b/external/storm-elasticsearch/README.md @@ -62,6 +62,24 @@ EsLookupResultOutput output = createOutput(); EsLookupBolt lookupBolt = new EsLookupBolt(esConfig, getRequestAdapter, output); ``` +## EsSearchBolt (org.apache.storm.elasticsearch.bolt.EsSearchBolt) + +EsSearchBolt performs a search request to Elasticsearch. +In order to do that, three dependencies need to be satisfied. Apart from usual EsConfig, two other dependencies must be provided: + ElasticsearchSearchRequest is used to convert the incoming Tuple to the SearchRequest that will be executed against Elasticsearch. + EsSearchResultOutput is used to declare the output fields and convert the SearchResponse to values that are emited by the bolt. + +Incoming tuple is passed to provided SearchRequest creator and the result of that execution is passed to Elasticsearch client. +The bolt then uses the provider output adapter (EsSearchResultOutput) to convert the SearchResponse to Values to emit. +The output fields are also specified by the user of the bolt via the output adapter (EsSearchResultOutput). + +```java +EsConfig esConfig = createEsConfig(); +ElasticsearchSearchRequest searchRequestAdapter = createElasticsearchSearchRequest(); +EsSearchResultOutput output = createOutput(); +EsLookupBolt lookupBolt = new EsLookupBolt(esConfig, getRequestAdapter, output); +``` + ## EsConfig (org.apache.storm.elasticsearch.common.EsConfig) Provided components (Bolt, State) takes in EsConfig as a constructor arg. diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/ElasticsearchSearchRequest.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/ElasticsearchSearchRequest.java new file mode 100644 index 0000000000..e422f3e9ea --- /dev/null +++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/ElasticsearchSearchRequest.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.elasticsearch; + +import org.apache.storm.tuple.ITuple; +import org.elasticsearch.action.search.SearchRequest; + +import java.io.Serializable; + +/** + * The adapter to convert the incoming tuple to Elasticsearch SearchRequest. + */ +public interface ElasticsearchSearchRequest extends Serializable { + + /** + * @return SearchRequest to perform against Elasticsearch. + */ + SearchRequest extractFrom(ITuple tuple); +} diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/EsSearchResultOutput.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/EsSearchResultOutput.java new file mode 100644 index 0000000000..8cb42a7bf0 --- /dev/null +++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/EsSearchResultOutput.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.elasticsearch; + +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.elasticsearch.action.search.SearchResponse; + +import java.io.Serializable; +import java.util.Collection; + +/** + * The adapter to convert the search results fetched from Elasticsearch to values. + */ +public interface EsSearchResultOutput extends Serializable { + + /** + * @return collection of values to emit. + */ + Collection toValues(SearchResponse response); + + /** + * @return output fields to declare. + */ + Fields fields(); +} diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsSearchBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsSearchBolt.java new file mode 100644 index 0000000000..bb8727aa49 --- /dev/null +++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsSearchBolt.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.elasticsearch.bolt; + +import org.apache.storm.elasticsearch.ElasticsearchSearchRequest; +import org.apache.storm.elasticsearch.EsSearchResultOutput; +import org.apache.storm.elasticsearch.common.EsConfig; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; + +import java.util.Collection; + +import static org.elasticsearch.common.base.Preconditions.checkNotNull; + +public class EsSearchBolt extends AbstractEsBolt{ + + + private final ElasticsearchSearchRequest searchRequest; + private final EsSearchResultOutput output; + + /** + * @throws NullPointerException if any of the parameters is null + */ + public EsSearchBolt(EsConfig esConfig, ElasticsearchSearchRequest searchRequest, + EsSearchResultOutput output) { + super(esConfig); + checkNotNull(searchRequest); + checkNotNull(output); + this.searchRequest = searchRequest; + this.output = output; + } + + @Override + public void execute(Tuple tuple) { + try { + SearchRequest request = searchRequest.extractFrom(tuple); + SearchResponse response = client.search(request).actionGet(); + Collection values = output.toValues(response); + tryEmitAndAck(values, tuple); + } catch (Exception e) { + collector.reportError(e); + collector.fail(tuple); + } + } + + private void tryEmitAndAck(Collection values, Tuple tuple) { + for (Values value : values) { + collector.emit(tuple, value); + } + collector.ack(tuple); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + outputFieldsDeclarer.declare(output.fields()); + } + +} diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsSearch.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsSearch.java new file mode 100644 index 0000000000..2e4754ffb3 --- /dev/null +++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsSearch.java @@ -0,0 +1,24 @@ +package org.apache.storm.elasticsearch.trident; + +import org.apache.storm.trident.operation.TridentCollector; +import org.apache.storm.trident.state.BaseQueryFunction; +import org.apache.storm.trident.tuple.TridentTuple; +import org.apache.storm.tuple.Values; + +import java.util.Collection; +import java.util.List; + +public class EsSearch extends BaseQueryFunction> { + + @Override + public List> batchRetrieve(EsState state, List tridentTuples) { + return state.batchRetrieve(tridentTuples); + } + + @Override + public void execute(TridentTuple tuple, Collection valuesList, TridentCollector collector) { + for (Values values : valuesList) { + collector.emit(values); + } + } +} diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java index 2241f4ba02..34afb8f088 100644 --- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java +++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java @@ -17,21 +17,31 @@ */ package org.apache.storm.elasticsearch.trident; +import org.apache.storm.elasticsearch.ElasticsearchSearchRequest; +import org.apache.storm.elasticsearch.EsSearchResultOutput; import org.apache.storm.topology.FailedException; import org.apache.storm.elasticsearch.common.StormElasticSearchClient; import org.apache.storm.elasticsearch.common.EsConfig; import org.apache.storm.elasticsearch.common.EsTupleMapper; +import org.apache.storm.tuple.Values; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.common.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.storm.trident.state.State; import org.apache.storm.trident.tuple.TridentTuple; +import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import static org.elasticsearch.common.base.Preconditions.checkNotNull; + /** * Trident State for storing tuple to ES document. * @since 0.11 @@ -41,6 +51,8 @@ class EsState implements State { private static Client client; private EsConfig esConfig; private EsTupleMapper tupleMapper; + private ElasticsearchSearchRequest searchRequest; + private EsSearchResultOutput output; /** * EsState constructor @@ -52,6 +64,19 @@ public EsState(EsConfig esConfig, EsTupleMapper tupleMapper) { this.tupleMapper = tupleMapper; } + /** + * EsState constructor + * @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig} + * @param searchRequest Tuple to Es SearchRequest mapper {@link ElasticsearchSearchRequest} + * @param output Es search response to Value mapper {@link EsSearchResultOutput} + */ + public EsState(EsConfig esConfig, ElasticsearchSearchRequest searchRequest, + EsSearchResultOutput output) { + this.esConfig = esConfig; + this.searchRequest = searchRequest; + this.output = output; + } + /** * @param txid * @@ -95,6 +120,7 @@ public void prepare() { * Each tuple should have relevant fields (source, index, type, id) for EsState's tupleMapper to extract ES document. */ public void updateState(List tuples) { + checkNotNull(tupleMapper); BulkRequestBuilder bulkRequest = client.prepareBulk(); for (TridentTuple tuple : tuples) { String source = tupleMapper.getSource(tuple); @@ -110,4 +136,28 @@ public void updateState(List tuples) { throw new FailedException(); } } + + /** + * Search current state from ElasticSearch. + * @param tuples list of tuples for searching ES. + * @return list of collection retrieve by es query. + */ + public List> batchRetrieve(List tuples) { + checkNotNull(searchRequest); + checkNotNull(output); + List> batchRetrieveResult = new ArrayList<>(); + try { + for (TridentTuple tuple : tuples){ + SearchRequest request = searchRequest.extractFrom(tuple); + SearchResponse response = client.search(request).actionGet(); + Collection values = output.toValues(response); + batchRetrieveResult.add(values); + + } + }catch (Exception e){ + LOG.warn("Batch retrieve operation is failed."); + throw new FailedException(e); + } + return batchRetrieveResult; + } } diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java index 5ae174fbb0..57e23b9b63 100644 --- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java +++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java @@ -17,6 +17,8 @@ */ package org.apache.storm.elasticsearch.trident; +import org.apache.storm.elasticsearch.ElasticsearchSearchRequest; +import org.apache.storm.elasticsearch.EsSearchResultOutput; import org.apache.storm.task.IMetricsContext; import org.apache.storm.elasticsearch.common.EsConfig; import org.apache.storm.elasticsearch.common.EsTupleMapper; @@ -34,7 +36,9 @@ */ public class EsStateFactory implements StateFactory { private final EsConfig esConfig; - private final EsTupleMapper tupleMapper; + private EsTupleMapper tupleMapper; + private ElasticsearchSearchRequest searchRequest; + private EsSearchResultOutput output; /** * EsStateFactory constructor @@ -46,10 +50,29 @@ public EsStateFactory(EsConfig esConfig, EsTupleMapper tupleMapper) { this.tupleMapper = checkNotNull(tupleMapper); } + /** + * + * @param esConfig + * @param searchRequest + * @param output + */ + public EsStateFactory(EsConfig esConfig, ElasticsearchSearchRequest searchRequest, + EsSearchResultOutput output) { + this.esConfig = checkNotNull(esConfig); + this.searchRequest = checkNotNull(searchRequest); + this.output = checkNotNull(output); + } + @Override public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { - EsState esState = new EsState(esConfig, tupleMapper); - esState.prepare(); - return esState; + if (tupleMapper != null) { + EsState esState = new EsState(esConfig, tupleMapper); + esState.prepare(); + return esState; + } else { + EsState esState = new EsState(esConfig, searchRequest, output); + esState.prepare(); + return esState; + } } } diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsSearchBoltIntegrationTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsSearchBoltIntegrationTest.java new file mode 100644 index 0000000000..54e813f547 --- /dev/null +++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsSearchBoltIntegrationTest.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.elasticsearch.bolt; + +import org.apache.storm.elasticsearch.ElasticsearchSearchRequest; +import org.apache.storm.elasticsearch.EsSearchResultOutput; +import org.apache.storm.elasticsearch.common.EsConfig; +import org.apache.storm.testing.IntegrationTest; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.ITuple; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.verify; + +@Category(IntegrationTest.class) +@RunWith(MockitoJUnitRunner.class) +public class EsSearchBoltIntegrationTest extends AbstractEsBoltIntegrationTest{ + + private final String documentId = UUID.randomUUID().toString(); + private final String indexName = "index"; + private final String typeName = "type"; + private final String source = "{\"user\":\"user1\"}"; + + private ElasticsearchSearchRequest searchRequest = new TestESSearchRequest(); + private EsSearchResultOutput output = new TestESSearchResultOutput(); + + @Captor + private ArgumentCaptor anchor; + + @Captor + private ArgumentCaptor emmitedValues; + + @Mock + private Tuple tuple; + + @Override + protected EsSearchBolt createBolt(EsConfig esConfig) { + return new EsSearchBolt(esConfig, searchRequest, output); + } + + @Before + public void populateIndexWithTestData() throws Exception { + node.client().prepareIndex(indexName, typeName, documentId).setSource(source).execute().actionGet(); + } + + @After + public void clearIndex() throws Exception { + node.client().delete(new DeleteRequest(indexName, typeName, documentId)).actionGet(); + } + + @Test + public void anchorsTheTuple() throws Exception { + bolt.execute(tuple); + + verify(outputCollector).emit(anchor.capture(), emmitedValues.capture()); + assertThat(anchor.getValue(), is(tuple)); + } + + @Test + public void emitsExpectedValues() throws Exception { + Values expectedValues = expectedValues(); + + bolt.execute(tuple); + + verify(outputCollector).emit(anchor.capture(), emmitedValues.capture()); + assertThat(emmitedValues.getValue(), is(expectedValues)); + } + + @Test + public void acksTuple() throws Exception { + bolt.execute(tuple); + + verify(outputCollector).ack(anchor.capture()); + assertThat(anchor.getValue(), is(tuple)); + } + + @Override + protected Class getBoltClass() { + return EsSearchBolt.class; + } + + private Values expectedValues() { + return new Values(source); + } + + private class TestESSearchRequest implements ElasticsearchSearchRequest { + + @Override + public SearchRequest extractFrom(ITuple tuple) { + return node.client().prepareSearch(indexName) + .setTypes(typeName) + .request(); + } + } + + private class TestESSearchResultOutput implements EsSearchResultOutput { + + @Override + public Collection toValues(SearchResponse response) { + return Collections.singleton(expectedValues()); + } + + @Override + public Fields fields() { + return new Fields("data"); + } + } +} diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsSearchBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsSearchBoltTest.java new file mode 100644 index 0000000000..630bee8f4d --- /dev/null +++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsSearchBoltTest.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.elasticsearch.bolt; + +import org.apache.storm.elasticsearch.ElasticsearchSearchRequest; +import org.apache.storm.elasticsearch.EsSearchResultOutput; +import org.apache.storm.elasticsearch.common.EsConfig; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.util.Collections; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class EsSearchBoltTest extends AbstractEsBoltTest { + + @Mock + private EsConfig esConfig; + + @Mock + private Tuple tuple; + + @Mock + private Client client; + + @Mock + private ElasticsearchSearchRequest searchRequest; + + @Mock + private SearchRequest request; + + @Mock + private EsSearchResultOutput output; + + private Client originalClient; + + + @Override + protected EsSearchBolt createBolt(EsConfig esConfig) { + originalClient = EsLookupBolt.getClient(); + EsLookupBolt.replaceClient(this.client); + return new EsSearchBolt(esConfig, searchRequest, output); + } + + @Before + public void configureBoltDependencies() throws Exception { + when(searchRequest.extractFrom(tuple)).thenReturn(request); + when(output.toValues(any(SearchResponse.class))).thenReturn(Collections.singleton(new Values(""))); + } + + @After + public void replaceClientWithOriginal() throws Exception { + EsLookupBolt.replaceClient(originalClient); + } + + + @Test + public void failsTupleWhenClientThrows() throws Exception { + when(client.search(request)).thenThrow(ElasticsearchException.class); + bolt.execute(tuple); + + verify(outputCollector).fail(tuple); + } + + @Test + public void reportsExceptionWhenClientThrows() throws Exception { + ElasticsearchException elasticsearchException = new ElasticsearchException("dummy"); + when(client.search(request)).thenThrow(elasticsearchException); + bolt.execute(tuple); + + verify(outputCollector).reportError(elasticsearchException); + } + + @Override + protected Class getBoltClass() { + return EsSearchBolt.class; + } +} From b4c2b5b580018a1db3fe727d2c41441b65bb31f1 Mon Sep 17 00:00:00 2001 From: subhankar Date: Mon, 11 Apr 2016 19:11:21 +0530 Subject: [PATCH 2/2] [STORM-979][storm-elasticsearch] Added Missing License Header --- .../storm/elasticsearch/bolt/EsSearchBolt.java | 1 - .../storm/elasticsearch/trident/EsSearch.java | 17 +++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsSearchBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsSearchBolt.java index bb8727aa49..fa9bbc4485 100644 --- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsSearchBolt.java +++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsSearchBolt.java @@ -32,7 +32,6 @@ public class EsSearchBolt extends AbstractEsBolt{ - private final ElasticsearchSearchRequest searchRequest; private final EsSearchResultOutput output; diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsSearch.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsSearch.java index 2e4754ffb3..71e0b20890 100644 --- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsSearch.java +++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsSearch.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.storm.elasticsearch.trident; import org.apache.storm.trident.operation.TridentCollector;