Skip to content

Commit 0bf317b

Browse files
authored
[Issue #468] improve operator name setting. (#470)
Ensure that the operator name is set for serverless worker inputs when the inputs are added to an operator.
1 parent d2718bb commit 0bf317b

File tree

7 files changed

+63
-17
lines changed

7 files changed

+63
-17
lines changed

docs/TPC-H.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,17 +68,25 @@ Connect to trino-cli:
6868
cd ~/opt/trino-server
6969
./bin/trino --server localhost:8080 --catalog pixels --schema tpch
7070
```
71-
Execute the TPC-H queries in trino-cli.
71+
In trino-cli, select the ordered data layout by setting the two session properties:
72+
```sql
73+
set session pixels.ordered_path_enabled=true
74+
set session pixels.compact_path_enabled=false
75+
```
76+
By default, both paths are enabled. You can also enable the compact path and disable the ordered path when [data compaction](#data-compaction) is done.
77+
After selecting the data layout, execute the TPC-H queries in trino-cli.
7278

7379
## Data Compaction*
7480
This is optional. It is only needed if we want to test the query performance on the compact layout.
7581
In pixels-cli, use the following commands to compact the files in the ordered path of each table:
7682
```bash
7783
COMPACT -s tpch -t customer -n no -c 2
7884
COMPACT -s tpch -t lineitem -n no -c 16
85+
COMPACT -s tpch -t nation -n no -c 1
7986
COMPACT -s tpch -t orders -n no -c 8
8087
COMPACT -s tpch -t part -n no -c 1
8188
COMPACT -s tpch -t partsupp -n no -c 8
89+
COMPACT -s tpch -t region -n no -c 1
8290
COMPACT -s tpch -t supplier -n no -c 1
8391
```
8492
The tables `nation` and `region` are too small, no need to compact them.

pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/AggregationOperator.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,22 @@ public AggregationOperator(String name, List<AggregationInput> finalAggrInputs,
7070
requireNonNull(finalAggrInputs, "finalAggrInputs is null");
7171
checkArgument(!finalAggrInputs.isEmpty(), "finalAggrInputs is empty");
7272
this.finalAggrInputs = ImmutableList.copyOf(finalAggrInputs);
73+
for (AggregationInput aggrInput : this.finalAggrInputs)
74+
{
75+
aggrInput.setOperatorName(name);
76+
}
77+
7378
if (scanInputs == null || scanInputs.isEmpty())
7479
{
7580
this.scanInputs = ImmutableList.of();
7681
}
7782
else
7883
{
7984
this.scanInputs = ImmutableList.copyOf(scanInputs);
85+
for (ScanInput scanInput : this.scanInputs)
86+
{
87+
scanInput.setOperatorName(name);
88+
}
8089
}
8190
}
8291

@@ -122,7 +131,6 @@ public CompletableFuture<CompletableFuture<?>[]> execute()
122131
int i = 0;
123132
for (AggregationInput preAggrInput : this.finalAggrInputs)
124133
{
125-
preAggrInput.setOperatorName(this.getName());
126134
this.finalAggrOutputs[i++] = InvokerFactory.Instance()
127135
.getInvoker(WorkerType.AGGREGATION).invoke(preAggrInput);
128136
}
@@ -157,7 +165,6 @@ public CompletableFuture<Void> executePrev()
157165
int i = 0;
158166
for (ScanInput scanInput : this.scanInputs)
159167
{
160-
scanInput.setOperatorName(this.getName());
161168
this.scanOutputs[i++] = InvokerFactory.Instance()
162169
.getInvoker(WorkerType.SCAN).invoke(scanInput);
163170
}

pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/PartitionedJoinOperator.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ public PartitionedJoinOperator(String name, List<PartitionInput> smallPartitionI
6666
else
6767
{
6868
this.smallPartitionInputs = ImmutableList.copyOf(smallPartitionInputs);
69+
for (PartitionInput partitionInput : this.smallPartitionInputs)
70+
{
71+
partitionInput.setOperatorName(name);
72+
}
6973
}
7074
if (largePartitionInputs == null || largePartitionInputs.isEmpty())
7175
{
@@ -74,6 +78,10 @@ public PartitionedJoinOperator(String name, List<PartitionInput> smallPartitionI
7478
else
7579
{
7680
this.largePartitionInputs = ImmutableList.copyOf(largePartitionInputs);
81+
for (PartitionInput partitionInput : this.largePartitionInputs)
82+
{
83+
partitionInput.setOperatorName(name);
84+
}
7785
}
7886
}
7987

@@ -139,7 +147,6 @@ public CompletableFuture<CompletableFuture<?>[]> execute()
139147
for (int i = 0; i < joinInputs.size(); ++i)
140148
{
141149
JoinInput joinInput = joinInputs.get(i);
142-
joinInput.setOperatorName(this.getName());
143150
if (joinAlgo == JoinAlgorithm.PARTITIONED)
144151
{
145152
joinOutputs[i] = InvokerFactory.Instance()
@@ -191,7 +198,6 @@ public CompletableFuture<Void> executePrev()
191198
int i = 0;
192199
for (PartitionInput partitionInput : largePartitionInputs)
193200
{
194-
partitionInput.setOperatorName(this.getName());
195201
largePartitionOutputs[i++] = InvokerFactory.Instance()
196202
.getInvoker(WorkerType.PARTITION).invoke((partitionInput));
197203
}
@@ -210,7 +216,6 @@ public CompletableFuture<Void> executePrev()
210216
int i = 0;
211217
for (PartitionInput partitionInput : smallPartitionInputs)
212218
{
213-
partitionInput.setOperatorName(this.getName());
214219
smallPartitionOutputs[i++] = InvokerFactory.Instance()
215220
.getInvoker(WorkerType.PARTITION).invoke((partitionInput));
216221
}
@@ -228,7 +233,6 @@ public CompletableFuture<Void> executePrev()
228233
int i = 0;
229234
for (PartitionInput partitionInput : smallPartitionInputs)
230235
{
231-
partitionInput.setOperatorName(this.getName());
232236
smallPartitionOutputs[i++] = InvokerFactory.Instance()
233237
.getInvoker(WorkerType.PARTITION).invoke((partitionInput));
234238
}
@@ -239,7 +243,6 @@ public CompletableFuture<Void> executePrev()
239243
i = 0;
240244
for (PartitionInput partitionInput : largePartitionInputs)
241245
{
242-
partitionInput.setOperatorName(this.getName());
243246
largePartitionOutputs[i++] = InvokerFactory.Instance()
244247
.getInvoker(WorkerType.PARTITION).invoke((partitionInput));
245248
}

pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/SingleStageJoinOperator.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public class SingleStageJoinOperator extends JoinOperator
5151
public SingleStageJoinOperator(String name, JoinInput joinInput, JoinAlgorithm joinAlgo)
5252
{
5353
super(name);
54+
// ImmutableList.of() add the reference of joinInput into the returned list
55+
joinInput.setOperatorName(name);
5456
this.joinInputs = ImmutableList.of(joinInput);
5557
this.joinAlgo = joinAlgo;
5658
}
@@ -59,6 +61,10 @@ public SingleStageJoinOperator(String name, List<JoinInput> joinInputs, JoinAlgo
5961
{
6062
super(name);
6163
this.joinInputs = ImmutableList.copyOf(joinInputs);
64+
for (JoinInput joinInput : this.joinInputs)
65+
{
66+
joinInput.setOperatorName(name);
67+
}
6268
this.joinAlgo = joinAlgo;
6369
}
6470

@@ -116,7 +122,6 @@ public CompletableFuture<CompletableFuture<?>[]> execute()
116122
for (int i = 0; i < joinInputs.size(); ++i)
117123
{
118124
JoinInput joinInput = joinInputs.get(i);
119-
joinInput.setOperatorName(this.getName());
120125
if (joinAlgo == JoinAlgorithm.BROADCAST)
121126
{
122127
joinOutputs[i] = InvokerFactory.Instance()

pixels-planner/src/main/java/io/pixelsdb/pixels/planner/plan/physical/StarlingAggregationOperator.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,21 @@ public StarlingAggregationOperator(String name, List<AggregationInput> finalAggr
7171
requireNonNull(finalAggrInputs, "finalAggrInputs is null");
7272
checkArgument(!finalAggrInputs.isEmpty(), "finalAggrInputs is empty");
7373
this.finalAggrInputs = ImmutableList.copyOf(finalAggrInputs);
74+
for (AggregationInput aggrInput : this.finalAggrInputs)
75+
{
76+
aggrInput.setOperatorName(name);
77+
}
7478
if (partitionInputs == null || partitionInputs.isEmpty())
7579
{
7680
this.partitionInputs = ImmutableList.of();
7781
}
7882
else
7983
{
8084
this.partitionInputs = ImmutableList.copyOf(partitionInputs);
85+
for (PartitionInput partitionInput : this.partitionInputs)
86+
{
87+
partitionInput.setOperatorName(name);
88+
}
8189
}
8290
}
8391

@@ -123,7 +131,6 @@ public CompletableFuture<CompletableFuture<?>[]> execute()
123131
int i = 0;
124132
for (AggregationInput preAggrInput : this.finalAggrInputs)
125133
{
126-
preAggrInput.setOperatorName(this.getName());
127134
this.finalAggrOutputs[i++] = InvokerFactory.Instance()
128135
.getInvoker(WorkerType.AGGREGATION).invoke(preAggrInput);
129136
}
@@ -158,7 +165,6 @@ public CompletableFuture<Void> executePrev()
158165
int i = 0;
159166
for (PartitionInput partitionInput : this.partitionInputs)
160167
{
161-
partitionInput.setOperatorName(this.getName());
162168
this.partitionOutputs[i++] = InvokerFactory.Instance()
163169
.getInvoker(WorkerType.PARTITION).invoke(partitionInput);
164170
}

pixels-planner/src/test/java/io/pixelsdb/pixels/planner/TestOutput.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
*/
2020
package io.pixelsdb.pixels.planner;
2121

22+
import com.alibaba.fastjson.JSON;
2223
import com.google.gson.Gson;
24+
import io.pixelsdb.pixels.planner.plan.physical.input.ScanInput;
2325
import io.pixelsdb.pixels.planner.plan.physical.output.ScanOutput;
2426
import org.junit.Test;
2527

@@ -42,4 +44,19 @@ public void testEncodeScanOutput()
4244
assert json != null && !json.isEmpty();
4345
System.out.println(json);
4446
}
47+
48+
@Test
49+
public void testEncodeScanInput()
50+
{
51+
ScanInput scanInput = new ScanInput();
52+
scanInput.setTransId(1);
53+
scanInput.setOperatorName("scan");
54+
System.out.println(JSON.toJSONString(scanInput));
55+
Gson gson = new Gson();
56+
System.out.println(gson.toJson(scanInput));
57+
ScanInput scanInput1 = JSON.parseObject(JSON.toJSONString(scanInput), ScanInput.class);
58+
ScanInput scanInput2 = gson.fromJson(gson.toJson(scanInput), ScanInput.class);
59+
System.out.println(scanInput1.getOperatorName());
60+
System.out.println(scanInput2.getOperatorName());
61+
}
4562
}

pixels-turbo/pixels-worker-vhive/src/main/java/io/pixelsdb/pixels/worker/vhive/utils/ServiceImpl.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,9 @@ public void execute(TurboProto.WorkerRequest request,
7272
output = handler.handleRequest(input);
7373
Utils.stopProfile(JFRFilename);
7474

75-
Utils.upload(JFRFilename, String.format("%s_%s/%s",
75+
Utils.upload(JFRFilename, String.format("%s/%s/%s",
7676
input.getTransId(), notNullOrElse(input.getOperatorName(), "default"), JFRFilename));
77-
log.info(String.format("upload JFR file to experiments/%s_%s/%s successfully",
77+
log.info(String.format("upload JFR file to experiments/%s/%s/%s successfully",
7878
input.getTransId(), notNullOrElse(input.getOperatorName(), "default"), JFRFilename));
7979
} else
8080
{
@@ -83,10 +83,10 @@ public void execute(TurboProto.WorkerRequest request,
8383
output = handler.handleRequest(input);
8484
}
8585
Utils.dump(JSONFilename, input, output);
86-
Utils.upload(JSONFilename, String.format("%s_%s/%s",
87-
input.getTransId(), notNullOrElse(input.getOperatorName(), "none"), JSONFilename));
88-
log.info(String.format("upload JSON file to experiments/%s_%s/%s successfully",
89-
input.getTransId(), notNullOrElse(input.getOperatorName(), "none"), JSONFilename));
86+
Utils.upload(JSONFilename, String.format("%s/%s/%s",
87+
input.getTransId(), notNullOrElse(input.getOperatorName(), "default"), JSONFilename));
88+
log.info(String.format("upload JSON file to experiments/%s/%s/%s successfully",
89+
input.getTransId(), notNullOrElse(input.getOperatorName(), "default"), JSONFilename));
9090

9191
log.info(String.format("get output successfully: %s", JSON.toJSONString(output)));
9292
} catch (Exception e)

0 commit comments

Comments
 (0)