1010import org .apache .logging .log4j .Logger ;
1111import org .opensearch .OpenSearchException ;
1212import org .opensearch .action .DocWriteRequest ;
13- import org .opensearch .action .admin .indices .alias .Alias ;
1413import org .opensearch .action .admin .indices .create .CreateIndexRequest ;
1514import org .opensearch .action .admin .indices .create .CreateIndexResponse ;
16- import org .opensearch .action .admin .indices .rollover .RolloverRequest ;
17- import org .opensearch .action .admin .indices .rollover .RolloverResponse ;
1815import org .opensearch .action .bulk .BulkRequest ;
1916import org .opensearch .action .bulk .BulkResponse ;
2017import org .opensearch .action .index .IndexRequest ;
2118import org .opensearch .action .support .GroupedActionListener ;
2219import org .opensearch .action .support .WriteRequest ;
2320import org .opensearch .client .Client ;
24- import org .opensearch .cluster .ClusterState ;
2521import org .opensearch .cluster .service .ClusterService ;
2622import org .opensearch .common .settings .Settings ;
2723import org .opensearch .common .util .io .Streams ;
2824import org .opensearch .common .xcontent .XContentFactory ;
2925import org .opensearch .core .action .ActionListener ;
3026import org .opensearch .core .xcontent .ToXContent ;
3127import org .opensearch .securityanalytics .commons .model .IOC ;
28+ import org .opensearch .securityanalytics .commons .model .IOCType ;
3229import org .opensearch .securityanalytics .commons .model .UpdateAction ;
3330import org .opensearch .securityanalytics .commons .store .FeedStore ;
3431import org .opensearch .securityanalytics .model .STIX2IOC ;
3532import org .opensearch .securityanalytics .settings .SecurityAnalyticsSettings ;
3633import org .opensearch .securityanalytics .threatIntel .common .StashedThreadContext ;
3734import org .opensearch .securityanalytics .threatIntel .model .DefaultIocStoreConfig ;
3835import org .opensearch .securityanalytics .threatIntel .model .SATIFSourceConfig ;
39- import org .opensearch .securityanalytics .util .IndexUtils ;
4036
4137import java .io .ByteArrayOutputStream ;
4238import java .io .IOException ;
@@ -56,7 +52,7 @@ public class STIX2IOCFeedStore implements FeedStore {
5652 public static final String IOC_ALL_INDEX_PATTERN = IOC_INDEX_NAME_BASE + "-*" ;
5753 public static final String IOC_FEED_ID_PLACEHOLDER = "FEED_ID" ;
5854 public static final String IOC_INDEX_NAME_TEMPLATE = IOC_INDEX_NAME_BASE + "-" + IOC_FEED_ID_PLACEHOLDER ;
59- public static final String IOC_WRITE_INDEX_ALIAS = IOC_INDEX_NAME_TEMPLATE ;
55+ public static final String IOC_ALL_INDEX_PATTERN_BY_ID = IOC_INDEX_NAME_TEMPLATE + "-*" ;
6056 public static final String IOC_TIME_PLACEHOLDER = "TIME" ;
6157 public static final String IOC_INDEX_PATTERN = IOC_INDEX_NAME_TEMPLATE + "-" + IOC_TIME_PLACEHOLDER ;
6258
@@ -117,80 +113,36 @@ public void storeIOCs(Map<IOC, UpdateAction> actionToIOCs) {
117113 }
118114
119115 public void indexIocs (List <STIX2IOC > iocs ) throws IOException {
120- String iocAlias = getIocIndexAlias (saTifSourceConfig .getId ());
121- String iocPattern = getIocIndexRolloverPattern (saTifSourceConfig .getId ());
116+ String newActiveIndex = getNewActiveIndex (saTifSourceConfig .getId ());
117+ String iocIndexPattern = getAllIocIndexPatternById (saTifSourceConfig .getId ());
122118
123- if (iocIndexExists (iocAlias ) == false ) {
124- initFeedIndex (iocAlias , iocPattern , ActionListener .wrap (
125- r -> {
126- saTifSourceConfig .getIocTypes ().forEach (type -> {
127- String writeIndex = IndexUtils .getWriteIndex (iocAlias , clusterService .state ());
128- String lowerCaseType = type .toLowerCase (Locale .ROOT );
129- ((DefaultIocStoreConfig ) saTifSourceConfig .getIocStoreConfig ()).getIocMapStore ().putIfAbsent (lowerCaseType , new ArrayList <>());
130- ((DefaultIocStoreConfig ) saTifSourceConfig .getIocStoreConfig ()).getIocMapStore ().get (lowerCaseType ).add (iocAlias );
131- ((DefaultIocStoreConfig ) saTifSourceConfig .getIocStoreConfig ()).getIocMapStore ().get (lowerCaseType ).add (writeIndex );
132- });
133- bulkIndexIocs (iocs , iocAlias );
134- }, e -> {
135- log .error ("Failed to initialize the IOC index and save the IOCs" , e );
136- baseListener .onFailure (e );
137- }
138- ));
139- } else {
140- rolloverIndex (iocAlias , iocPattern , ActionListener .wrap (
141- r -> {
142- saTifSourceConfig .getIocTypes ().forEach (type -> {
143- String writeIndex = IndexUtils .getWriteIndex (iocAlias , clusterService .state ());
144- String lowerCaseType = type .toLowerCase (Locale .ROOT );
145- ((DefaultIocStoreConfig ) saTifSourceConfig .getIocStoreConfig ()).getIocMapStore ().get (lowerCaseType ).add (writeIndex );
146- });
147- bulkIndexIocs (iocs , iocAlias );
148- }, e -> {
149- log .error ("Failed to rollover the IOC index and save the IOCs" , e );
150- baseListener .onFailure (e );
151- }
152- ));
153- }
154- }
155-
156- private void rolloverIndex (
157- String alias ,
158- String pattern ,
159- ActionListener <RolloverResponse > listener
160- ) {
161- if (clusterService .state ().metadata ().hasAlias (alias ) == false ) {
162- listener .onFailure (new OpenSearchException ("Alias not initialized" ));
163- return ;
164- }
165-
166- RolloverRequest request = new RolloverRequest (alias , pattern );
167- request .getCreateIndexRequest ()
168- .mapping (iocIndexMapping ())
169- .settings (Settings .builder ().put ("index.hidden" , true ).build ());
170- client .admin ().indices ().rolloverIndex (
171- request ,
172- ActionListener .wrap (
173- rolloverResponse -> {
174- if (false == rolloverResponse .isRolledOver ()) {
175- log .info (alias + "not rolled over. Rollover condition status: " + rolloverResponse .getConditionStatus ());
176- listener .onFailure (new OpenSearchException (alias + "not rolled over. Rollover condition status: " + rolloverResponse .getConditionStatus ()));
177- } else {
178- listener .onResponse (rolloverResponse );
179- }
180- }, e -> {
181- log .error ("rollover failed for alias [" + alias + "]." );
182- listener .onFailure (e );
119+ initFeedIndex (newActiveIndex , ActionListener .wrap (
120+ r -> {
121+ saTifSourceConfig .getIocTypes ().forEach (type -> {
122+ IOCType iocType = IOCType .fromString (type );
123+ if (saTifSourceConfig .getIocStoreConfig () instanceof DefaultIocStoreConfig ) {
124+ List <DefaultIocStoreConfig .IocToIndexDetails > listOfIocToIndexDetails =
125+ ((DefaultIocStoreConfig ) saTifSourceConfig .getIocStoreConfig ()).getIocToIndexDetails ();
126+ listOfIocToIndexDetails .removeIf (iocToIndexDetails -> iocToIndexDetails .getIocType () == iocType );
127+ DefaultIocStoreConfig .IocToIndexDetails iocToIndexDetails =
128+ new DefaultIocStoreConfig .IocToIndexDetails (iocType , iocIndexPattern , newActiveIndex );
129+ listOfIocToIndexDetails .add (iocToIndexDetails );
183130 }
184- )
185- );
131+ });
132+ bulkIndexIocs (iocs , newActiveIndex );
133+ }, e -> {
134+ log .error ("Failed to initialize the IOC index and save the IOCs" , e );
135+ baseListener .onFailure (e );
136+ }
137+ ));
186138 }
187139
188- private void bulkIndexIocs (List <STIX2IOC > iocs , String iocAlias ) throws IOException {
140+ private void bulkIndexIocs (List <STIX2IOC > iocs , String activeIndex ) throws IOException {
189141 List <BulkRequest > bulkRequestList = new ArrayList <>();
190142 BulkRequest bulkRequest = new BulkRequest ();
191143
192144 for (STIX2IOC ioc : iocs ) {
193- IndexRequest indexRequest = new IndexRequest (iocAlias )
145+ IndexRequest indexRequest = new IndexRequest (activeIndex )
194146 .id (StringUtils .isBlank (ioc .getId ())? UUID .randomUUID ().toString () : ioc .getId ())
195147 .opType (DocWriteRequest .OpType .INDEX )
196148 .source (ioc .toXContent (XContentFactory .jsonBuilder (), ToXContent .EMPTY_PARAMS ));
@@ -235,27 +187,20 @@ private void bulkIndexIocs(List<STIX2IOC> iocs, String iocAlias) throws IOExcept
235187 }
236188 }
237189
238- public boolean iocIndexExists (String alias ) {
239- ClusterState clusterState = clusterService .state ();
240- return clusterState .metadata ().hasAlias (alias );
190+ public static String getAllIocIndexPatternById (String sourceConfigId ) {
191+ return IOC_ALL_INDEX_PATTERN_BY_ID .replace (IOC_FEED_ID_PLACEHOLDER , sourceConfigId .toLowerCase (Locale .ROOT ));
241192 }
242193
243- public static String getIocIndexAlias (String feedSourceConfigId ) {
244- return IOC_WRITE_INDEX_ALIAS .replace (IOC_FEED_ID_PLACEHOLDER , feedSourceConfigId .toLowerCase (Locale .ROOT ));
245- }
246-
247- public static String getIocIndexRolloverPattern (String feedSourceConfigId ) {
194+ public static String getNewActiveIndex (String sourceConfigId ) {
248195 return IOC_INDEX_PATTERN
249- .replace (IOC_FEED_ID_PLACEHOLDER , feedSourceConfigId .toLowerCase (Locale .ROOT ))
196+ .replace (IOC_FEED_ID_PLACEHOLDER , sourceConfigId .toLowerCase (Locale .ROOT ))
250197 .replace (IOC_TIME_PLACEHOLDER , Long .toString (Instant .now ().toEpochMilli ()));
251198 }
252199
253-
254- public void initFeedIndex (String feedAliasName , String feedIndexName , ActionListener <CreateIndexResponse > listener ) {
200+ public void initFeedIndex (String feedIndexName , ActionListener <CreateIndexResponse > listener ) {
255201 var indexRequest = new CreateIndexRequest (feedIndexName )
256202 .mapping (iocIndexMapping ())
257203 .settings (Settings .builder ().put ("index.hidden" , true ).build ());
258- indexRequest .alias (new Alias (feedAliasName )); // set the alias
259204 client .admin ().indices ().create (indexRequest , ActionListener .wrap (
260205 r -> {
261206 log .info ("Created system index {}" , feedIndexName );
0 commit comments