Skip to content

Append support for hash/range-partitioned segments #9241

@jihoonson

Description

@jihoonson

Motivation

Druid currently doesn't support append hash/range-partitioned segments even though it's required in productions pretty often.

Proposed changes

This proposal proposes to support append segments only when the new segments are partitioned in the same way with the existing ones. For example, you can append new hash-partitioned segments to an existing hash-partitioned datasource if they are partitioned using the exactly same hash function. However, you cannot append range-partitioned segments to a hash-partitioned datasource. You can always append linearly partitioned segments to a linearly partitioned datasource.

In this proposal, I would like to use the term "partition ID" to represent the last number in the segment ID. For example, given a segment myDatasource_2020-01-01T00:00:00.000Z_2020-01-02T00:00:00.000Z_2020-01-02T02:23:54.235Z_2, its partition ID is 2. The partition ID is regarded as 0 if it's missing in the segment ID.

Changes in ShardSpec

Currently, the partition ID of the segment is tightly coupled with the bucket in the hash/range partitioning. For example, in the hash partitioning, the partition ID is (the hash value of the partition dimensions % partition size). In the range partitioning, the partition ID is the linearly increasing number for range-partitioned buckets.

However, to support append, we need a new way to tell the given two segments fall in the same bucket or not. To do this, the segment needs to keep an additional information of the bucket as below:

public interface ShardSpec
{
...
  /**
   * Returns the bucket ID of this partition (segment). A bucket represents the secondary partition
   * in the hash or the range partitioning. This always returns 0 for the linear partitioning.
   */
  short getBucketId();
...
}

Note that we need to track all segments in the same bucket for the easier segment pruning in the brokers. To make things simple, the partition ID must be aligned with the bucket ID which means partition ID = n * numBuckets + bucket ID. As a result, the partition IDs might not be continuous after append new segments. This is also useful for compatibility because the bucket ID can be easily restored even when it's missing after the downgrades.

This may cause some issues with the minor compaction because the minor compaction can compact segments only when their partition IDs are adjacent. See the Future work section for this issue.

Changes in SingleDimensionShardSpec

A new field, numBuckets should be added to SingleDimensionShardSpec as shown below.

  private static final short UNKNOWN_NUM_BUCKETS = -1;

  /**
   * @param dimension    partition dimension
   * @param start        inclusive start of this range
   * @param end          exclusive end of this range
   * @param partitionNum unique ID for this shard
   * @param numBuckets   number of range-partitioned buckets
   */
  @JsonCreator
  public SingleDimensionShardSpec(
      @JsonProperty("dimension") String dimension,
      @JsonProperty("start") @Nullable String start,
      @JsonProperty("end") @Nullable String end,
      @JsonProperty("partitionNum") int partitionNum,
      @JsonProperty("numBuckets") @Nullable Short numBuckets
  )
  {
    Preconditions.checkArgument(partitionNum >= 0, "partitionNum >= 0");
    this.dimension = Preconditions.checkNotNull(dimension, "dimension");
    this.start = start;
    this.end = end;
    this.partitionNum = partitionNum;
    this.numBuckets = numBuckets == null ? UNKNOWN_NUM_BUCKETS : numBuckets;
  }

Changes in the Simple task and the Parallel task

The index task needs to allow partitionsSpec when forceGuaranteedRollup = false because the append doesn't necessarily guarantee the perfect rollup across the entire datasource. If appendToExisting = true and forceGuaranteedRollup = false in the ingestionSpec, then the indexing task will create the perfectly rolled up segments from the input and append them to an existing datasource. This is to create more compact segments, but I guess it might be useful to not create perfectly rolled up segments in some cases. Please see the Future work section for more details.

If appendingToExisting = true, the partitionsSpec can be optional.

  • If it's an empty time chunk, the given partitionsSpec will be used.
  • If it's not an empty time chunk, the given partitionsSpec will be ignored with a warning in the task logs.
    • When the parallel index task appends to a non-empty datasource with the range partitioning, it can skip the first phase to find the best partitioning.

If appendToExisting = true, the indexing task should use the action-based segment allocator to find proper partition IDs for new segments which is a centralized segment allocator in the overlord. When they request to allocate new segment IDs, the indexing task should send the proper bucket ID to the overlord. As a result, SegmentAllocateAction would be:

  @JsonCreator
  public SegmentAllocateAction(
      @JsonProperty("dataSource") String dataSource,
      @JsonProperty("timestamp") DateTime timestamp,
      @JsonProperty("queryGranularity") Granularity queryGranularity,
      @JsonProperty("preferredSegmentGranularity") Granularity preferredSegmentGranularity,
      @JsonProperty("sequenceName") String sequenceName,
      @JsonProperty("previousSegmentId") String previousSegmentId,
      @JsonProperty("skipSegmentLineageCheck") boolean skipSegmentLineageCheck,
      @JsonProperty("bucketId") @Nullable Short bucketId, // new field
      @JsonProperty("shardSpecFactory") @Nullable ShardSpecFactory shardSpecFactory, // nullable for backward compatibility
      @JsonProperty("lockGranularity") @Nullable LockGranularity lockGranularity // nullable for backward compatibility
  )
...

The SegmentAllocateAction will return a proper partition ID which is aligned to be the given bucket ID.

Backward compatibility

  • If the bucket ID is missing in the shardSpec,
    • it's 0 for the linear partitioning.
    • it's partition ID % core partition size for the hash/range partitioning.
  • If the bucket ID is missing in the SegmentAllocateAction,
    • it's 0 for the linear partitioning.
    • it's partition ID % core partition size for the hash/range partitioning.

Rationale

These are dropped ideas.

  • The "Multi" input source
    • Just like the Hadoop task, the native task can support the Multi input source to read data from two or more input sources. This indirectly supports a sort of "append" by specifying both the existing segments and new data in the input source spec. However, the main use case of the Multi input source would be reading from multiple input sources rather than append.
  • Independent partition ID and bucket ID
    • To make the partition ID and the bucket ID independent, the VersionedIntervalTimeline should be able to efficiently filter out the segments of the same bucket. This requires an extra index or data structure in memory of the brokers and the coordinator.

Operational impact

  • The broker and the coordinator can use a bit more memory because of the new field in the shardSpec.
  • Deprecate appending linearly partitioned segments to hash partitioned datasources
    • HashBasedNumberedShardSpec is being used to allow append linearly partitioned segments to hash partitioned datasources. However, I don't see this is very useful once we support appending hash partitioned segments to hash partitioned datasources.
  • There should be no issues in rolling upgrades.
  • Once you upgrade the cluster and append segments to a range-partitioned datasource, you cannot downgrade back to an old version anymore because you will lose the new field numBuckets in the SingleDimensionShardSpec.

Test plan

  • New integration tests will be added.
  • Will test in a real Druid cluster.
  • Will verify there is issue in rolling upgrades/downgrades

Future work

  • When a task creates new segments with the hash/range partitioning, it can split a bucket into multiple segments based on the segment size (or number of rows). This can be thought as supporting the fixed linear partitioning for the tertiary partitioning.
  • Partition-aware minor compaction
    • The minor compaction should be able to compact the segments of the same bucket. This may require changing the meaning of the "adjacent partition IDs" per partitioning method.
  • In this proposal, when appendToExisting = true and forceGuaranteedRollup = false, the indexing task always creates new segments which are perfectly rolled up. We may want to add a new configuration, partitionWithShuffle, to control this behavior.

Semi-related, the segment pruning in brokers should be supported for hash partitioned datasources as well.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions