Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions distribution/src/resources/drill-override-example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ drill.exec: {
}
},
cache.hazel.subnets: ["*.*.*.*"],
spill: {
# These options are common to all spilling operators.
# They can be overriden, per operator (but this is just for
# backward compatibility, and may be deprecated in the future)
directories : [ "/tmp/drill/spill" ],
fs : "file:///"
}
sort: {
purge.threshold : 100,
external: {
Expand All @@ -150,11 +157,26 @@ drill.exec: {
batch.size : 4000,
group.size : 100,
threshold : 200,
# The 2 options below override the common ones
# they should be deprecated in the future
directories : [ "/tmp/drill/spill" ],
fs : "file:///"
}
}
},
hashagg: {
# The partitions divide the work inside the hashagg, to ease
# handling spilling. This initial figure is tuned down when
# memory is limited.
# Setting this option to 1 disables spilling !
num_partitions: 32,
spill: {
# The 2 options below override the common ones
# they should be deprecated in the future
directories : [ "/tmp/drill/spill" ],
fs : "file:///"
}
},
memory: {
top.max: 1000000000000,
operator: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ public interface ExecConstants {
String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size";
String BATCH_PURGE_THRESHOLD = "drill.exec.sort.purge.threshold";

// Spill boot-time Options common to all spilling operators
// (Each individual operator may override the common options)

String SPILL_FILESYSTEM = "drill.exec.spill.fs";
String SPILL_DIRS = "drill.exec.spill.directories";

// External Sort Boot configuration

String EXTERNAL_SORT_TARGET_SPILL_BATCH_SIZE = "drill.exec.sort.external.spill.batch.size";
Expand All @@ -86,6 +92,22 @@ public interface ExecConstants {

BooleanValidator EXTERNAL_SORT_DISABLE_MANAGED_OPTION = new BooleanValidator("exec.sort.disable_managed", false);

// Hash Aggregate Options

String HASHAGG_NUM_PARTITIONS = "drill.exec.hashagg.num_partitions";
String HASHAGG_NUM_PARTITIONS_KEY = "exec.hashagg.num_partitions";
LongValidator HASHAGG_NUM_PARTITIONS_VALIDATOR = new RangeLongValidator(HASHAGG_NUM_PARTITIONS_KEY, 1, 128, 32); // 1 means - no spilling
String HASHAGG_MAX_MEMORY = "drill.exec.hashagg.mem_limit";
String HASHAGG_MAX_MEMORY_KEY = "exec.hashagg.mem_limit";
LongValidator HASHAGG_MAX_MEMORY_VALIDATOR = new RangeLongValidator(HASHAGG_MAX_MEMORY_KEY, 0, Integer.MAX_VALUE, 0);
// min batches is used for tuning (each partition needs so many batches when planning the number of partitions,
// or reserve this number when calculating whether the remaining available memory is too small and requires a spill.)
// Low value may OOM (e.g., when incoming rows become wider), higher values use fewer partitions but are safer
String HASHAGG_MIN_BATCHES_PER_PARTITION = "drill.exec.hashagg.min_batches_per_partition";
String HASHAGG_MIN_BATCHES_PER_PARTITION_KEY = "drill.exec.hashagg.min_batches_per_partition";
LongValidator HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR = new RangeLongValidator(HASHAGG_MIN_BATCHES_PER_PARTITION_KEY, 2, 5, 3);
String HASHAGG_SPILL_DIRS = "drill.exec.hashagg.spill.directories";
String HASHAGG_SPILL_FILESYSTEM = "drill.exec.hashagg.spill.fs";

String TEXT_LINE_READER_BATCH_SIZE = "drill.exec.storage.file.text.batch.size";
String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.vector.NullableBigIntVector;
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.ValueVector;

import com.codahale.metrics.MetricRegistry;
Expand Down Expand Up @@ -138,6 +140,60 @@ private void readVectors(InputStream input, RecordBatchDef batchDef) throws IOEx
va = container;
}

// Like above, only preserve the original container and list of value-vectors
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class was refactored in the external sort unit test PR. Let's coordinate on merging the two sets of changes.

public void readFromStreamWithContainer(VectorContainer myContainer, InputStream input) throws IOException {
final VectorContainer container = new VectorContainer();
final UserBitShared.RecordBatchDef batchDef = UserBitShared.RecordBatchDef.parseDelimitedFrom(input);
recordCount = batchDef.getRecordCount();
if (batchDef.hasCarriesTwoByteSelectionVector() && batchDef.getCarriesTwoByteSelectionVector()) {

if (sv2 == null) {
sv2 = new SelectionVector2(allocator);
}
sv2.allocateNew(recordCount * SelectionVector2.RECORD_SIZE);
sv2.getBuffer().setBytes(0, input, recordCount * SelectionVector2.RECORD_SIZE);
svMode = BatchSchema.SelectionVectorMode.TWO_BYTE;
}
final List<ValueVector> vectorList = Lists.newArrayList();
final List<SerializedField> fieldList = batchDef.getFieldList();
for (SerializedField metaData : fieldList) {
final int dataLength = metaData.getBufferLength();
final MaterializedField field = MaterializedField.create(metaData);
final DrillBuf buf = allocator.buffer(dataLength);
final ValueVector vector;
try {
buf.writeBytes(input, dataLength);
vector = TypeHelper.getNewVector(field, allocator);
vector.load(metaData, buf);
} finally {
buf.release();
}
vectorList.add(vector);
}
container.addCollection(vectorList);
container.setRecordCount(recordCount);
myContainer.transferIn(container); // transfer the vectors
myContainer.buildSchema(svMode);
myContainer.setRecordCount(recordCount);
/*
// for debugging -- show values from the first row
Object tmp0 = (myContainer).getValueAccessorById(NullableVarCharVector.class, 0).getValueVector();
Object tmp1 = (myContainer).getValueAccessorById(NullableVarCharVector.class, 1).getValueVector();
Object tmp2 = (myContainer).getValueAccessorById(NullableBigIntVector.class, 2).getValueVector();
if (tmp0 != null && tmp1 != null && tmp2 != null) {
NullableVarCharVector vv0 = ((NullableVarCharVector) tmp0);
NullableVarCharVector vv1 = ((NullableVarCharVector) tmp1);
NullableBigIntVector vv2 = ((NullableBigIntVector) tmp2);

try {
logger.info("HASH AGG: Got a row = {} , {} , {}", vv0.getAccessor().get(0), vv1.getAccessor().get(0), vv2.getAccessor().get(0));
} catch (Exception e) { logger.info("HASH AGG: Got an exception = {}",e); }
}
else { logger.info("HASH AGG: got nulls !!!"); }
*/
va = myContainer;
}

public void writeToStreamAndRetain(OutputStream output) throws IOException {
retain = true;
writeToStream(output);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.physical.base;

import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.graph.GraphVisitor;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;

Expand Down Expand Up @@ -102,17 +104,31 @@ public void setCost(double cost) {
this.cost = cost;
}

// Not available. Presumably because Drill does not currently use
// this value, though it does appear in some test physical plans.
// public void setMaxAllocation(long alloc) {
// maxAllocation = alloc;
// }

@Override
public long getMaxAllocation() {
return maxAllocation;
}

/**
* Any operator that supports spilling should override this method
* @param maxAllocation The max memory allocation to be set
*/
@Override
public void setMaxAllocation(long maxAllocation) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was a problem with this method, which is why it was commented out. Trying to remember... Something about the internal, temporary serializations in planning causing values to be overwritten. Jackson will see this method and use it for deserializing the value. May have to fiddle around a bit to remember the issue.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what happens when you give a software "intelligence" to make decisions for you ..... shall we pick another method name to fool Jackson ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we just want to fool Jackson we can use @JsonIgnore. Would be better to track down the root cause -- whatever it was.

this.maxAllocation = maxAllocation;
/*throw new DrillRuntimeException("Unsupported method: setMaxAllocation()");*/
}

/**
* Any operator that supports spilling should override this method (and return true)
* @return false
*/
@Override @JsonIgnore
public boolean isBufferedOperator() { return false; }

// @Override
// public void setBufferedOperator(boolean bo) {}

@Override
public String getUserName() {
return userName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,21 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
*/
public long getMaxAllocation();

/**
*
* @param maxAllocation The max memory allocation to be set
*/
public void setMaxAllocation(long maxAllocation);

/**
*
* @return True iff this operator manages its memory (including disk spilling)
*/
@JsonIgnore
public boolean isBufferedOperator();

// public void setBufferedOperator(boolean bo);

@JsonProperty("@id")
public int getOperatorId();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,19 @@ public int getOperatorType() {
return CoreOperatorType.EXTERNAL_SORT_VALUE;
}

// Set here, rather than the base class, because this is the only
// operator, at present, that makes use of the maximum allocation.
// Remove this, in favor of the base class version, when Drill
// sets the memory allocation for all operators.

/**
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can just remove the Javadoc so it is inherited from the base method.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But still need a local comment to be clear about this code .... so does not matter much.

*
* @param maxAllocation The max memory allocation to be set
*/
@Override
public void setMaxAllocation(long maxAllocation) {
this.maxAllocation = maxAllocation;
}

/**
* The External Sort operator supports spilling
* @return true
*/
@Override
public boolean isBufferedOperator() { return true; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.drill.exec.physical.base.AbstractSingle;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.planner.physical.AggPrelBase;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;

import com.fasterxml.jackson.annotation.JsonCreator;
Expand All @@ -34,22 +35,27 @@ public class HashAggregate extends AbstractSingle {

static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregate.class);

private final AggPrelBase.OperatorPhase aggPhase;
private final List<NamedExpression> groupByExprs;
private final List<NamedExpression> aggrExprs;

private final float cardinality;

@JsonCreator
public HashAggregate(@JsonProperty("child") PhysicalOperator child,
@JsonProperty("phase") AggPrelBase.OperatorPhase aggPhase,
@JsonProperty("keys") List<NamedExpression> groupByExprs,
@JsonProperty("exprs") List<NamedExpression> aggrExprs,
@JsonProperty("cardinality") float cardinality) {
super(child);
this.aggPhase = aggPhase;
this.groupByExprs = groupByExprs;
this.aggrExprs = aggrExprs;
this.cardinality = cardinality;
}

public AggPrelBase.OperatorPhase getAggPhase() { return aggPhase; }

public List<NamedExpression> getGroupByExprs() {
return groupByExprs;
}
Expand All @@ -69,13 +75,28 @@ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVis

@Override
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
return new HashAggregate(child, groupByExprs, aggrExprs, cardinality);
HashAggregate newHAG = new HashAggregate(child, aggPhase, groupByExprs, aggrExprs, cardinality);
newHAG.setMaxAllocation(getMaxAllocation());
return newHAG;
}

@Override
public int getOperatorType() {
return CoreOperatorType.HASH_AGGREGATE_VALUE;
}


/**
*
* @param maxAllocation The max memory allocation to be set
*/
@Override
public void setMaxAllocation(long maxAllocation) {
this.maxAllocation = maxAllocation;
}
/**
* The Hash Aggregate operator supports spilling
* @return true
*/
@Override
public boolean isBufferedOperator() { return true; }
}
Loading