77
88import com .amazonaws .AmazonServiceException ;
99import com .amazonaws .SdkClientException ;
10+ import org .apache .commons .csv .CSVParser ;
11+ import org .apache .commons .csv .CSVRecord ;
1012import org .apache .logging .log4j .LogManager ;
1113import org .apache .logging .log4j .Logger ;
14+ import org .opensearch .action .bulk .BulkRequest ;
1215import org .opensearch .client .Client ;
1316import org .opensearch .cluster .service .ClusterService ;
1417import org .opensearch .core .action .ActionListener ;
2932import org .opensearch .securityanalytics .commons .connector .model .S3ConnectorConfig ;
3033import org .opensearch .securityanalytics .commons .model .FeedConfiguration ;
3134import org .opensearch .securityanalytics .commons .model .IOCSchema ;
35+ import org .opensearch .securityanalytics .commons .model .IOCType ;
3236import org .opensearch .securityanalytics .commons .model .STIX2 ;
3337import org .opensearch .securityanalytics .commons .model .UpdateType ;
3438import org .opensearch .securityanalytics .model .STIX2IOC ;
3539import org .opensearch .securityanalytics .model .STIX2IOCDto ;
3640import org .opensearch .securityanalytics .settings .SecurityAnalyticsSettings ;
3741import org .opensearch .securityanalytics .threatIntel .model .S3Source ;
3842import org .opensearch .securityanalytics .threatIntel .model .SATIFSourceConfig ;
43+ import org .opensearch .securityanalytics .threatIntel .model .UrlDownloadSource ;
3944import org .opensearch .securityanalytics .threatIntel .service .TIFJobParameterService ;
45+ import org .opensearch .securityanalytics .threatIntel .util .ThreatIntelFeedParser ;
4046import org .opensearch .securityanalytics .util .SecurityAnalyticsException ;
4147import software .amazon .awssdk .core .exception .SdkException ;
4248import software .amazon .awssdk .services .s3 .model .HeadObjectResponse ;
4955import java .io .InputStream ;
5056import java .io .InputStreamReader ;
5157import java .nio .charset .StandardCharsets ;
58+ import java .time .Instant ;
5259import java .util .ArrayList ;
60+ import java .util .Collections ;
61+ import java .util .Iterator ;
5362import java .util .List ;
63+ import java .util .UUID ;
5464import java .util .stream .Collectors ;
5565
66+ import static org .opensearch .securityanalytics .threatIntel .service .ThreatIntelFeedDataService .isValidIp ;
67+
5668/**
5769 * IOC Service implements operations that interact with retrieving IOCs from data sources,
5870 * parsing them into threat intel data models (i.e., [IOC]), and ingesting them to system indexes.
@@ -84,14 +96,14 @@ public STIX2IOCFetchService(Client client, ClusterService clusterService) {
8496
8597 /**
8698 * Method takes in and calls method to rollover and bulk index a list of STIX2IOCs
99+ *
87100 * @param saTifSourceConfig
88101 * @param stix2IOCList
89102 * @param listener
90103 */
91104 public void onlyIndexIocs (SATIFSourceConfig saTifSourceConfig ,
92105 List <STIX2IOC > stix2IOCList ,
93- ActionListener <STIX2IOCFetchResponse > listener )
94- {
106+ ActionListener <STIX2IOCFetchResponse > listener ) {
95107 STIX2IOCFeedStore feedStore = new STIX2IOCFeedStore (client , clusterService , saTifSourceConfig , listener );
96108 try {
97109 feedStore .indexIocs (stix2IOCList );
@@ -100,6 +112,7 @@ public void onlyIndexIocs(SATIFSourceConfig saTifSourceConfig,
100112 listener .onFailure (e );
101113 }
102114 }
115+
103116 public void downloadAndIndexIOCs (SATIFSourceConfig saTifSourceConfig , ActionListener <STIX2IOCFetchResponse > listener ) {
104117 S3ConnectorConfig s3ConnectorConfig = constructS3ConnectorConfig (saTifSourceConfig );
105118 Connector <STIX2 > s3Connector = constructS3Connector (s3ConnectorConfig );
@@ -144,7 +157,7 @@ private void testS3ClientConnection(S3ConnectorConfig s3ConnectorConfig, ActionL
144157 } catch (StsException stsException ) {
145158 log .warn ("S3Client connection test failed with StsException: " , stsException );
146159 listener .onResponse (new TestS3ConnectionResponse (RestStatus .fromCode (stsException .statusCode ()), stsException .awsErrorDetails ().errorMessage ()));
147- } catch (SdkException sdkException ) {
160+ } catch (SdkException sdkException ) {
148161 // SdkException is a RunTimeException that doesn't have a status code.
149162 // Logging the full exception, and providing generic response as output.
150163 log .warn ("S3Client connection test failed with SdkException: " , sdkException );
@@ -227,6 +240,77 @@ private String getEndpoint() {
227240 return "" ;
228241 }
229242
243+ public void downloadFromUrlAndIndexIOCs (SATIFSourceConfig saTifSourceConfig , ActionListener <STIX2IOCFetchResponse > listener ) {
244+ UrlDownloadSource source = (UrlDownloadSource ) saTifSourceConfig .getSource ();
245+ switch (source .getFeedFormat ()) { // todo add check to stop user from creating url type config from rest api. only internal allowed
246+ case "csv" :
247+ try (CSVParser reader = ThreatIntelFeedParser .getThreatIntelFeedReaderCSV (source .getUrl ())) {
248+ CSVParser noHeaderReader = ThreatIntelFeedParser .getThreatIntelFeedReaderCSV (source .getUrl ());
249+ boolean notFound = true ;
250+
251+ while (notFound ) {
252+ CSVRecord hasHeaderRecord = reader .iterator ().next ();
253+
254+ //if we want to skip this line and keep iterating
255+ if ((hasHeaderRecord .values ().length == 1 && "" .equals (hasHeaderRecord .values ()[0 ])) || hasHeaderRecord .get (0 ).charAt (0 ) == '#' || hasHeaderRecord .get (0 ).charAt (0 ) == ' ' ) {
256+ noHeaderReader .iterator ().next ();
257+ } else { // we found the first line that contains information
258+ notFound = false ;
259+ }
260+ }
261+ if (source .hasCsvHeader ()) {
262+ parseAndSaveThreatIntelFeedDataCSV (reader .iterator (), saTifSourceConfig , listener );
263+ } else {
264+ parseAndSaveThreatIntelFeedDataCSV (noHeaderReader .iterator (), saTifSourceConfig , listener );
265+ }
266+ } catch (Exception e ) {
267+ log .error ("Failed to download the IoCs in CSV format for source " + saTifSourceConfig .getId ());
268+ listener .onFailure (e );
269+ return ;
270+ }
271+ break ;
272+ default :
273+ log .error ("unsupported feed format for url download:" + source .getFeedFormat ());
274+ listener .onFailure (new UnsupportedOperationException ("unsupported feed format for url download:" + source .getFeedFormat ()));
275+ }
276+ }
277+
278+ private void parseAndSaveThreatIntelFeedDataCSV (Iterator <CSVRecord > iterator , SATIFSourceConfig saTifSourceConfig , ActionListener <STIX2IOCFetchResponse > listener ) throws IOException {
279+ List <BulkRequest > bulkRequestList = new ArrayList <>();
280+
281+ UrlDownloadSource source = (UrlDownloadSource ) saTifSourceConfig .getSource ();
282+ List <STIX2IOC > iocs = new ArrayList <>();
283+ while (iterator .hasNext ()) {
284+ CSVRecord record = iterator .next ();
285+ String iocType = saTifSourceConfig .getIocTypes ().stream ().findFirst ().orElse (null );
286+ Integer colNum = source .getCsvIocValueColumnNo ();
287+ String iocValue = record .values ()[colNum ].split (" " )[0 ];
288+ if (iocType .equalsIgnoreCase (IOCType .ipv6_addr .toString ()) && !isValidIp (iocValue )) {
289+ log .info ("Invalid IP address, skipping this ioc record: {}" , iocValue );
290+ continue ;
291+ }
292+ Instant now = Instant .now ();
293+ STIX2IOC stix2IOC = new STIX2IOC (
294+ UUID .randomUUID ().toString (),
295+ UUID .randomUUID ().toString (),
296+ iocType == null ? IOCType .ipv4_addr : IOCType .valueOf (iocType ),
297+ iocValue ,
298+ "high" ,
299+ now ,
300+ now ,
301+ "" ,
302+ Collections .emptyList (),
303+ "" ,
304+ saTifSourceConfig .getId (),
305+ saTifSourceConfig .getName (),
306+ STIX2IOC .NO_VERSION
307+ );
308+ iocs .add (stix2IOC );
309+ }
310+ STIX2IOCFeedStore feedStore = new STIX2IOCFeedStore (client , clusterService , saTifSourceConfig , listener );
311+ feedStore .indexIocs (iocs );
312+ }
313+
230314 public static class STIX2IOCFetchResponse extends ActionResponse implements ToXContentObject {
231315 public static String IOCS_FIELD = "iocs" ;
232316 public static String TOTAL_FIELD = "total" ;
0 commit comments