diff --git a/external/storm-elasticsearch/README.md b/external/storm-elasticsearch/README.md
index 2c52531c4e..479c3743ef 100644
--- a/external/storm-elasticsearch/README.md
+++ b/external/storm-elasticsearch/README.md
@@ -62,6 +62,24 @@ EsLookupResultOutput output = createOutput();
EsLookupBolt lookupBolt = new EsLookupBolt(esConfig, getRequestAdapter, output);
```
+## EsSearchBolt (org.apache.storm.elasticsearch.bolt.EsSearchBolt)
+
+EsSearchBolt performs a search request to Elasticsearch.
+In order to do that, three dependencies need to be satisfied. Apart from usual EsConfig, two other dependencies must be provided:
+ ElasticsearchSearchRequest is used to convert the incoming Tuple to the SearchRequest that will be executed against Elasticsearch.
+ EsSearchResultOutput is used to declare the output fields and convert the SearchResponse to values that are emited by the bolt.
+
+Incoming tuple is passed to provided SearchRequest creator and the result of that execution is passed to Elasticsearch client.
+The bolt then uses the provider output adapter (EsSearchResultOutput) to convert the SearchResponse to Values to emit.
+The output fields are also specified by the user of the bolt via the output adapter (EsSearchResultOutput).
+
+```java
+EsConfig esConfig = createEsConfig();
+ElasticsearchSearchRequest searchRequestAdapter = createElasticsearchSearchRequest();
+EsSearchResultOutput output = createOutput();
+EsLookupBolt lookupBolt = new EsLookupBolt(esConfig, getRequestAdapter, output);
+```
+
## EsConfig (org.apache.storm.elasticsearch.common.EsConfig)
Provided components (Bolt, State) takes in EsConfig as a constructor arg.
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/ElasticsearchSearchRequest.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/ElasticsearchSearchRequest.java
new file mode 100644
index 0000000000..e422f3e9ea
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/ElasticsearchSearchRequest.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch;
+
+import org.apache.storm.tuple.ITuple;
+import org.elasticsearch.action.search.SearchRequest;
+
+import java.io.Serializable;
+
+/**
+ * The adapter to convert the incoming tuple to Elasticsearch SearchRequest.
+ */
+public interface ElasticsearchSearchRequest extends Serializable {
+
+ /**
+ * @return SearchRequest to perform against Elasticsearch.
+ */
+ SearchRequest extractFrom(ITuple tuple);
+}
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/EsSearchResultOutput.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/EsSearchResultOutput.java
new file mode 100644
index 0000000000..8cb42a7bf0
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/EsSearchResultOutput.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch;
+
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.elasticsearch.action.search.SearchResponse;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * The adapter to convert the search results fetched from Elasticsearch to values.
+ */
+public interface EsSearchResultOutput extends Serializable {
+
+ /**
+ * @return collection of values to emit.
+ */
+ Collection toValues(SearchResponse response);
+
+ /**
+ * @return output fields to declare.
+ */
+ Fields fields();
+}
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsSearchBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsSearchBolt.java
new file mode 100644
index 0000000000..fa9bbc4485
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsSearchBolt.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import org.apache.storm.elasticsearch.ElasticsearchSearchRequest;
+import org.apache.storm.elasticsearch.EsSearchResultOutput;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+
+import java.util.Collection;
+
+import static org.elasticsearch.common.base.Preconditions.checkNotNull;
+
+public class EsSearchBolt extends AbstractEsBolt{
+
+ private final ElasticsearchSearchRequest searchRequest;
+ private final EsSearchResultOutput output;
+
+ /**
+ * @throws NullPointerException if any of the parameters is null
+ */
+ public EsSearchBolt(EsConfig esConfig, ElasticsearchSearchRequest searchRequest,
+ EsSearchResultOutput output) {
+ super(esConfig);
+ checkNotNull(searchRequest);
+ checkNotNull(output);
+ this.searchRequest = searchRequest;
+ this.output = output;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ try {
+ SearchRequest request = searchRequest.extractFrom(tuple);
+ SearchResponse response = client.search(request).actionGet();
+ Collection values = output.toValues(response);
+ tryEmitAndAck(values, tuple);
+ } catch (Exception e) {
+ collector.reportError(e);
+ collector.fail(tuple);
+ }
+ }
+
+ private void tryEmitAndAck(Collection values, Tuple tuple) {
+ for (Values value : values) {
+ collector.emit(tuple, value);
+ }
+ collector.ack(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(output.fields());
+ }
+
+}
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsSearch.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsSearch.java
new file mode 100644
index 0000000000..71e0b20890
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsSearch.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.trident;
+
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseQueryFunction;
+import org.apache.storm.trident.tuple.TridentTuple;
+import org.apache.storm.tuple.Values;
+
+import java.util.Collection;
+import java.util.List;
+
+public class EsSearch extends BaseQueryFunction> {
+
+ @Override
+ public List> batchRetrieve(EsState state, List tridentTuples) {
+ return state.batchRetrieve(tridentTuples);
+ }
+
+ @Override
+ public void execute(TridentTuple tuple, Collection valuesList, TridentCollector collector) {
+ for (Values values : valuesList) {
+ collector.emit(values);
+ }
+ }
+}
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
index 2241f4ba02..34afb8f088 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
@@ -17,21 +17,31 @@
*/
package org.apache.storm.elasticsearch.trident;
+import org.apache.storm.elasticsearch.ElasticsearchSearchRequest;
+import org.apache.storm.elasticsearch.EsSearchResultOutput;
import org.apache.storm.topology.FailedException;
import org.apache.storm.elasticsearch.common.StormElasticSearchClient;
import org.apache.storm.elasticsearch.common.EsConfig;
import org.apache.storm.elasticsearch.common.EsTupleMapper;
+import org.apache.storm.tuple.Values;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
+import org.elasticsearch.common.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.tuple.TridentTuple;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import static org.elasticsearch.common.base.Preconditions.checkNotNull;
+
/**
* Trident State for storing tuple to ES document.
* @since 0.11
@@ -41,6 +51,8 @@ class EsState implements State {
private static Client client;
private EsConfig esConfig;
private EsTupleMapper tupleMapper;
+ private ElasticsearchSearchRequest searchRequest;
+ private EsSearchResultOutput output;
/**
* EsState constructor
@@ -52,6 +64,19 @@ public EsState(EsConfig esConfig, EsTupleMapper tupleMapper) {
this.tupleMapper = tupleMapper;
}
+ /**
+ * EsState constructor
+ * @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
+ * @param searchRequest Tuple to Es SearchRequest mapper {@link ElasticsearchSearchRequest}
+ * @param output Es search response to Value mapper {@link EsSearchResultOutput}
+ */
+ public EsState(EsConfig esConfig, ElasticsearchSearchRequest searchRequest,
+ EsSearchResultOutput output) {
+ this.esConfig = esConfig;
+ this.searchRequest = searchRequest;
+ this.output = output;
+ }
+
/**
* @param txid
*
@@ -95,6 +120,7 @@ public void prepare() {
* Each tuple should have relevant fields (source, index, type, id) for EsState's tupleMapper to extract ES document.
*/
public void updateState(List tuples) {
+ checkNotNull(tupleMapper);
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (TridentTuple tuple : tuples) {
String source = tupleMapper.getSource(tuple);
@@ -110,4 +136,28 @@ public void updateState(List tuples) {
throw new FailedException();
}
}
+
+ /**
+ * Search current state from ElasticSearch.
+ * @param tuples list of tuples for searching ES.
+ * @return list of collection retrieve by es query.
+ */
+ public List> batchRetrieve(List tuples) {
+ checkNotNull(searchRequest);
+ checkNotNull(output);
+ List> batchRetrieveResult = new ArrayList<>();
+ try {
+ for (TridentTuple tuple : tuples){
+ SearchRequest request = searchRequest.extractFrom(tuple);
+ SearchResponse response = client.search(request).actionGet();
+ Collection values = output.toValues(response);
+ batchRetrieveResult.add(values);
+
+ }
+ }catch (Exception e){
+ LOG.warn("Batch retrieve operation is failed.");
+ throw new FailedException(e);
+ }
+ return batchRetrieveResult;
+ }
}
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
index 5ae174fbb0..57e23b9b63 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
@@ -17,6 +17,8 @@
*/
package org.apache.storm.elasticsearch.trident;
+import org.apache.storm.elasticsearch.ElasticsearchSearchRequest;
+import org.apache.storm.elasticsearch.EsSearchResultOutput;
import org.apache.storm.task.IMetricsContext;
import org.apache.storm.elasticsearch.common.EsConfig;
import org.apache.storm.elasticsearch.common.EsTupleMapper;
@@ -34,7 +36,9 @@
*/
public class EsStateFactory implements StateFactory {
private final EsConfig esConfig;
- private final EsTupleMapper tupleMapper;
+ private EsTupleMapper tupleMapper;
+ private ElasticsearchSearchRequest searchRequest;
+ private EsSearchResultOutput output;
/**
* EsStateFactory constructor
@@ -46,10 +50,29 @@ public EsStateFactory(EsConfig esConfig, EsTupleMapper tupleMapper) {
this.tupleMapper = checkNotNull(tupleMapper);
}
+ /**
+ *
+ * @param esConfig
+ * @param searchRequest
+ * @param output
+ */
+ public EsStateFactory(EsConfig esConfig, ElasticsearchSearchRequest searchRequest,
+ EsSearchResultOutput output) {
+ this.esConfig = checkNotNull(esConfig);
+ this.searchRequest = checkNotNull(searchRequest);
+ this.output = checkNotNull(output);
+ }
+
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
- EsState esState = new EsState(esConfig, tupleMapper);
- esState.prepare();
- return esState;
+ if (tupleMapper != null) {
+ EsState esState = new EsState(esConfig, tupleMapper);
+ esState.prepare();
+ return esState;
+ } else {
+ EsState esState = new EsState(esConfig, searchRequest, output);
+ esState.prepare();
+ return esState;
+ }
}
}
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsSearchBoltIntegrationTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsSearchBoltIntegrationTest.java
new file mode 100644
index 0000000000..54e813f547
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsSearchBoltIntegrationTest.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import org.apache.storm.elasticsearch.ElasticsearchSearchRequest;
+import org.apache.storm.elasticsearch.EsSearchResultOutput;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.testing.IntegrationTest;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.verify;
+
+@Category(IntegrationTest.class)
+@RunWith(MockitoJUnitRunner.class)
+public class EsSearchBoltIntegrationTest extends AbstractEsBoltIntegrationTest{
+
+ private final String documentId = UUID.randomUUID().toString();
+ private final String indexName = "index";
+ private final String typeName = "type";
+ private final String source = "{\"user\":\"user1\"}";
+
+ private ElasticsearchSearchRequest searchRequest = new TestESSearchRequest();
+ private EsSearchResultOutput output = new TestESSearchResultOutput();
+
+ @Captor
+ private ArgumentCaptor anchor;
+
+ @Captor
+ private ArgumentCaptor emmitedValues;
+
+ @Mock
+ private Tuple tuple;
+
+ @Override
+ protected EsSearchBolt createBolt(EsConfig esConfig) {
+ return new EsSearchBolt(esConfig, searchRequest, output);
+ }
+
+ @Before
+ public void populateIndexWithTestData() throws Exception {
+ node.client().prepareIndex(indexName, typeName, documentId).setSource(source).execute().actionGet();
+ }
+
+ @After
+ public void clearIndex() throws Exception {
+ node.client().delete(new DeleteRequest(indexName, typeName, documentId)).actionGet();
+ }
+
+ @Test
+ public void anchorsTheTuple() throws Exception {
+ bolt.execute(tuple);
+
+ verify(outputCollector).emit(anchor.capture(), emmitedValues.capture());
+ assertThat(anchor.getValue(), is(tuple));
+ }
+
+ @Test
+ public void emitsExpectedValues() throws Exception {
+ Values expectedValues = expectedValues();
+
+ bolt.execute(tuple);
+
+ verify(outputCollector).emit(anchor.capture(), emmitedValues.capture());
+ assertThat(emmitedValues.getValue(), is(expectedValues));
+ }
+
+ @Test
+ public void acksTuple() throws Exception {
+ bolt.execute(tuple);
+
+ verify(outputCollector).ack(anchor.capture());
+ assertThat(anchor.getValue(), is(tuple));
+ }
+
+ @Override
+ protected Class getBoltClass() {
+ return EsSearchBolt.class;
+ }
+
+ private Values expectedValues() {
+ return new Values(source);
+ }
+
+ private class TestESSearchRequest implements ElasticsearchSearchRequest {
+
+ @Override
+ public SearchRequest extractFrom(ITuple tuple) {
+ return node.client().prepareSearch(indexName)
+ .setTypes(typeName)
+ .request();
+ }
+ }
+
+ private class TestESSearchResultOutput implements EsSearchResultOutput {
+
+ @Override
+ public Collection toValues(SearchResponse response) {
+ return Collections.singleton(expectedValues());
+ }
+
+ @Override
+ public Fields fields() {
+ return new Fields("data");
+ }
+ }
+}
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsSearchBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsSearchBoltTest.java
new file mode 100644
index 0000000000..630bee8f4d
--- /dev/null
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsSearchBoltTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.bolt;
+
+import org.apache.storm.elasticsearch.ElasticsearchSearchRequest;
+import org.apache.storm.elasticsearch.EsSearchResultOutput;
+import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.util.Collections;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class EsSearchBoltTest extends AbstractEsBoltTest {
+
+ @Mock
+ private EsConfig esConfig;
+
+ @Mock
+ private Tuple tuple;
+
+ @Mock
+ private Client client;
+
+ @Mock
+ private ElasticsearchSearchRequest searchRequest;
+
+ @Mock
+ private SearchRequest request;
+
+ @Mock
+ private EsSearchResultOutput output;
+
+ private Client originalClient;
+
+
+ @Override
+ protected EsSearchBolt createBolt(EsConfig esConfig) {
+ originalClient = EsLookupBolt.getClient();
+ EsLookupBolt.replaceClient(this.client);
+ return new EsSearchBolt(esConfig, searchRequest, output);
+ }
+
+ @Before
+ public void configureBoltDependencies() throws Exception {
+ when(searchRequest.extractFrom(tuple)).thenReturn(request);
+ when(output.toValues(any(SearchResponse.class))).thenReturn(Collections.singleton(new Values("")));
+ }
+
+ @After
+ public void replaceClientWithOriginal() throws Exception {
+ EsLookupBolt.replaceClient(originalClient);
+ }
+
+
+ @Test
+ public void failsTupleWhenClientThrows() throws Exception {
+ when(client.search(request)).thenThrow(ElasticsearchException.class);
+ bolt.execute(tuple);
+
+ verify(outputCollector).fail(tuple);
+ }
+
+ @Test
+ public void reportsExceptionWhenClientThrows() throws Exception {
+ ElasticsearchException elasticsearchException = new ElasticsearchException("dummy");
+ when(client.search(request)).thenThrow(elasticsearchException);
+ bolt.execute(tuple);
+
+ verify(outputCollector).reportError(elasticsearchException);
+ }
+
+ @Override
+ protected Class getBoltClass() {
+ return EsSearchBolt.class;
+ }
+}