diff --git a/docs/api-reference/data-management-api.md b/docs/api-reference/data-management-api.md index 4adeaa8b2083..754bf62f725a 100644 --- a/docs/api-reference/data-management-api.md +++ b/docs/api-reference/data-management-api.md @@ -206,7 +206,8 @@ Marks the state of a group of segments as unused, using an array of segment IDs Pass the array of segment IDs or interval as a JSON object in the request body. For the interval, specify the start and end times as ISO 8601 strings to identify segments inclusive of the start time and exclusive of the end time. -Druid only updates the segments completely contained within the specified interval; partially overlapping segments are not affected. +Optionally, specify an array of segment versions with interval. Druid updates only the segments completely contained +within the specified interval that match the optional list of versions; partially overlapping segments are not affected. #### URL @@ -214,12 +215,13 @@ Druid only updates the segments completely contained within the specified interv #### Request body -The group of segments is sent as a JSON request payload that accepts one of the following properties: +The group of segments is sent as a JSON request payload that accepts the following properties: -|Property|Description|Example| -|----------|-------------|---------| -|`interval`|ISO 8601 segments interval.|`"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"`| -|`segmentIds`|Array of segment IDs.|`["segmentId1", "segmentId2"]`| +|Property|Description|Required|Example| +|----------|-------------|---------|---------| +|`interval`|ISO 8601 segments interval.|Yes, if `segmentIds` is not specified.|`"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"`| +|`segmentIds`|List of segment IDs.|Yes, if `interval` is not specified.|`["segmentId1", "segmentId2"]`| +|`versions`|List of segment versions. Must be provided with `interval`.|No.|`["2024-03-14T16:00:04.086Z", ""2024-03-12T16:00:04.086Z"]`| #### Responses @@ -306,7 +308,8 @@ Marks the state of a group of segments as used, using an array of segment IDs or Pass the array of segment IDs or interval as a JSON object in the request body. For the interval, specify the start and end times as ISO 8601 strings to identify segments inclusive of the start time and exclusive of the end time. -Druid only updates the segments completely contained within the specified interval; partially overlapping segments are not affected. +Optionally, specify an array of segment versions with interval. Druid updates only the segments completely contained +within the specified interval that match the optional list of versions; partially overlapping segments are not affected. #### URL @@ -314,12 +317,13 @@ Druid only updates the segments completely contained within the specified interv #### Request body -The group of segments is sent as a JSON request payload that accepts one of the following properties: +The group of segments is sent as a JSON request payload that accepts the following properties: -|Property|Description|Example| -|----------|-------------|---------| -|`interval`| ISO 8601 segments interval.|`"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"`| -|`segmentIds`|Array of segment IDs.|`["segmentId1", "segmentId2"]`| +|Property|Description|Required|Example| +|----------|-------------|---------|---------| +|`interval`|ISO 8601 segments interval.|Yes, if `segmentIds` is not specified.|`"2015-09-12T03:00:00.000Z/2015-09-12T05:00:00.000Z"`| +|`segmentIds`|List of segment IDs.|Yes, if `interval` is not specified.|`["segmentId1", "segmentId2"]`| +|`versions`|List of segment versions. Must be provided with `interval`.|No.|`["2024-03-14T16:00:04.086Z", ""2024-03-12T16:00:04.086Z"]`| #### Responses diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentsAsUnusedAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentsAsUnusedAction.java index ddf57afbc185..93cb75280fac 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentsAsUnusedAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentsAsUnusedAction.java @@ -63,9 +63,8 @@ public TypeReference getReturnTypeReference() @Override public Integer perform(Task task, TaskActionToolbox toolbox) { - int numMarked = toolbox.getIndexerMetadataStorageCoordinator() - .markSegmentsAsUnusedWithinInterval(dataSource, interval); - return numMarked; + return toolbox.getIndexerMetadataStorageCoordinator() + .markSegmentsAsUnusedWithinInterval(dataSource, interval); } @Override diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java index 1e36cc825a01..382673bed4b8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java @@ -387,7 +387,8 @@ public void testKillBatchSizeOneAndLimit4() throws Exception segments.size(), getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( DATA_SOURCE, - Intervals.of("2018-01-01/2020-01-01") + Intervals.of("2018-01-01/2020-01-01"), + null ) ); @@ -434,7 +435,8 @@ public void testKillMultipleUnusedSegmentsWithNullMaxUsedStatusLastUpdatedTime() 1, getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( DATA_SOURCE, - segment1.getInterval() + segment1.getInterval(), + null ) ); @@ -442,7 +444,8 @@ public void testKillMultipleUnusedSegmentsWithNullMaxUsedStatusLastUpdatedTime() 1, getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( DATA_SOURCE, - segment4.getInterval() + segment4.getInterval(), + null ) ); @@ -450,7 +453,8 @@ public void testKillMultipleUnusedSegmentsWithNullMaxUsedStatusLastUpdatedTime() 1, getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( DATA_SOURCE, - segment3.getInterval() + segment3.getInterval(), + null ) ); @@ -508,7 +512,8 @@ public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedT 1, getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( DATA_SOURCE, - segment1.getInterval() + segment1.getInterval(), + null ) ); @@ -516,7 +521,8 @@ public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedT 1, getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( DATA_SOURCE, - segment4.getInterval() + segment4.getInterval(), + null ) ); @@ -529,7 +535,8 @@ public void testKillMultipleUnusedSegmentsWithDifferentMaxUsedStatusLastUpdatedT 1, getSegmentsMetadataManager().markAsUnusedSegmentsInInterval( DATA_SOURCE, - segment3.getInterval() + segment3.getInterval(), + null ) ); diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java index 540eba990f22..f0b9f06425d9 100644 --- a/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SegmentsMetadataManager.java @@ -53,7 +53,13 @@ public interface SegmentsMetadataManager */ int markAsUsedAllNonOvershadowedSegmentsInDataSource(String dataSource); - int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval); + /** + * Marks non-overshadowed unused segments for the given interval and optional list of versions + * as used. If versions are not specified, all versions of non-overshadowed unused segments in the interval + * will be marked as used. + * @return Number of segments updated + */ + int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval, @Nullable List versions); /** * Marks the given segment IDs as "used" only if there are not already overshadowed @@ -81,7 +87,13 @@ public interface SegmentsMetadataManager */ int markAsUnusedAllSegmentsInDataSource(String dataSource); - int markAsUnusedSegmentsInInterval(String dataSource, Interval interval); + /** + * Marks segments as unused that are fully contained in the given interval for an optional list of versions. + * If versions are not specified, all versions of segments in the interval will be marked as unused. + * Segments that are already marked as unused are not updated. + * @return The number of segments updated + */ + int markAsUnusedSegmentsInInterval(String dataSource, Interval interval, @Nullable List versions); int markSegmentsAsUnused(Set segmentIds); diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java index 4a268a2257da..66a60e072c02 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java @@ -654,21 +654,25 @@ public boolean markSegmentAsUsed(final String segmentId) @Override public int markAsUsedAllNonOvershadowedSegmentsInDataSource(final String dataSource) { - return doMarkAsUsedNonOvershadowedSegments(dataSource, null); + return doMarkAsUsedNonOvershadowedSegments(dataSource, null, null); } @Override - public int markAsUsedNonOvershadowedSegmentsInInterval(final String dataSource, final Interval interval) + public int markAsUsedNonOvershadowedSegmentsInInterval( + final String dataSource, + final Interval interval, + @Nullable final List versions + ) { Preconditions.checkNotNull(interval); - return doMarkAsUsedNonOvershadowedSegments(dataSource, interval); + return doMarkAsUsedNonOvershadowedSegments(dataSource, interval, versions); } - /** - * Implementation for both {@link #markAsUsedAllNonOvershadowedSegmentsInDataSource} (if the given interval is null) - * and {@link #markAsUsedNonOvershadowedSegmentsInInterval}. - */ - private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, @Nullable Interval interval) + private int doMarkAsUsedNonOvershadowedSegments( + final String dataSourceName, + final @Nullable Interval interval, + final @Nullable List versions + ) { final List unusedSegments = new ArrayList<>(); final SegmentTimeline timeline = new SegmentTimeline(); @@ -682,12 +686,12 @@ private int doMarkAsUsedNonOvershadowedSegments(String dataSourceName, @Nullable interval == null ? Intervals.ONLY_ETERNITY : Collections.singletonList(interval); try (final CloseableIterator iterator = - queryTool.retrieveUsedSegments(dataSourceName, intervals)) { + queryTool.retrieveUsedSegments(dataSourceName, intervals, versions)) { timeline.addSegments(iterator); } try (final CloseableIterator iterator = - queryTool.retrieveUnusedSegments(dataSourceName, intervals, null, null, null, null, null)) { + queryTool.retrieveUnusedSegments(dataSourceName, intervals, versions, null, null, null, null)) { while (iterator.hasNext()) { final DataSegment dataSegment = iterator.next(); timeline.addSegments(Iterators.singletonIterator(dataSegment)); @@ -796,7 +800,7 @@ private CloseableIterator retrieveUsedSegmentsOverlappingIntervals( private int markSegmentsAsUsed(final List segmentIds) { if (segmentIds.isEmpty()) { - log.info("No segments found to update!"); + log.info("No segments found to mark as used."); return 0; } @@ -856,13 +860,18 @@ public int markSegmentsAsUnused(Set segmentIds) } @Override - public int markAsUnusedSegmentsInInterval(String dataSourceName, Interval interval) + public int markAsUnusedSegmentsInInterval( + final String dataSource, + final Interval interval, + @Nullable final List versions + ) { + Preconditions.checkNotNull(interval); try { return connector.getDBI().withHandle( handle -> SqlSegmentsMetadataQuery.forHandle(handle, connector, dbTables.get(), jsonMapper) - .markSegmentsUnused(dataSourceName, interval) + .markSegmentsUnused(dataSource, interval, versions) ); } catch (Exception e) { diff --git a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java index 29465cd665ba..5308f9dd7fea 100644 --- a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java +++ b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java @@ -42,6 +42,8 @@ import org.skife.jdbi.v2.PreparedBatch; import org.skife.jdbi.v2.Query; import org.skife.jdbi.v2.ResultIterator; +import org.skife.jdbi.v2.SQLStatement; +import org.skife.jdbi.v2.Update; import javax.annotation.Nullable; import java.util.ArrayList; @@ -119,11 +121,24 @@ public CloseableIterator retrieveUsedSegments( final String dataSource, final Collection intervals ) + { + return retrieveUsedSegments(dataSource, intervals, null); + } + + /** + * Similar to {@link #retrieveUsedSegments}, but with an additional {@code versions} argument. When {@code versions} + * is specified, all used segments in the specified {@code intervals} and {@code versions} are retrieved. + */ + public CloseableIterator retrieveUsedSegments( + final String dataSource, + final Collection intervals, + final List versions + ) { return retrieveSegments( dataSource, intervals, - null, + versions, IntervalMode.OVERLAPS, true, null, @@ -134,7 +149,7 @@ public CloseableIterator retrieveUsedSegments( } /** - * Retrieves segments for a given datasource that are marked unused and that are *fully contained by* any interval + * Retrieves segments for a given datasource that are marked unused and that are fully contained by any interval * in a particular collection of intervals. If the collection of intervals is empty, this method will retrieve all * unused segments. *

@@ -184,7 +199,7 @@ public CloseableIterator retrieveUnusedSegments( /** * Similar to {@link #retrieveUnusedSegments}, but also retrieves associated metadata for the segments for a given - * datasource that are marked unused and that are *fully contained by* any interval in a particular collection of + * datasource that are marked unused and that are fully contained by any interval in a particular collection of * intervals. If the collection of intervals is empty, this method will retrieve all unused segments. * * This call does not return any information about realtime segments. @@ -312,45 +327,83 @@ public int markSegments(final Collection segmentIds, final boolean us } /** - * Marks all used segments that are *fully contained by* a particular interval as unused. + * Marks all used segments that are fully contained by a particular interval as unused. * - * @return the number of segments actually modified. + * @return Number of segments updated. */ public int markSegmentsUnused(final String dataSource, final Interval interval) + { + return markSegmentsUnused(dataSource, interval, null); + } + + /** + * Marks all used segments that are fully contained by a particular interval filtered by an optional list of versions + * as unused. + * + * @return Number of segments updated. + */ + public int markSegmentsUnused(final String dataSource, final Interval interval, @Nullable final List versions) { if (Intervals.isEternity(interval)) { - return handle - .createStatement( - StringUtils.format( - "UPDATE %s SET used=:used, used_status_last_updated = :used_status_last_updated " - + "WHERE dataSource = :dataSource AND used = true", - dbTables.getSegmentsTable() - ) + final StringBuilder sb = new StringBuilder(); + sb.append( + StringUtils.format( + "UPDATE %s SET used=:used, used_status_last_updated = :used_status_last_updated " + + "WHERE dataSource = :dataSource AND used = true", + dbTables.getSegmentsTable() ) + ); + + final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions); + + if (hasVersions) { + sb.append(getConditionForVersions(versions)); + } + + final Update stmt = handle + .createStatement(sb.toString()) .bind("dataSource", dataSource) .bind("used", false) - .bind("used_status_last_updated", DateTimes.nowUtc().toString()) - .execute(); + .bind("used_status_last_updated", DateTimes.nowUtc().toString()); + + if (hasVersions) { + bindVersionsToQuery(stmt, versions); + } + + return stmt.execute(); } else if (Intervals.canCompareEndpointsAsStrings(interval) && interval.getStart().getYear() == interval.getEnd().getYear()) { // Safe to write a WHERE clause with this interval. Note that it is unsafe if the years are different, because // that means extra characters can sneak in. (Consider a query interval like "2000-01-01/2001-01-01" and a // segment interval like "20001/20002".) - return handle - .createStatement( - StringUtils.format( - "UPDATE %s SET used=:used, used_status_last_updated = :used_status_last_updated " - + "WHERE dataSource = :dataSource AND used = true AND %s", - dbTables.getSegmentsTable(), - IntervalMode.CONTAINS.makeSqlCondition(connector.getQuoteString(), ":start", ":end") - ) + final StringBuilder sb = new StringBuilder(); + sb.append( + StringUtils.format( + "UPDATE %s SET used=:used, used_status_last_updated = :used_status_last_updated " + + "WHERE dataSource = :dataSource AND used = true AND %s", + dbTables.getSegmentsTable(), + IntervalMode.CONTAINS.makeSqlCondition(connector.getQuoteString(), ":start", ":end") ) + ); + + final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions); + + if (hasVersions) { + sb.append(getConditionForVersions(versions)); + } + + final Update stmt = handle + .createStatement(sb.toString()) .bind("dataSource", dataSource) .bind("used", false) .bind("start", interval.getStart().toString()) .bind("end", interval.getEnd().toString()) - .bind("used_status_last_updated", DateTimes.nowUtc().toString()) - .execute(); + .bind("used_status_last_updated", DateTimes.nowUtc().toString()); + + if (hasVersions) { + bindVersionsToQuery(stmt, versions); + } + return stmt.execute(); } else { // Retrieve, then drop, since we can't write a WHERE clause directly. final List segments = ImmutableList.copyOf( @@ -358,7 +411,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval) retrieveSegments( dataSource, Collections.singletonList(interval), - null, + versions, IntervalMode.CONTAINS, true, null, @@ -680,11 +733,10 @@ private Query> buildSegmentsTableQuery( appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode, connector); } - if (!CollectionUtils.isNullOrEmpty(versions)) { - final String versionsStr = versions.stream() - .map(version -> "'" + version + "'") - .collect(Collectors.joining(",")); - sb.append(StringUtils.format(" AND version IN (%s)", versionsStr)); + final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions); + + if (hasVersions) { + sb.append(getConditionForVersions(versions)); } // Add the used_status_last_updated time filter only for unused segments when maxUsedStatusLastUpdatedTime is non-null. @@ -734,6 +786,10 @@ private Query> buildSegmentsTableQuery( bindQueryIntervals(sql, intervals); } + if (hasVersions) { + bindVersionsToQuery(sql, versions); + } + return sql; } @@ -834,6 +890,36 @@ private static int computeNumChangedSegments(List segmentIds, int[] segm return numChangedSegments; } + private static String getConditionForVersions(final List versions) + { + if (CollectionUtils.isNullOrEmpty(versions)) { + return ""; + } + + final StringBuilder sb = new StringBuilder(); + + sb.append(" AND version IN ("); + for (int i = 0; i < versions.size(); i++) { + sb.append(StringUtils.format(":version%d", i)); + if (i != versions.size() - 1) { + sb.append(","); + } + } + sb.append(")"); + return sb.toString(); + } + + private static void bindVersionsToQuery(final SQLStatement query, final List versions) + { + if (CollectionUtils.isNullOrEmpty(versions)) { + return; + } + + for (int i = 0; i < versions.size(); i++) { + query.bind(StringUtils.format("version%d", i), versions.get(i)); + } + } + enum IntervalMode { CONTAINS { diff --git a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java index a633640f0e22..2f4334b36ac6 100644 --- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java +++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java @@ -206,22 +206,21 @@ public Response markAsUsedAllNonOvershadowedSegments(@PathParam("dataSourceName" @Produces(MediaType.APPLICATION_JSON) @ResourceFilters(DatasourceResourceFilter.class) public Response markAsUsedNonOvershadowedSegments( - @PathParam("dataSourceName") String dataSourceName, - MarkDataSourceSegmentsPayload payload + @PathParam("dataSourceName") final String dataSourceName, + final SegmentsToUpdateFilter payload ) { if (payload == null || !payload.isValid()) { - log.warn("Invalid request payload: [%s]", payload); return Response .status(Response.Status.BAD_REQUEST) - .entity("Invalid request payload, either interval or segmentIds array must be specified") + .entity(SegmentsToUpdateFilter.INVALID_PAYLOAD_ERROR_MESSAGE) .build(); } else { SegmentUpdateOperation operation = () -> { - final Interval interval = payload.getInterval(); + final List versions = payload.getVersions(); if (interval != null) { - return segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(dataSourceName, interval); + return segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(dataSourceName, interval, versions); } else { final Set segmentIds = payload.getSegmentIds(); if (segmentIds == null || segmentIds.isEmpty()) { @@ -254,22 +253,22 @@ public Response markAsUsedNonOvershadowedSegments( @Consumes(MediaType.APPLICATION_JSON) public Response markSegmentsAsUnused( @PathParam("dataSourceName") final String dataSourceName, - final MarkDataSourceSegmentsPayload payload, + final SegmentsToUpdateFilter payload, @Context final HttpServletRequest req ) { if (payload == null || !payload.isValid()) { - log.warn("Invalid request payload: [%s]", payload); return Response .status(Response.Status.BAD_REQUEST) - .entity("Invalid request payload, either interval or segmentIds array must be specified") + .entity(SegmentsToUpdateFilter.INVALID_PAYLOAD_ERROR_MESSAGE) .build(); } else { SegmentUpdateOperation operation = () -> { final Interval interval = payload.getInterval(); + final List versions = payload.getVersions(); final int numUpdatedSegments; if (interval != null) { - numUpdatedSegments = segmentsMetadataManager.markAsUnusedSegmentsInInterval(dataSourceName, interval); + numUpdatedSegments = segmentsMetadataManager.markAsUnusedSegmentsInInterval(dataSourceName, interval, versions); } else { final Set segmentIds = payload.getSegmentIds() @@ -302,7 +301,7 @@ public Response markSegmentsAsUnused( private static Response logAndCreateDataSourceNotFoundResponse(String dataSourceName) { - log.warn("datasource not found [%s]", dataSourceName); + log.warn("datasource[%s] not found", dataSourceName); return Response.noContent().build(); } @@ -319,7 +318,7 @@ private static Response performSegmentUpdate(String dataSourceName, SegmentUpdat .build(); } catch (Exception e) { - log.error(e, "Error occurred while updating segments for data source[%s]", dataSourceName); + log.error(e, "Error occurred while updating segments for datasource[%s]", dataSourceName); return Response .serverError() .entity(ImmutableMap.of("error", "Exception occurred.", "message", Throwables.getRootCause(e).toString())) @@ -567,9 +566,9 @@ private SegmentsLoadStatistics computeSegmentLoadStatistics(Iterable servedSegments return false; } + /** + * Either {@code interval} or {@code segmentIds} array must be specified, but not both. + * {@code versions} may be optionally specified only when {@code interval} is provided. + */ @VisibleForTesting - protected static class MarkDataSourceSegmentsPayload + static class SegmentsToUpdateFilter { private final Interval interval; private final Set segmentIds; + private final List versions; + + private static final String INVALID_PAYLOAD_ERROR_MESSAGE = "Invalid request payload. Specify either 'interval' or 'segmentIds', but not both." + + " Optionally, include 'versions' only when 'interval' is provided."; @JsonCreator - public MarkDataSourceSegmentsPayload( - @JsonProperty("interval") Interval interval, - @JsonProperty("segmentIds") Set segmentIds + public SegmentsToUpdateFilter( + @JsonProperty("interval") @Nullable Interval interval, + @JsonProperty("segmentIds") @Nullable Set segmentIds, + @JsonProperty("versions") @Nullable List versions ) { this.interval = interval; this.segmentIds = segmentIds; + this.versions = versions; } + @Nullable @JsonProperty public Interval getInterval() { return interval; } + @Nullable @JsonProperty public Set getSegmentIds() { return segmentIds; } - public boolean isValid() + @Nullable + @JsonProperty + public List getVersions() + { + return versions; + } + + private boolean isValid() { final boolean hasSegmentIds = !CollectionUtils.isNullOrEmpty(segmentIds); if (interval == null) { - return hasSegmentIds; + return hasSegmentIds && CollectionUtils.isNullOrEmpty(versions); } else { return !hasSegmentIds; } diff --git a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java index b177b40c5876..a7128ce27143 100644 --- a/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SqlSegmentsMetadataManagerTest.java @@ -41,6 +41,7 @@ import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.partition.NoneShardSpec; import org.hamcrest.MatcherAssert; +import org.joda.time.DateTime; import org.joda.time.Duration; import org.joda.time.Interval; import org.joda.time.Period; @@ -589,6 +590,156 @@ public void testMarkAsUsedNonOvershadowedSegments() throws Exception ); } + @Test + public void testMarkAsUsedNonOvershadowedSegmentsInEternityIntervalWithVersions() throws Exception + { + publishWikiSegments(); + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + sqlSegmentsMetadataManager.poll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + + final DataSegment koalaSegment1 = createSegment( + DS.KOALA, + "2017-10-15T00:00:00.000/2017-10-17T00:00:00.000", + "2017-10-15T20:19:12.565Z" + ); + + final DataSegment koalaSegment2 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + "2017-10-16T20:19:12.565Z" + ); + + // Overshadowed by koalaSegment2 + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + "2017-10-15T20:19:12.565Z" + ); + + publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3); + + sqlSegmentsMetadataManager.poll(); + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + Assert.assertEquals( + 2, + sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval( + DS.KOALA, + Intervals.ETERNITY, + ImmutableList.of("2017-10-15T20:19:12.565Z", "2017-10-16T20:19:12.565Z") + ) + ); + sqlSegmentsMetadataManager.poll(); + + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1, koalaSegment2), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + } + + @Test + public void testMarkAsUsedNonOvershadowedSegmentsInFiniteIntervalWithVersions() throws Exception + { + publishWikiSegments(); + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + sqlSegmentsMetadataManager.poll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + + final DataSegment koalaSegment1 = createSegment( + DS.KOALA, + "2017-10-15T00:00:00.000/2017-10-17T00:00:00.000", + "2017-10-15T20:19:12.565Z" + ); + + final DataSegment koalaSegment2 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + "2017-10-16T20:19:12.565Z" + ); + + // Overshadowed by koalaSegment2 + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + "2017-10-15T20:19:12.565Z" + ); + + publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3); + + sqlSegmentsMetadataManager.poll(); + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + Assert.assertEquals( + 2, + sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval( + DS.KOALA, + Intervals.of("2017-10-15/2017-10-18"), + ImmutableList.of("2017-10-15T20:19:12.565Z", "2017-10-16T20:19:12.565Z") + ) + ); + sqlSegmentsMetadataManager.poll(); + + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1, koalaSegment2), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + } + + @Test + public void testMarkAsUsedNonOvershadowedSegmentsWithNonExistentVersions() throws Exception + { + publishWikiSegments(); + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + sqlSegmentsMetadataManager.poll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + + final DataSegment koalaSegment1 = createSegment( + DS.KOALA, + "2017-10-15T00:00:00.000/2017-10-17T00:00:00.000", + "2017-10-15T20:19:12.565Z" + ); + + final DataSegment koalaSegment2 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + "2017-10-16T20:19:12.565Z" + ); + + // Overshadowed by koalaSegment2 + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + "2017-10-15T20:19:12.565Z" + ); + + publishUnusedSegments(koalaSegment1, koalaSegment2, koalaSegment3); + + sqlSegmentsMetadataManager.poll(); + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + Assert.assertEquals( + 0, + sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval( + DS.KOALA, + Intervals.ETERNITY, + ImmutableList.of("foo", "bar") + ) + ); + sqlSegmentsMetadataManager.poll(); + + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + } + @Test public void testMarkAsUsedNonOvershadowedSegmentsInvalidDataSource() throws Exception { @@ -683,7 +834,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsInInterval() throws IOException ); // 2 out of 3 segments match the interval - Assert.assertEquals(2, sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(DS.KOALA, theInterval)); + Assert.assertEquals(2, sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(DS.KOALA, theInterval, null)); sqlSegmentsMetadataManager.poll(); Assert.assertEquals( @@ -731,7 +882,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsInIntervalWithOverlappingInterv ); // 1 out of 3 segments match the interval, other 2 overlap, only the segment fully contained will be marked unused - Assert.assertEquals(1, sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(DS.KOALA, theInterval)); + Assert.assertEquals(1, sqlSegmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(DS.KOALA, theInterval, null)); sqlSegmentsMetadataManager.poll(); Assert.assertEquals( @@ -787,7 +938,7 @@ public void testMarkAsUnusedSegmentsInInterval() throws IOException final Interval theInterval = Intervals.of("2017-10-15T00:00:00.000/2017-10-18T00:00:00.000"); // 2 out of 3 segments match the interval - Assert.assertEquals(2, sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(DS.KOALA, theInterval)); + Assert.assertEquals(2, sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(DS.KOALA, theInterval, null)); sqlSegmentsMetadataManager.poll(); Assert.assertEquals( @@ -796,6 +947,104 @@ public void testMarkAsUnusedSegmentsInInterval() throws IOException ); } + @Test + public void testMarkAsUnusedSegmentsInIntervalAndVersions() throws IOException + { + publishWikiSegments(); + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + sqlSegmentsMetadataManager.poll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + + final DateTime now = DateTimes.nowUtc(); + final String v1 = now.toString(); + final String v2 = now.plus(Duration.standardDays(1)).toString(); + + final DataSegment koalaSegment1 = createSegment( + DS.KOALA, + "2017-10-15T00:00:00.000/2017-10-16T00:00:00.000", + v1 + ); + final DataSegment koalaSegment2 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + v2 + ); + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, + "2017-10-19T00:00:00.000/2017-10-20T00:00:00.000", + v2 + ); + + publisher.publishSegment(koalaSegment1); + publisher.publishSegment(koalaSegment2); + publisher.publishSegment(koalaSegment3); + final Interval theInterval = Intervals.of("2017-10-15/2017-10-18"); + + Assert.assertEquals( + 2, + sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval( + DS.KOALA, + theInterval, + ImmutableList.of(v1, v2) + ) + ); + + sqlSegmentsMetadataManager.poll(); + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment3), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + } + + @Test + public void testMarkAsUnusedSegmentsInIntervalAndNonExistentVersions() throws IOException + { + publishWikiSegments(); + sqlSegmentsMetadataManager.startPollingDatabasePeriodically(); + sqlSegmentsMetadataManager.poll(); + Assert.assertTrue(sqlSegmentsMetadataManager.isPollingDatabasePeriodically()); + + final DateTime now = DateTimes.nowUtc(); + final String v1 = now.toString(); + final String v2 = now.plus(Duration.standardDays(1)).toString(); + + final DataSegment koalaSegment1 = createSegment( + DS.KOALA, + "2017-10-15T00:00:00.000/2017-10-16T00:00:00.000", + v1 + ); + final DataSegment koalaSegment2 = createSegment( + DS.KOALA, + "2017-10-17T00:00:00.000/2017-10-18T00:00:00.000", + v2 + ); + final DataSegment koalaSegment3 = createSegment( + DS.KOALA, + "2017-10-19T00:00:00.000/2017-10-20T00:00:00.000", + v2 + ); + + publisher.publishSegment(koalaSegment1); + publisher.publishSegment(koalaSegment2); + publisher.publishSegment(koalaSegment3); + final Interval theInterval = Intervals.of("2017-10-15/2017-10-18"); + + Assert.assertEquals( + 0, + sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval( + DS.KOALA, + theInterval, + ImmutableList.of("foo", "bar", "baz") + ) + ); + + sqlSegmentsMetadataManager.poll(); + Assert.assertEquals( + ImmutableSet.of(wikiSegment1, wikiSegment2, koalaSegment1, koalaSegment2, koalaSegment3), + ImmutableSet.copyOf(sqlSegmentsMetadataManager.iterateAllUsedSegments()) + ); + } + @Test public void testMarkAsUnusedSegmentsInIntervalWithOverlappingInterval() throws IOException { @@ -822,7 +1071,7 @@ public void testMarkAsUnusedSegmentsInIntervalWithOverlappingInterval() throws I final Interval theInterval = Intervals.of("2017-10-16T00:00:00.000/2017-10-20T00:00:00.000"); // 1 out of 3 segments match the interval, other 2 overlap, only the segment fully contained will be marked unused - Assert.assertEquals(1, sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(DS.KOALA, theInterval)); + Assert.assertEquals(1, sqlSegmentsMetadataManager.markAsUnusedSegmentsInInterval(DS.KOALA, theInterval, null)); sqlSegmentsMetadataManager.poll(); Assert.assertEquals( diff --git a/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java b/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java index 1fa2ef2302c9..fc2330cf1f0e 100644 --- a/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java +++ b/server/src/test/java/org/apache/druid/metadata/TestDerbyConnector.java @@ -145,7 +145,7 @@ public SegmentsTable segments() } /** - * A wrapper class for queries on the segments table. + * A wrapper class for updating the segments table. */ public static class SegmentsTable { diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java index adf12ae70543..d255d0abc7d4 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestSegmentsMetadataManager.java @@ -87,7 +87,7 @@ public int markAsUsedAllNonOvershadowedSegmentsInDataSource(String dataSource) } @Override - public int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval) + public int markAsUsedNonOvershadowedSegmentsInInterval(String dataSource, Interval interval, @Nullable List versions) { return 0; } @@ -116,7 +116,7 @@ public int markAsUnusedAllSegmentsInDataSource(String dataSource) } @Override - public int markAsUnusedSegmentsInInterval(String dataSource, Interval interval) + public int markAsUnusedSegmentsInInterval(String dataSource, Interval interval, @Nullable List versions) { return 0; } diff --git a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java index bd673f078b29..f6bccbeb7da4 100644 --- a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java +++ b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java @@ -19,6 +19,8 @@ package org.apache.druid.server.http; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -36,6 +38,7 @@ import org.apache.druid.client.ImmutableSegmentLoadInfo; import org.apache.druid.client.SegmentLoadInfo; import org.apache.druid.error.DruidExceptionMatcher; +import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.metadata.MetadataRuleManager; import org.apache.druid.metadata.SegmentsMetadataManager; @@ -734,15 +737,56 @@ public void testMarkSegmentAsUsedNoChange() public void testMarkAsUsedNonOvershadowedSegmentsInterval() { Interval interval = Intervals.of("2010-01-22/P1D"); - int numUpdatedSegments = - segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(EasyMock.eq("datasource1"), EasyMock.eq(interval)); + int numUpdatedSegments = segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval( + EasyMock.eq("datasource1"), EasyMock.eq(interval), EasyMock.isNull() + ); + EasyMock.expect(numUpdatedSegments).andReturn(3).once(); + EasyMock.replay(segmentsMetadataManager, inventoryView, server); + + DataSourcesResource dataSourcesResource = createResource(); + Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( + "datasource1", + new DataSourcesResource.SegmentsToUpdateFilter(interval, null, null) + ); + Assert.assertEquals(200, response.getStatus()); + EasyMock.verify(segmentsMetadataManager, inventoryView, server); + } + + @Test + public void testMarkAsUsedNonOvershadowedSegmentsIntervalWithVersions() + { + Interval interval = Intervals.of("2010-01-22/P1D"); + + int numUpdatedSegments = segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval( + EasyMock.eq("datasource1"), EasyMock.eq(interval), EasyMock.eq(ImmutableList.of("v0")) + ); EasyMock.expect(numUpdatedSegments).andReturn(3).once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); DataSourcesResource dataSourcesResource = createResource(); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", - new DataSourcesResource.MarkDataSourceSegmentsPayload(interval, null) + new DataSourcesResource.SegmentsToUpdateFilter(interval, null, ImmutableList.of("v0")) + ); + Assert.assertEquals(200, response.getStatus()); + EasyMock.verify(segmentsMetadataManager, inventoryView, server); + } + + @Test + public void testMarkAsUsedNonOvershadowedSegmentsIntervalWithNonExistentVersion() + { + Interval interval = Intervals.of("2010-01-22/P1D"); + + int numUpdatedSegments = segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval( + EasyMock.eq("datasource1"), EasyMock.eq(interval), EasyMock.eq(ImmutableList.of("foo")) + ); + EasyMock.expect(numUpdatedSegments).andReturn(0).once(); + EasyMock.replay(segmentsMetadataManager, inventoryView, server); + + DataSourcesResource dataSourcesResource = createResource(); + Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( + "datasource1", + new DataSourcesResource.SegmentsToUpdateFilter(interval, null, ImmutableList.of("foo")) ); Assert.assertEquals(200, response.getStatus()); EasyMock.verify(segmentsMetadataManager, inventoryView, server); @@ -752,8 +796,9 @@ public void testMarkAsUsedNonOvershadowedSegmentsInterval() public void testMarkAsUsedNonOvershadowedSegmentsIntervalNoneUpdated() { Interval interval = Intervals.of("2010-01-22/P1D"); - int numUpdatedSegments = - segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(EasyMock.eq("datasource1"), EasyMock.eq(interval)); + int numUpdatedSegments = segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval( + EasyMock.eq("datasource1"), EasyMock.eq(interval), EasyMock.isNull() + ); EasyMock.expect(numUpdatedSegments).andReturn(0).once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); @@ -761,7 +806,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsIntervalNoneUpdated() Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", - new DataSourcesResource.MarkDataSourceSegmentsPayload(interval, null) + new DataSourcesResource.SegmentsToUpdateFilter(interval, null, null) ); Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity()); EasyMock.verify(segmentsMetadataManager, inventoryView, server); @@ -771,8 +816,9 @@ public void testMarkAsUsedNonOvershadowedSegmentsIntervalNoneUpdated() public void testMarkAsUsedNonOvershadowedSegmentsSet() { Set segmentIds = ImmutableSet.of(dataSegmentList.get(1).getId().toString()); - int numUpdatedSegments = - segmentsMetadataManager.markAsUsedNonOvershadowedSegments(EasyMock.eq("datasource1"), EasyMock.eq(segmentIds)); + int numUpdatedSegments = segmentsMetadataManager.markAsUsedNonOvershadowedSegments( + EasyMock.eq("datasource1"), EasyMock.eq(segmentIds) + ); EasyMock.expect(numUpdatedSegments).andReturn(3).once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); @@ -780,7 +826,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsSet() Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", - new DataSourcesResource.MarkDataSourceSegmentsPayload(null, segmentIds) + new DataSourcesResource.SegmentsToUpdateFilter(null, segmentIds, null) ); Assert.assertEquals(200, response.getStatus()); EasyMock.verify(segmentsMetadataManager, inventoryView, server); @@ -790,8 +836,9 @@ public void testMarkAsUsedNonOvershadowedSegmentsSet() public void testMarkAsUsedNonOvershadowedSegmentsIntervalException() { Interval interval = Intervals.of("2010-01-22/P1D"); - int numUpdatedSegments = - segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(EasyMock.eq("datasource1"), EasyMock.eq(interval)); + int numUpdatedSegments = segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval( + EasyMock.eq("datasource1"), EasyMock.eq(interval), EasyMock.isNull() + ); EasyMock.expect(numUpdatedSegments).andThrow(new RuntimeException("Error!")).once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); @@ -799,7 +846,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsIntervalException() Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", - new DataSourcesResource.MarkDataSourceSegmentsPayload(interval, null) + new DataSourcesResource.SegmentsToUpdateFilter(interval, null, null) ); Assert.assertEquals(500, response.getStatus()); EasyMock.verify(segmentsMetadataManager, inventoryView, server); @@ -809,8 +856,9 @@ public void testMarkAsUsedNonOvershadowedSegmentsIntervalException() public void testMarkAsUsedNonOvershadowedSegmentsNoDataSource() { Interval interval = Intervals.of("2010-01-22/P1D"); - int numUpdatedSegments = - segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval(EasyMock.eq("datasource1"), EasyMock.eq(interval)); + int numUpdatedSegments = segmentsMetadataManager.markAsUsedNonOvershadowedSegmentsInInterval( + EasyMock.eq("datasource1"), EasyMock.eq(interval), EasyMock.isNull() + ); EasyMock.expect(numUpdatedSegments).andReturn(0).once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); @@ -818,7 +866,7 @@ public void testMarkAsUsedNonOvershadowedSegmentsNoDataSource() Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", - new DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-22/P1D"), null) + new DataSourcesResource.SegmentsToUpdateFilter(Intervals.of("2010-01-22/P1D"), null, null) ); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity()); @@ -826,13 +874,13 @@ public void testMarkAsUsedNonOvershadowedSegmentsNoDataSource() } @Test - public void testMarkAsUsedNonOvershadowedSegmentsWithNullIntervalAndSegmentIds() + public void testMarkAsUsedNonOvershadowedSegmentsWithNullIntervalAndSegmentIdsAndVersions() { DataSourcesResource dataSourcesResource = createResource(); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", - new DataSourcesResource.MarkDataSourceSegmentsPayload(null, null) + new DataSourcesResource.SegmentsToUpdateFilter(null, null, null) ); Assert.assertEquals(400, response.getStatus()); } @@ -844,20 +892,22 @@ public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullIntervalAndEmptySegm Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", - new DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-22/P1D"), ImmutableSet.of()) + new DataSourcesResource.SegmentsToUpdateFilter( + Intervals.of("2010-01-22/P1D"), ImmutableSet.of(), null + ) ); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity()); } @Test - public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullIntervalAndNullSegmentIds() + public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullInterval() { DataSourcesResource dataSourcesResource = createResource(); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", - new DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-22/P1D"), null) + new DataSourcesResource.SegmentsToUpdateFilter(Intervals.of("2010-01-22/P1D"), null, null) ); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity()); @@ -870,32 +920,99 @@ public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullIntervalAndSegmentId Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", - new DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-22/P1D"), ImmutableSet.of("segment1")) + new DataSourcesResource.SegmentsToUpdateFilter(Intervals.of("2010-01-22/P1D"), ImmutableSet.of("segment1"), null) + ); + Assert.assertEquals(400, response.getStatus()); + } + + @Test + public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullIntervalAndSegmentIdsAndVersions() + { + DataSourcesResource dataSourcesResource = createResource(); + + Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( + "datasource1", + new DataSourcesResource.SegmentsToUpdateFilter( + Intervals.of("2020/2030"), ImmutableSet.of("seg1"), ImmutableList.of("v1", "v2") + ) + ); + Assert.assertEquals(400, response.getStatus()); + } + + @Test + public void testMarkAsUsedNonOvershadowedSegmentsWithEmptySegmentIds() + { + DataSourcesResource dataSourcesResource = createResource(); + + Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( + "datasource1", + new DataSourcesResource.SegmentsToUpdateFilter(null, ImmutableSet.of(), null) + ); + Assert.assertEquals(400, response.getStatus()); + } + + @Test + public void testMarkAsUsedNonOvershadowedSegmentsWithEmptyVersions() + { + DataSourcesResource dataSourcesResource = createResource(); + + Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( + "datasource1", + new DataSourcesResource.SegmentsToUpdateFilter(null, null, ImmutableList.of()) ); Assert.assertEquals(400, response.getStatus()); } @Test - public void testMarkAsUsedNonOvershadowedSegmentsWithNullIntervalAndEmptySegmentIds() + public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullVersions() { DataSourcesResource dataSourcesResource = createResource(); Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( "datasource1", - new DataSourcesResource.MarkDataSourceSegmentsPayload(null, ImmutableSet.of()) + new DataSourcesResource.SegmentsToUpdateFilter(null, null, ImmutableList.of("v1", "v2")) ); Assert.assertEquals(400, response.getStatus()); } @Test - public void testMarkAsUsedNonOvershadowedSegmentsNoPayload() + public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullSegmentIdsAndVersions() { DataSourcesResource dataSourcesResource = createResource(); - Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments("datasource1", null); + Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( + "datasource1", + new DataSourcesResource.SegmentsToUpdateFilter(null, ImmutableSet.of("segment1"), ImmutableList.of("v1", "v2")) + ); Assert.assertEquals(400, response.getStatus()); } + @Test + public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullIntervalAndVersions() + { + DataSourcesResource dataSourcesResource = createResource(); + + Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( + "datasource1", + new DataSourcesResource.SegmentsToUpdateFilter(Intervals.ETERNITY, null, ImmutableList.of("v1", "v2")) + ); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity()); + } + + @Test + public void testMarkAsUsedNonOvershadowedSegmentsWithNonNullIntervalAndEmptyVersions() + { + DataSourcesResource dataSourcesResource = createResource(); + + Response response = dataSourcesResource.markAsUsedNonOvershadowedSegments( + "datasource1", + new DataSourcesResource.SegmentsToUpdateFilter(Intervals.ETERNITY, null, ImmutableList.of()) + ); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity()); + } + @Test public void testSegmentLoadChecksForVersion() { @@ -1039,12 +1156,13 @@ public void testMarkSegmentsAsUnused() EasyMock.expect(segmentsMetadataManager.markSegmentsAsUnused(segmentIds)).andReturn(1).once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); - final DataSourcesResource.MarkDataSourceSegmentsPayload payload = - new DataSourcesResource.MarkDataSourceSegmentsPayload( + final DataSourcesResource.SegmentsToUpdateFilter payload = + new DataSourcesResource.SegmentsToUpdateFilter( null, segmentIds.stream() .map(SegmentId::toString) - .collect(Collectors.toSet()) + .collect(Collectors.toSet()), + null ); DataSourcesResource dataSourcesResource = createResource(); @@ -1068,12 +1186,13 @@ public void testMarkSegmentsAsUnusedNoChanges() EasyMock.expect(segmentsMetadataManager.markSegmentsAsUnused(segmentIds)).andReturn(0).once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); - final DataSourcesResource.MarkDataSourceSegmentsPayload payload = - new DataSourcesResource.MarkDataSourceSegmentsPayload( + final DataSourcesResource.SegmentsToUpdateFilter payload = + new DataSourcesResource.SegmentsToUpdateFilter( null, segmentIds.stream() .map(SegmentId::toString) - .collect(Collectors.toSet()) + .collect(Collectors.toSet()), + null ); DataSourcesResource dataSourcesResource = createResource(); @@ -1099,16 +1218,16 @@ public void testMarkSegmentsAsUnusedException() .once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); - final DataSourcesResource.MarkDataSourceSegmentsPayload payload = - new DataSourcesResource.MarkDataSourceSegmentsPayload( + final DataSourcesResource.SegmentsToUpdateFilter payload = + new DataSourcesResource.SegmentsToUpdateFilter( null, segmentIds.stream() .map(SegmentId::toString) - .collect(Collectors.toSet()) + .collect(Collectors.toSet()), + null ); - DataSourcesResource dataSourcesResource = - createResource(); + DataSourcesResource dataSourcesResource = createResource(); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request); Assert.assertEquals(500, response.getStatus()); Assert.assertNotNull(response.getEntity()); @@ -1120,11 +1239,11 @@ public void testMarkAsUnusedSegmentsInInterval() { final Interval theInterval = Intervals.of("2010-01-01/P1D"); - EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval)).andReturn(1).once(); + EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval, null)).andReturn(1).once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); - final DataSourcesResource.MarkDataSourceSegmentsPayload payload = - new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null); + final DataSourcesResource.SegmentsToUpdateFilter payload = + new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null, null); DataSourcesResource dataSourcesResource = createResource(); prepareRequestForAudit(); @@ -1140,11 +1259,11 @@ public void testMarkAsUnusedSegmentsInIntervalNoChanges() { final Interval theInterval = Intervals.of("2010-01-01/P1D"); - EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval)).andReturn(0).once(); + EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval, null)).andReturn(0).once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); - final DataSourcesResource.MarkDataSourceSegmentsPayload payload = - new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null); + final DataSourcesResource.SegmentsToUpdateFilter payload = + new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null, null); DataSourcesResource dataSourcesResource = createResource(); prepareRequestForAudit(); @@ -1159,16 +1278,15 @@ public void testMarkAsUnusedSegmentsInIntervalException() { final Interval theInterval = Intervals.of("2010-01-01/P1D"); - EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval)) + EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval, null)) .andThrow(new RuntimeException("Exception occurred")) .once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); - final DataSourcesResource.MarkDataSourceSegmentsPayload payload = - new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null); + final DataSourcesResource.SegmentsToUpdateFilter payload = + new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null, null); - DataSourcesResource dataSourcesResource = - createResource(); + DataSourcesResource dataSourcesResource = createResource(); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request); Assert.assertEquals(500, response.getStatus()); Assert.assertNotNull(response.getEntity()); @@ -1179,11 +1297,50 @@ public void testMarkAsUnusedSegmentsInIntervalException() public void testMarkAsUnusedSegmentsInIntervalNoDataSource() { final Interval theInterval = Intervals.of("2010-01-01/P1D"); - EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval)).andReturn(0).once(); + EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval, null)) + .andReturn(0).once(); + EasyMock.replay(segmentsMetadataManager, inventoryView, server); + + final DataSourcesResource.SegmentsToUpdateFilter payload = + new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null, null); + DataSourcesResource dataSourcesResource = createResource(); + prepareRequestForAudit(); + + Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("numChangedSegments", 0), response.getEntity()); + EasyMock.verify(segmentsMetadataManager); + } + + @Test + public void testMarkAsUnusedSegmentsInIntervalWithVersions() + { + final Interval theInterval = Intervals.of("2010-01-01/P1D"); + EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval, ImmutableList.of("v1"))) + .andReturn(2).once(); EasyMock.replay(segmentsMetadataManager, inventoryView, server); - final DataSourcesResource.MarkDataSourceSegmentsPayload payload = - new DataSourcesResource.MarkDataSourceSegmentsPayload(theInterval, null); + final DataSourcesResource.SegmentsToUpdateFilter payload = + new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null, ImmutableList.of("v1")); + DataSourcesResource dataSourcesResource = createResource(); + prepareRequestForAudit(); + + Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request); + Assert.assertEquals(200, response.getStatus()); + Assert.assertEquals(ImmutableMap.of("numChangedSegments", 2), response.getEntity()); + EasyMock.verify(segmentsMetadataManager); + } + + @Test + public void testMarkAsUnusedSegmentsInIntervalWithNonExistentVersion() + { + final Interval theInterval = Intervals.of("2010-01-01/P1D"); + EasyMock.expect(segmentsMetadataManager.markAsUnusedSegmentsInInterval("datasource1", theInterval, ImmutableList.of("foo"))) + .andReturn(0).once(); + EasyMock.replay(segmentsMetadataManager, inventoryView, server); + + final DataSourcesResource.SegmentsToUpdateFilter payload = + new DataSourcesResource.SegmentsToUpdateFilter(theInterval, null, ImmutableList.of("foo")); DataSourcesResource dataSourcesResource = createResource(); prepareRequestForAudit(); @@ -1193,6 +1350,21 @@ public void testMarkAsUnusedSegmentsInIntervalNoDataSource() EasyMock.verify(segmentsMetadataManager); } + @Test + public void testSegmentsToUpdateFilterSerde() throws JsonProcessingException + { + final ObjectMapper mapper = new DefaultObjectMapper(); + final String payload = "{\"interval\":\"2023-01-01T00:00:00.000Z/2024-01-01T00:00:00.000Z\",\"segmentIds\":null,\"versions\":[\"v1\"]}"; + + final DataSourcesResource.SegmentsToUpdateFilter obj = + mapper.readValue(payload, DataSourcesResource.SegmentsToUpdateFilter.class); + Assert.assertEquals(Intervals.of("2023/2024"), obj.getInterval()); + Assert.assertEquals(ImmutableList.of("v1"), obj.getVersions()); + Assert.assertNull(obj.getSegmentIds()); + + Assert.assertEquals(payload, mapper.writeValueAsString(obj)); + } + @Test public void testMarkSegmentsAsUnusedNullPayload() { @@ -1202,18 +1374,19 @@ public void testMarkSegmentsAsUnusedNullPayload() Assert.assertEquals(400, response.getStatus()); Assert.assertNotNull(response.getEntity()); Assert.assertEquals( - "Invalid request payload, either interval or segmentIds array must be specified", + "Invalid request payload. Specify either 'interval' or 'segmentIds', but not both." + + " Optionally, include 'versions' only when 'interval' is provided.", response.getEntity() ); } @Test - public void testMarkSegmentsAsUnusedWithNullIntervalAndSegmentIds() + public void testMarkSegmentsAsUnusedWithNullIntervalAndSegmentIdsAndVersions() { DataSourcesResource dataSourcesResource = createResource(); - final DataSourcesResource.MarkDataSourceSegmentsPayload payload = - new DataSourcesResource.MarkDataSourceSegmentsPayload(null, null); + final DataSourcesResource.SegmentsToUpdateFilter payload = + new DataSourcesResource.SegmentsToUpdateFilter(null, null, null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request); Assert.assertEquals(400, response.getStatus()); @@ -1224,10 +1397,9 @@ public void testMarkSegmentsAsUnusedWithNullIntervalAndSegmentIds() public void testMarkSegmentsAsUnusedWithNonNullIntervalAndEmptySegmentIds() { DataSourcesResource dataSourcesResource = createResource(); - - final DataSourcesResource.MarkDataSourceSegmentsPayload payload = - new DataSourcesResource.MarkDataSourceSegmentsPayload(Intervals.of("2010-01-01/P1D"), ImmutableSet.of()); prepareRequestForAudit(); + final DataSourcesResource.SegmentsToUpdateFilter payload = + new DataSourcesResource.SegmentsToUpdateFilter(Intervals.of("2010-01-01/P1D"), ImmutableSet.of(), null); Response response = dataSourcesResource.markSegmentsAsUnused("datasource1", payload, request); Assert.assertEquals(200, response.getStatus());