Skip to content

Commit 4bed357

Browse files
authored
[Issue pixelsdb#1236] revise checkpoint-related RPC interfaces (pixelsdb#1239)
In this PR, the query engine is responsible for checking and marking a long-running query, and sends RPC requests to transaction server and retina to push watermarks and checkpoint (offload) visibility structures.
1 parent 2a2ac44 commit 4bed357

File tree

10 files changed

+380
-241
lines changed

10 files changed

+380
-241
lines changed

pixels-common/src/main/java/io/pixelsdb/pixels/common/retina/RetinaService.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,4 +407,58 @@ public boolean addWriteBuffer(String schemaName, String tableName) throws Retina
407407
}
408408
return true;
409409
}
410+
411+
/**
412+
* Register a long-running query to be offloaded to disk checkpoint.
413+
*
414+
* @param timestamp the transaction timestamp
415+
* @return true on success
416+
* @throws RetinaException if the operation fails
417+
*/
418+
public boolean registerOffload(long timestamp) throws RetinaException
419+
{
420+
String token = UUID.randomUUID().toString();
421+
RetinaProto.RegisterOffloadRequest request = RetinaProto.RegisterOffloadRequest.newBuilder()
422+
.setHeader(RetinaProto.RequestHeader.newBuilder().setToken(token).build())
423+
.setTimestamp(timestamp)
424+
.build();
425+
RetinaProto.RegisterOffloadResponse response = this.stub.registerOffload(request);
426+
if (response.getHeader().getErrorCode() != 0)
427+
{
428+
throw new RetinaException("Failed to register offload: " + response.getHeader().getErrorCode()
429+
+ " " + response.getHeader().getErrorMsg());
430+
}
431+
if (!response.getHeader().getToken().equals(token))
432+
{
433+
throw new RetinaException("Response token does not match");
434+
}
435+
return true;
436+
}
437+
438+
/**
439+
* Unregister a long-running query's offload checkpoint when the query completes.
440+
*
441+
* @param timestamp the transaction timestamp
442+
* @return true on success
443+
* @throws RetinaException if the operation fails
444+
*/
445+
public boolean unregisterOffload(long timestamp) throws RetinaException
446+
{
447+
String token = UUID.randomUUID().toString();
448+
RetinaProto.UnregisterOffloadRequest request = RetinaProto.UnregisterOffloadRequest.newBuilder()
449+
.setHeader(RetinaProto.RequestHeader.newBuilder().setToken(token).build())
450+
.setTimestamp(timestamp)
451+
.build();
452+
RetinaProto.UnregisterOffloadResponse response = this.stub.unregisterOffload(request);
453+
if (response.getHeader().getErrorCode() != 0)
454+
{
455+
throw new RetinaException("Failed to unregister offload: " + response.getHeader().getErrorCode()
456+
+ " " + response.getHeader().getErrorMsg());
457+
}
458+
if (!response.getHeader().getToken().equals(token))
459+
{
460+
throw new RetinaException("Response token does not match");
461+
}
462+
return true;
463+
}
410464
}

pixels-common/src/main/java/io/pixelsdb/pixels/common/transaction/TransService.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,4 +432,24 @@ public long getSafeGcTimestamp() throws TransException
432432
}
433433
return response.getTimestamp();
434434
}
435+
436+
/**
437+
* Mark a transaction as offloaded. This allows the transaction to be skipped when
438+
* calculating the minimum running transaction timestamp for garbage collection.
439+
*
440+
* @param transId the id of the transaction to mark as offloaded
441+
* @return true on success
442+
* @throws TransException if the operation fails
443+
*/
444+
public boolean markTransOffloaded(long transId) throws TransException
445+
{
446+
TransProto.MarkTransOffloadedRequest request = TransProto.MarkTransOffloadedRequest.newBuilder()
447+
.setTransId(transId).build();
448+
TransProto.MarkTransOffloadedResponse response = this.stub.markTransOffloaded(request);
449+
if (response.getErrorCode() != ErrorCode.SUCCESS)
450+
{
451+
throw new TransException("failed to mark transaction as offloaded, error code=" + response.getErrorCode());
452+
}
453+
return true;
454+
}
435455
}

pixels-common/src/main/resources/pixels.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ retina.reader.prefetch.threads=8
291291
# offloading threshold for long query in seconds
292292
pixels.transaction.offload.threshold=1800
293293
# snapshot storage directory
294-
pixels.retina.checkpoint.dir=/tmp/pixels-checkpoints
294+
pixels.retina.checkpoint.dir=file:///tmp/pixels-checkpoints
295295

296296
### pixels-sink ###
297297
sink.server.enabled=false

pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/retina/RetinaServerImpl.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,56 @@ public void getWriteBuffer(RetinaProto.GetWriteBufferRequest request,
640640
}
641641
}
642642

643+
@Override
644+
public void registerOffload(RetinaProto.RegisterOffloadRequest request,
645+
StreamObserver<RetinaProto.RegisterOffloadResponse> responseObserver)
646+
{
647+
RetinaProto.ResponseHeader.Builder headerBuilder = RetinaProto.ResponseHeader.newBuilder()
648+
.setToken(request.getHeader().getToken());
649+
650+
try
651+
{
652+
this.retinaResourceManager.registerOffload(request.getTimestamp());
653+
responseObserver.onNext(RetinaProto.RegisterOffloadResponse.newBuilder()
654+
.setHeader(headerBuilder.build()).build());
655+
} catch (RetinaException e)
656+
{
657+
logger.error("registerOffload failed for timestamp={}",
658+
request.getTimestamp(), e);
659+
headerBuilder.setErrorCode(1).setErrorMsg(e.getMessage());
660+
responseObserver.onNext(RetinaProto.RegisterOffloadResponse.newBuilder()
661+
.setHeader(headerBuilder.build()).build());
662+
} finally
663+
{
664+
responseObserver.onCompleted();
665+
}
666+
}
667+
668+
@Override
669+
public void unregisterOffload(RetinaProto.UnregisterOffloadRequest request,
670+
StreamObserver<RetinaProto.UnregisterOffloadResponse> responseObserver)
671+
{
672+
RetinaProto.ResponseHeader.Builder headerBuilder = RetinaProto.ResponseHeader.newBuilder()
673+
.setToken(request.getHeader().getToken());
674+
675+
try
676+
{
677+
this.retinaResourceManager.unregisterOffload(request.getTimestamp());
678+
responseObserver.onNext(RetinaProto.UnregisterOffloadResponse.newBuilder()
679+
.setHeader(headerBuilder.build()).build());
680+
} catch (Exception e)
681+
{
682+
logger.error("unregisterOffload failed for timestamp={}",
683+
request.getTimestamp(), e);
684+
headerBuilder.setErrorCode(1).setErrorMsg(e.getMessage());
685+
responseObserver.onNext(RetinaProto.UnregisterOffloadResponse.newBuilder()
686+
.setHeader(headerBuilder.build()).build());
687+
} finally
688+
{
689+
responseObserver.onCompleted();
690+
}
691+
}
692+
643693
/**
644694
* Check if the order or compact paths from pixels metadata is valid.
645695
*

pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/transaction/TransContextManager.java

Lines changed: 19 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import io.pixelsdb.pixels.common.transaction.TransContext;
2323
import io.pixelsdb.pixels.common.utils.ConfigFactory;
2424
import io.pixelsdb.pixels.daemon.TransProto;
25-
import io.pixelsdb.pixels.retina.RetinaResourceManager;
2625
import org.apache.logging.log4j.LogManager;
2726
import org.apache.logging.log4j.Logger;
2827

@@ -75,8 +74,6 @@ protected static TransContextManager Instance()
7574

7675
private final ReadWriteLock contextLock = new ReentrantReadWriteLock();
7776

78-
private static final long OFFLOAD_THRESHOLD = Long.parseLong(ConfigFactory.Instance().getProperty("pixels.transaction.offload.threshold"));
79-
8077
private TransContextManager() { }
8178

8279
/**
@@ -218,20 +215,7 @@ private boolean terminateTrans(long transId, TransProto.TransStatus status)
218215
* Adding the same lock in {@link #offloadLongRunningQueries()}
219216
* constitutes a mutually exclusive critical section.
220217
*/
221-
synchronized (context)
222-
{
223-
context.setStatus(status);
224-
if (context.isOffloaded())
225-
{
226-
try
227-
{
228-
RetinaResourceManager.Instance().unregisterOffload(context.getTransId(), context.getTimestamp());
229-
} catch (Exception e)
230-
{
231-
log.error("Unregister failed", e);
232-
}
233-
}
234-
}
218+
context.setStatus(status);
235219

236220
if (context.isReadOnly())
237221
{
@@ -254,52 +238,6 @@ private boolean terminateTrans(long transId, TransProto.TransStatus status)
254238
return false;
255239
}
256240

257-
/**
258-
* Offload long-running queries to disk.
259-
*/
260-
public void offloadLongRunningQueries()
261-
{
262-
long now = System.currentTimeMillis();
263-
boolean pushed = false;
264-
265-
for (TransContext ctx : runningReadOnlyTrans)
266-
{
267-
if (ctx.isOffloaded())
268-
{
269-
continue;
270-
}
271-
272-
if ((now - ctx.getStartTime()) > OFFLOAD_THRESHOLD)
273-
{
274-
try
275-
{
276-
// 1. Register and generate snapshot
277-
RetinaResourceManager.Instance().registerOffload(ctx.getTransId(), ctx.getTimestamp());
278-
279-
// 2. Double-checked locking
280-
synchronized (ctx)
281-
{
282-
if (ctx.getStatus() == TransProto.TransStatus.PENDING)
283-
{
284-
ctx.setOffloaded(true);
285-
pushed = true;
286-
} else
287-
{
288-
// Transaction has ended, rollback registration
289-
RetinaResourceManager.Instance().unregisterOffload(ctx.getTransId(), ctx.getTimestamp());
290-
}
291-
}
292-
} catch (Exception e)
293-
{
294-
log.error("Failed to offload transaction {}", ctx.getTransId(), e);
295-
}
296-
}
297-
}
298-
if (pushed)
299-
{
300-
TransServiceImpl.pushWatermarks(true);
301-
}
302-
}
303241

304242
/**
305243
* Dump the context of transactions in this manager to a history file and remove terminated transactions. This method
@@ -451,4 +389,22 @@ public int getQueryConcurrency(boolean readOnly)
451389
this.contextLock.writeLock().unlock();
452390
}
453391
}
392+
393+
/**
394+
* Mark a transaction as offloaded. This allows the transaction context manager to
395+
* skip it when calculating the minimum running transaction timestamp.
396+
*
397+
* @param transId the transaction id
398+
* @return true if the transaction exists and was marked as offloaded, false otherwise
399+
*/
400+
public boolean markTransOffloaded(long transId)
401+
{
402+
TransContext context = this.transIdToContext.get(transId);
403+
if (context != null)
404+
{
405+
context.setOffloaded(true);
406+
return true;
407+
}
408+
return false;
409+
}
454410
}

pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/transaction/TransServiceImpl.java

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -109,19 +109,6 @@ public class TransServiceImpl extends TransServiceGrpc.TransServiceImplBase
109109

110110
public TransServiceImpl()
111111
{
112-
/*
113-
* Initiate a background monitoring thread to periodically (every 5 minutes)
114-
* trigger the detection and offloading process for long-running queries,
115-
* thereby ensuring the persistent release of blockages on the Low Watermarks
116-
* and guaranteeing the proper functioning of the garbage collection mechanism.
117-
*/
118-
ScheduledExecutorService offloadScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
119-
Thread t = new Thread(r, "trans-offload-monitor");
120-
t.setDaemon(true);
121-
return t;
122-
});
123-
offloadScheduler.scheduleAtFixedRate(() ->
124-
TransContextManager.Instance().offloadLongRunningQueries(), 5, 5, TimeUnit.MINUTES);
125112
}
126113

127114
@Override
@@ -314,7 +301,7 @@ public void rollbackTrans(TransProto.RollbackTransRequest request,
314301
responseObserver.onCompleted();
315302
}
316303

317-
static void pushWatermarks(boolean readOnly)
304+
private void pushWatermarks(boolean readOnly)
318305
{
319306
long timestamp = TransContextManager.Instance().getMinRunningTransTimestamp(readOnly);
320307
if (readOnly)
@@ -497,4 +484,25 @@ public void getSafeGcTimestamp(com.google.protobuf.Empty request,
497484
responseObserver.onNext(response);
498485
responseObserver.onCompleted();
499486
}
487+
488+
@Override
489+
public void markTransOffloaded(TransProto.MarkTransOffloadedRequest request,
490+
StreamObserver<TransProto.MarkTransOffloadedResponse> responseObserver)
491+
{
492+
int error = ErrorCode.SUCCESS;
493+
boolean success = TransContextManager.Instance().markTransOffloaded(request.getTransId());
494+
if (!success)
495+
{
496+
logger.error("transaction id {} does not exist or failed to mark as offloaded", request.getTransId());
497+
error = ErrorCode.TRANS_ID_NOT_EXIST;
498+
}
499+
500+
// After marking, attempt to push low watermark.
501+
pushWatermarks(true);
502+
503+
TransProto.MarkTransOffloadedResponse response = TransProto.MarkTransOffloadedResponse.newBuilder()
504+
.setErrorCode(error).build();
505+
responseObserver.onNext(response);
506+
responseObserver.onCompleted();
507+
}
500508
}

0 commit comments

Comments
 (0)