2121import io .redlink .more .data .model .DataPoint ;
2222import io .redlink .more .data .model .RoutingInfo ;
2323import io .redlink .more .data .util .ElasticUtils ;
24- import java .util .ArrayList ;
2524import org .apache .commons .lang3 .Range ;
2625import org .apache .commons .lang3 .StringUtils ;
2726import org .slf4j .Logger ;
@@ -51,52 +50,23 @@ private String getElasticIndexName(RoutingInfo routingInfo) {
5150 public List <String > storeDataPoints (final List <DataPoint > dataBulk , final RoutingInfo routingInfo ) throws IOException {
5251 final String indexName = getElasticIndexName (routingInfo );
5352 final String uidPrefix = generateUidPrefix (routingInfo );
54- Boolean is_exploded = false ;
55- List <String > exploded_returnId = new ArrayList <>() ;
53+
5654 try {
5755 final BulkRequest .Builder br = new BulkRequest .Builder ()
5856 .index (indexName );
5957
6058 for (DataPoint dataPoint : dataBulk ) {
6159 final var uid = uidPrefix + dataPoint .datapointId ();
62- //TODO create transformer so its cleaner mapping solution bulk request operations fails
63- if (dataPoint .data ().keySet ().stream ().anyMatch (key -> key .toLowerCase ().contains ("polar360" ))){
64- is_exploded = true ;
65- final List <ElasticDataPoint > elasticItems = ElasticDataPoint .explode_toElastic (dataPoint , routingInfo );
66- if (elasticItems .isEmpty ()) {
67- LOG .warn ("polar360 data point {} produced no exploded items, skipping" , dataPoint .datapointId ());
68- continue ;
69- }
70- exploded_returnId .add (dataPoint .datapointId ());
71- int counter = 0 ;
72- for (ElasticDataPoint e : elasticItems ) {
73- final String explodedId = uid + "-" + counter ++;
74- br .operations (op -> op
75- .index (idx -> idx
76- .index (indexName )
77- .id (explodedId ) // <-- Maybe should be uid + "-" + something??
78- .document (e )
79- )
80- ); // <--- MISSING SEMICOLON FIXED
81- }
82- }
83- else {
84-
85-
8660 final ElasticDataPoint elasticDoc = ElasticDataPoint .toElastic (dataPoint , routingInfo );
8761 br .operations (op -> op
8862 .index (idx -> idx
8963 .index (indexName )
9064 .id (uid )
9165 .document (elasticDoc )
9266 )
93- ); }
67+ );
9468 }
9569
96- if (exploded_returnId .isEmpty () && is_exploded ) {
97- LOG .warn ("All polar360 data points produced no exploded items, nothing to store" );
98- return List .of ();
99- }
10070 LOG .debug ("Sending {} data-points to {}" , dataBulk .size (), indexName );
10171 final BulkResponse result = client .bulk (br .build ());
10272
@@ -109,17 +79,13 @@ public List<String> storeDataPoints(final List<DataPoint> dataBulk, final Routin
10979 }
11080 }
11181 }
112- if (is_exploded ) {
113- return exploded_returnId ;
114- }
115- else {
82+
11683 return result .items ().stream ()
11784 .filter (i -> i .error () == null )
11885 .map (BulkResponseItem ::id )
11986 .filter (StringUtils ::isNotBlank )
12087 .map (i -> i .substring (uidPrefix .length ()))
12188 .toList ();
122- }
12389 } catch (IOException | ElasticsearchException e ) {
12490 LOG .warn ("Error when sending data bulk to elastic index. Error message: {}" , e .toString ());
12591 throw e ;
0 commit comments