Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions external/storm-elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,24 @@ EsLookupResultOutput output = createOutput();
EsLookupBolt lookupBolt = new EsLookupBolt(esConfig, getRequestAdapter, output);
```

## EsSearchBolt (org.apache.storm.elasticsearch.bolt.EsSearchBolt)

EsSearchBolt performs a search request to Elasticsearch.
In order to do that, three dependencies need to be satisfied. Apart from usual EsConfig, two other dependencies must be provided:
ElasticsearchSearchRequest is used to convert the incoming Tuple to the SearchRequest that will be executed against Elasticsearch.
EsSearchResultOutput is used to declare the output fields and convert the SearchResponse to values that are emited by the bolt.

Incoming tuple is passed to provided SearchRequest creator and the result of that execution is passed to Elasticsearch client.
The bolt then uses the provider output adapter (EsSearchResultOutput) to convert the SearchResponse to Values to emit.
The output fields are also specified by the user of the bolt via the output adapter (EsSearchResultOutput).

```java
EsConfig esConfig = createEsConfig();
ElasticsearchSearchRequest searchRequestAdapter = createElasticsearchSearchRequest();
EsSearchResultOutput output = createOutput();
EsLookupBolt lookupBolt = new EsLookupBolt(esConfig, getRequestAdapter, output);
```

## EsConfig (org.apache.storm.elasticsearch.common.EsConfig)

Provided components (Bolt, State) takes in EsConfig as a constructor arg.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.elasticsearch;

import org.apache.storm.tuple.ITuple;
import org.elasticsearch.action.search.SearchRequest;

import java.io.Serializable;

/**
* The adapter to convert the incoming tuple to Elasticsearch SearchRequest.
*/
public interface ElasticsearchSearchRequest extends Serializable {

/**
* @return SearchRequest to perform against Elasticsearch.
*/
SearchRequest extractFrom(ITuple tuple);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.elasticsearch;

import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.elasticsearch.action.search.SearchResponse;

import java.io.Serializable;
import java.util.Collection;

/**
* The adapter to convert the search results fetched from Elasticsearch to values.
*/
public interface EsSearchResultOutput extends Serializable {

/**
* @return collection of values to emit.
*/
Collection<Values> toValues(SearchResponse response);

/**
* @return output fields to declare.
*/
Fields fields();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.elasticsearch.bolt;

import org.apache.storm.elasticsearch.ElasticsearchSearchRequest;
import org.apache.storm.elasticsearch.EsSearchResultOutput;
import org.apache.storm.elasticsearch.common.EsConfig;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;

import java.util.Collection;

import static org.elasticsearch.common.base.Preconditions.checkNotNull;

public class EsSearchBolt extends AbstractEsBolt{

private final ElasticsearchSearchRequest searchRequest;
private final EsSearchResultOutput output;

/**
* @throws NullPointerException if any of the parameters is null
*/
public EsSearchBolt(EsConfig esConfig, ElasticsearchSearchRequest searchRequest,
EsSearchResultOutput output) {
super(esConfig);
checkNotNull(searchRequest);
checkNotNull(output);
this.searchRequest = searchRequest;
this.output = output;
}

@Override
public void execute(Tuple tuple) {
try {
SearchRequest request = searchRequest.extractFrom(tuple);
SearchResponse response = client.search(request).actionGet();
Collection<Values> values = output.toValues(response);
tryEmitAndAck(values, tuple);
} catch (Exception e) {
collector.reportError(e);
collector.fail(tuple);
}
}

private void tryEmitAndAck(Collection<Values> values, Tuple tuple) {
for (Values value : values) {
collector.emit(tuple, value);
}
collector.ack(tuple);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(output.fields());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.elasticsearch.trident;

import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.state.BaseQueryFunction;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;

import java.util.Collection;
import java.util.List;

public class EsSearch extends BaseQueryFunction<EsState, Collection<Values>> {

@Override
public List<Collection<Values>> batchRetrieve(EsState state, List<TridentTuple> tridentTuples) {
return state.batchRetrieve(tridentTuples);
}

@Override
public void execute(TridentTuple tuple, Collection<Values> valuesList, TridentCollector collector) {
for (Values values : valuesList) {
collector.emit(values);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,31 @@
*/
package org.apache.storm.elasticsearch.trident;

import org.apache.storm.elasticsearch.ElasticsearchSearchRequest;
import org.apache.storm.elasticsearch.EsSearchResultOutput;
import org.apache.storm.topology.FailedException;

import org.apache.storm.elasticsearch.common.StormElasticSearchClient;
import org.apache.storm.elasticsearch.common.EsConfig;
import org.apache.storm.elasticsearch.common.EsTupleMapper;
import org.apache.storm.tuple.Values;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.storm.trident.state.State;
import org.apache.storm.trident.tuple.TridentTuple;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import static org.elasticsearch.common.base.Preconditions.checkNotNull;

/**
* Trident State for storing tuple to ES document.
* @since 0.11
Expand All @@ -41,6 +51,8 @@ class EsState implements State {
private static Client client;
private EsConfig esConfig;
private EsTupleMapper tupleMapper;
private ElasticsearchSearchRequest searchRequest;
private EsSearchResultOutput output;

/**
* EsState constructor
Expand All @@ -52,6 +64,19 @@ public EsState(EsConfig esConfig, EsTupleMapper tupleMapper) {
this.tupleMapper = tupleMapper;
}

/**
* EsState constructor
* @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
* @param searchRequest Tuple to Es SearchRequest mapper {@link ElasticsearchSearchRequest}
* @param output Es search response to Value mapper {@link EsSearchResultOutput}
*/
public EsState(EsConfig esConfig, ElasticsearchSearchRequest searchRequest,
EsSearchResultOutput output) {
this.esConfig = esConfig;
this.searchRequest = searchRequest;
this.output = output;
}

/**
* @param txid
*
Expand Down Expand Up @@ -95,6 +120,7 @@ public void prepare() {
* Each tuple should have relevant fields (source, index, type, id) for EsState's tupleMapper to extract ES document.
*/
public void updateState(List<TridentTuple> tuples) {
checkNotNull(tupleMapper);
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (TridentTuple tuple : tuples) {
String source = tupleMapper.getSource(tuple);
Expand All @@ -110,4 +136,28 @@ public void updateState(List<TridentTuple> tuples) {
throw new FailedException();
}
}

/**
* Search current state from ElasticSearch.
* @param tuples list of tuples for searching ES.
* @return list of collection retrieve by es query.
*/
public List<Collection<Values>> batchRetrieve(List<TridentTuple> tuples) {
checkNotNull(searchRequest);
checkNotNull(output);
List<Collection<Values>> batchRetrieveResult = new ArrayList<>();
try {
for (TridentTuple tuple : tuples){
SearchRequest request = searchRequest.extractFrom(tuple);
SearchResponse response = client.search(request).actionGet();
Collection<Values> values = output.toValues(response);
batchRetrieveResult.add(values);

}
}catch (Exception e){
LOG.warn("Batch retrieve operation is failed.");
throw new FailedException(e);
}
return batchRetrieveResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.storm.elasticsearch.trident;

import org.apache.storm.elasticsearch.ElasticsearchSearchRequest;
import org.apache.storm.elasticsearch.EsSearchResultOutput;
import org.apache.storm.task.IMetricsContext;
import org.apache.storm.elasticsearch.common.EsConfig;
import org.apache.storm.elasticsearch.common.EsTupleMapper;
Expand All @@ -34,7 +36,9 @@
*/
public class EsStateFactory implements StateFactory {
private final EsConfig esConfig;
private final EsTupleMapper tupleMapper;
private EsTupleMapper tupleMapper;
private ElasticsearchSearchRequest searchRequest;
private EsSearchResultOutput output;

/**
* EsStateFactory constructor
Expand All @@ -46,10 +50,29 @@ public EsStateFactory(EsConfig esConfig, EsTupleMapper tupleMapper) {
this.tupleMapper = checkNotNull(tupleMapper);
}

/**
*
* @param esConfig
* @param searchRequest
* @param output
*/
public EsStateFactory(EsConfig esConfig, ElasticsearchSearchRequest searchRequest,
EsSearchResultOutput output) {
this.esConfig = checkNotNull(esConfig);
this.searchRequest = checkNotNull(searchRequest);
this.output = checkNotNull(output);
}

@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
EsState esState = new EsState(esConfig, tupleMapper);
esState.prepare();
return esState;
if (tupleMapper != null) {
EsState esState = new EsState(esConfig, tupleMapper);
esState.prepare();
return esState;
} else {
EsState esState = new EsState(esConfig, searchRequest, output);
esState.prepare();
return esState;
}
}
}
Loading