Skip to content

DRILL-5457: Spill implementation for Hash Aggregate#822

Closed
Ben-Zvi wants to merge 1 commit into
apache:masterfrom
Ben-Zvi:hashagg-spill
Closed

DRILL-5457: Spill implementation for Hash Aggregate#822
Ben-Zvi wants to merge 1 commit into
apache:masterfrom
Ben-Zvi:hashagg-spill

Conversation

@Ben-Zvi
Copy link
Copy Markdown
Contributor

@Ben-Zvi Ben-Zvi commented May 2, 2017

This pull-request is for the work on enabling memory spill for the Hash Aggregate Operator.

To assist in reviewing this extensive code change, listed below are various topics/issues that describe the implementation decisions and give some code pointers. The reviewer can read these items and peek into their relevant code, or read all the items first (and comment on the design decisions as well).

The last topic is not "least": It describes many issues and solutions related to the need to estimate the memory size of batches (and hash tables, etc.) This work took a significant amount of time, and will need some more to get better.

(Most of the code changes are in HashAggTemplate.java, hence this file is not mentioned specifically below)

Aggregation phase:

The code was changed to pass the aggregation phase information (whether this is a Single phase, or 1st of two phase, or 2nd of two phase) from the planner to the HAG operator code.
(See HashAggregate.java, AggPrelBase.java, HashAggPrel.java )

Partitioning:

The data (rows/groups) coming into the HAG is partitioned into (a power of 2) number of partitions, based on the N least significant bits of the hash value (computed out of the row's key columns).
Each partition can be handled independently of the others. Ideally each partition should fit into the available memory. The number of partitions is initialized from the option "drill.exec.hashagg.num_partitions", and scaled down if the available memory seems too small (each partition needs to hold at least one batch in memory).
The scaling down uses the formula: AVAIL_MEMORY > NUM_PARTITIONS * ( K * EST_BATCH_SIZE + 8M )
(see delayedSetup() ) where K is the option drill.exec.hashagg.min_batches_per_partition -- see below).
Computing the number of partitions is delayed till actuall data arrives on incoming (in order to get an accurate sizing on varchars). See delayedSetup(). There is also special code for cases data never arrives (empty batches) hence no partitions (see beginning of outputCurrentBatch(), cleanUp(), delayedSetup() ).
Many of the code changes made in order to implement multi-partitions follow the original code, only changing scalar members (of HashAggTemplate) into arrays, like "htable" becomes "htables[]".
Each partition has its own hash table. After each time it is spilled, its hash table is freed and reallocated.

Hash Code:

The hash code computation result was extracted from the HashTable (needed for the partitions), and added as a parameter to the put() method. Thus for each new incoming row, first the hash code is computed, and the low N bits are used to select the partition, then the hash code is right shifted by N, and the result is passed back to the put() method.
After spilling, the hash codes are not kept. When reading the rows (groups) from the spill, the hash codes are computed again (and right shifted before use - once per each cycle of spilling - thus repartitioning).
(See more about "spilling cycle" below).

Hash Table put():

The put() method for the hash table was rewriten and simplified. In addition to the hash-code parameter change, it was changed to return the PutStatus, with two new states: NEW_BATCH_ADDED notifies the caller that a new batch was created internally, hence a new batch (only needed for Hash Agg) is needed (prior code was getting this from comparing the returned index against the prior number of batches).
A second new state is KEY_ADDED_LAST, which notifies that a batch was just filled, hence it is time for checking memory availability (because a new batch would be allocated soon).
Similar rewriting was done for the hash table containsKey() method (and addBatchifNeeded() ).

Memory pressure check:

Logically the place to check for a memory pressure is when a new memory is needed (i.e., when a new group needs to be created.) However the code structure does not let this easily (e.g., a new batch is allocated inside the hash table object when a new group is detected, or the hash table structure is doubled in size), thus instead the check is done AFTER a new group was added, in case this was the last group added to that batch (see in checkGroupAndAggrValues() - checking for a new status KEY_ADDED_LAST )
This memory availability check checks if there is enough memory left between the allocated so far and the limit.
Spill is initiated when: MEMORY_USED + MAX_MEMORY_NEEDED > MEMORY_LIMIT (see checkGroupAndAggrValues() )
where the memory needed is: (EST_BATCH_SIZE + 64K * (4+4)) * K * PLANNED_BATCHES + MAX_HASH_TABLES_RESIZE
(See K above, under Partitioning, and the rest well below, under memory estimations).

When can not spill:

Single phase HAG can not spill. Also under memory duress 2nd phase may end up with only a single partition, which can not allow spilling (no progress is made). In these two cases, the memory check is skipped, and the operator functions like the old code -- if runs out of memory then it will OOM. A try-catch was added into the code to provide more detail on the OOM (see getOOMErrorMsg() ).
Also in case of a single partition the allocator's memory limit is set to 10GB, to be compatible with the prior code.
Another "can't spill" situation is when choosing a partition to spill, but no partition has more than 1 batch (hence memory can not be gained, as after spilling 1 batch need to reinitialize that partition with a new batch). See chooseAPartitionToFlush(). In such a case the code "crosses its fingers" and continues without spilling.

1st phase - Early return:

The 1st phase HAG does not spill to disk. When the 1st detects a memory pressure it picks the current partition (the one whose last batch just got full) and returns that partition downstream (just like regular return, only early). Afterwards that partition is (deallocated and) initialized. Note the boolean "earlyOutput" in the code which controls special processing in this case - when turned on the code switches to output (e.g., innerNext() in HashAggBatch.java), and turned off when done (see outputCurrentBatch() ).

Spilling:

Using the SpillSet (like the External Sort does) for the actual IO. Each partition spills into a single
file. Changes to SpillSet: Generalize it for any kind of "buffered memory" operator (pass in the operator's type). Also small changes to the spill file name.

2nd phase - Flushing/spilling:

When a memory pressure is detected (while reading and processing the incoming), one of the
partitions is selected ( see chooseAPartitionToFlush() ) and flushed ( spillAPartition() ), and then its memory is freed and that partition is re-initialized ( reinitPartition() ). The choice of a partition gives some small priority to the current partition (since its last batch is full, unlike the others), and priority by a factor of 4 to partitions that are already spilled (i.e., a spilled partition with 10 batches would be chosen vs a pristine/non-spilled with 39 batches.)

Partition spilling (spillAPartition() ): For each batch in the partition - Allocate an outgoing container, link the values and the keys into this container, and write it to the file/stream.

2nd phase - End of incoming: After the last batch was read from the incoming - the original code ( next() ) returned a status of NONE. The new code - after spilling can't return NONE, so instead returning a special status of RESTART (see outputCurrentBatch() ). This RESTART is captured by the caller of the next() ( innerNext() in HashAggBatch.java ) which continues to drive the aggregation (instead of returning).

After the end of the incoming, all the (partially) spilled partitions finish spilling all their remaining in-memory batches to disk (see outputCurrentBatch() ). This is done to simplify the later processing of each spilled partition, as well as freeing memory which may be needed as partitions are processed. The spilled partitions are added into a list (spilledPartitionsList) to allow for later processing.

2nd phase reading of the spill:

Reading of each spilled partition/file is performed like reading the incoming. For this purpose, a new class was added: SpilledRecordbatch. The main method there is next() which reads a batch from the stream -- first time it uses the original readFromStream() method, which creates a new container; subsequent calls use the new readFromStreamWithContainer() method, which is similar - only reuses the prior container. (This was done because many places in the code have references into the container).

Spilling cycles:

Reading a spilled partition "just like incoming" allows for that input to spill again (and again ...);
this was termed SECONDARY spilling (and TERTIARY ...). As the spilled partitions are kept in a FIFO list, processing of SECONDARY partitions would start only after all the regular spilled ones, etc. Hence a member "cycleNum" was created, incremented every time that processing the spilled list advances to another "cycle" (see outputCurrentBatch() ).
The "cycleNum" is used for the hash-code computation; the same hash-code is computed at every cycle, but the cycle tells how much to right-shift that code so that different bits would be used (for partitioning and hash-table bucket).

Configuration options:

  • drill.exec.hashagg.num_partitions: Initial number of partitions in each HAG operator (the number may be down adjusted in case too little memory is available). Default value: 32 , allowed range 1 - 128 , where a value of 1 means "No spilling" (and thus setting 10GB limit).
  • drill.exec.hashagg.min_batches_per_partition: Range 2--5. Default 3. Used for internal initial estimate of number of partitions, and later when predicting memory needed (to avoid a spill).
    (A value of 2 may be better, but it evokes some bug which would be addressed separately).

Also using options common to all the "buffered" operators (can be overriden, per operator):

  • drill.exe.spill.fs: File system for spilling into.
  • drill.exec.spill.directories: (Array of) directories to spill into.
    (To override, per-operator: for the (managed) External Sort: "drill.exec.sort.external.spill.fs" and
    "drill.exec.sort.external.spill.directories", and for the Hash Aggregate:
    "drill.exec.hashagg.spill.fs" and "drill.exec.hashagg.spill.directories")

For testing:

  • drill.exec.hashagg.mem_limit: Limit the memory for each HAG operator (also sets this number in the allocator, hence this is a "hard limit").

Also for testing (or for a customer workaround ??):

  • planner.force_2phase_aggr: Forces the aggregation to be two phase.

Stats and metrics:

The hash-table stats were modified to sum the stats across all the partitions' hash tables. (This only applies to the first spilling cycle; does not count for SECONDARY, TERTIARY spilling etc.).
New metrics added:

  • "num partitions" (actual number; may have been scaled down due to memory pressure)
  • "spilled partitions" (number that has spilled)
  • "MB spilled" (in case of 1st phase - that's the total data returned early).
    All the above three refer to the end of input into the HAG (does not include handling of spills, secondary spills, etc.)
  • "cycle": 0 - no spill (or 1st phase), 1 - regular spill, 2 - Secondary, 3 - Tertiary ...)

Memory Allocation Utilities:

Extended for all "buffered operators", not only for Sort. (Hash Join will be added later as well, etc.)

Changes to Unit tests:

  • New TestHashAggSpill : Runs two hash agg queries - One spills some of its partitions (1 out of 2), and the other test forces a smaller memory hence gets into a Secondary and Tertiary spills.
  • TestBugFixes.testDRILL4884: This test implicitly relied on rows returned in order (the old Hash agg, plus the Parquet file).
    With the new division into partitions, that order was broken. Fix: added an "order by".
  • TestTpchDistributedConcurrent.testConcurrentQueries: Needed longer timeout (probably spilled).

MISC

  • After a spill, check again if enough memory was freed, else spill (another partition) again. (Not sure if needed.)
  • Suggestion not implemented: Scaling down the initial hash-table sizes by the number of partitions (e.g. when 4 partitions, each hash-table starts with 1/4 of the initial size). Reason for not changing: starting with a small size immediately causes doubling and another doubling etc. Better allocate a little more and save that work.
  • The RecordBatchSizer had a recent change to handle MAPs (recursively). Merged this change with the modified measureColumn() which returns an int (the est size).

MEMORY SIZE ESTIMATIONS

As described above, we need to get good estimate of the memory needs in order to decide initially on the number of partitions, and later to decide each time (a batch gets filled) wheter to spill or not.
These estimates are complicated due to:
(1) Possible changes in the incoming data batches (e.g., varchar(12) in the first batch becomes varchar(200) in the second incoming batch). This may invalidate prior estimates.
(2) Arbitrary setting of length 50 for varchar type (when sizing the allocation of DrillBufs)
(3) Allocation size aligned up to nearest power of 2 (DrillBufs for varchars)
(4) When an internal batch gets filled, and estimation shows ample memory -- a second batch may get filled before the first one's partition allocated a new batch (hence may cause "double booking").
(5) Inserting a single value may cause the "start indices" (the real actual "hash table") to double in size. This structure can get pretty large (few MB).
(6) Does the size of the incoming batch being charged against the HAG's memory allocator limit ? (Not sure; usually not a problem as the prior batch is deallocated before the next one comes; unless the next one is "much bigger")
(7) For varchars: The memory is allocated as a power of 2 (e.g. doubled via setSafe()). This can cause a big memory waste, like if the total memory needed for 64k varchars is ~5MB, then 8MB is allocated, wasting 3MB).
(8) The varchar value vector uses an internal "offset vector" that allocates "size+1", hence for 64K it allocates 512kb, of which 256kb are wasted (see DRILL-5446).

Solutions for the memory estimation issues:

(1)+(6) above: Monitor the size of each incoming batch. Resize batch size estimate if the incoming batch is bigger (see doWork() )

(5) When estimating memory needs, take into account hash table size doubling in all partitions (using the new hash table method extraMemoryNeededForResize() ).

(4) Track "plannedBatches"; when "promising" few partitions a new batch each, take this into account when checking for available memory. (Though "more than 1" situation seems very rare).

(2)+(3) Idealy tracking the size of EACH varchar column could work better, but not simple to implement. Instead -- just find the maximum size of any of the incoming columns (for simplicity - not only varchars), and use this value (capped at 50, min value of 8; rounded up to the next power of 2 if needed). This addresses the common situation of multiple short varchar key columns but not the (very rare) situation of a huge varchar KEY column, plus few short ones.

(7) Update RecordBatchSizer.java -- added a method netRowWidthCap50() which takes into account the rounding up (per each column in a row), plus nulls arrays as needed, for each row (will multiply that by 64K in updateEstMaxBatchSize() ).

==== END ====

Copy link
Copy Markdown

@amansinha100 amansinha100 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sending a first batch of review comments.

* @return Size of extra memory needed if the HT (i.e. startIndices) is doubled
*/
@Override
public int extraMemoryNeededForResize() {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this return a long ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good observation !! Fixed .....

return found ? PutStatus.KEY_PRESENT : PutStatus.KEY_ADDED;
htIdxHolder.value = currentIdx;
return addedBatch ? PutStatus.NEW_BATCH_ADDED :
( freeIndex + 1 > batchHolders.size() * BATCH_SIZE ) ?
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does KEY_ADDED_LAST indicate the last key in a single batch or last key across all batches added so far ? It seems it is the latter but would it be sufficient to check whether the last batch is filled up because the previous batches would have already been filled up ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The former: Last key in a batch. This information is needed at the Hash Aggr to initiate a check for memory pressure (could calculate this information, but looks cleaner/simpler to get that as a special status from the hash table). Hopefully we'd never have to deal with batches of size 1 (row), as it would conflict - both "last row" and "new batch" at the same insertion.

}
*/
/*
if ( handlingSpills && ( incomingRowIdx == 0 ) ) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could enclose this using the EXTRA_DEBUG_1 flag that is used elsewhere.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This debugging code is written for a specific schema (VarChar,VarChar,Bigint); placing it inside EXTRA_DEBUG_1 may create the impression that it is a generic debug code. Leaving it as a comment/sample allows for easy rewriting to print out data (of any other schema).

logger.trace("Adjusting Batch size estimate from {} to {}",estMaxBatchSize,totalAddedMem);
estMaxBatchSize = totalAddedMem;
}
} catch (OutOfMemoryException exc) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which call in this try{} block is throwing an OOM ? The addBatchHolder() does not and the other function calls are essentially computing the allocated memory.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addBatchHolder() calls newBatchHolder(), which should allocate the new batch using the allocator, hence (I think) may OOM if not enough memory left.

addBatchHolder(currentPartition);

if ( plannedBatches > 0 ) { plannedBatches--; } // just allocated a planned batch
long totalAddedMem = allocator.getAllocatedMemory() - allocatedBefore;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume totalAddedMem is the memory added for the group-by keys. Would it be better to rename it to something like totalAddedMemGroupBy and allocatedBefore to allocatedBeforeGroupBy such that it is explicit ? Also, why not use allocatedBeforeAggCol here instead of calling allocator.getAllocatedMemory() again ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totalAddedMem is for both the group-by keys and the aggr columns.
allocatedBefore is the initial size of the allocation, then the keys are added to the hash table, (then allocatedBeforeAggCol keeps the size), then a batch holder for the agg-columns is added, and the total is computed.
The total is important, like if the incoming batch becomes bigger, and we try to adjust the estimate for the batch size.
The allocatedBeforeAggCol only gives some tracing refinement to tell which batch grew (the one in the hash table or the one for the agg columns).


// calculate the (max) new memory needed now
long hashTableDoublingSizeNeeded = 0; // in case the hash table(s) would resize
for ( HashTable ht : htables ) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not immediately obvious why all partitions need to be asked for extra memory needed for resizing. If a new row causes doubling of a hash table, only 1 partition would be affected. But if you are considering all rows from an incoming batch then yes it could in the worst case affect all partitions.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check for memory pressure takes place whenever any new batch becomes full (because soon after a new batch would be needed by that partition). However the current incoming batch may still hold many unprocessed rows, destined to all the partitions, and these rows could trigger resizing at each and every partition (in the worst case).

rowsInPartition += numPendingOutput; // for logging
rowsSpilled += numPendingOutput;

allocateOutgoing(numPendingOutput);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppose numPendingOutput is small (e.g 100 rows) but there are several such batches, we would end up calling allocateOutgoing() multiple times for small amounts...shouldn't we try to combine these into smaller number of outgoing batches ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When spilling a partition, all its batches (except sometimes the last one) are full (i.e. 64K); hence no need to combine. (These are not the incoming batches, but new batches constructed inside the Hash Aggr).

numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY);
if ( numPartitions == 1 ) {
canSpill = false;
logger.warn("Spilling was disabled");
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could add the reason why it was disabled in this case since it happened without user intervention.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a user intervention - the setting of num_partitions to 1 !! Though for clarity, the message was changed now to "Spilling was disabled due to configuration setting of num_partitions to 1"

try {
this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions);
this.htables[i].setMaxVarcharSize(maxColumnWidth);
} catch (IllegalStateException ise) {} // ignore
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ignore this exception ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm .. don't remember; I'll remove this catch (i.e. throw a DrillRuntimeException in case IllegalState shows.).

Copy link
Copy Markdown
Contributor

@paul-rogers paul-rogers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the "secondary" files outside of the hash agg itself. Will review the "core" files separately.

String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size";
String BATCH_PURGE_THRESHOLD = "drill.exec.sort.purge.threshold";

// Spill Options common to all spilling operators
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spill boot-time config options...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, boot-time. Wish Drill had a clear way to distinguish boot-time from session options.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a solution is coming. See DRILL-5547.


// Hash Aggregate Options

String HASHAGG_NUM_PARTITIONS_KEY = "drill.exec.hashagg.num_partitions";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For config options, the path is usually "drill.exec.something". But, for system/session options, there is no need for the top "drill." namespace, so they usually are of the form "exec.something" or even shorter. Check out some of the other names and you'll see the pattern.

Or, am I confused? Normally, the option name appears as "key", then a validator, using that name, appears below. But, in the validator, we have a different name (one that does, in fact, follow the usual option rules.) So, we we have both a config and a system/session option?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed all three to have both the config option ("drill.exec.hashagg....") and its matching _KEY (same, sans the prefix "drill.") which is only used to create the validator.
Hence all three can be set either in the config file, or for the session:
Number of partitions, max memory, and minimum batches per partitions.

// Low value may OOM (e.g., when incoming rows become wider), higher values use fewer partitions but are safer
String HASHAGG_MIN_BATCHES_PER_PARTITION_KEY = "drill.exec.hashagg.min_batches_per_partition";
LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION = new RangeLongValidator("exec.hashagg.min_batches_per_partition", 2, 5, 3);
String HASHAGG_SPILL_DIRS = "drill.exec.hashagg.spill.directories";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe label which are config options and which session/system options.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All three are both config and session options.

va = container;
}

// Like above, only preserve the original container and list of value-vectors
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class was refactored in the external sort unit test PR. Let's coordinate on merging the two sets of changes.

* @param maxAllocation The max memory allocation to be set
*/
@Override
public void setMaxAllocation(long maxAllocation) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a problem with this method, which is why it was commented out. Trying to remember... Something about the internal, temporary serializations in planning causing values to be overwritten. Jackson will see this method and use it for deserializing the value. May have to fiddle around a bit to remember the issue.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what happens when you give a software "intelligence" to make decisions for you ..... shall we pick another method name to fool Jackson ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we just want to fool Jackson we can use @JsonIgnore. Would be better to track down the root cause -- whatever it was.

* @param queryContext
*/
public static void setupSortMemoryAllocations(final PhysicalPlan plan, final QueryContext queryContext) {
public static void setupBufferedOpsMemoryAllocations(final PhysicalPlan plan, final QueryContext queryContext) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change looks good. Very clean.

// Use plain Java compilation where available
prefer_plain_java: false
},
spill: {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nicely done! Good use of inheritance and references.

threshold: 40000,
// File system to use. Local file system by default.
fs: "file:///"
fs: ${drill.exec.spill.fs},
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: explain what we are doing:

In Drill versions prior to 1.11, only sort spilled. Starting in 1.11, other operators spill.
Spilling is now configured in the spill group above. For backward compatibility, existing configuration files will still configure spilling here. Please move spill configuration to the new group above.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added:

    // -- The two options below can be used to override the options common
    // -- for all spilling operators (see "spill" above).
    // -- This is done for backward compatibility; in the future they
    // -- would be deprecated (you should be using only the common ones)

// Use plain Java compilation where available
prefer_plain_java: false
},
spill: {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please modify distribution/resources/drill-override-example.conf to explain how to set up spilling. Add a comment (see below) about deprecating the sort-specific config, but that it will still work for a few more releases.

This file (drill-module.conf) is visible only to developers. drill-override.conf is the file users modify. drill-override-example.conf is the example "starter" file that new users start with.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added "spill" and "hashagg" sections in the override example file, with some comments:

spill: {
# These options are common to all spilling operators.
# They can be overriden, per operator (but this is just for
# backward compatibility, and may be deprecated in the future)
directories : [ "/tmp/drill/spill" ],
fs : "file:///"
}
hashagg: {
# The partitions divide the work inside the hashagg, to ease
# handling spilling. This initial figure is tuned down when
# memory is limited.
# Setting this option to 1 disables spilling !
num_partitions: 32,
spill: {
# The 2 options below override the common ones
# they should be deprecated in the future
directories : [ "/tmp/drill/spill" ],
fs : "file:///"
}
},

*
* @throws Exception
*/
@Test
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: are two tests sufficient to test all the code paths in the modified code? Might we need test for specific cases? Will provide detailed suggestions as I review the actual spill code.

@rchallapalli
Copy link
Copy Markdown

Based on the current design, if the code senses that there is not sufficient memory then it goes back to the old code. Now I have encountered a case where this happened and the old agg did not respect the memory constraints imposed by me. I gave 116MB memory and the old hash agg code consumed ~130MB and completed the query. This doesn't play well with the overall resource management plan

@rchallapalli
Copy link
Copy Markdown

And based on my initial testing, I observed that the hash-agg is only using half of the memory allocated. Since the planner by default uses a 2-phase agg, the memory computation logic divides the allocated memory between the 2 hash-agg operators in the plan. This is grossly in-efficient. And every test written would need to be modified once this issue gets resolved. Hence I would push for a fix sooner than later

Copy link
Copy Markdown
Contributor

@paul-rogers paul-rogers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another partial review, mostly about code structure, exceptions and so on. Need to make another pass to really understand the core algorithm.

String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size";
String BATCH_PURGE_THRESHOLD = "drill.exec.sort.purge.threshold";

// Spill Options common to all spilling operators
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a solution is coming. See DRILL-5547.

* @param maxAllocation The max memory allocation to be set
*/
@Override
public void setMaxAllocation(long maxAllocation) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we just want to fool Jackson we can use @JsonIgnore. Would be better to track down the root cause -- whatever it was.

// or: 1st phase need to return (not fully grouped) partial output due to memory pressure
aggregator.earlyOutput()) {
// then output the next batch downstream
IterOutcome out = aggregator.outputCurrentBatch();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since HashAggregator is not an operator executor (AKA record batch), it does not have to follow the iterator protocol and use the IterOutcome enum. Instead, you can define your own. You won't need the OK_NEW_SCHEMA, OUT_OF_MEMORY, FAIL or NOT_YET values. All you seem to need is OK, NONE and RESTART.

This approach will avoid the need to change the IterOutcome enum and export your states to all of the Drill iterator protocol.

Did something similar in Sort for the iterator class that returns either in-memory or merged spilled batches.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done !! Looks so simple in retrospective ...

import org.apache.hadoop.fs.Path;

import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_SIZE;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is impressive how you were able to slide spilling into the existing code structure. Performance and modularity are never required, of course, but it may be worth at least considering them.

Putting so much code in a template has a large drawback. Our current byte-code based code generation performs at its worst when templates are large. This template is the base for the generated code. In traditional Java, the size of a subclass is independent of the size of the superclass. So, if we used "plain old" Java, the size of this template would have no performance impact.

But, with byte-code manipulation, each generated class contains a complete copy of the byte code for the template class. With a huge template, we make a huge copy every time. We pay a cost in terms of the time it takes to make the copy, then analyze the resulting byte codes. Also, we fill up the code cache with many copies of the same code.

Three solutions.

  1. Ignore the problem. (Which is probably the right choice until performance becomes a concern.)
  2. Refactor the code so that the bulk of logic is in a non-template class, with only a thin layer of code in the template.
  3. Use "plain old" Java compilation for this class to avoid the many large copies described above.

The other problem is that this class has so many state variables that full testing and understanding will be hard. There is value in smaller chunks to reduce the cost of testing and maintenance.

private boolean updateAggrValues(int incomingRowIdx, int idxWithinBatch) {
updateAggrValuesInternal(incomingRowIdx, idxWithinBatch);
try { updateAggrValuesInternal(incomingRowIdx, idxWithinBatch); }
catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc); }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably IllegalStateException to clearly state that this is a "this should never occur" kind of error.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but only when %100 sure; else if there's a chance for a SchemaChange -- Better use UnsupportedOperationException ...
Code change done !!

spilledPartitionsList.add(sp);
try {
reinitPartition(nextPartitionToReturn); // free the memory
} catch (Exception e) {throw new RuntimeException(e);}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See note above about caching specific exceptions and mapping to the proper UserException. Else, it is really hard to know what went wrong by looking at logs. And, the user has no clue: just "something went wrong."

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually now seems that reinitPartition() need not throw any exception !!
Cleaned up all the catches, etc.

//
if ( isSpilled(nextPartitionToReturn) ) {
spillAPartition(nextPartitionToReturn); // spill the rest
SpilledPartition sp = new SpilledPartition();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can SpilledPartition take on some of the behavior that is inline here? Can it, for example, take a partition state instance and do the spilling? Can it handle the reading later?

Not yet entirely clear how this works, so take this suggestion with a grain of salt...

reinitPartition(partitionToReturn);
} catch (SchemaChangeException sce) {
throw new DrillRuntimeException("Hash Aggregation can not handle schema changes.");
} catch (Exception e) { /* just ignore */ }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even, say, an OOM?

// deallocate memory used by this partition, and re-initialize
reinitPartition(partitionToReturn);
} catch (SchemaChangeException sce) {
throw new DrillRuntimeException("Hash Aggregation can not handle schema changes.");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a real schema change, it is a bogus one that is an artifact of the way the low level functions work. Throw an IllegalStateException.

throw new DrillRuntimeException("Hash Aggregation can not handle schema changes.");
} catch (Exception e) { /* just ignore */ }

if ( earlyOutput ) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General comment. This code is very long and very complex. It has a high "cyclomatic complexity." It is unlikely that we can generate system-level tests that hit all the if conditions. And, if some new developer had to modify this code, how would they know if they broke anything?

This stuff probably needs to be divided up into smaller chunks that can be unit tested (and understood.)

Copy link
Copy Markdown
Contributor

@paul-rogers paul-rogers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes in this commit LGTM. Thanks!

Copy link
Copy Markdown
Contributor

@paul-rogers paul-rogers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit 3 also LGTM. Thanks.

Copy link
Copy Markdown
Contributor

@paul-rogers paul-rogers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commit 4 LGTM.

Copy link
Copy Markdown
Contributor

@paul-rogers paul-rogers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed all commits since my initial review. While there are more things we could discuss, what is here seems to work well. Makes sense to go ahead and commit work done so far and discuss other improvements as follow-on PRs.

+1

Copy link
Copy Markdown
Contributor

@paul-rogers paul-rogers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few small comments, suggestions. None are critical.

The explanation that kicked off this PR is wonderful; very detailed and worth reading multiple times.

Once this PR is committed, that information will become hard to find. As part of a follow-on PR, I wonder if that material can migrate into Javadoc in a package-info.java file.

The code continues to cry out for simplification. The many flags, modes and so on will be quite hard to test and maintain. Not in this PR, but in some subsequent one, it would be to do three things:

  • Create true unit tests for all cases.
  • To do that, we'll probably find that the work will go faster if the code is divided up into smaller, more-focused classes, with a driver layer implementing what has become a sophisticated state machine.
  • The bulk of the code remains in the template where it is byte-code copied into generated code for each query. Performance in this area might not be a priority now, but will be eventually.

Again, all these can be follow-on improvements once the basic functionality is delivered in this PR.

private int currentIndex = 0;
private IterOutcome outcome;
private int numGroupedRecords = 0;
private int currentBatchRecordCount = 0; // Performance: Avoid repeated calls to getRecordCount()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure that the very small savings in time is worth the complexity of keeping a cached copy in sync. If needed for an inner loop, can it be a local variable instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getRecordCount() virtual method is called per each record ! And in some cases this method performs several checks. Unfortunately other inefficiencies indeed dwarf this savings. A local variable won't work, as execution may return and come back (e.g. spill) midway processing the incoming batch.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation.

private int originalPartition = -1; // the partition a secondary reads from

private class SpilledPartition { public int spilledBatches; public String spillFile /* Path filePath */; int cycleNum; int origPartn; int prevOrigPartn; }
private class SpilledPartition { public int spilledBatches; public String spillFile; int cycleNum; int origPartn; int prevOrigPartn; }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private static class since you don't have any methods and so have no use for the "inner this" pointer.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thanks.

if (EXTRA_DEBUG_1) {
logger.debug("Processed {} records", underlyingIndex);
// Cleanup the previous batch since we are done processing it.
for (VectorWrapper<?> v : incoming) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two comments/questions here.

First, can we be sure that there is always a Selection Vector Remover between the hash agg and anything that can emit a batch that uses an SV4 (such as sort)? Otherwise, the getValueVector() method won't work.

Second, is it certain that no other code references the vectors from this container?

If both those invariants are met, then, yes, it is the job of this operator to release buffers created by the upstream.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmmm... this is the original Hash Agg code. Hence probably the two invariants are met.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is original code, then clearly it does the right thing else we'd have seen problems by now. So, fine to leave as-is.

// Handle various results from getting the next batch
switch (outcome) {
case OUT_OF_MEMORY:
case NOT_YET:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The OUT_OF_MEMORY is really an error condition. NOT_YET is not yet supported. is the RETURN_OUTCOME the proper return value? Or, should this throw a user exception?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again - this is the original Hash Agg code (just re-indented, as the while(true) and try blocks were removed). The "outcome" comes from getting the next incoming batch, hence OUT_OF_MEMORY may (?) occur if a grossly abnormal incoming batch shows up unexpectedly.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with OUT_OF_MEMORY is that it basically does not work. But, if this is original code, then we can address that problem another time.


// Get the next RecordBatch from the incoming
IterOutcome out = outgoing.next(0, incoming);
case OK_NEW_SCHEMA:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this operator handle a new schema from upstream, except on the first batch? Can this occur when reading from a spill file? (The spill file was for a prior schema, say, that has since shifted to a new schema, so reading the spill file shifts back to the old schema?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case leads to immediate failure; even the first batch does not get here. The error message returned is "Hash aggregate does not support schema change" . (Again -- all is original code).

Copy link
Copy Markdown
Contributor

@paul-rogers paul-rogers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for addressing all the comments.
LGTM
+1

Copy link
Copy Markdown
Contributor

@paul-rogers paul-rogers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LTGM
Minor style changes suggested.
Otherwise, +1

}

AggOutcome out = aggregator.doWork();
if ( wasKilled ) { // if kill() was called before, then finish up
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spaces, here and below.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// Read incoming batches and process their records
//
out = aggregator.doWork();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while (aggregator.doWork() == AggOutcome.CALL_WORK_AGAIN) {
  // Nothing to do
}

?

In one of your reviews you said you didn't like empty loops, but sometimes they are handy...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scratch that, I see you need the value of "out". So:

  AggOutcome out;
  do {
      //
      //  Read incoming batches and process their records
      //
      out = aggregator.doWork();
  } while (out == AggOutcome.CALL_WORK_AGAIN) {

Or Even:

  //  Read incoming batches and process their records
  AggOutcome out;
  while ((out = aggregator.doWork()) == AggOutcome.CALL_WORK_AGAIN) {
    // Nothing to do
  }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done ( do while ...)

paul-rogers pushed a commit to paul-rogers/drill that referenced this pull request Jun 20, 2017
@asfgit asfgit closed this in c16e5f8 Jun 21, 2017
@Ben-Zvi Ben-Zvi deleted the hashagg-spill branch July 10, 2018 00:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants