diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf index b9d09a8fb43..8010f85cc57 100644 --- a/distribution/src/resources/drill-override-example.conf +++ b/distribution/src/resources/drill-override-example.conf @@ -142,6 +142,13 @@ drill.exec: { } }, cache.hazel.subnets: ["*.*.*.*"], + spill: { + # These options are common to all spilling operators. + # They can be overriden, per operator (but this is just for + # backward compatibility, and may be deprecated in the future) + directories : [ "/tmp/drill/spill" ], + fs : "file:///" + } sort: { purge.threshold : 100, external: { @@ -150,11 +157,26 @@ drill.exec: { batch.size : 4000, group.size : 100, threshold : 200, + # The 2 options below override the common ones + # they should be deprecated in the future directories : [ "/tmp/drill/spill" ], fs : "file:///" } } }, + hashagg: { + # The partitions divide the work inside the hashagg, to ease + # handling spilling. This initial figure is tuned down when + # memory is limited. + # Setting this option to 1 disables spilling ! + num_partitions: 32, + spill: { + # The 2 options below override the common ones + # they should be deprecated in the future + directories : [ "/tmp/drill/spill" ], + fs : "file:///" + } + }, memory: { top.max: 1000000000000, operator: { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 18f69d53d2d..537377d2dab 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -64,6 +64,12 @@ public interface ExecConstants { String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size"; String BATCH_PURGE_THRESHOLD = "drill.exec.sort.purge.threshold"; + // Spill boot-time Options common to all spilling operators + // (Each individual operator may override the common options) + + String SPILL_FILESYSTEM = "drill.exec.spill.fs"; + String SPILL_DIRS = "drill.exec.spill.directories"; + // External Sort Boot configuration String EXTERNAL_SORT_TARGET_SPILL_BATCH_SIZE = "drill.exec.sort.external.spill.batch.size"; @@ -86,6 +92,22 @@ public interface ExecConstants { BooleanValidator EXTERNAL_SORT_DISABLE_MANAGED_OPTION = new BooleanValidator("exec.sort.disable_managed", false); + // Hash Aggregate Options + + String HASHAGG_NUM_PARTITIONS = "drill.exec.hashagg.num_partitions"; + String HASHAGG_NUM_PARTITIONS_KEY = "exec.hashagg.num_partitions"; + LongValidator HASHAGG_NUM_PARTITIONS_VALIDATOR = new RangeLongValidator(HASHAGG_NUM_PARTITIONS_KEY, 1, 128, 32); // 1 means - no spilling + String HASHAGG_MAX_MEMORY = "drill.exec.hashagg.mem_limit"; + String HASHAGG_MAX_MEMORY_KEY = "exec.hashagg.mem_limit"; + LongValidator HASHAGG_MAX_MEMORY_VALIDATOR = new RangeLongValidator(HASHAGG_MAX_MEMORY_KEY, 0, Integer.MAX_VALUE, 0); + // min batches is used for tuning (each partition needs so many batches when planning the number of partitions, + // or reserve this number when calculating whether the remaining available memory is too small and requires a spill.) + // Low value may OOM (e.g., when incoming rows become wider), higher values use fewer partitions but are safer + String HASHAGG_MIN_BATCHES_PER_PARTITION = "drill.exec.hashagg.min_batches_per_partition"; + String HASHAGG_MIN_BATCHES_PER_PARTITION_KEY = "drill.exec.hashagg.min_batches_per_partition"; + LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR = new RangeLongValidator(HASHAGG_MIN_BATCHES_PER_PARTITION_KEY, 2, 5, 3); + String HASHAGG_SPILL_DIRS = "drill.exec.hashagg.spill.directories"; + String HASHAGG_SPILL_FILESYSTEM = "drill.exec.hashagg.spill.fs"; String TEXT_LINE_READER_BATCH_SIZE = "drill.exec.storage.file.text.batch.size"; String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size"; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java index 9d0182fec37..d569ae5754b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java @@ -35,6 +35,8 @@ import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.vector.NullableBigIntVector; +import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.ValueVector; import com.codahale.metrics.MetricRegistry; @@ -138,6 +140,60 @@ private void readVectors(InputStream input, RecordBatchDef batchDef) throws IOEx va = container; } + // Like above, only preserve the original container and list of value-vectors + public void readFromStreamWithContainer(VectorContainer myContainer, InputStream input) throws IOException { + final VectorContainer container = new VectorContainer(); + final UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input); + recordCount = batchDef.getRecordCount(); + if (batchDef.hasCarriesTwoByteSelectionVector() && batchDef.getCarriesTwoByteSelectionVector()) { + + if (sv2 == null) { + sv2 = new SelectionVector2(allocator); + } + sv2.allocateNew(recordCount * SelectionVector2.RECORD_SIZE); + sv2.getBuffer().setBytes(0, input, recordCount * SelectionVector2.RECORD_SIZE); + svMode = BatchSchema.SelectionVectorMode.TWO_BYTE; + } + final List vectorList = Lists.newArrayList(); + final List fieldList = batchDef.getFieldList(); + for (SerializedField metaData : fieldList) { + final int dataLength = metaData.getBufferLength(); + final MaterializedField field = MaterializedField.create(metaData); + final DrillBuf buf = allocator.buffer(dataLength); + final ValueVector vector; + try { + buf.writeBytes(input, dataLength); + vector = TypeHelper.getNewVector(field, allocator); + vector.load(metaData, buf); + } finally { + buf.release(); + } + vectorList.add(vector); + } + container.addCollection(vectorList); + container.setRecordCount(recordCount); + myContainer.transferIn(container); // transfer the vectors + myContainer.buildSchema(svMode); + myContainer.setRecordCount(recordCount); + /* + // for debugging -- show values from the first row + Object tmp0 = (myContainer).getValueAccessorById(NullableVarCharVector.class, 0).getValueVector(); + Object tmp1 = (myContainer).getValueAccessorById(NullableVarCharVector.class, 1).getValueVector(); + Object tmp2 = (myContainer).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector(); + if (tmp0 != null && tmp1 != null && tmp2 != null) { + NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0); + NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1); + NullableBigIntVector vv2 = ((NullableBigIntVector) tmp2); + + try { + logger.info("HASH AGG: Got a row = {} , {} , {}", vv0.getAccessor().get(0), vv1.getAccessor().get(0), vv2.getAccessor().get(0)); + } catch (Exception e) { logger.info("HASH AGG: Got an exception = {}",e); } + } + else { logger.info("HASH AGG: got nulls !!!"); } + */ + va = myContainer; + } + public void writeToStreamAndRetain(OutputStream output) throws IOException { retain = true; writeToStream(output); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java index a547e26a088..6f422507b2d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.physical.base; +import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.graph.GraphVisitor; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -102,17 +104,31 @@ public void setCost(double cost) { this.cost = cost; } - // Not available. Presumably because Drill does not currently use - // this value, though it does appear in some test physical plans. -// public void setMaxAllocation(long alloc) { -// maxAllocation = alloc; -// } - @Override public long getMaxAllocation() { return maxAllocation; } + /** + * Any operator that supports spilling should override this method + * @param maxAllocation The max memory allocation to be set + */ + @Override + public void setMaxAllocation(long maxAllocation) { + this.maxAllocation = maxAllocation; + /*throw new DrillRuntimeException("Unsupported method: setMaxAllocation()");*/ + } + + /** + * Any operator that supports spilling should override this method (and return true) + * @return false + */ + @Override @JsonIgnore + public boolean isBufferedOperator() { return false; } + + // @Override + // public void setBufferedOperator(boolean bo) {} + @Override public String getUserName() { return userName; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java index b1954ca06d7..980f32c766c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java @@ -83,6 +83,21 @@ public interface PhysicalOperator extends GraphValue { */ public long getMaxAllocation(); + /** + * + * @param maxAllocation The max memory allocation to be set + */ + public void setMaxAllocation(long maxAllocation); + + /** + * + * @return True iff this operator manages its memory (including disk spilling) + */ + @JsonIgnore + public boolean isBufferedOperator(); + + // public void setBufferedOperator(boolean bo); + @JsonProperty("@id") public int getOperatorId(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java index 17848d0af85..cb9679d1831 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java @@ -49,12 +49,19 @@ public int getOperatorType() { return CoreOperatorType.EXTERNAL_SORT_VALUE; } - // Set here, rather than the base class, because this is the only - // operator, at present, that makes use of the maximum allocation. - // Remove this, in favor of the base class version, when Drill - // sets the memory allocation for all operators. - + /** + * + * @param maxAllocation The max memory allocation to be set + */ + @Override public void setMaxAllocation(long maxAllocation) { this.maxAllocation = maxAllocation; } + + /** + * The External Sort operator supports spilling + * @return true + */ + @Override + public boolean isBufferedOperator() { return true; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java index 4dafbe8a202..0614dc45da4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashAggregate.java @@ -21,6 +21,7 @@ import org.apache.drill.exec.physical.base.AbstractSingle; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.planner.physical.AggPrelBase; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import com.fasterxml.jackson.annotation.JsonCreator; @@ -34,6 +35,7 @@ public class HashAggregate extends AbstractSingle { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregate.class); + private final AggPrelBase.OperatorPhase aggPhase; private final List groupByExprs; private final List aggrExprs; @@ -41,15 +43,19 @@ public class HashAggregate extends AbstractSingle { @JsonCreator public HashAggregate(@JsonProperty("child") PhysicalOperator child, + @JsonProperty("phase") AggPrelBase.OperatorPhase aggPhase, @JsonProperty("keys") List groupByExprs, @JsonProperty("exprs") List aggrExprs, @JsonProperty("cardinality") float cardinality) { super(child); + this.aggPhase = aggPhase; this.groupByExprs = groupByExprs; this.aggrExprs = aggrExprs; this.cardinality = cardinality; } + public AggPrelBase.OperatorPhase getAggPhase() { return aggPhase; } + public List getGroupByExprs() { return groupByExprs; } @@ -69,7 +75,9 @@ public T accept(PhysicalVisitor physicalVis @Override protected PhysicalOperator getNewWithChild(PhysicalOperator child) { - return new HashAggregate(child, groupByExprs, aggrExprs, cardinality); + HashAggregate newHAG = new HashAggregate(child, aggPhase, groupByExprs, aggrExprs, cardinality); + newHAG.setMaxAllocation(getMaxAllocation()); + return newHAG; } @Override @@ -77,5 +85,18 @@ public int getOperatorType() { return CoreOperatorType.HASH_AGGREGATE_VALUE; } - + /** + * + * @param maxAllocation The max memory allocation to be set + */ + @Override + public void setMaxAllocation(long maxAllocation) { + this.maxAllocation = maxAllocation; + } + /** + * The Hash Aggregate operator supports spilling + * @return true + */ + @Override + public boolean isBufferedOperator() { return true; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index dc913b16697..97e059921e3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.List; +import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.ErrorCollector; @@ -55,7 +56,6 @@ import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.ValueVector; -import com.google.common.collect.Lists; import com.sun.codemodel.JExpr; import com.sun.codemodel.JVar; @@ -63,12 +63,13 @@ public class HashAggBatch extends AbstractRecordBatch { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggBatch.class); private HashAggregator aggregator; - private final RecordBatch incoming; + private RecordBatch incoming; private LogicalExpression[] aggrExprs; private TypedFieldId[] groupByOutFieldIds; private TypedFieldId[] aggrOutFieldIds; // field ids for the outgoing batch private final List comparators; private BatchSchema incomingSchema; + private boolean wasKilled; private final GeneratorMapping UPDATE_AGGR_INSIDE = GeneratorMapping.create("setupInterior" /* setup method */, "updateAggrValuesInternal" /* eval method */, @@ -87,6 +88,7 @@ public class HashAggBatch extends AbstractRecordBatch { public HashAggBatch(HashAggregate popConfig, RecordBatch incoming, FragmentContext context) throws ExecutionSetupException { super(popConfig, context); this.incoming = incoming; + wasKilled = false; final int numGrpByExprs = popConfig.getGroupByExprs().size(); comparators = Lists.newArrayListWithExpectedSize(numGrpByExprs); @@ -136,15 +138,36 @@ public IterOutcome innerNext() { return IterOutcome.NONE; } - if (aggregator.buildComplete() && !aggregator.allFlushed()) { - // aggregation is complete and not all records have been output yet - return aggregator.outputCurrentBatch(); + // if aggregation is complete and not all records have been output yet + if (aggregator.buildComplete() || + // or: 1st phase need to return (not fully grouped) partial output due to memory pressure + aggregator.earlyOutput()) { + // then output the next batch downstream + HashAggregator.AggIterOutcome aggOut = aggregator.outputCurrentBatch(); + // if Batch returned, or end of data - then return the appropriate iter outcome + if ( aggOut == HashAggregator.AggIterOutcome.AGG_NONE ) { return IterOutcome.NONE; } + if ( aggOut == HashAggregator.AggIterOutcome.AGG_OK ) { return IterOutcome.OK; } + // if RESTART - continue below with doWork() - read some spilled partition, just like reading incoming + incoming = aggregator.getNewIncoming(); // Restart - incoming was just changed + } + + if (wasKilled) { // if kill() was called before, then finish up + aggregator.cleanup(); + incoming.kill(false); + return IterOutcome.NONE; } - logger.debug("Starting aggregator doWork; incoming record count = {} ", incoming.getRecordCount()); + // Read and aggregate records + // ( may need to run again if the spilled partition that was read + // generated new partitions that were all spilled ) + AggOutcome out; + do { + // + // Read incoming batches and process their records + // + out = aggregator.doWork(); + } while (out == AggOutcome.CALL_WORK_AGAIN); - AggOutcome out = aggregator.doWork(); - logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount()); switch (out) { case CLEANUP_AND_RETURN: container.zeroVectors(); @@ -153,6 +176,7 @@ public IterOutcome innerNext() { // fall through case RETURN_OUTCOME: return aggregator.getOutcome(); + case UPDATE_AGGREGATOR: context.fail(UserException.unsupportedError() .message(SchemaChangeException.schemaChanged( @@ -175,7 +199,6 @@ public IterOutcome innerNext() { * @return true if the aggregator was setup successfully. false if there was a failure. */ private boolean createAggregator() { - logger.debug("Creating new aggregator."); try { stats.startSetup(); this.aggregator = createAggregatorInternal(); @@ -198,7 +221,7 @@ private HashAggregator createAggregatorInternal() throws SchemaChangeException, ClassGenerator cgInner = cg.getInnerGenerator("BatchHolder"); top.plainJavaCapable(true); // Uncomment out this line to debug the generated code. -// top.saveCodeForDebugging(true); + // top.saveCodeForDebugging(true); container.clear(); @@ -266,7 +289,7 @@ private HashAggregator createAggregatorInternal() throws SchemaChangeException, HashTable.DEFAULT_LOAD_FACTOR, popConfig.getGroupByExprs(), null /* no probe exprs */, comparators); agg.setup(popConfig, htConfig, context, this.stats, - oContext.getAllocator(), incoming, this, + oContext, incoming, this, aggrExprs, cgInner.getWorkspaceTypes(), groupByOutFieldIds, @@ -314,6 +337,7 @@ public void close() { @Override protected void killIncoming(boolean sendUpstream) { + wasKilled = true; incoming.kill(sendUpstream); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 1615200f274..38f0222cbad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -18,82 +18,155 @@ package org.apache.drill.exec.physical.impl.aggregate; import java.io.IOException; +import java.io.OutputStream; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.TimeUnit; import javax.inject.Named; +import com.google.common.base.Stopwatch; + +import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.LogicalExpression; + +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.cache.VectorAccessibleSerializable; import org.apache.drill.exec.compile.sig.RuntimeOverridden; import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; + +import org.apache.drill.exec.memory.BaseAllocator; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.physical.base.AbstractBase; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.physical.impl.common.ChainedHashTable; import org.apache.drill.exec.physical.impl.common.HashTable; import org.apache.drill.exec.physical.impl.common.HashTableConfig; import org.apache.drill.exec.physical.impl.common.HashTableStats; import org.apache.drill.exec.physical.impl.common.IndexPointer; + +import org.apache.drill.exec.physical.impl.spill.RecordBatchSizer; + +import org.apache.drill.exec.physical.impl.spill.SpillSet; +import org.apache.drill.exec.planner.physical.AggPrelBase; + +import org.apache.drill.exec.proto.UserBitShared; + import org.apache.drill.exec.record.MaterializedField; + import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.RecordBatch.IterOutcome; -import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.BatchSchema; + import org.apache.drill.exec.record.VectorContainer; + +import org.apache.drill.exec.record.TypedFieldId; + +import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.WritableBatch; + import org.apache.drill.exec.vector.AllocationHelper; + import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.ObjectVector; import org.apache.drill.exec.vector.ValueVector; + import org.apache.drill.exec.vector.VariableWidthVector; +import static org.apache.drill.exec.record.RecordBatch.MAX_BATCH_SIZE; + public abstract class HashAggTemplate implements HashAggregator { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class); + protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class); -// private static final long ALLOCATOR_INITIAL_RESERVATION = 1 * 1024 * 1024; -// private static final long ALLOCATOR_MAX_RESERVATION = 20L * 1000 * 1000 * 1000; - private static final int VARIABLE_WIDTH_VALUE_SIZE = 50; + private static final int VARIABLE_MAX_WIDTH_VALUE_SIZE = 50; + private static final int VARIABLE_MIN_WIDTH_VALUE_SIZE = 8; private static final boolean EXTRA_DEBUG_1 = false; private static final boolean EXTRA_DEBUG_2 = false; -// private static final String TOO_BIG_ERROR = -// "Couldn't add value to an empty batch. This likely means that a single value is too long for a varlen field."; -// private boolean newSchema = false; + private static final boolean EXTRA_DEBUG_SPILL = false; + + // Fields needed for partitioning (the groups into partitions) + private int numPartitions = 0; // must be 2 to the power of bitsInMask (set in setup()) + private int partitionMask; // numPartitions - 1 + private int bitsInMask; // number of bits in the MASK + private int nextPartitionToReturn = 0; // which partition to return the next batch from + // The following members are used for logging, metrics, etc. + private int rowsInPartition = 0; // counts #rows in each partition + private int rowsNotSpilled = 0; + private int rowsSpilled = 0; + private int rowsSpilledReturned = 0; + private int rowsReturnedEarly = 0; + + private boolean isTwoPhase = false; // 1 phase or 2 phase aggr? + private boolean is2ndPhase = false; + private boolean canSpill = true; // make it false in case can not spill + private ChainedHashTable baseHashTable; + private boolean earlyOutput = false; // when 1st phase returns a partition due to no memory + private int earlyPartition = 0; // which partition to return early + + private long memoryLimit; // max memory to be used by this oerator + private long estMaxBatchSize = 0; // used for adjusting #partitions + private long estRowWidth = 0; + private int maxColumnWidth = VARIABLE_MIN_WIDTH_VALUE_SIZE; // to control memory allocation for varchars + private long minBatchesPerPartition; // for tuning - num partitions and spill decision + private long plannedBatches = 0; // account for planned, but not yet allocated batches + private int underlyingIndex = 0; private int currentIndex = 0; private IterOutcome outcome; -// private int outputCount = 0; private int numGroupedRecords = 0; - private int outBatchIndex = 0; + private int currentBatchRecordCount = 0; // Performance: Avoid repeated calls to getRecordCount() + private int lastBatchOutputCount = 0; private RecordBatch incoming; -// private BatchSchema schema; + private BatchSchema schema; private HashAggBatch outgoing; private VectorContainer outContainer; -// private FragmentContext context; + + private FragmentContext context; + private OperatorContext oContext; private BufferAllocator allocator; -// private HashAggregate hashAggrConfig; - private HashTable htable; - private ArrayList batchHolders; + private HashTable htables[]; + private ArrayList batchHolders[]; + private int outBatchIndex[]; + + // For handling spilling + private SpillSet spillSet; + SpilledRecordbatch newIncoming; // when reading a spilled file - work like an "incoming" + private OutputStream outputStream[]; // an output stream for each spilled partition + private int spilledBatchesCount[]; // count number of batches spilled, in each partition + private String spillFiles[]; + private int cycleNum = 0; // primary, secondary, tertiary, etc. + private int originalPartition = -1; // the partition a secondary reads from + + private static class SpilledPartition { public int spilledBatches; public String spillFile; int cycleNum; int origPartn; int prevOrigPartn; } + + private ArrayList spilledPartitionsList; + private int operatorId; // for the spill file name + private IndexPointer htIdxHolder; // holder for the Hashtable's internal index returned by put() private IndexPointer outStartIdxHolder; private IndexPointer outNumRecordsHolder; private int numGroupByOutFields = 0; // Note: this should be <= number of group-by fields - - ErrorCollector collector = new ErrorCollectorImpl(); + private TypedFieldId[] groupByOutFieldIds; private MaterializedField[] materializedValueFields; private boolean allFlushed = false; private boolean buildComplete = false; + private boolean handlingSpills = false; // True once starting to process spill files private OperatorStats stats = null; private HashTableStats htStats = new HashTableStats(); @@ -103,7 +176,15 @@ public enum Metric implements MetricDef { NUM_BUCKETS, NUM_ENTRIES, NUM_RESIZING, - RESIZING_TIME; + RESIZING_TIME, + NUM_PARTITIONS, + SPILLED_PARTITIONS, // number of partitions spilled to disk + SPILL_MB, // Number of MB of data spilled to disk. This amount is first written, + // then later re-read. So, disk I/O is twice this amount. + // For first phase aggr -- this is an estimate of the amount of data + // returned early (analogous to a spill in the 2nd phase). + SPILL_CYCLE // 0 - no spill, 1 - spill, 2 - SECONDARY, 3 - TERTIARY + ; // duplicate for hash ag @@ -121,7 +202,6 @@ public class BatchHolder { private int batchOutputCount = 0; private int capacity = Integer.MAX_VALUE; - private boolean allocatedNextBatch = false; @SuppressWarnings("resource") public BatchHolder() { @@ -145,8 +225,8 @@ public BatchHolder() { if (vector instanceof FixedWidthVector) { ((FixedWidthVector) vector).allocateNew(HashTable.BATCH_SIZE); } else if (vector instanceof VariableWidthVector) { - ((VariableWidthVector) vector).allocateNew(HashTable.VARIABLE_WIDTH_VECTOR_SIZE * HashTable.BATCH_SIZE, - HashTable.BATCH_SIZE); + // This case is never used .... a varchar falls under ObjectVector which is allocated on the heap ! + ((VariableWidthVector) vector).allocateNew(maxColumnWidth, HashTable.BATCH_SIZE); } else if (vector instanceof ObjectVector) { ((ObjectVector) vector).allocateNew(HashTable.BATCH_SIZE); } else { @@ -166,20 +246,23 @@ public BatchHolder() { } private boolean updateAggrValues(int incomingRowIdx, int idxWithinBatch) { - updateAggrValuesInternal(incomingRowIdx, idxWithinBatch); + try { updateAggrValuesInternal(incomingRowIdx, idxWithinBatch); } + catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc); } maxOccupiedIdx = Math.max(maxOccupiedIdx, idxWithinBatch); return true; } private void setup() { - setupInterior(incoming, outgoing, aggrValuesContainer); + try { setupInterior(incoming, outgoing, aggrValuesContainer); } + catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc);} } private void outputValues(IndexPointer outStartIdxHolder, IndexPointer outNumRecordsHolder) { outStartIdxHolder.value = batchOutputCount; outNumRecordsHolder.value = 0; for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) { - outputRecordValues(i, batchOutputCount); + try { outputRecordValues(i, batchOutputCount); } + catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc);} if (EXTRA_DEBUG_2) { logger.debug("Outputting values to output index: {}", batchOutputCount); } @@ -204,24 +287,23 @@ private int getNumPendingOutput() { @RuntimeOverridden public void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing, - @Named("aggrValuesContainer") VectorContainer aggrValuesContainer) { + @Named("aggrValuesContainer") VectorContainer aggrValuesContainer) throws SchemaChangeException { } @RuntimeOverridden - public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) { + public void updateAggrValuesInternal(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException{ } @RuntimeOverridden - public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) { + public void outputRecordValues(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) throws SchemaChangeException{ } } - @Override public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, - OperatorStats stats, BufferAllocator allocator, RecordBatch incoming, HashAggBatch outgoing, - LogicalExpression[] valueExprs, List valueFieldIds, TypedFieldId[] groupByOutFieldIds, - VectorContainer outContainer) throws SchemaChangeException, ClassTransformationException, IOException { + OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, + LogicalExpression[] valueExprs, List valueFieldIds, TypedFieldId[] groupByOutFieldIds, + VectorContainer outContainer) throws SchemaChangeException, IOException { if (valueExprs == null || valueFieldIds == null) { throw new IllegalArgumentException("Invalid aggr value exprs or workspace variables."); @@ -230,15 +312,34 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme throw new IllegalArgumentException("Wrong number of workspace variables."); } -// this.context = context; + this.context = context; this.stats = stats; - this.allocator = allocator; + this.allocator = oContext.getAllocator(); + this.oContext = oContext; this.incoming = incoming; -// this.schema = incoming.getSchema(); this.outgoing = outgoing; this.outContainer = outContainer; + this.operatorId = hashAggrConfig.getOperatorId(); + + is2ndPhase = hashAggrConfig.getAggPhase() == AggPrelBase.OperatorPhase.PHASE_2of2; + isTwoPhase = hashAggrConfig.getAggPhase() != AggPrelBase.OperatorPhase.PHASE_1of1; + canSpill = isTwoPhase; // single phase can not spill + + // Typically for testing - force a spill after a partition has more than so many batches + minBatchesPerPartition = context.getConfig().getLong(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION); + + // Set the memory limit + memoryLimit = allocator.getLimit(); + // Optional configured memory limit, typically used only for testing. + long configLimit = context.getConfig().getLong(ExecConstants.HASHAGG_MAX_MEMORY); + if (configLimit > 0) { + logger.warn("Memory limit was changed to {}",configLimit); + memoryLimit = Math.min(memoryLimit, configLimit); + allocator.setLimit(memoryLimit); // enforce at the allocator + } -// this.hashAggrConfig = hashAggrConfig; + // All the settings that require the number of partitions were moved into delayedSetup() + // which would be called later, after the actuall data first arrives // currently, hash aggregation is only applicable if there are group-by expressions. // For non-grouped (a.k.a Plain) aggregations that don't involve DISTINCT, there is no @@ -266,112 +367,278 @@ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, Fragme } } - ChainedHashTable ht = + spillSet = new SpillSet(context,hashAggrConfig, UserBitShared.CoreOperatorType.HASH_AGGREGATE); + baseHashTable = new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); - this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); - + this.groupByOutFieldIds = groupByOutFieldIds; // retain these for delayedSetup, and to allow recreating hash tables (after a spill) numGroupByOutFields = groupByOutFieldIds.length; - batchHolders = new ArrayList(); - // First BatchHolder is created when the first put request is received. doSetup(incoming); } + /** + * Delayed setup are the parts from setup() that can only be set after actual data arrives in incoming + * This data is used to compute the number of partitions. + */ + private void delayedSetup() { + + // Set the number of partitions from the configuration (raise to a power of two, if needed) + numPartitions = context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS); + if ( numPartitions == 1 ) { + canSpill = false; + logger.warn("Spilling was disabled due to configuration setting of num_partitions to 1"); + } + numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2 + + if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an empty batch + else { + // Estimate the max batch size; should use actual data (e.g. lengths of varchars) + updateEstMaxBatchSize(incoming); + } + long memAvail = memoryLimit - allocator.getAllocatedMemory(); + if ( !canSpill ) { // single phase, or spill disabled by configuation + numPartitions = 1; // single phase should use only a single partition (to save memory) + } else { // two phase + // Adjust down the number of partitions if needed - when the memory available can not hold as + // many batches (configurable option), plus overhead (e.g. hash table, links, hash values)) + while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 8 * 1024 * 1024) > memAvail ) { + numPartitions /= 2; + if ( numPartitions < 2) { + if ( is2ndPhase ) { + canSpill = false; // 2nd phase needs at least 2 to make progress + logger.warn("Spilling was disabled - not enough memory available for internal partitioning"); + } + break; + } + } + } + logger.debug("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", + numPartitions, canSpill ? "Can" : "Cannot"); + + // The following initial safety check should be revisited once we can lower the number of rows in a batch + // In cases of very tight memory -- need at least memory to process one batch, plus overhead (e.g. hash table) + if ( numPartitions == 1 ) { + // if too little memory - behave like the old code -- no memory limit for hash aggregate + allocator.setLimit(AbstractBase.MAX_ALLOCATION); // 10_000_000_000L + } + // Based on the number of partitions: Set the mask and bit count + partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F + bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 + + // Create arrays (one entry per partition) + htables = new HashTable[numPartitions] ; + batchHolders = (ArrayList[]) new ArrayList[numPartitions] ; + outBatchIndex = new int[numPartitions] ; + outputStream = new OutputStream[numPartitions]; + spilledBatchesCount = new int[numPartitions]; + spillFiles = new String[numPartitions]; + spilledPartitionsList = new ArrayList(); + + plannedBatches = numPartitions; // each partition should allocate its first batch + + // initialize every (per partition) entry in the arrays + for (int i = 0; i < numPartitions; i++ ) { + try { + this.htables[i] = baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions); + this.htables[i].setMaxVarcharSize(maxColumnWidth); + } catch (ClassTransformationException e) { + throw UserException.unsupportedError(e) + .message("Code generation error - likely an error in the code.") + .build(logger); + } catch (IOException e) { + throw UserException.resourceError(e) + .message("IO Error while creating a hash table.") + .build(logger); + } catch (SchemaChangeException sce) { + throw new IllegalStateException("Unexpected Schema Change while creating a hash table",sce); + } + this.batchHolders[i] = new ArrayList(); // First BatchHolder is created when the first put request is received. + } + } + /** + * get new incoming: (when reading spilled files like an "incoming") + * @return The (newly replaced) incoming + */ + @Override + public RecordBatch getNewIncoming() { return newIncoming; } + + private void initializeSetup(RecordBatch newIncoming) throws SchemaChangeException, IOException { + baseHashTable.updateIncoming(newIncoming); // after a spill - a new incoming + this.incoming = newIncoming; + currentBatchRecordCount = newIncoming.getRecordCount(); // first batch in this spill file + nextPartitionToReturn = 0; + for (int i = 0; i < numPartitions; i++ ) { + htables[i].reinit(newIncoming); + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { + bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = new ArrayList(); + } + outBatchIndex[i] = 0; + outputStream[i] = null; + spilledBatchesCount[i] = 0; + spillFiles[i] = null; + } + } + + /** + * Update the estimated max batch size to be used in the Hash Aggr Op. + * using the record batch size to get the row width. + * @param incoming + */ + private void updateEstMaxBatchSize(RecordBatch incoming) { + if ( estMaxBatchSize > 0 ) { return; } // no handling of a schema (or varchar) change + RecordBatchSizer sizer = new RecordBatchSizer(incoming); + logger.trace("Incoming sizer: {}",sizer); + // An empty batch only has the schema, can not tell actual length of varchars + // else use the actual varchars length, each capped at 50 (to match the space allocation) + estRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : sizer.netRowWidthCap50(); + estMaxBatchSize = estRowWidth * MAX_BATCH_SIZE; + + // Get approx max (varchar) column width to get better memory allocation + maxColumnWidth = Math.max(sizer.maxSize(), VARIABLE_MIN_WIDTH_VALUE_SIZE); + maxColumnWidth = Math.min(maxColumnWidth, VARIABLE_MAX_WIDTH_VALUE_SIZE); + + logger.trace("{} phase. Estimated row width: {} batch size: {} memory limit: {} max column width: {}", + isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estMaxBatchSize,memoryLimit,maxColumnWidth); + + if ( estMaxBatchSize > memoryLimit ) { + logger.warn("HashAggregate: Estimated max batch size {} is larger than the memory limit {}",estMaxBatchSize,memoryLimit); + } + } + + /** + * Read and process (i.e., insert into the hash table and aggregate) records from the current batch. + * Once complete, get the incoming NEXT batch and process it as well, etc. + * For 1st phase, may return when an early output needs to be performed. + * + * @return Agg outcome status + */ @Override public AggOutcome doWork() { - try { - // Note: Keeping the outer and inner try blocks here to maintain some similarity with - // StreamingAggregate which does somethings conditionally in the outer try block. - // In the future HashAggregate may also need to perform some actions conditionally - // in the outer try block. - - outside: - while (true) { - // loop through existing records, aggregating the values as necessary. - if (EXTRA_DEBUG_1) { - logger.debug("Starting outer loop of doWork()..."); + + while (true) { + + // This would be called only once - first time actual data arrives on incoming + if ( schema == null && incoming.getRecordCount() > 0 ) { + this.schema = incoming.getSchema(); + currentBatchRecordCount = incoming.getRecordCount(); // initialize for first non empty batch + // Calculate the number of partitions based on actual incoming data + delayedSetup(); + } + + // + // loop through existing records in this batch, aggregating the values as necessary. + // + if (EXTRA_DEBUG_1) { + logger.debug("Starting outer loop of doWork()..."); + } + for (; underlyingIndex < currentBatchRecordCount; incIndex()) { + if (EXTRA_DEBUG_2) { + logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); } - for (; underlyingIndex < incoming.getRecordCount(); incIndex()) { - if (EXTRA_DEBUG_2) { - logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); - } - checkGroupAndAggrValues(currentIndex); + checkGroupAndAggrValues(currentIndex); + // If adding a group discovered a memory pressure during 1st phase, then start + // outputing some partition downstream in order to free memory. + if ( earlyOutput ) { + outputCurrentBatch(); + incIndex(); // next time continue with the next incoming row + return AggOutcome.RETURN_OUTCOME; } + } + + if (EXTRA_DEBUG_1) { + logger.debug("Processed {} records", underlyingIndex); + } - if (EXTRA_DEBUG_1) { - logger.debug("Processed {} records", underlyingIndex); + // Cleanup the previous batch since we are done processing it. + for (VectorWrapper v : incoming) { + v.getValueVector().clear(); + } + // + // Get the NEXT input batch, initially from the upstream, later (if there was a spill) + // from one of the spill files (The spill case is handled differently here to avoid + // collecting stats on the spilled records) + // + if ( handlingSpills ) { + outcome = context.shouldContinue() ? incoming.next() : IterOutcome.STOP; + } else { + long beforeAlloc = allocator.getAllocatedMemory(); + + // Get the next RecordBatch from the incoming (i.e. upstream operator) + outcome = outgoing.next(0, incoming); + + // If incoming batch is bigger than our estimate - adjust the estimate to match + long afterAlloc = allocator.getAllocatedMemory(); + long incomingBatchSize = afterAlloc - beforeAlloc; + if ( estMaxBatchSize < incomingBatchSize) { + logger.trace("Found a bigger incoming batch: {} , prior estimate was: {}", incomingBatchSize, estMaxBatchSize); + estMaxBatchSize = incomingBatchSize; } + } - try { + if (EXTRA_DEBUG_1) { + logger.debug("Received IterOutcome of {}", outcome); + } - while (true) { - // Cleanup the previous batch since we are done processing it. - for (VectorWrapper v : incoming) { - v.getValueVector().clear(); - } - IterOutcome out = outgoing.next(0, incoming); - if (EXTRA_DEBUG_1) { - logger.debug("Received IterOutcome of {}", out); - } - switch (out) { - case OUT_OF_MEMORY: - case NOT_YET: - this.outcome = out; - return AggOutcome.RETURN_OUTCOME; - - case OK_NEW_SCHEMA: - if (EXTRA_DEBUG_1) { - logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount()); - } -// newSchema = true; - this.cleanup(); - // TODO: new schema case needs to be handled appropriately - return AggOutcome.UPDATE_AGGREGATOR; - - case OK: - resetIndex(); - if (incoming.getRecordCount() == 0) { - continue; - } else { - checkGroupAndAggrValues(currentIndex); - incIndex(); - - if (EXTRA_DEBUG_1) { - logger.debug("Continuing outside loop"); - } - continue outside; - } - - case NONE: - // outcome = out; - - buildComplete = true; - - updateStats(htable); - - // output the first batch; remaining batches will be output - // in response to each next() call by a downstream operator - - outputCurrentBatch(); - - // return setOkAndReturn(); - return AggOutcome.RETURN_OUTCOME; - - case STOP: - default: - outcome = out; - return AggOutcome.CLEANUP_AND_RETURN; - } + // Handle various results from getting the next batch + switch (outcome) { + case OUT_OF_MEMORY: + case NOT_YET: + return AggOutcome.RETURN_OUTCOME; + + case OK_NEW_SCHEMA: + if (EXTRA_DEBUG_1) { + logger.debug("Received new schema. Batch has {} records.", incoming.getRecordCount()); } + this.cleanup(); + // TODO: new schema case needs to be handled appropriately + return AggOutcome.UPDATE_AGGREGATOR; - } finally { - // placeholder... - } + case OK: + currentBatchRecordCount = incoming.getRecordCount(); // size of next batch + + resetIndex(); // initialize index (a new batch needs to be processed) + + if (EXTRA_DEBUG_1) { + logger.debug("Continue to start processing the next batch"); + } + break; + + case NONE: + resetIndex(); // initialize index (in case spill files need to be processed) + + buildComplete = true; + + updateStats(htables); + + // output the first batch; remaining batches will be output + // in response to each next() call by a downstream operator + AggIterOutcome aggOutcome = outputCurrentBatch(); + + if ( aggOutcome == AggIterOutcome.AGG_RESTART ) { + // Output of first batch returned a RESTART (all new partitions were spilled) + return AggOutcome.CALL_WORK_AGAIN; // need to read/process the next partition + } + + if ( aggOutcome != AggIterOutcome.AGG_NONE ) { outcome = IterOutcome.OK; } + + return AggOutcome.RETURN_OUTCOME; + + case STOP: + default: + return AggOutcome.CLEANUP_AND_RETURN; } - } finally { } } + /** + * Allocate space for the returned aggregate columns + * (Note DRILL-5588: Maybe can eliminate this allocation (and copy)) + * @param records + */ private void allocateOutgoing(int records) { // Skip the keys and only allocate for outputting the workspace values // (keys will be output through splitAndTransfer) @@ -382,14 +649,8 @@ private void allocateOutgoing(int records) { while (outgoingIter.hasNext()) { @SuppressWarnings("resource") ValueVector vv = outgoingIter.next().getValueVector(); -// MajorType type = vv.getField().getType(); - /* - * In build schema we use the allocation model that specifies exact record count - * so we need to stick with that allocation model until DRILL-2211 is resolved. Using - * 50 as the average bytes per value as is used in HashTable. - */ - AllocationHelper.allocatePrecomputedChildCount(vv, records, VARIABLE_WIDTH_VALUE_SIZE, 0); + AllocationHelper.allocatePrecomputedChildCount(vv, records, maxColumnWidth, 0); } } @@ -400,45 +661,82 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { - // return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { - if (htable != null) { - htable.clear(); - htable = null; + if ( schema == null ) { return; } // not set up; nothing to clean + if ( is2ndPhase && spillSet.getWriteBytes() > 0 ) { + stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled + (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0)); + } + // clean (and deallocate) each partition + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { + bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { + try { + outputStream[i].close(); + outputStream[i] = null; + spillSet.delete(spillFiles[i]); + spillFiles[i] = null; + } catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); + } + } } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { + SpilledPartition sp = spilledPartitionsList.remove(0); + try { + spillSet.delete(sp.spillFile); + } catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); + } + } + // Delete the currently handled (if any) spilled file + if ( newIncoming != null ) { newIncoming.close(); } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } - if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) /* throws SchemaChangeException /*, IOException */ { + assert htables[part] != null; + htables[part].reset(); + if ( batchHolders[part] != null) { + for (BatchHolder bh : batchHolders[part]) { bh.clear(); } - batchHolders.clear(); - batchHolders = null; + batchHolders[part].clear(); } + batchHolders[part] = new ArrayList(); // First BatchHolder is created when the first put request is received. } -// private final AggOutcome setOkAndReturn() { -// this.outcome = IterOutcome.OK; -// for (VectorWrapper v : outgoing) { -// v.getValueVector().getMutator().setValueCount(outputCount); -// } -// return AggOutcome.RETURN_OUTCOME; -// } - private final void incIndex() { underlyingIndex++; - if (underlyingIndex >= incoming.getRecordCount()) { + if (underlyingIndex >= currentBatchRecordCount) { currentIndex = Integer.MAX_VALUE; return; } - currentIndex = getVectorIndex(underlyingIndex); + try { currentIndex = getVectorIndex(underlyingIndex); } + catch (SchemaChangeException sc) { throw new UnsupportedOperationException(sc);} } private final void resetIndex() { @@ -446,71 +744,337 @@ private final void resetIndex() { incIndex(); } - private void addBatchHolder() { + private boolean isSpilled(int part) { + return outputStream[part] != null; + } + /** + * Which partition to choose for flushing out (i.e. spill or return) ? + * - The current partition (to which a new bach holder is added) has a priority, + * because its last batch holder is full. + * - Also the largest prior spilled partition has some priority, as it is already spilled; + * but spilling too few rows (e.g. a single batch) gets us nothing. + * - So the largest non-spilled partition has some priority, to get more memory freed. + * Need to weigh the above three options. + * + * @param currPart - The partition that hit the memory limit (gets a priority) + * @return The partition (number) chosen to be spilled + */ + private int chooseAPartitionToFlush(int currPart) { + if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the current partition + int currPartSize = batchHolders[currPart].size(); + if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current if size is 1 + // first find the largest spilled partition + int maxSizeSpilled = -1; + int indexMaxSpilled = -1; + for (int isp = 0; isp < numPartitions; isp++ ) { + if ( isSpilled(isp) && maxSizeSpilled < batchHolders[isp].size() ) { + maxSizeSpilled = batchHolders[isp].size(); + indexMaxSpilled = isp; + } + } + // Give the current (if already spilled) some priority + if ( isSpilled(currPart) && ( currPartSize + 1 >= maxSizeSpilled )) { + maxSizeSpilled = currPartSize ; + indexMaxSpilled = currPart; + } + // now find the largest non-spilled partition + int maxSize = -1; + int indexMax = -1; + // Use the largest spilled (if found) as a base line, with a factor of 4 + if ( indexMaxSpilled > -1 && maxSizeSpilled > 1 ) { + indexMax = indexMaxSpilled; + maxSize = 4 * maxSizeSpilled ; + } + for ( int insp = 0; insp < numPartitions; insp++) { + if ( ! isSpilled(insp) && maxSize < batchHolders[insp].size() ) { + indexMax = insp; + maxSize = batchHolders[insp].size(); + } + } + // again - priority to the current partition + if ( ! isSpilled(currPart) && (currPartSize + 1 >= maxSize) ) { + return currPart; + } + if ( maxSize <= 1 ) { // Can not make progress by spilling a single batch! + return -1; // try skipping this spill + } + return indexMax; + } + + /** + * Iterate through the batches of the given partition, writing them to a file + * + * @param part The partition (number) to spill + */ + private void spillAPartition(int part) { + + ArrayList currPartition = batchHolders[part]; + rowsInPartition = 0; + if ( EXTRA_DEBUG_SPILL ) { + logger.debug("HashAggregate: Spilling partition {} current cycle {} part size {}", part, cycleNum, currPartition.size()); + } + + if ( currPartition.size() == 0 ) { return; } // in case empty - nothing to spill + + // If this is the first spill for this partition, create an output stream + if ( ! isSpilled(part) ) { + + spillFiles[part] = spillSet.getNextSpillFile(cycleNum > 0 ? Integer.toString(cycleNum) : null); + + try { + outputStream[part] = spillSet.openForOutput(spillFiles[part]); + } catch (IOException ioe) { + throw UserException.resourceError(ioe) + .message("Hash Aggregation failed to open spill file: " + spillFiles[part]) + .build(logger); + } + } + + for (int currOutBatchIndex = 0; currOutBatchIndex < currPartition.size(); currOutBatchIndex++ ) { + + // get the number of records in the batch holder that are pending output + int numPendingOutput = currPartition.get(currOutBatchIndex).getNumPendingOutput(); + + rowsInPartition += numPendingOutput; // for logging + rowsSpilled += numPendingOutput; + + allocateOutgoing(numPendingOutput); + + currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder); + int numOutputRecords = outNumRecordsHolder.value; + + this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value); + + // set the value count for outgoing batch value vectors + /* int i = 0; */ + for (VectorWrapper v : outgoing) { + v.getValueVector().getMutator().setValueCount(numOutputRecords); + /* + // print out the first row to be spilled ( varchar, varchar, bigint ) + try { + if (i++ < 2) { + NullableVarCharVector vv = ((NullableVarCharVector) v.getValueVector()); + logger.info("FIRST ROW = {}", vv.getAccessor().get(0)); + } else { + NullableBigIntVector vv = ((NullableBigIntVector) v.getValueVector()); + logger.info("FIRST ROW = {}", vv.getAccessor().get(0)); + } + } catch (Exception e) { logger.info("While printing the first row - Got an exception = {}",e); } + */ + } + + outContainer.setRecordCount(numPendingOutput); + WritableBatch batch = WritableBatch.getBatchNoHVWrap(numPendingOutput, outContainer, false); + VectorAccessibleSerializable outputBatch = new VectorAccessibleSerializable(batch, allocator); + Stopwatch watch = Stopwatch.createStarted(); + try { + outputBatch.writeToStream(outputStream[part]); + } catch (IOException ioe) { + throw UserException.dataWriteError(ioe) + .message("Hash Aggregation failed to write to output stream: " + outputStream[part].toString()) + .build(logger); + } + outContainer.zeroVectors(); + logger.trace("HASH AGG: Took {} us to spill {} records", watch.elapsed(TimeUnit.MICROSECONDS), numPendingOutput); + } + + spilledBatchesCount[part] += currPartition.size(); // update count of spilled batches + + logger.trace("HASH AGG: Spilled {} rows from {} batches of partition {}", rowsInPartition, currPartition.size(), part); + } + + private void addBatchHolder(int part) { + BatchHolder bh = newBatchHolder(); - batchHolders.add(bh); + batchHolders[part].add(bh); if (EXTRA_DEBUG_1) { - logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size()); + logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders[part].size()); } bh.setup(); } - // Overridden in the generated class when created as plain Java code. - + // These methods are overridden in the generated class when created as plain Java code. protected BatchHolder newBatchHolder() { return new BatchHolder(); } + /** + * Output the next batch from partition "nextPartitionToReturn" + * + * @return iteration outcome (e.g., OK, NONE ...) + */ @Override - public IterOutcome outputCurrentBatch() { - if (outBatchIndex >= batchHolders.size()) { - this.outcome = IterOutcome.NONE; - return outcome; + public AggIterOutcome outputCurrentBatch() { + + // when incoming was an empty batch, just finish up + if ( schema == null ) { + logger.trace("Incoming was empty; output is an empty batch."); + this.outcome = IterOutcome.NONE; // no records were read + allFlushed = true; + return AggIterOutcome.AGG_NONE; } - // get the number of records in the batch holder that are pending output - int numPendingOutput = batchHolders.get(outBatchIndex).getNumPendingOutput(); + // Initialization (covers the case of early output) + ArrayList currPartition = batchHolders[earlyPartition]; + int currOutBatchIndex = outBatchIndex[earlyPartition]; + int partitionToReturn = earlyPartition; + + if ( ! earlyOutput ) { + // Update the next partition to return (if needed) + // skip fully returned (or spilled) partitions + while (nextPartitionToReturn < numPartitions) { + // + // If this partition was spilled - spill the rest of it and skip it + // + if ( isSpilled(nextPartitionToReturn) ) { + spillAPartition(nextPartitionToReturn); // spill the rest + SpilledPartition sp = new SpilledPartition(); + sp.spillFile = spillFiles[nextPartitionToReturn]; + sp.spilledBatches = spilledBatchesCount[nextPartitionToReturn]; + sp.cycleNum = cycleNum; // remember the current cycle + sp.origPartn = nextPartitionToReturn; // for debugging / filename + sp.prevOrigPartn = originalPartition; // for debugging / filename + spilledPartitionsList.add(sp); + + reinitPartition(nextPartitionToReturn); // free the memory + long posn = spillSet.getPosition(outputStream[nextPartitionToReturn]); + spillSet.tallyWriteBytes(posn); // for the IO stats + try { + outputStream[nextPartitionToReturn].close(); + } catch (IOException ioe) { + throw UserException.resourceError(ioe) + .message("IO Error while closing output stream") + .build(logger); + } + outputStream[nextPartitionToReturn] = null; + } + else { + currPartition = batchHolders[nextPartitionToReturn]; + currOutBatchIndex = outBatchIndex[nextPartitionToReturn]; + // If curr batch (partition X index) is not empty - proceed to return it + if (currOutBatchIndex < currPartition.size() && 0 != currPartition.get(currOutBatchIndex).getNumPendingOutput()) { + break; + } + } + nextPartitionToReturn++; // else check next partition + } + + // if passed the last partition - either done or need to restart and read spilled partitions + if (nextPartitionToReturn >= numPartitions) { + // The following "if" is probably never used; due to a similar check at the end of this method + if ( spilledPartitionsList.isEmpty() ) { // and no spilled partitions + allFlushed = true; + this.outcome = IterOutcome.NONE; + if ( is2ndPhase ) { + stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled + (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0)); + } + return AggIterOutcome.AGG_NONE; // then return NONE + } + // Else - there are still spilled partitions to process - pick one and handle just like a new incoming + buildComplete = false; // go back and call doWork() again + handlingSpills = true; // beginning to work on the spill files + // pick a spilled partition; set a new incoming ... + SpilledPartition sp = spilledPartitionsList.remove(0); + // Create a new "incoming" out of the spilled partition spill file + newIncoming = new SpilledRecordbatch(sp.spillFile, sp.spilledBatches, context, schema, oContext, spillSet); + originalPartition = sp.origPartn; // used for the filename + logger.trace("Reading back spilled original partition {} as an incoming",originalPartition); + // Initialize .... new incoming, new set of partitions + try { initializeSetup(newIncoming); } catch (Exception e) { throw new RuntimeException(e); } + // update the cycle num if needed + // The current cycle num should always be one larger than in the spilled partition + if ( cycleNum == sp.cycleNum ) { + cycleNum = 1 + sp.cycleNum; + stats.setLongStat(Metric.SPILL_CYCLE, cycleNum); // update stats + // report first spill or memory stressful situations + if ( cycleNum == 1 ) { logger.info("Started reading spilled records "); } + if ( cycleNum == 2 ) { logger.info("SECONDARY SPILLING "); } + if ( cycleNum == 3 ) { logger.warn("TERTIARY SPILLING "); } + if ( cycleNum == 4 ) { logger.warn("QUATERNARY SPILLING "); } + if ( cycleNum == 5 ) { logger.warn("QUINARY SPILLING "); } + } + if ( EXTRA_DEBUG_SPILL ) { + logger.debug("Start reading spilled partition {} (prev {}) from cycle {} (with {} batches). More {} spilled partitions left.", + sp.origPartn, sp.prevOrigPartn, sp.cycleNum, sp.spilledBatches, spilledPartitionsList.size()); + } + return AggIterOutcome.AGG_RESTART; + } + + partitionToReturn = nextPartitionToReturn ; - if (numPendingOutput == 0) { - this.outcome = IterOutcome.NONE; - return outcome; } + // get the number of records in the batch holder that are pending output + int numPendingOutput = currPartition.get(currOutBatchIndex).getNumPendingOutput(); + + // The following accounting is for logging, metrics, etc. + rowsInPartition += numPendingOutput ; + if ( ! handlingSpills ) { rowsNotSpilled += numPendingOutput; } + else { rowsSpilledReturned += numPendingOutput; } + if ( earlyOutput ) { rowsReturnedEarly += numPendingOutput; } + allocateOutgoing(numPendingOutput); - batchHolders.get(outBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder); + currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder); int numOutputRecords = outNumRecordsHolder.value; if (EXTRA_DEBUG_1) { logger.debug("After output values: outStartIdx = {}, outNumRecords = {}", outStartIdxHolder.value, outNumRecordsHolder.value); } - this.htable.outputKeys(outBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value); + + this.htables[partitionToReturn].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value); // set the value count for outgoing batch value vectors for (VectorWrapper v : outgoing) { v.getValueVector().getMutator().setValueCount(numOutputRecords); } -// outputCount += numOutputRecords; - this.outcome = IterOutcome.OK; - logger.debug("HashAggregate: Output current batch index {} with {} records.", outBatchIndex, numOutputRecords); + if ( EXTRA_DEBUG_SPILL && is2ndPhase ) { + logger.debug("So far returned {} + SpilledReturned {} total {} (spilled {})",rowsNotSpilled,rowsSpilledReturned, + rowsNotSpilled+rowsSpilledReturned, + rowsSpilled); + } lastBatchOutputCount = numOutputRecords; - outBatchIndex++; - if (outBatchIndex == batchHolders.size()) { - allFlushed = true; + outBatchIndex[partitionToReturn]++; + // if just flushed the last batch in the partition + if (outBatchIndex[partitionToReturn] == currPartition.size()) { + + if ( EXTRA_DEBUG_SPILL ) { + logger.debug("HashAggregate: {} Flushed partition {} with {} batches total {} rows", + earlyOutput ? "(Early)" : "", + partitionToReturn, outBatchIndex[partitionToReturn], rowsInPartition); + } + rowsInPartition = 0; // reset to count for the next partition + + // deallocate memory used by this partition, and re-initialize + reinitPartition(partitionToReturn); - logger.debug("HashAggregate: All batches flushed."); + if ( earlyOutput ) { - // cleanup my internal state since there is nothing more to return - this.cleanup(); + if ( EXTRA_DEBUG_SPILL ) { + logger.debug("HASH AGG: Finished (early) re-init partition {}, mem allocated: {}", earlyPartition, allocator.getAllocatedMemory()); + } + outBatchIndex[earlyPartition] = 0; // reset, for next time + earlyOutput = false ; // done with early output + } + else if ( (partitionToReturn + 1 == numPartitions) && spilledPartitionsList.isEmpty() ) { // last partition ? + + allFlushed = true; // next next() call will return NONE + + logger.trace("HashAggregate: All batches flushed."); + + // cleanup my internal state since there is nothing more to return + this.cleanup(); + } } - return this.outcome; + return AggIterOutcome.AGG_OK; } @Override @@ -522,11 +1086,33 @@ public boolean allFlushed() { public boolean buildComplete() { return buildComplete; } + @Override + public boolean earlyOutput() { return earlyOutput; } public int numGroupedRecords() { return numGroupedRecords; } + /** + * Generate a detailed error message in case of "Out Of Memory" + * @return err msg + */ + private String getOOMErrorMsg() { + String errmsg; + if ( !isTwoPhase ) { + errmsg = "Single Phase Hash Aggregate operator can not spill." ; + } else if ( ! canSpill ) { // 2nd phase, with only 1 partition + errmsg = "Too little memory available to operator to facilitate spilling."; + } else { // a bug ? + errmsg = "OOM at " + (is2ndPhase ? "Second Phase" : "First Phase") + ". Partitions: " + numPartitions + + ". Estimated batch size: " + estMaxBatchSize + ". Planned batches: " + plannedBatches; + if ( rowsSpilled > 0 ) { errmsg += ". Rows spilled so far: " + rowsSpilled; } + } + errmsg += " Memory limit: " + allocator.getLimit() + " so far allocated: " + allocator.getAllocatedMemory() + ". "; + + return errmsg; + } + // Check if a group is present in the hash table; if not, insert it in the hash table. // The htIdxHolder contains the index of the group in the hash table container; this same // index is also used for the aggregation values maintained by the hash aggregate. @@ -535,6 +1121,8 @@ private void checkGroupAndAggrValues(int incomingRowIdx) { throw new IllegalArgumentException("Invalid incoming row index."); } + assert ! earlyOutput; + /** for debugging Object tmp = (incoming).getValueAccessorById(0, BigIntVector.class).getValueVector(); BigIntVector vv0 = null; @@ -546,44 +1134,189 @@ private void checkGroupAndAggrValues(int incomingRowIdx) { holder.value = vv0.getAccessor().get(incomingRowIdx) ; } */ + /* + if ( handlingSpills && ( incomingRowIdx == 0 ) ) { + // for debugging -- show the first row from a spilled batch + Object tmp0 = (incoming).getValueAccessorById(NullableVarCharVector.class, 0).getValueVector(); + Object tmp1 = (incoming).getValueAccessorById(NullableVarCharVector.class, 1).getValueVector(); + Object tmp2 = (incoming).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector(); + + if (tmp0 != null && tmp1 != null && tmp2 != null) { + NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0); + NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1); + NullableBigIntVector vv2 = ((NullableBigIntVector) tmp2); + logger.debug("The first row = {} , {} , {}", vv0.getAccessor().get(incomingRowIdx), vv1.getAccessor().get(incomingRowIdx), vv2.getAccessor().get(incomingRowIdx)); + } + } + */ + // The hash code is computed once, then its lower bits are used to determine the + // partition to use, and the higher bits determine the location in the hash table. + int hashCode; + try { + htables[0].updateBatches(); + hashCode = htables[0].getHashCode(incomingRowIdx); + } catch (SchemaChangeException e) { + throw new UnsupportedOperationException("Unexpected schema change", e); + } - htable.put(incomingRowIdx, htIdxHolder, 1 /* retry count */); + // right shift hash code for secondary (or tertiary...) spilling + for (int i = 0; i < cycleNum; i++) { hashCode >>>= bitsInMask; } - int currentIdx = htIdxHolder.value; + int currentPartition = hashCode & partitionMask ; + hashCode >>>= bitsInMask; + HashTable.PutStatus putStatus = null; + long allocatedBefore = allocator.getAllocatedMemory(); - // get the batch index and index within the batch - if (currentIdx >= batchHolders.size() * HashTable.BATCH_SIZE) { - addBatchHolder(); + // Insert the key columns into the hash table + try { + putStatus = htables[currentPartition].put(incomingRowIdx, htIdxHolder, hashCode); + } catch (OutOfMemoryException exc) { + throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may happen when can not spill + } catch (SchemaChangeException e) { + throw new UnsupportedOperationException("Unexpected schema change", e); } - BatchHolder bh = batchHolders.get((currentIdx >>> 16) & HashTable.BATCH_MASK); - int idxWithinBatch = currentIdx & HashTable.BATCH_MASK; + int currentIdx = htIdxHolder.value; - // Check if we have almost filled up the workspace vectors and add a batch if necessary - if ((idxWithinBatch == (bh.capacity - 1)) && (bh.allocatedNextBatch == false)) { - htable.addNewKeyBatch(); - addBatchHolder(); - bh.allocatedNextBatch = true; + long addedMem = allocator.getAllocatedMemory() - allocatedBefore; + if ( addedMem > 0 ) { + logger.trace("MEMORY CHECK HT: allocated {} added {} partition {}",allocatedBefore,addedMem,currentPartition); } + // Check if put() added a new batch (for the keys) inside the hash table, hence a matching batch + // (for the aggregate columns) needs to be created + if ( putStatus == HashTable.PutStatus.NEW_BATCH_ADDED ) { + try { + long allocatedBeforeAggCol = allocator.getAllocatedMemory(); + + addBatchHolder(currentPartition); + + if ( plannedBatches > 0 ) { plannedBatches--; } // just allocated a planned batch + long totalAddedMem = allocator.getAllocatedMemory() - allocatedBefore; + logger.trace("MEMORY CHECK AGG: added {} total (with HT) added {}",allocator.getAllocatedMemory()-allocatedBeforeAggCol,totalAddedMem); + // resize the batch estimate if needed (e.g., varchars may take more memory than estimated) + if ( totalAddedMem > estMaxBatchSize ) { + logger.trace("Adjusting Batch size estimate from {} to {}",estMaxBatchSize,totalAddedMem); + estMaxBatchSize = totalAddedMem; + } + } catch (OutOfMemoryException exc) { + throw new OutOfMemoryException(getOOMErrorMsg(), exc); // may happen when can not spill + } + } + BatchHolder bh = batchHolders[currentPartition].get((currentIdx >>> 16) & HashTable.BATCH_MASK); + int idxWithinBatch = currentIdx & HashTable.BATCH_MASK; if (bh.updateAggrValues(incomingRowIdx, idxWithinBatch)) { numGroupedRecords++; } + + // =================================================================================== + // If the last batch just became full - that is the time to check the memory limits !! + // If exceeded, then need to spill (if 2nd phase) or output early (1st) + // (Skip this if cannot spill; in such case an OOM may be encountered later) + // =================================================================================== + if ( putStatus == HashTable.PutStatus.KEY_ADDED_LAST && canSpill ) { + + plannedBatches++; // planning to allocate one more batch + + // calculate the (max) new memory needed now + long hashTableDoublingSizeNeeded = 0; // in case the hash table(s) would resize + for ( HashTable ht : htables ) { + hashTableDoublingSizeNeeded += ht.extraMemoryNeededForResize(); + } + + // Plan ahead for at least MIN batches, to account for size changing, and some overhead + long maxMemoryNeeded = minBatchesPerPartition * plannedBatches * + ( estMaxBatchSize + MAX_BATCH_SIZE * ( 4 + 4 /* links + hash-values */) ) + + hashTableDoublingSizeNeeded; + + // log a detailed debug message explaining why a spill may be needed + logger.trace("MEMORY CHECK: Allocated mem: {}, agg phase: {}, trying to add to partition {} with {} batches. " + + "Memory needed {}, Est batch size {}, mem limit {}", + allocator.getAllocatedMemory(), isTwoPhase?(is2ndPhase?"2ND":"1ST"):"Single", currentPartition, + batchHolders[currentPartition].size(), maxMemoryNeeded, estMaxBatchSize, memoryLimit); + // + // Spill if the allocated memory plus the memory needed exceeds the memory limit. + // + if ( allocator.getAllocatedMemory() + maxMemoryNeeded > memoryLimit ) { + + // Pick a "victim" partition to spill or return + int victimPartition = chooseAPartitionToFlush(currentPartition); + + // In case no partition has more than one batch -- try and "push the limits"; maybe next + // time the spill could work. + if ( victimPartition < 0 ) { return; } + + if ( is2ndPhase ) { + long before = allocator.getAllocatedMemory(); + + spillAPartition(victimPartition); + logger.trace("RAN OUT OF MEMORY: Spilled partition {}",victimPartition); + + // Re-initialize (free memory, then recreate) the partition just spilled/returned + reinitPartition(victimPartition); + + // in some "edge" cases (e.g. testing), spilling one partition may not be enough + if ( allocator.getAllocatedMemory() + maxMemoryNeeded > memoryLimit ) { + int victimPartition2 = chooseAPartitionToFlush(victimPartition); + if ( victimPartition2 < 0 ) { return; } + long after = allocator.getAllocatedMemory(); + spillAPartition(victimPartition2); + reinitPartition(victimPartition2); + logger.warn("A Second Spill was Needed: allocated before {}, after first spill {}, after second {}, memory needed {}", + before, after, allocator.getAllocatedMemory(), maxMemoryNeeded); + logger.trace("Second Partition Spilled: {}",victimPartition2); + } + } + else { + // 1st phase need to return a partition early in order to free some memory + earlyOutput = true; + earlyPartition = victimPartition; + + if ( EXTRA_DEBUG_SPILL ) { + logger.debug("picked partition {} for early output", victimPartition); + } + } + } + } } - private void updateStats(HashTable htable) { - htable.getStats(htStats); + /** + * Updates the stats at the time after all the input was read. + * Note: For spilled partitions, their hash-table stats from before the spill are lost. + * And the SPILLED_PARTITIONS only counts the spilled partitions in the primary, not SECONDARY etc. + * @param htables + */ + private void updateStats(HashTable[] htables) { + if ( cycleNum > 0 ) { return; } // These stats are only for before processing spilled files + long numSpilled = 0; + HashTableStats newStats = new HashTableStats(); + // sum the stats from all the partitions + for (int ind = 0; ind < numPartitions; ind++) { + htables[ind].getStats(newStats); + htStats.addStats(newStats); + if (isSpilled(ind)) { + numSpilled++; + } + } this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets); this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries); this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing); this.stats.setLongStat(Metric.RESIZING_TIME, htStats.resizingTime); + this.stats.setLongStat(Metric.NUM_PARTITIONS, numPartitions); + if ( is2ndPhase ) { + this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled); + } + if ( rowsReturnedEarly > 0 ) { + stats.setLongStat(Metric.SPILL_MB, // update stats - est. total MB returned early + (int) Math.round( rowsReturnedEarly * estRowWidth / 1024.0D / 1024.0)); + } } // Code-generated methods (implemented in HashAggBatch) - public abstract void doSetup(@Named("incoming") RecordBatch incoming); + public abstract void doSetup(@Named("incoming") RecordBatch incoming) throws SchemaChangeException; - public abstract int getVectorIndex(@Named("recordIndex") int recordIndex); + public abstract int getVectorIndex(@Named("recordIndex") int recordIndex) throws SchemaChangeException; - public abstract boolean resetValues(); + public abstract boolean resetValues() throws SchemaChangeException; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java index 7cc43adca2e..21d5a4a4ffc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java @@ -24,8 +24,8 @@ import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.physical.impl.common.HashTableConfig; @@ -40,13 +40,17 @@ public interface HashAggregator { new TemplateClassDefinition(HashAggregator.class, HashAggTemplate.class); public static enum AggOutcome { - RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR + RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR, CALL_WORK_AGAIN } + // For returning results from outputCurrentBatch + // OK - batch returned, NONE - end of data, RESTART - call again + public enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART } + public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, - OperatorStats stats, BufferAllocator allocator, RecordBatch incoming, HashAggBatch outgoing, - LogicalExpression[] valueExprs, List valueFieldIds, TypedFieldId[] keyFieldIds, - VectorContainer outContainer) throws SchemaChangeException, IOException, ClassTransformationException; + OperatorStats stats, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, + LogicalExpression[] valueExprs, List valueFieldIds, TypedFieldId[] keyFieldIds, + VectorContainer outContainer) throws SchemaChangeException, IOException, ClassTransformationException; public abstract IterOutcome getOutcome(); @@ -60,6 +64,9 @@ public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfi public abstract boolean buildComplete(); - public abstract IterOutcome outputCurrentBatch(); + public abstract AggIterOutcome outputCurrentBatch(); + + public abstract boolean earlyOutput(); + public abstract RecordBatch getNewIncoming(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java new file mode 100644 index 00000000000..b05353ecac7 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.aggregate; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.cache.VectorAccessibleSerializable; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.impl.spill.SpillSet; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.CloseableRecordBatch; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.WritableBatch; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; + +/** + * A class to replace "incoming" - instead scanning a spilled partition file + */ +public class SpilledRecordbatch implements CloseableRecordBatch { + private VectorContainer container; + private InputStream spillStream; + private int spilledBatches; + private FragmentContext context; + private BatchSchema schema; + private OperatorContext oContext; + private SpillSet spillSet; + // Path spillStreamPath; + private String spillFile; + VectorAccessibleSerializable vas; + + public SpilledRecordbatch(String spillFile,/* Path spillStreamPath,*/ int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet spillSet) { + this.context = context; + this.schema = schema; + this.spilledBatches = spilledBatches; + this.oContext = oContext; + this.spillSet = spillSet; + //this.spillStreamPath = spillStreamPath; + this.spillFile = spillFile; + vas = new VectorAccessibleSerializable(oContext.getAllocator()); + container = vas.get(); + + try { + this.spillStream = this.spillSet.openForInput(spillFile); + } catch (IOException e) { throw new RuntimeException(e);} + + next(); // initialize the container + } + + @Override + public SelectionVector2 getSelectionVector2() { + throw new UnsupportedOperationException(); + } + + @Override + public SelectionVector4 getSelectionVector4() { + throw new UnsupportedOperationException(); + } + + @Override + public TypedFieldId getValueVectorId(SchemaPath path) { + return container.getValueVectorId(path); + } + + @Override + public VectorWrapper getValueAccessorById(Class clazz, int... ids) { + return container.getValueAccessorById(clazz, ids); + } + + @Override + public Iterator> iterator() { + return container.iterator(); + } + + @Override + public FragmentContext getContext() { return context; } + + @Override + public BatchSchema getSchema() { return schema; } + + @Override + public WritableBatch getWritableBatch() { + return WritableBatch.get(this); + } + + @Override + public VectorContainer getOutgoingContainer() { return container; } + + @Override + public int getRecordCount() { return container.getRecordCount(); } + + @Override + public void kill(boolean sendUpstream) { + this.close(); // delete the current spill file + } + + /** + * Read the next batch from the spill file + * + * @return IterOutcome + */ + @Override + public IterOutcome next() { + + if ( spilledBatches <= 0 ) { // no more batches to read in this partition + this.close(); + return IterOutcome.NONE; + } + + if ( spillStream == null ) { + throw new IllegalStateException("Spill stream was null"); + }; + + if ( spillSet.getPosition(spillStream) < 0 ) { + HashAggTemplate.logger.warn("Position is {} for stream {}", spillSet.getPosition(spillStream), spillStream.toString()); + } + + try { + if ( container.getNumberOfColumns() > 0 ) { // container already initialized + // Pass our container to the reader because other classes (e.g. HashAggBatch, HashTable) + // may have a reference to this container (as an "incoming") + vas.readFromStreamWithContainer(container, spillStream); + } + else { // first time - create a container + vas.readFromStream(spillStream); + container = vas.get(); + } + } catch (IOException e) { + throw UserException.dataReadError(e).addContext("Failed reading from a spill file").build(HashAggTemplate.logger); + } + + spilledBatches-- ; // one less batch to read + return IterOutcome.OK; + } + + @Override + public void close() { + container.clear(); + try { + if (spillStream != null) { + spillStream.close(); + spillStream = null; + } + + spillSet.delete(spillFile); + } + catch (IOException e) { + throw new RuntimeException(e); + } finally { + spillSet.close(); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java index 77ebb0d7f51..436480ecf25 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java @@ -114,7 +114,7 @@ public class ChainedHashTable { private HashTableConfig htConfig; private final FragmentContext context; private final BufferAllocator allocator; - private final RecordBatch incomingBuild; + private RecordBatch incomingBuild; private final RecordBatch incomingProbe; private final RecordBatch outgoing; @@ -129,14 +129,18 @@ public ChainedHashTable(HashTableConfig htConfig, FragmentContext context, Buffe this.outgoing = outgoing; } - public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds) throws ClassTransformationException, + public void updateIncoming(RecordBatch incomingBuild) { + this.incomingBuild = incomingBuild; + } + + public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds, int numPartitions) throws ClassTransformationException, IOException, SchemaChangeException { CodeGenerator top = CodeGenerator.get(HashTable.TEMPLATE_DEFINITION, context.getFunctionRegistry(), context.getOptions()); top.plainJavaCapable(true); // Uncomment out this line to debug the generated code. // This code is called from generated code, so to step into this code, // persist the code generated in HashAggBatch also. -// top.saveCodeForDebugging(true); + // top.saveCodeForDebugging(true); ClassGenerator cg = top.getRoot(); ClassGenerator cgInner = cg.getInnerGenerator("BatchHolder"); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java index ef7dadfc72e..9c93c1658c6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.physical.impl.common; import org.apache.drill.exec.compile.TemplateClassDefinition; +import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; @@ -43,7 +44,7 @@ public interface HashTable { */ static final public float DEFAULT_LOAD_FACTOR = 0.75f; - static public enum PutStatus {KEY_PRESENT, KEY_ADDED, PUT_FAILED;} + static public enum PutStatus {KEY_PRESENT, KEY_ADDED, NEW_BATCH_ADDED, KEY_ADDED_LAST, PUT_FAILED;} /** * The batch size used for internal batch holders @@ -51,30 +52,35 @@ static public enum PutStatus {KEY_PRESENT, KEY_ADDED, PUT_FAILED;} static final public int BATCH_SIZE = Character.MAX_VALUE + 1; static final public int BATCH_MASK = 0x0000FFFF; - /** Variable width vector size in bytes */ - public static final int VARIABLE_WIDTH_VECTOR_SIZE = 50 * BATCH_SIZE; + public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig); - public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, - RecordBatch incomingBuild, RecordBatch incomingProbe, - RecordBatch outgoing, VectorContainer htContainerOrig); + public void updateBatches() throws SchemaChangeException; - public void updateBatches(); + public int getHashCode(int incomingRowIdx) throws SchemaChangeException; - public void put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount); + public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException; - public int containsKey(int incomingRowIdx, boolean isProbe); + public int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException; public void getStats(HashTableStats stats); + public long extraMemoryNeededForResize(); + public int size(); public boolean isEmpty(); public void clear(); + public void reinit(RecordBatch newIncoming); + + public void reset(); + + public void setMaxVarcharSize(int size); + public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords); - public void addNewKeyBatch(); + // public void addNewKeyBatch(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java index c494c857bde..7baa9d91f92 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableStats.java @@ -26,6 +26,13 @@ public class HashTableStats { public HashTableStats() { } + + public void addStats (HashTableStats newStats) { + this.numBuckets += newStats.numBuckets ; + this.numEntries += newStats.numEntries ; + this.numResizing += newStats.numResizing ; + this.resizingTime += newStats.resizingTime ; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index 96f9422f920..3209c27200a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -25,6 +25,7 @@ import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; import org.apache.drill.exec.compile.sig.RuntimeOverridden; +import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; @@ -50,14 +51,19 @@ public abstract class HashTableTemplate implements HashTable { // A hash 'bucket' consists of the start index to indicate start of a hash chain // Array of start indexes. start index is a global index across all batch holders + // This is the "classic hash table", where Hash-Value % size-of-table yields + // the offset/position (in the startIndices) of the beginning of the hash chain. private IntVector startIndices; // Array of batch holders..each batch holder can hold up to BATCH_SIZE entries private ArrayList batchHolders; - // Size of the hash table in terms of number of buckets + // Current size of the hash table in terms of number of buckets private int tableSize = 0; + // Original size of the hash table (needed when re-initializing) + private int originalTableSize; + // Threshold after which we rehash; It must be the tableSize * loadFactor private int threshold; @@ -95,6 +101,8 @@ public abstract class HashTableTemplate implements HashTable { private int resizingTime = 0; + private int maxVarcharSize = 8; // for varchar allocation + // This class encapsulates the links, keys and values for up to BATCH_SIZE // *unique* records. Thus, suppose there are N incoming record batches, each // of size BATCH_SIZE..but they have M unique keys altogether, the number of @@ -134,7 +142,9 @@ public BatchHolder(int idx) { if (vv instanceof FixedWidthVector) { ((FixedWidthVector) vv).allocateNew(BATCH_SIZE); } else if (vv instanceof VariableWidthVector) { - ((VariableWidthVector) vv).allocateNew(VARIABLE_WIDTH_VECTOR_SIZE, BATCH_SIZE); + long beforeMem = allocator.getAllocatedMemory(); + ((VariableWidthVector) vv).allocateNew(maxVarcharSize * BATCH_SIZE, BATCH_SIZE); + logger.trace("HT allocated {} for varchar of max width {}",allocator.getAllocatedMemory() - beforeMem, maxVarcharSize); } else { vv.allocateNew(); } @@ -166,7 +176,7 @@ private void init(IntVector links, IntVector hashValues, int size) { hashValues.getMutator().setValueCount(size); } - protected void setup() { + protected void setup() throws SchemaChangeException { setupInterior(incomingBuild, incomingProbe, outgoing, htContainer); } @@ -175,7 +185,7 @@ protected void setup() { // currentIdxHolder with the index of the next link. private boolean isKeyMatch(int incomingRowIdx, IndexPointer currentIdxHolder, - boolean isProbe) { + boolean isProbe) throws SchemaChangeException { int currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK; boolean match = false; @@ -201,7 +211,7 @@ private boolean isKeyMatch(int incomingRowIdx, // Insert a new entry coming from the incoming batch into the hash table // container at the specified index - private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdxWithinBatch) { + private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdxWithinBatch) throws SchemaChangeException { int currentIdxWithinBatch = currentIdx & BATCH_MASK; setValue(incomingRowIdx, currentIdxWithinBatch); @@ -405,36 +415,34 @@ protected void setupInterior( @Named("incomingBuild") RecordBatch incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe, @Named("outgoing") RecordBatch outgoing, - @Named("htContainer") VectorContainer htContainer) { + @Named("htContainer") VectorContainer htContainer) throws SchemaChangeException { } @RuntimeOverridden protected boolean isKeyMatchInternalBuild( - @Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) { + @Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException { return false; } @RuntimeOverridden protected boolean isKeyMatchInternalProbe( - @Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) { + @Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException { return false; } @RuntimeOverridden - protected void setValue(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) { + protected void setValue(@Named("incomingRowIdx") int incomingRowIdx, @Named("htRowIdx") int htRowIdx) throws SchemaChangeException { } @RuntimeOverridden - protected void outputRecordKeys(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) { + protected void outputRecordKeys(@Named("htRowIdx") int htRowIdx, @Named("outRowIdx") int outRowIdx) throws SchemaChangeException { } } // class BatchHolder @Override - public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, - RecordBatch incomingBuild, RecordBatch incomingProbe, - RecordBatch outgoing, VectorContainer htContainerOrig) { + public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) { float loadf = htConfig.getLoadFactor(); int initialCap = htConfig.getInitialCapacity(); @@ -465,6 +473,7 @@ public void setup(HashTableConfig htConfig, FragmentContext context, BufferAlloc if (tableSize > MAXIMUM_CAPACITY) { tableSize = MAXIMUM_CAPACITY; } + originalTableSize = tableSize ; // retain original size threshold = (int) Math.ceil(tableSize * loadf); @@ -476,13 +485,17 @@ public void setup(HashTableConfig htConfig, FragmentContext context, BufferAlloc batchHolders = new ArrayList(); // First BatchHolder is created when the first put request is received. - doSetup(incomingBuild, incomingProbe); + try { + doSetup(incomingBuild, incomingProbe); + } catch (SchemaChangeException e) { + throw new IllegalStateException("Unexpected schema change", e); + } currentIdxHolder = new IndexPointer(); } @Override - public void updateBatches() { + public void updateBatches() throws SchemaChangeException { doSetup(incomingBuild, incomingProbe); for (BatchHolder batchHolder : batchHolders) { batchHolder.setup(); @@ -497,6 +510,21 @@ public int numResizing() { return numResizing; } + /** + * + * @return Size of extra memory needed if the HT (i.e. startIndices) is doubled + */ + @Override + public long extraMemoryNeededForResize() { + if (tableSize == MAXIMUM_CAPACITY) { return 0; } // will not resize + int newSize = roundUpToPowerOf2(2 * tableSize); + + if (newSize > MAXIMUM_CAPACITY) { + newSize = MAXIMUM_CAPACITY; + } + return newSize * 4 /* sizeof(int) */; + } + @Override public int size() { return numEntries; @@ -526,7 +554,7 @@ public void clear() { batchHolders = null; } startIndices.clear(); - currentIdxHolder = null; + // currentIdxHolder = null; // keep IndexPointer in case HT is reused numEntries = 0; } @@ -544,86 +572,69 @@ private static int roundUpToPowerOf2(int number) { return rounded; } - @Override - public void put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount) { - put(incomingRowIdx, htIdxHolder); + public int getHashCode(int incomingRowIdx) throws SchemaChangeException { + return getHashBuild(incomingRowIdx); } - private PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder) { + /** put() uses the hash code (from gethashCode() above) to insert the key(s) from the incoming + * row into the hash table. The code selects the bucket in the startIndices, then the keys are + * placed into the chained list - by storing the key values into a batch, and updating its + * "links" member. Last it modifies the index holder to the batch offset so that the caller + * can store the remaining parts of the row into a matching batch (outside the hash table). + * Returning + * + * @param incomingRowIdx - position of the incoming row + * @param htIdxHolder - to return batch + batch-offset (for caller to manage a matching batch) + * @param hashCode - computed over the key(s) by calling getHashCode() + * @return Status - the key(s) was ADDED or was already PRESENT + */ + @Override + public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException { - int hash = getHashBuild(incomingRowIdx); - int i = getBucketIndex(hash, numBuckets()); - int startIdx = startIndices.getAccessor().get(i); + int bucketIndex = getBucketIndex(hashCode, numBuckets()); + int startIdx = startIndices.getAccessor().get(bucketIndex); int currentIdx; - int currentIdxWithinBatch; - BatchHolder bh; BatchHolder lastEntryBatch = null; int lastEntryIdxWithinBatch = EMPTY_SLOT; + // if startIdx is non-empty, follow the hash chain links until we find a matching + // key or reach the end of the chain (and remember the last link there) + for ( currentIdxHolder.value = startIdx; + currentIdxHolder.value != EMPTY_SLOT; + /* isKeyMatch() below also advances the currentIdxHolder to the next link */) { - if (startIdx == EMPTY_SLOT) { - // this is the first entry in this bucket; find the first available slot in the - // container of keys and values - currentIdx = freeIndex++; - addBatchIfNeeded(currentIdx); + // remember the current link, which would be the last when the next link is empty + lastEntryBatch = batchHolders.get((currentIdxHolder.value >>> 16) & HashTable.BATCH_MASK); + lastEntryIdxWithinBatch = currentIdxHolder.value & BATCH_MASK; - if (EXTRA_DEBUG) { - logger.debug("Empty bucket index = {}. incomingRowIdx = {}; inserting new entry at currentIdx = {}.", i, - incomingRowIdx, currentIdx); + if (lastEntryBatch.isKeyMatch(incomingRowIdx, currentIdxHolder, false)) { + htIdxHolder.value = currentIdxHolder.value; + return PutStatus.KEY_PRESENT; } - - insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch); - // update the start index array - startIndices.getMutator().setSafe(getBucketIndex(hash, numBuckets()), currentIdx); - htIdxHolder.value = currentIdx; - return PutStatus.KEY_ADDED; } - currentIdx = startIdx; - boolean found = false; - - bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK); - currentIdxHolder.value = currentIdx; - - // if startIdx is non-empty, follow the hash chain links until we find a matching - // key or reach the end of the chain - while (true) { - currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK; + // no match was found, so insert a new entry + currentIdx = freeIndex++; + boolean addedBatch = addBatchIfNeeded(currentIdx); - if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, false)) { - htIdxHolder.value = currentIdxHolder.value; - found = true; - break; - } else if (currentIdxHolder.value == EMPTY_SLOT) { - lastEntryBatch = bh; - lastEntryIdxWithinBatch = currentIdxWithinBatch; - break; - } else { - bh = batchHolders.get((currentIdxHolder.value >>> 16) & HashTable.BATCH_MASK); - lastEntryBatch = bh; - } + if (EXTRA_DEBUG) { + logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx); } - if (!found) { - // no match was found, so insert a new entry - currentIdx = freeIndex++; - addBatchIfNeeded(currentIdx); + insertEntry(incomingRowIdx, currentIdx, hashCode, lastEntryBatch, lastEntryIdxWithinBatch); - if (EXTRA_DEBUG) { - logger.debug("No match was found for incomingRowIdx = {}; inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx); - } - - insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch); - htIdxHolder.value = currentIdx; - return PutStatus.KEY_ADDED; + // if there was no hash chain at this bucket, need to update the start index array + if (startIdx == EMPTY_SLOT) { + startIndices.getMutator().setSafe(getBucketIndex(hashCode, numBuckets()), currentIdx); } - - return found ? PutStatus.KEY_PRESENT : PutStatus.KEY_ADDED; + htIdxHolder.value = currentIdx; + return addedBatch ? PutStatus.NEW_BATCH_ADDED : + ( freeIndex + 1 > batchHolders.size() * BATCH_SIZE ) ? + PutStatus.KEY_ADDED_LAST : // the last key in the batch + PutStatus.KEY_ADDED; // otherwise } - private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdx) { - - addBatchIfNeeded(currentIdx); + private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, BatchHolder lastEntryBatch, int lastEntryIdx) throws SchemaChangeException { BatchHolder bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK); @@ -640,60 +651,39 @@ private void insertEntry(int incomingRowIdx, int currentIdx, int hashValue, Batc // Return -1 if key is not found in the hash table. Otherwise, return the global index of the key @Override - public int containsKey(int incomingRowIdx, boolean isProbe) { + public int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException { int hash = isProbe ? getHashProbe(incomingRowIdx) : getHashBuild(incomingRowIdx); - int i = getBucketIndex(hash, numBuckets()); - - int currentIdx = startIndices.getAccessor().get(i); - - if (currentIdx == EMPTY_SLOT) { - return -1; - } - - BatchHolder bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK); - currentIdxHolder.value = currentIdx; + int bucketIndex = getBucketIndex(hash, numBuckets()); - boolean found = false; - - while (true) { + for ( currentIdxHolder.value = startIndices.getAccessor().get(bucketIndex); + currentIdxHolder.value != EMPTY_SLOT; ) { + BatchHolder bh = batchHolders.get((currentIdxHolder.value >>> 16) & BATCH_MASK); if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, isProbe)) { - found = true; - break; - } else if (currentIdxHolder.value == EMPTY_SLOT) { - break; - } else { - bh = batchHolders.get((currentIdxHolder.value >>> 16) & BATCH_MASK); + return currentIdxHolder.value; } } - - return found ? currentIdxHolder.value : -1; + return -1; } // Add a new BatchHolder to the list of batch holders if needed. This is based on the supplied // currentIdx; since each BatchHolder can hold up to BATCH_SIZE entries, if the currentIdx exceeds - // the capacity, we will add a new BatchHolder. - private BatchHolder addBatchIfNeeded(int currentIdx) { + // the capacity, we will add a new BatchHolder. Return true if a new batch was added. + private boolean addBatchIfNeeded(int currentIdx) throws SchemaChangeException { int totalBatchSize = batchHolders.size() * BATCH_SIZE; if (currentIdx >= totalBatchSize) { - BatchHolder bh = addBatchHolder(); + BatchHolder bh = newBatchHolder(batchHolders.size()); + batchHolders.add(bh); + bh.setup(); if (EXTRA_DEBUG) { logger.debug("HashTable: Added new batch. Num batches = {}.", batchHolders.size()); } - return bh; - } else { - return batchHolders.get(batchHolders.size() - 1); + return true; } + return false; } - private BatchHolder addBatchHolder() { - BatchHolder bh = newBatchHolder(batchHolders.size()); - batchHolders.add(bh); - bh.setup(); - return bh; - } - - protected BatchHolder newBatchHolder(int index) { + protected BatchHolder newBatchHolder(int index) { // special method to allow debugging of gen code return new BatchHolder(index); } @@ -755,6 +745,34 @@ private void resizeAndRehashIfNeeded() { numResizing++; } + /** + * Reinit the hash table to its original size, and clear up all its prior batch holder + * + */ + public void reset() { + // long before = allocator.getAllocatedMemory(); + this.clear(); // Clear all current batch holders and hash table (i.e. free their memory) + // long after = allocator.getAllocatedMemory(); + + // logger.debug("Reinit Hash Table: Memory before {} After {} Percent after: {}",before,after, (100 * after ) / before); + + freeIndex = 0; // all batch holders are gone + // reallocate batch holders, and the hash table to the original size + batchHolders = new ArrayList(); + startIndices = allocMetadataVector(originalTableSize, EMPTY_SLOT); + } + public void reinit(RecordBatch newIncoming) { + incomingBuild = newIncoming; + reset(); + try { + updateBatches(); // Needed ? (to update the new incoming?) + } catch (SchemaChangeException e) { + throw new IllegalStateException("Unexpected schema change", e); + } catch(IndexOutOfBoundsException ioob) { + throw new IllegalStateException("reinit update batches", ioob); + } + } + @Override public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords) { assert batchIdx < batchHolders.size(); @@ -775,17 +793,20 @@ private IntVector allocMetadataVector(int size, int initialValue) { } @Override + public void setMaxVarcharSize(int size) { maxVarcharSize = size; } + +/* @Override public void addNewKeyBatch() { int numberOfBatches = batchHolders.size(); this.addBatchHolder(); freeIndex = numberOfBatches * BATCH_SIZE; } - +*/ // These methods will be code-generated in the context of the outer class - protected abstract void doSetup(@Named("incomingBuild") RecordBatch incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe); + protected abstract void doSetup(@Named("incomingBuild") RecordBatch incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe) throws SchemaChangeException; - protected abstract int getHashBuild(@Named("incomingRowIdx") int incomingRowIdx); + protected abstract int getHashBuild(@Named("incomingRowIdx") int incomingRowIdx) throws SchemaChangeException; - protected abstract int getHashProbe(@Named("incomingRowIdx") int incomingRowIdx); + protected abstract int getHashProbe(@Named("incomingRowIdx") int incomingRowIdx) throws SchemaChangeException; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index e2c016b6571..4af1664562b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -315,7 +315,7 @@ public void setupHashTable() throws IOException, SchemaChangeException, ClassTra // Create the chained hash table final ChainedHashTable ht = new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null); - hashTable = ht.createAndSetupHashTable(null); + hashTable = ht.createAndSetupHashTable(null, 1); } public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException { @@ -374,7 +374,8 @@ public void executeBuildPhase() throws SchemaChangeException, ClassTransformatio // For every record in the build batch , hash the key columns for (int i = 0; i < currentRecordCount; i++) { - hashTable.put(i, htIndex, 1 /* retry count */); + int hashCode = hashTable.getHashCode(i); + hashTable.put(i, htIndex, hashCode); /* Use the global index returned by the hash table, to store * the current record index and batch index. This will be used diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java index 4cb2bae8a2a..a1b81695d41 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/RecordBatchSizer.java @@ -31,6 +31,7 @@ import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractMapVector; +import org.apache.drill.exec.vector.VariableWidthVector; /** * Given a record batch or vector container, determines the actual memory @@ -68,14 +69,14 @@ public static class ColumnSize { public int capacity; public int density; public int dataSize; + public boolean variableWidth; - public ColumnSize(ValueVector v) { - metadata = v.getField(); + public ColumnSize(ValueVector vv) { + metadata = vv.getField(); stdSize = TypeHelper.getSize(metadata.getType()); // Can't get size estimates if this is an empty batch. - - int rowCount = v.getAccessor().getValueCount(); + int rowCount = vv.getAccessor().getValueCount(); if (rowCount == 0) { estSize = stdSize; return; @@ -84,17 +85,17 @@ public ColumnSize(ValueVector v) { // Total size taken by all vectors (and underlying buffers) // associated with this vector. - totalSize = v.getAllocatedByteCount(); + totalSize = vv.getAllocatedByteCount(); // Capacity is the number of values that the vector could // contain. This is useful only for fixed-length vectors. - capacity = v.getValueCapacity(); + capacity = vv.getValueCapacity(); // The amount of memory consumed by the payload: the actual // data stored in the vectors. - dataSize = v.getPayloadByteCount(); + dataSize = vv.getPayloadByteCount(); // Determine "density" the number of rows compared to potential // capacity. Low-density batches occur at block boundaries, ends @@ -105,6 +106,7 @@ public ColumnSize(ValueVector v) { density = roundUp(dataSize * 100, totalSize); estSize = roundUp(dataSize, rowCount); + variableWidth = vv instanceof VariableWidthVector ; } @Override @@ -155,6 +157,7 @@ public String toString() { * vectors are partially full; prevents overestimating row width. */ private int netRowWidth; + private int netRowWidthCap50; private boolean hasSv2; private int sv2Size; private int avgDensity; @@ -167,6 +170,18 @@ public RecordBatchSizer(RecordBatch batch) { batch.getSelectionVector2() : null); } + /** + * Maximum width of a column; used for memory estimation in case of Varchars + */ + public int maxSize; + /** + * Count the nullable columns; used for memory estimation + */ + public int numNullables; + /** + * + * @param va + */ public RecordBatchSizer(VectorAccessible va) { this(va, null); } @@ -174,7 +189,9 @@ public RecordBatchSizer(VectorAccessible va) { public RecordBatchSizer(VectorAccessible va, SelectionVector2 sv2) { rowCount = va.getRecordCount(); for (VectorWrapper vw : va) { - measureColumn(vw); + int size = measureColumn(vw.getValueVector()); + if ( size > maxSize ) { maxSize = size; } + if ( vw.getField().isNullable() ) { numNullables++; } } if (rowCount > 0) { @@ -208,32 +225,45 @@ public void applySv2() { totalBatchSize += sv2Size; } - private void measureColumn(VectorWrapper vw) { - measureColumn(vw.getValueVector()); + /** + * Round up (if needed) to the next power of 2 (only up to 64) + * @param arg Number to round up (must be < 64) + * @return power of 2 result + */ + private int roundUpToPowerOf2(int arg) { + if ( arg <= 2 ) { return 2; } + if ( arg <= 4 ) { return 4; } + if ( arg <= 8 ) { return 8; } + if ( arg <= 16 ) { return 16; } + if ( arg <= 32 ) { return 32; } + return 64; } - - private void measureColumn(ValueVector v) { - + private int measureColumn(ValueVector vv) { // Maps consume no size themselves. However, their contained // vectors do consume space, so visit columns recursively. - - if (v.getField().getType().getMinorType() == MinorType.MAP) { - expandMap((AbstractMapVector) v); - return; + if (vv.getField().getType().getMinorType() == MinorType.MAP) { + return expandMap((AbstractMapVector) vv); } - ColumnSize colSize = new ColumnSize(v); + + ColumnSize colSize = new ColumnSize(vv); columnSizes.add(colSize); stdRowWidth += colSize.stdSize; totalBatchSize += colSize.totalSize; netBatchSize += colSize.dataSize; netRowWidth += colSize.estSize; + netRowWidthCap50 += ! colSize.variableWidth ? colSize.estSize : + 8 /* offset vector */ + roundUpToPowerOf2( Math.min(colSize.estSize,50) ); + // above change 8 to 4 after DRILL-5446 is fixed + return colSize.estSize; } - private void expandMap(AbstractMapVector mapVector) { + private int expandMap(AbstractMapVector mapVector) { + int accum = 0; for (ValueVector vector : mapVector) { - measureColumn(vector); + accum += measureColumn(vector); } + return accum; } public static int roundUp(int num, int denom) { @@ -247,10 +277,18 @@ public static int roundUp(int num, int denom) { public int stdRowWidth() { return stdRowWidth; } public int grossRowWidth() { return grossRowWidth; } public int netRowWidth() { return netRowWidth; } + /** + * Compute the "real" width of the row, taking into account each varchar column size + * (historically capped at 50, and rounded up to power of 2 to match drill buf allocation) + * and null marking columns. + * @return "real" width of the row + */ + public int netRowWidthCap50() { return netRowWidthCap50 + numNullables; } public int actualSize() { return totalBatchSize; } public boolean hasSv2() { return hasSv2; } public int avgDensity() { return avgDensity; } public int netSize() { return netBatchSize; } + public int maxSize() { return maxSize; } public static final int MAX_VECTOR_SIZE = 16 * 1024 * 1024; // 16 MiB diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java index 74e1fb5674e..87eebc668d4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java @@ -30,11 +30,13 @@ import java.util.Set; import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -84,7 +86,7 @@ private interface FileManager { * Given a manager-specific input stream, return the current read position. * Used to report total read bytes. * - * @param outputStream input stream created by the file manager + * @param inputStream input stream created by the file manager * @return */ long getReadBytes(InputStream inputStream); @@ -346,7 +348,6 @@ public long getReadBytes(InputStream inputStream) { */ private final String spillDirName; - private final String spillFileName; private int fileCount = 0; @@ -356,16 +357,34 @@ public long getReadBytes(InputStream inputStream) { private long writeBytes; - public SpillSet(FragmentContext context, PhysicalOperator popConfig) { - this(context, popConfig, null, "spill"); - } - - public SpillSet(FragmentContext context, PhysicalOperator popConfig, - String opName, String fileName) { + public SpillSet(FragmentContext context, PhysicalOperator popConfig, UserBitShared.CoreOperatorType optype) { FragmentHandle handle = context.getHandle(); + String operName = "Unknown"; + + // Set the spill options from the configuration DrillConfig config = context.getConfig(); - spillFileName = fileName; - List dirList = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS); + String spillFs; + List dirList; + + // Set the operator name (used as part of the spill file name), + // and set oper. specific options (the config file defaults to using the + // common options; users may override those - per operator) + switch (optype) { + case EXTERNAL_SORT: + operName = "Sort"; + spillFs = config.getString(ExecConstants.EXTERNAL_SORT_SPILL_FILESYSTEM); + dirList = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS); + break; + case HASH_AGGREGATE: + operName = "HashAgg"; + spillFs = config.getString(ExecConstants.HASHAGG_SPILL_FILESYSTEM); + dirList = config.getStringList(ExecConstants.HASHAGG_SPILL_DIRS); + break; + default: // just use the common ones + spillFs = config.getString(ExecConstants.SPILL_FILESYSTEM); + dirList = config.getStringList(ExecConstants.SPILL_DIRS); + } + dirs = Iterators.cycle(dirList); // If more than one directory, semi-randomly choose an offset into @@ -386,23 +405,18 @@ public SpillSet(FragmentContext context, PhysicalOperator popConfig, // system is selected and impersonation is off. (We use that // as a proxy for a non-production Drill setup.) - String spillFs = config.getString(ExecConstants.EXTERNAL_SORT_SPILL_FILESYSTEM); boolean impersonationEnabled = config.getBoolean(ExecConstants.IMPERSONATION_ENABLED); if (spillFs.startsWith("file:///") && ! impersonationEnabled) { fileManager = new LocalFileManager(spillFs); } else { fileManager = new HadoopFileManager(spillFs); } - spillDirName = String.format( - "%s_major%d_minor%d_op%d%s", - QueryIdHelper.getQueryId(handle.getQueryId()), - handle.getMajorFragmentId(), - handle.getMinorFragmentId(), - popConfig.getOperatorId(), - (opName == null) ? "" : "_" + opName); + + spillDirName = String.format("%s_%s_%s-%s_minor%s", QueryIdHelper.getQueryId(handle.getQueryId()), + operName, handle.getMajorFragmentId(), popConfig.getOperatorId(), handle.getMinorFragmentId()); } - public String getNextSpillFile() { + public String getNextSpillFile(String extraName) { // Identify the next directory from the round-robin list to // the file created from this round of spilling. The directory must @@ -411,7 +425,12 @@ public String getNextSpillFile() { String spillDir = dirs.next(); String currSpillPath = Joiner.on("/").join(spillDir, spillDirName); currSpillDirs.add(currSpillPath); - String outputFile = Joiner.on("/").join(currSpillPath, spillFileName + ++fileCount); + + String outputFile = Joiner.on("/").join(currSpillPath, "spill" + ++fileCount); + if ( extraName != null ) { + outputFile += "_" + extraName; + } + try { fileManager.deleteOnExit(currSpillPath); } catch (IOException e) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java index 69e9b4ce9a3..4d5f2901520 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java @@ -39,6 +39,8 @@ import org.apache.drill.exec.physical.impl.xsort.SingleBatchSorter; import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.InputBatch; import org.apache.drill.exec.physical.impl.xsort.managed.BatchGroup.SpilledRun; + +import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -399,7 +401,7 @@ public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, Record allocator = oContext.getAllocator(); opCodeGen = new OperatorCodeGenerator(context, popConfig); - spillSet = new SpillSet(context, popConfig, "sort", "run"); + spillSet = new SpillSet(context, popConfig, UserBitShared.CoreOperatorType.EXTERNAL_SORT); copierHolder = new CopierHolder(context, allocator, opCodeGen); configure(context.getConfig()); } @@ -1390,7 +1392,7 @@ private BatchGroup.SpilledRun doMergeAndSpill(LinkedList b // spill file. After each write, we release the memory associated // with the just-written batch. - String outputFile = spillSet.getNextSpillFile(); + String outputFile = spillSet.getNextSpillFile(null); stats.setLongStat(Metric.SPILL_COUNT, spillSet.getFileCount()); BatchGroup.SpilledRun newGroup = null; try (AutoCloseable ignored = AutoCloseables.all(batchesToSpill); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java index 732ff15f5ef..8c69930e94c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java @@ -51,7 +51,7 @@ public abstract class AggPrelBase extends DrillAggregateRelBase implements Prel { - protected static enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2}; + public static enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2}; protected OperatorPhase operPhase = OperatorPhase.PHASE_1of1 ; // default phase protected List keys = Lists.newArrayList(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java index b911f6bee82..460ee8a4716 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPruleBase.java @@ -61,6 +61,9 @@ protected List getDistributionField(DrillAggregateRel rel, bo // currently won't generate a 2 phase plan. protected boolean create2PhasePlan(RelOptRuleCall call, DrillAggregateRel aggregate) { PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); + if ( settings.isForce2phaseAggr() ) { // for testing - force 2 phase aggr + return true; + } RelNode child = call.rel(0).getInputs().get(0); boolean smallInput = child.getRows() < settings.getSliceTarget(); if (! settings.isMultiPhaseAggEnabled() || settings.isSingleMode() || smallInput) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java index c382af6c216..09d33fd8a6b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrel.java @@ -73,7 +73,7 @@ public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { Prel child = (Prel) this.getInput(); - HashAggregate g = new HashAggregate(child.getPhysicalOperator(creator), keys, aggExprs, 1.0f); + HashAggregate g = new HashAggregate(child.getPhysicalOperator(creator), operPhase, keys, aggExprs, 1.0f); return creator.addMetadata(this, g); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java index 648adb7aa35..15314bacbca 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java @@ -133,6 +133,9 @@ public class PlannerSettings implements Context{ the need to turn off join optimization may go away. */ public static final BooleanValidator JOIN_OPTIMIZATION = new BooleanValidator("planner.enable_join_optimization", true); + // for testing purpose + public static final String FORCE_2PHASE_AGGR_KEY = "planner.force_2phase_aggr"; + public static final BooleanValidator FORCE_2PHASE_AGGR = new BooleanValidator(FORCE_2PHASE_AGGR_KEY, false); public OptionManager options = null; public FunctionImplementationRegistry functionImplementationRegistry = null; @@ -274,6 +277,8 @@ public boolean isTypeInferenceEnabled() { return options.getOption(TYPE_INFERENCE); } + public boolean isForce2phaseAggr() { return options.getOption(FORCE_2PHASE_AGGR);} // for testing + public long getInSubqueryThreshold() { return options.getOption(IN_SUBQUERY_THRESHOLD); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java index 04cf8fca308..0daa6b3c979 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java @@ -34,7 +34,7 @@ *

* A key thing to know is that the Iterator provided by a record batch must * align with the rank positions of the field IDs provided using - * {@link getValueVectorId}. + * {@link #getValueVectorId}. *

*/ public interface RecordBatch extends VectorAccessible { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 8492f3664b0..c2a4d651d96 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -93,6 +93,10 @@ public class SystemOptionManager extends BaseOptionManager implements OptionMana PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD, PlannerSettings.QUOTING_IDENTIFIERS, PlannerSettings.JOIN_OPTIMIZATION, + PlannerSettings.FORCE_2PHASE_AGGR, // for testing + ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR, + ExecConstants.HASHAGG_MAX_MEMORY_VALIDATOR, + ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR, // for tuning ExecConstants.CAST_TO_NULLABLE_NUMERIC_OPTION, ExecConstants.OUTPUT_FORMAT_VALIDATOR, ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java index d06424e9c7f..79b49e46bab 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java @@ -26,7 +26,6 @@ import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.physical.config.ExternalSort; import org.apache.drill.exec.server.options.OptionManager; public class MemoryAllocationUtilities { @@ -40,7 +39,7 @@ public class MemoryAllocationUtilities { * @param plan * @param queryContext */ - public static void setupSortMemoryAllocations(final PhysicalPlan plan, final QueryContext queryContext) { + public static void setupBufferedOpsMemoryAllocations(final PhysicalPlan plan, final QueryContext queryContext) { // Test plans may already have a pre-defined memory plan. // Otherwise, determine memory allocation. @@ -49,30 +48,30 @@ public static void setupSortMemoryAllocations(final PhysicalPlan plan, final Que return; } // look for external sorts - final List sortList = new LinkedList<>(); + final List bufferedOpList = new LinkedList<>(); for (final PhysicalOperator op : plan.getSortedOperators()) { - if (op instanceof ExternalSort) { - sortList.add((ExternalSort) op); + if ( op.isBufferedOperator() ) { + bufferedOpList.add(op); } } // if there are any sorts, compute the maximum allocation, and set it on them - if (sortList.size() > 0) { + if (bufferedOpList.size() > 0) { final OptionManager optionManager = queryContext.getOptions(); final long maxWidthPerNode = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val; long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(), queryContext.getConfig().getLong(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC)); maxAllocPerNode = Math.min(maxAllocPerNode, optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val); - final long maxSortAlloc = maxAllocPerNode / (sortList.size() * maxWidthPerNode); - logger.debug("Max sort alloc: {}", maxSortAlloc); + final long maxOperatorAlloc = maxAllocPerNode / (bufferedOpList.size() * maxWidthPerNode); + logger.debug("Max buffered operator alloc: {}", maxOperatorAlloc); - for(final ExternalSort externalSort : sortList) { + for(final PhysicalOperator op : bufferedOpList) { // Ensure that the sort receives the minimum memory needed to make progress. // Without this, the math might work out to allocate too little memory. - long alloc = Math.max(maxSortAlloc, externalSort.getInitialAllocation()); - externalSort.setMaxAllocation(alloc); + long alloc = Math.max(maxOperatorAlloc, op.getInitialAllocation()); + op.setMaxAllocation(alloc); } } plan.getProperties().hasResourcePlan = true; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 5e5fef09243..62c230778f8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -433,7 +433,7 @@ private void parseAndRunPhysicalPlan(final String json) throws ExecutionSetupExc private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupException { validatePlan(plan); - MemoryAllocationUtilities.setupSortMemoryAllocations(plan, queryContext); + MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext); //Marking endTime of Planning queryManager.markPlanningEndTime(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java index 7ffb22441a8..2f945d85f16 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java @@ -97,7 +97,7 @@ private List getFragments(final DrillbitContext dContext, final Ge throw new IllegalStateException("Planning fragments supports only SQL or PHYSICAL QueryType"); } - MemoryAllocationUtilities.setupSortMemoryAllocations(plan, queryContext); + MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext); final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next(); diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index c2a2bf0a554..8aedaf64684 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -207,6 +207,33 @@ drill.exec: { // java ... -ea -Ddrill.exec.debug.validate_vectors=true ... validate_vectors: false }, + spill: { + // *** Options common to all the operators that may spill + // File system to use. Local file system by default. + fs: "file:///", + // List of directories to use. Directories are created + // if they do not exist. + directories: [ "/tmp/drill/spill" ] + }, + hashagg: { + // An internal tuning; should not be changed + min_batches_per_partition: 3, + // An option for testing - force a memory limit + mem_limit: 0, + // The max number of partitions in each hashagg operator + // This number is tuned down when memory is limited + // Setting it to 1 means: No spilling + num_partitions: 32, + spill: { + // -- The 2 options below can be used to override the common ones + // -- (common to all spilling operators) + // File system to use. Local file system by default. + fs: ${drill.exec.spill.fs}, + // List of directories to use. Directories are created + // if they do not exist. + directories: ${drill.exec.spill.directories}, + } + }, sort: { purge.threshold : 1000, external: { @@ -232,11 +259,15 @@ drill.exec: { group.size: 40000, // Deprecated for managed xsort; used only by legacy xsort threshold: 40000, + // -- The two options below can be used to override the options common + // -- for all spilling operators (see "spill" above). + // -- This is done for backward compatibility; in the future they + // -- would be deprecated (you should be using only the common ones) // File system to use. Local file system by default. - fs: "file:///" + fs: ${drill.exec.spill.fs}, // List of directories to use. Directories are created // if they do not exist. - directories: [ "/tmp/drill/spill" ], + directories: ${drill.exec.spill.directories}, // Size of the batches written to, and read from, the spill files. // Determines the ratio of memory to input data size for a single- // generation sort. Smaller values give larger ratios, but at a diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java index 27df7100ebe..1a4d63b8046 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestBugFixes.java @@ -211,12 +211,13 @@ public void testDRILL4884() throws Exception { int limit = 65536; ImmutableList.Builder> baselineBuilder = ImmutableList.builder(); for (int i = 0; i < limit; i++) { - baselineBuilder.add(Collections.singletonMap("`id`", String.valueOf(i + 1))); + baselineBuilder.add(Collections.singletonMap("`id`", /*String.valueOf */ (i + 1))); } List> baseline = baselineBuilder.build(); testBuilder() - .sqlQuery(String.format("select id from dfs_test.`%s/bugs/DRILL-4884/limit_test_parquet/test0_0_0.parquet` group by id limit %s", TEST_RES_PATH, limit)) + .sqlQuery(String.format("select cast(id as int) as id from dfs_test.`%s/bugs/DRILL-4884/limit_test_parquet/test0_0_0.parquet` group by id order by 1 limit %s", + TEST_RES_PATH, limit)) .unOrdered() .baselineRecords(baseline) .go(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java index 66b75710e06..f15e75790cf 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java @@ -46,7 +46,7 @@ * any particular order of execution. We ignore the results. */ public class TestTpchDistributedConcurrent extends BaseTestQuery { - @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(140000); // Longer timeout than usual. + @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(180000); // Longer timeout than usual. /* * Valid test names taken from TestTpchDistributed. Fuller path prefixes are diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java new file mode 100644 index 00000000000..fe6fcbcf3c5 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/agg/TestHashAggrSpill.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.impl.agg; + +import ch.qos.logback.classic.Level; +import org.apache.drill.BaseTestQuery; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.physical.impl.aggregate.HashAggTemplate; +import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.test.ClientFixture; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.FixtureBuilder; +import org.apache.drill.test.LogFixture; +import org.apache.drill.test.ProfileParser; +import org.apache.drill.test.QueryBuilder; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test spilling for the Hash Aggr operator (using the mock reader) + */ +public class TestHashAggrSpill extends BaseTestQuery { + + private void runAndDump(ClientFixture client, String sql, long expectedRows, long spillCycle, long spilledPartitions) throws Exception { + String plan = client.queryBuilder().sql(sql).explainJson(); + + QueryBuilder.QuerySummary summary = client.queryBuilder().sql(sql).run(); + if ( expectedRows > 0 ) { + assertEquals(expectedRows, summary.recordCount()); + } + // System.out.println(String.format("======== \n Results: %,d records, %d batches, %,d ms\n ========", summary.recordCount(), summary.batchCount(), summary.runTimeMs() ) ); + + //System.out.println("Query ID: " + summary.queryIdString()); + ProfileParser profile = client.parseProfile(summary.queryIdString()); + //profile.print(); + List ops = profile.getOpsOfType(UserBitShared.CoreOperatorType.HASH_AGGREGATE_VALUE); + + assertTrue( ! ops.isEmpty() ); + // check for the first op only + ProfileParser.OperatorProfile hag0 = ops.get(0); + long opCycle = hag0.getMetric(HashAggTemplate.Metric.SPILL_CYCLE.ordinal()); + assertEquals(spillCycle, opCycle); + long op_spilled_partitions = hag0.getMetric(HashAggTemplate.Metric.SPILLED_PARTITIONS.ordinal()); + assertEquals(spilledPartitions, op_spilled_partitions); + /* assertEquals(3, ops.size()); + for ( int i = 0; i < ops.size(); i++ ) { + ProfileParser.OperatorProfile hag = ops.get(i); + long cycle = hag.getMetric(HashAggTemplate.Metric.SPILL_CYCLE.ordinal()); + long num_partitions = hag.getMetric(HashAggTemplate.Metric.NUM_PARTITIONS.ordinal()); + long spilled_partitions = hag.getMetric(HashAggTemplate.Metric.SPILLED_PARTITIONS.ordinal()); + long mb_spilled = hag.getMetric(HashAggTemplate.Metric.SPILL_MB.ordinal()); + System.out.println(String.format("(%d) Spill cycle: %d, num partitions: %d, spilled partitions: %d, MB spilled: %d", i,cycle, num_partitions, spilled_partitions, + mb_spilled)); + } */ + } + + /** + * Test "normal" spilling: Only 2 partitions (out of 4) would require spilling + * ("normal spill" means spill-cycle = 1 ) + * + * @throws Exception + */ + @Test + public void testHashAggrSpill() throws Exception { + LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder() + .toConsole() + .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG) + ; + + FixtureBuilder builder = ClusterFixture.builder() + .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,74_000_000) + .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,16) + .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3) + .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true) + // .sessionOption(PlannerSettings.EXCHANGE.getOptionName(), true) + .maxParallelization(2) + .saveProfiles() + //.keepLocalFiles() + ; + try (LogFixture logs = logBuilder.build(); + ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + String sql = "SELECT empid_s17, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1200K` GROUP BY empid_s17, dept_i, branch_i"; + runAndDump(client, sql, 1_200_000, 1, 1); + } + } + + /** + * Test "secondary" spilling -- Some of the spilled partitions cause more spilling as they are read back + * (Hence spill-cycle = 2 ) + * + * @throws Exception + */ + @Test + public void testHashAggrSecondaryTertiarySpill() throws Exception { + LogFixture.LogFixtureBuilder logBuilder = LogFixture.builder() + .toConsole() + .logger("org.apache.drill.exec.physical.impl.aggregate", Level.DEBUG) + .logger("org.apache.drill.exec.cache", Level.INFO) + ; + + FixtureBuilder builder = ClusterFixture.builder() + .configProperty(ExecConstants.HASHAGG_MAX_MEMORY,58_000_000) + .configProperty(ExecConstants.HASHAGG_NUM_PARTITIONS,16) + .configProperty(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION,3) + .sessionOption(PlannerSettings.FORCE_2PHASE_AGGR_KEY,true) + .sessionOption(PlannerSettings.STREAMAGG.getOptionName(),false) + // .sessionOption(PlannerSettings.EXCHANGE.getOptionName(), true) + .maxParallelization(1) + .saveProfiles() + //.keepLocalFiles() + ; + try (LogFixture logs = logBuilder.build(); + ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + String sql = "SELECT empid_s44, dept_i, branch_i, AVG(salary_i) FROM `mock`.`employee_1100K` GROUP BY empid_s44, dept_i, branch_i"; + runAndDump(client, sql, 1_100_000, 3, 2); + } + } +} \ No newline at end of file diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java index e39a6443ed6..66588b19e3f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/BasicPhysicalOpUnitTest.java @@ -34,6 +34,7 @@ import org.apache.drill.exec.physical.config.Project; import org.apache.drill.exec.physical.config.StreamingAggregate; import org.apache.drill.exec.physical.config.TopN; +import org.apache.drill.exec.planner.physical.AggPrelBase; import org.junit.Ignore; import org.junit.Test; @@ -125,7 +126,7 @@ public void testSimpleMergeJoin() { @Test public void testSimpleHashAgg() { - HashAggregate aggConf = new HashAggregate(null, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f); + HashAggregate aggConf = new HashAggregate(null, AggPrelBase.OperatorPhase.PHASE_1of1, parseExprs("a", "a"), parseExprs("sum(b)", "b_sum"), 1.0f); List inputJsonBatches = Lists.newArrayList( "[{\"a\": 5, \"b\" : 1 }]", "[{\"a\": 5, \"b\" : 5},{\"a\": 3, \"b\" : 8}]"); diff --git a/exec/jdbc/pom.xml b/exec/jdbc/pom.xml index cb0c517e8ca..eecbdfaa07c 100644 --- a/exec/jdbc/pom.xml +++ b/exec/jdbc/pom.xml @@ -119,6 +119,7 @@ **/.checkstyle **/.buildpath **/*.json + **/*.iml **/git.properties **/donuts-output-data.txt **/*.tbl diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java index aa713f87c62..deed7a71f22 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcBus.java @@ -192,8 +192,8 @@ public void send(Response r) { r.pBody, r.dBodies); if (RpcConstants.EXTRA_DEBUGGING) { logger.debug("Adding message to outbound buffer. {}", outMessage); + logger.debug("Sending response with Sender {}", System.identityHashCode(this)); } - logger.debug("Sending response with Sender {}", System.identityHashCode(this)); connection.getChannel().writeAndFlush(outMessage); } diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java index 105ea471e09..581a9f819aa 100644 --- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java +++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java @@ -377,6 +377,7 @@ public void reAlloc() { throw new OversizedAllocationException("Unable to expand the buffer. Max allowed buffer size is reached."); } + logger.trace("Reallocating VarChar, new size {}",newAllocationSize); final DrillBuf newBuf = allocator.buffer((int)newAllocationSize); newBuf.setBytes(0, data, 0, data.capacity()); data.release();