Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/main/java/com/uid2/operator/Const.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class Config extends com.uid2.shared.Const.Config {
public static final String GcpSecretVersionNameProp = "gcp_secret_version_name";
public static final String OptOutStatusApiEnabled = "optout_status_api_enabled";
public static final String OptOutStatusMaxRequestSize = "optout_status_max_request_size";
public static final String OptOutOldCountThresholdSecondsProp = "optout_old_count_threshold_seconds";
public static final String MaxInvalidPaths = "logging_limit_max_invalid_paths_per_interval";
public static final String MaxVersionBucketsPerSite = "logging_limit_max_version_buckets_per_site";

Expand Down
27 changes: 27 additions & 0 deletions src/main/java/com/uid2/operator/store/CloudSyncOptOutStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,18 @@ public static class OptOutStoreSnapshot {
.description("gauge for max entries can be cached in bloomfilter")
.register(Metrics.globalRegistry);

private static final Gauge GAUGE_OPTOUT_OLD_COUNT = Gauge
.builder("uid2_operator_optout_old_count", () -> OptOutStoreSnapshot.oldCount.get())
.description("gauge for number of unique optout entries older than threshold (configurable)")
.register(Metrics.globalRegistry);

// stores the timestamp of last updated delta or partition file
private static final AtomicReference<Instant> lastUpdatedTimestamp = new AtomicReference<>(Instant.EPOCH);
private static final AtomicLong bloomFilterSize = new AtomicLong(0);
private static final AtomicLong bloomFilterMax = new AtomicLong(0);
private static final AtomicLong totalEntries = new AtomicLong(0);
private static final AtomicInteger adIdCount = new AtomicInteger(0);
private static final AtomicLong oldCount = new AtomicLong(0);
private static final BiFunction<Long, Long, Long> OPT_OUT_TIMESTAMP_MERGE_STRATEGY = Long::min;

private final DownloadCloudStorage fsLocal;
Expand Down Expand Up @@ -418,6 +424,8 @@ public static class OptOutStoreSnapshot {

private final Clock clock;

private final long optOutOldCountThresholdSeconds;

public OptOutStoreSnapshot(DownloadCloudStorage fsLocal, JsonObject jsonConfig, Clock clock) {
this.clock = clock;
this.fsLocal = fsLocal;
Expand All @@ -436,6 +444,7 @@ public OptOutStoreSnapshot(DownloadCloudStorage fsLocal, JsonObject jsonConfig,

this.adIdToOptOutTimestamp = Collections.emptyMap();
this.optoutStatusApiEnabled = jsonConfig.getBoolean(Const.Config.OptOutStatusApiEnabled, true);
this.optOutOldCountThresholdSeconds = jsonConfig.getLong(Const.Config.OptOutOldCountThresholdSecondsProp, 3600L); // Default 1 hour

// initially 1 partition
this.partitions = new OptOutPartition[1];
Expand All @@ -453,6 +462,7 @@ public OptOutStoreSnapshot(OptOutStoreSnapshot last, BloomFilter bf, OptOutHeap
this.fsLocal = last.fsLocal;
this.fileUtils = last.fileUtils;
this.iteration = last.iteration + 1;
this.optOutOldCountThresholdSeconds = last.optOutOldCountThresholdSeconds;

this.bloomFilter = bf;
this.heap = heap;
Expand Down Expand Up @@ -481,6 +491,7 @@ public OptOutStoreSnapshot(OptOutStoreSnapshot last, BloomFilter bf, OptOutHeap
// update total entries
totalEntries.set(size());
adIdCount.set(this.adIdToOptOutTimestamp.size());
oldCount.set(countOldRecords(this.optOutOldCountThresholdSeconds));
}

public long size() {
Expand All @@ -490,6 +501,22 @@ public long size() {
.sum();
}

public long countOldRecords(long thresholdSeconds) {
long currentTimeSeconds = clock.instant().getEpochSecond();
long cutoffTime = currentTimeSeconds - thresholdSeconds;

Set<String> uniqueHashes = new HashSet<>();
for (OptOutPartition partition : this.partitions) {
if (partition == null) continue;
partition.forEach(entry -> {
if (entry.timestamp < cutoffTime) {
uniqueHashes.add(entry.idHashToB64());
}
});
}
return uniqueHashes.size();
}

// method provided for OptOutService to assess health
public boolean isHealthy(Instant now) {
// index is healthy if it is updated within 3 * logRotationInterval
Expand Down
Loading