99
1010import app .coronawarn .server .common .persistence .domain .DiagnosisKey ;
1111import app .coronawarn .server .common .persistence .domain .FederationBatchInfo ;
12+ import app .coronawarn .server .common .persistence .domain .FederationBatchSourceSystem ;
1213import app .coronawarn .server .common .persistence .domain .FederationBatchStatus ;
1314import app .coronawarn .server .common .persistence .service .DiagnosisKeyService ;
1415import app .coronawarn .server .common .persistence .service .FederationBatchInfoService ;
2425import java .util .HashSet ;
2526import java .util .LinkedList ;
2627import java .util .List ;
28+ import java .util .Map ;
29+ import java .util .Map .Entry ;
2730import java .util .Optional ;
2831import java .util .Set ;
2932import java .util .concurrent .atomic .AtomicBoolean ;
3033import java .util .concurrent .atomic .AtomicReference ;
34+ import java .util .stream .Collectors ;
35+ import java .util .stream .Stream ;
3136import org .slf4j .Logger ;
3237import org .slf4j .LoggerFactory ;
3338import org .springframework .stereotype .Component ;
3944public class FederationBatchProcessor {
4045
4146 private static final Logger logger = LoggerFactory .getLogger (FederationBatchProcessor .class );
47+ private static final String CH = "CH" ;
4248 private final FederationBatchInfoService batchInfoService ;
4349 private final DiagnosisKeyService diagnosisKeyService ;
4450 private final FederationGatewayDownloadService federationGatewayDownloadService ;
@@ -101,14 +107,14 @@ public void prepareDownload() throws FatalFederationGatewayException {
101107 */
102108 protected void saveFirstBatchInfoForDate (LocalDate date ) throws FatalFederationGatewayException {
103109 try {
104- logger .info ("Triggering download of first batch for date {}. " , date );
110+ logger .info ("Triggering download of first batch for date {}" , date );
105111 BatchDownloadResponse response = federationGatewayDownloadService .downloadBatch (date );
106112 batchInfoService .save (new FederationBatchInfo (response .getBatchTag (), date , this .config .getSourceSystem ()));
107113 } catch (FatalFederationGatewayException e ) {
108114 throw e ;
109115 } catch (Exception e ) {
110116 logger .error (
111- "Triggering download of first batch for date {} failed. " , date , e );
117+ "Triggering download of first batch for date {} failed" , date , e );
112118 }
113119 }
114120
@@ -118,7 +124,7 @@ protected void saveFirstBatchInfoForDate(LocalDate date) throws FatalFederationG
118124 */
119125 public void processErrorFederationBatches () {
120126 List <FederationBatchInfo > federationBatchInfoWithError = batchInfoService .findByStatus (ERROR );
121- logger .info ("{} error federation batches for reprocessing found. " , federationBatchInfoWithError .size ());
127+ logger .info ("{} error federation batches for reprocessing found" , federationBatchInfoWithError .size ());
122128 federationBatchInfoWithError .forEach (this ::retryProcessingBatch );
123129 }
124130
@@ -129,7 +135,8 @@ private void retryProcessingBatch(FederationBatchInfo federationBatchInfo) {
129135 batchInfoService .save (new FederationBatchInfo (nextBatchTag , federationBatchInfo .getDate (), this .config
130136 .getSourceSystem ())));
131137 } catch (Exception e ) {
132- logger .error ("Failed to save next federation batch info for processing. Will not try again." , e );
138+ logger .error ("Failed to save next " + federationBatchInfo .getSourceSystem ()
139+ + " batch info for processing. Will not try again" , e );
133140 batchInfoService .updateStatus (federationBatchInfo , ERROR_WONT_RETRY );
134141 }
135142 }
@@ -142,7 +149,7 @@ private void retryProcessingBatch(FederationBatchInfo federationBatchInfo) {
142149 */
143150 public void processUnprocessedFederationBatches () throws FatalFederationGatewayException {
144151 Deque <FederationBatchInfo > unprocessedBatches = new LinkedList <>(batchInfoService .findByStatus (UNPROCESSED ));
145- logger .info ("{} unprocessed federation batches found. " , unprocessedBatches .size ());
152+ logger .info ("{} unprocessed {} batches found" , unprocessedBatches .size (), config . getSourceSystem ());
146153
147154 while (!unprocessedBatches .isEmpty ()) {
148155 FederationBatchInfo currentBatchInfo = unprocessedBatches .remove ();
@@ -155,6 +162,7 @@ public void processUnprocessedFederationBatches() throws FatalFederationGatewayE
155162 }
156163 });
157164 }
165+ logger .info ("Processed {} total {} batches" , seenBatches .size (), config .getSourceSystem ());
158166 }
159167
160168 private boolean isEfgsEnforceDateBasedDownloadAndNotSeen (String batchTag ) {
@@ -165,49 +173,75 @@ private Optional<String> processBatchAndReturnNextBatchId(
165173 FederationBatchInfo batchInfo , FederationBatchStatus errorStatus ) throws FatalFederationGatewayException {
166174 LocalDate date = batchInfo .getDate ();
167175 String batchTag = batchInfo .getBatchTag ();
168- logger .info ("Processing batch for date {} and batchTag {}." , date , batchTag );
176+ logger .info ("Processing {} batch for date {} and batchTag {}" , batchInfo . getSourceSystem () , date , batchTag );
169177 AtomicReference <Optional <String >> nextBatchTag = new AtomicReference <>(Optional .empty ());
170178 try {
171179 BatchDownloadResponse response = federationGatewayDownloadService .downloadBatch (batchTag , date );
172180 AtomicBoolean batchContainsInvalidKeys = new AtomicBoolean (false );
173181 nextBatchTag .set (response .getNextBatchTag ());
174182 response .getDiagnosisKeyBatch ().ifPresentOrElse (batch -> {
175- logger .info ("Downloaded {} keys for date {} and batchTag {}." , batch .getKeysCount (), date , batchTag );
183+ logger .info ("Downloaded {} {} keys for date {} and batchTag {}" , batchInfo .getSourceSystem (),
184+ batch .getKeysCount (), date , batchTag );
185+ Map <String , Integer > countedKeysByOriginCountry = batch
186+ .getKeysList ().stream ().collect (Collectors .groupingBy (
187+ app .coronawarn .server .common .protocols .external .exposurenotification .DiagnosisKey ::getOrigin ))
188+ .entrySet ().stream ()
189+ .collect (Collectors .toMap (Entry ::getKey ,
190+ e -> e .getValue ().size ()));
191+ if (config .getSourceSystem () == FederationBatchSourceSystem .EFGS ) {
192+ countedKeysByOriginCountry .forEach ((key , value ) -> logger .info ("Downloaded {} {} keys with origin country {}" ,
193+ batchInfo .getSourceSystem (), key , value ));
194+ }
195+ if (isChgs ()) {
196+ countedKeysByOriginCountry .entrySet ().stream ().filter (k -> !CH .equalsIgnoreCase (k .getKey ()))
197+ .forEach (k -> logger
198+ .warn ("There are keys {} with origin country {} which is different to CH and therefore they will be "
199+ + "dropped" , k .getValue (), k .getKey ()));
200+ }
201+
176202 if (config .isBatchAuditEnabled ()) {
177203 federationGatewayDownloadService .auditBatch (batchTag , date );
178204 }
179205 List <DiagnosisKey > validDiagnosisKeys = extractValidDiagnosisKeysFromBatch (batch );
180206 int numOfInvalidKeys = batch .getKeysCount () - validDiagnosisKeys .size ();
181207 if (numOfInvalidKeys > 0 ) {
182208 batchContainsInvalidKeys .set (true );
183- logger .info ("{} keys failed validation and were skipped." , numOfInvalidKeys );
209+ logger .info ("{} {} keys failed validation and were skipped" , batchInfo . getSourceSystem () , numOfInvalidKeys );
184210 }
185211 int insertedKeys = diagnosisKeyService .saveDiagnosisKeys (validDiagnosisKeys );
186- logger .info ("Successfully inserted {} keys for date {} and batchTag {}." , insertedKeys , date , batchTag );
187- }, () -> logger .info ("Batch for date {} and batchTag {} did not contain any keys." , date , batchTag ));
212+ logger .info ("Successfully inserted {} {} keys for date {} and batchTag {}" , batchInfo .getSourceSystem (),
213+ insertedKeys , date );
214+ }, () -> logger .info ("{} batch for date {} and batchTag {} did not contain any keys" ,
215+ batchInfo .getSourceSystem (), date , batchTag ));
188216 batchInfoService .updateStatus (batchInfo , batchContainsInvalidKeys .get () ? PROCESSED_WITH_ERROR : PROCESSED );
189217 return nextBatchTag .get ();
190218 } catch (FatalFederationGatewayException e ) {
191219 throw e ;
192220 } catch (Exception e ) {
193- logger .error (
194- "Federation batch processing for date " + date + " and batchTag " + batchTag + " failed. Status set to "
195- + errorStatus .name () + "." , e );
221+ logger .error (batchInfo .getSourceSystem () + " batch processing for date " + date + " and batchTag " + batchTag
222+ + " failed. Status set to " + errorStatus .name (), e );
196223 batchInfoService .updateStatus (batchInfo , errorStatus );
197224 return nextBatchTag .get ();
198225 }
199226 }
200227
201228 private List <DiagnosisKey > extractValidDiagnosisKeysFromBatch (DiagnosisKeyBatch diagnosisKeyBatch ) {
202- return diagnosisKeyBatch .getKeysList ()
203- .stream ()
204- .filter (validFederationKeyFilter ::isValid )
229+ Stream <app .coronawarn .server .common .protocols .external .exposurenotification .DiagnosisKey > partialKeys
230+ = diagnosisKeyBatch .getKeysList ().stream ().filter (validFederationKeyFilter ::isValid );
231+ if (isChgs ()) {
232+ partialKeys = partialKeys .filter (key -> CH .equalsIgnoreCase (key .getOrigin ()));
233+ }
234+ return partialKeys
205235 .map (this ::convertFederationDiagnosisKeyToDiagnosisKey )
206236 .filter (Optional ::isPresent )
207237 .map (Optional ::get )
208238 .collect (toList ());
209239 }
210240
241+ private boolean isChgs () {
242+ return config .getSourceSystem () == FederationBatchSourceSystem .CHGS ;
243+ }
244+
211245 private Optional <DiagnosisKey > convertFederationDiagnosisKeyToDiagnosisKey (
212246 app .coronawarn .server .common .protocols .external .exposurenotification .DiagnosisKey diagnosisKey ) {
213247 try {
0 commit comments