Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion docs/TPC-H.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,25 @@ Connect to trino-cli:
cd ~/opt/trino-server
./bin/trino --server localhost:8080 --catalog pixels --schema tpch
```
Execute the TPC-H queries in trino-cli.
In trino-cli, select the ordered data layout by setting the two session properties:
```sql
set session pixels.ordered_path_enabled=true
set session pixels.compact_path_enabled=false
```
By default, both paths are enabled. You can also enable the compact path and disable the ordered path when [data compaction](#data-compaction) is done.
After selecting the data layout, execute the TPC-H queries in trino-cli.

## Data Compaction*
This is optional. It is only needed if we want to test the query performance on the compact layout.
In pixels-cli, use the following commands to compact the files in the ordered path of each table:
```bash
COMPACT -s tpch -t customer -n no -c 2
COMPACT -s tpch -t lineitem -n no -c 16
COMPACT -s tpch -t nation -n no -c 1
COMPACT -s tpch -t orders -n no -c 8
COMPACT -s tpch -t part -n no -c 1
COMPACT -s tpch -t partsupp -n no -c 8
COMPACT -s tpch -t region -n no -c 1
COMPACT -s tpch -t supplier -n no -c 1
```
The tables `nation` and `region` are too small, no need to compact them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,22 @@ public AggregationOperator(String name, List<AggregationInput> finalAggrInputs,
requireNonNull(finalAggrInputs, "finalAggrInputs is null");
checkArgument(!finalAggrInputs.isEmpty(), "finalAggrInputs is empty");
this.finalAggrInputs = ImmutableList.copyOf(finalAggrInputs);
for (AggregationInput aggrInput : this.finalAggrInputs)
{
aggrInput.setOperatorName(name);
}

if (scanInputs == null || scanInputs.isEmpty())
{
this.scanInputs = ImmutableList.of();
}
else
{
this.scanInputs = ImmutableList.copyOf(scanInputs);
for (ScanInput scanInput : this.scanInputs)
{
scanInput.setOperatorName(name);
}
}
}

Expand Down Expand Up @@ -122,7 +131,6 @@ public CompletableFuture<CompletableFuture<?>[]> execute()
int i = 0;
for (AggregationInput preAggrInput : this.finalAggrInputs)
{
preAggrInput.setOperatorName(this.getName());
this.finalAggrOutputs[i++] = InvokerFactory.Instance()
.getInvoker(WorkerType.AGGREGATION).invoke(preAggrInput);
}
Expand Down Expand Up @@ -157,7 +165,6 @@ public CompletableFuture<Void> executePrev()
int i = 0;
for (ScanInput scanInput : this.scanInputs)
{
scanInput.setOperatorName(this.getName());
this.scanOutputs[i++] = InvokerFactory.Instance()
.getInvoker(WorkerType.SCAN).invoke(scanInput);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ public PartitionedJoinOperator(String name, List<PartitionInput> smallPartitionI
else
{
this.smallPartitionInputs = ImmutableList.copyOf(smallPartitionInputs);
for (PartitionInput partitionInput : this.smallPartitionInputs)
{
partitionInput.setOperatorName(name);
}
}
if (largePartitionInputs == null || largePartitionInputs.isEmpty())
{
Expand All @@ -74,6 +78,10 @@ public PartitionedJoinOperator(String name, List<PartitionInput> smallPartitionI
else
{
this.largePartitionInputs = ImmutableList.copyOf(largePartitionInputs);
for (PartitionInput partitionInput : this.largePartitionInputs)
{
partitionInput.setOperatorName(name);
}
}
}

Expand Down Expand Up @@ -139,7 +147,6 @@ public CompletableFuture<CompletableFuture<?>[]> execute()
for (int i = 0; i < joinInputs.size(); ++i)
{
JoinInput joinInput = joinInputs.get(i);
joinInput.setOperatorName(this.getName());
if (joinAlgo == JoinAlgorithm.PARTITIONED)
{
joinOutputs[i] = InvokerFactory.Instance()
Expand Down Expand Up @@ -191,7 +198,6 @@ public CompletableFuture<Void> executePrev()
int i = 0;
for (PartitionInput partitionInput : largePartitionInputs)
{
partitionInput.setOperatorName(this.getName());
largePartitionOutputs[i++] = InvokerFactory.Instance()
.getInvoker(WorkerType.PARTITION).invoke((partitionInput));
}
Expand All @@ -210,7 +216,6 @@ public CompletableFuture<Void> executePrev()
int i = 0;
for (PartitionInput partitionInput : smallPartitionInputs)
{
partitionInput.setOperatorName(this.getName());
smallPartitionOutputs[i++] = InvokerFactory.Instance()
.getInvoker(WorkerType.PARTITION).invoke((partitionInput));
}
Expand All @@ -228,7 +233,6 @@ public CompletableFuture<Void> executePrev()
int i = 0;
for (PartitionInput partitionInput : smallPartitionInputs)
{
partitionInput.setOperatorName(this.getName());
smallPartitionOutputs[i++] = InvokerFactory.Instance()
.getInvoker(WorkerType.PARTITION).invoke((partitionInput));
}
Expand All @@ -239,7 +243,6 @@ public CompletableFuture<Void> executePrev()
i = 0;
for (PartitionInput partitionInput : largePartitionInputs)
{
partitionInput.setOperatorName(this.getName());
largePartitionOutputs[i++] = InvokerFactory.Instance()
.getInvoker(WorkerType.PARTITION).invoke((partitionInput));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class SingleStageJoinOperator extends JoinOperator
public SingleStageJoinOperator(String name, JoinInput joinInput, JoinAlgorithm joinAlgo)
{
super(name);
// ImmutableList.of() add the reference of joinInput into the returned list
joinInput.setOperatorName(name);
this.joinInputs = ImmutableList.of(joinInput);
this.joinAlgo = joinAlgo;
}
Expand All @@ -59,6 +61,10 @@ public SingleStageJoinOperator(String name, List<JoinInput> joinInputs, JoinAlgo
{
super(name);
this.joinInputs = ImmutableList.copyOf(joinInputs);
for (JoinInput joinInput : this.joinInputs)
{
joinInput.setOperatorName(name);
}
this.joinAlgo = joinAlgo;
}

Expand Down Expand Up @@ -116,7 +122,6 @@ public CompletableFuture<CompletableFuture<?>[]> execute()
for (int i = 0; i < joinInputs.size(); ++i)
{
JoinInput joinInput = joinInputs.get(i);
joinInput.setOperatorName(this.getName());
if (joinAlgo == JoinAlgorithm.BROADCAST)
{
joinOutputs[i] = InvokerFactory.Instance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,21 @@ public StarlingAggregationOperator(String name, List<AggregationInput> finalAggr
requireNonNull(finalAggrInputs, "finalAggrInputs is null");
checkArgument(!finalAggrInputs.isEmpty(), "finalAggrInputs is empty");
this.finalAggrInputs = ImmutableList.copyOf(finalAggrInputs);
for (AggregationInput aggrInput : this.finalAggrInputs)
{
aggrInput.setOperatorName(name);
}
if (partitionInputs == null || partitionInputs.isEmpty())
{
this.partitionInputs = ImmutableList.of();
}
else
{
this.partitionInputs = ImmutableList.copyOf(partitionInputs);
for (PartitionInput partitionInput : this.partitionInputs)
{
partitionInput.setOperatorName(name);
}
}
}

Expand Down Expand Up @@ -123,7 +131,6 @@ public CompletableFuture<CompletableFuture<?>[]> execute()
int i = 0;
for (AggregationInput preAggrInput : this.finalAggrInputs)
{
preAggrInput.setOperatorName(this.getName());
this.finalAggrOutputs[i++] = InvokerFactory.Instance()
.getInvoker(WorkerType.AGGREGATION).invoke(preAggrInput);
}
Expand Down Expand Up @@ -158,7 +165,6 @@ public CompletableFuture<Void> executePrev()
int i = 0;
for (PartitionInput partitionInput : this.partitionInputs)
{
partitionInput.setOperatorName(this.getName());
this.partitionOutputs[i++] = InvokerFactory.Instance()
.getInvoker(WorkerType.PARTITION).invoke(partitionInput);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
*/
package io.pixelsdb.pixels.planner;

import com.alibaba.fastjson.JSON;
import com.google.gson.Gson;
import io.pixelsdb.pixels.planner.plan.physical.input.ScanInput;
import io.pixelsdb.pixels.planner.plan.physical.output.ScanOutput;
import org.junit.Test;

Expand All @@ -42,4 +44,19 @@ public void testEncodeScanOutput()
assert json != null && !json.isEmpty();
System.out.println(json);
}

@Test
public void testEncodeScanInput()
{
ScanInput scanInput = new ScanInput();
scanInput.setTransId(1);
scanInput.setOperatorName("scan");
System.out.println(JSON.toJSONString(scanInput));
Gson gson = new Gson();
System.out.println(gson.toJson(scanInput));
ScanInput scanInput1 = JSON.parseObject(JSON.toJSONString(scanInput), ScanInput.class);
ScanInput scanInput2 = gson.fromJson(gson.toJson(scanInput), ScanInput.class);
System.out.println(scanInput1.getOperatorName());
System.out.println(scanInput2.getOperatorName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ public void execute(TurboProto.WorkerRequest request,
output = handler.handleRequest(input);
Utils.stopProfile(JFRFilename);

Utils.upload(JFRFilename, String.format("%s_%s/%s",
Utils.upload(JFRFilename, String.format("%s/%s/%s",
input.getTransId(), notNullOrElse(input.getOperatorName(), "default"), JFRFilename));
log.info(String.format("upload JFR file to experiments/%s_%s/%s successfully",
log.info(String.format("upload JFR file to experiments/%s/%s/%s successfully",
input.getTransId(), notNullOrElse(input.getOperatorName(), "default"), JFRFilename));
} else
{
Expand All @@ -83,10 +83,10 @@ public void execute(TurboProto.WorkerRequest request,
output = handler.handleRequest(input);
}
Utils.dump(JSONFilename, input, output);
Utils.upload(JSONFilename, String.format("%s_%s/%s",
input.getTransId(), notNullOrElse(input.getOperatorName(), "none"), JSONFilename));
log.info(String.format("upload JSON file to experiments/%s_%s/%s successfully",
input.getTransId(), notNullOrElse(input.getOperatorName(), "none"), JSONFilename));
Utils.upload(JSONFilename, String.format("%s/%s/%s",
input.getTransId(), notNullOrElse(input.getOperatorName(), "default"), JSONFilename));
log.info(String.format("upload JSON file to experiments/%s/%s/%s successfully",
input.getTransId(), notNullOrElse(input.getOperatorName(), "default"), JSONFilename));

log.info(String.format("get output successfully: %s", JSON.toJSONString(output)));
} catch (Exception e)
Expand Down