Skip to content

Commit bae8196

Browse files
bonziniFam Zheng
authored andcommitted
blockjob: introduce .drain callback for jobs
This is required to decouple block jobs from running in an AioContext. With multiqueue block devices, a BlockDriverState does not really belong to a single AioContext. The solution is to first wait until all I/O operations are complete; then loop in the main thread for the block job to complete entirely. Signed-off-by: Paolo Bonzini <pbonzini@redhat.com> Reviewed-by: Fam Zheng <famz@redhat.com> Message-Id: <1477565348-5458-3-git-send-email-pbonzini@redhat.com> Signed-off-by: Fam Zheng <famz@redhat.com>
1 parent 50ab0e0 commit bae8196

File tree

4 files changed

+71
-25
lines changed

4 files changed

+71
-25
lines changed

block/backup.c

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,13 +300,29 @@ void backup_cow_request_end(CowRequest *req)
300300
cow_request_end(req);
301301
}
302302

303+
static void backup_drain(BlockJob *job)
304+
{
305+
BackupBlockJob *s = container_of(job, BackupBlockJob, common);
306+
307+
/* Need to keep a reference in case blk_drain triggers execution
308+
* of backup_complete...
309+
*/
310+
if (s->target) {
311+
BlockBackend *target = s->target;
312+
blk_ref(target);
313+
blk_drain(target);
314+
blk_unref(target);
315+
}
316+
}
317+
303318
static const BlockJobDriver backup_job_driver = {
304319
.instance_size = sizeof(BackupBlockJob),
305320
.job_type = BLOCK_JOB_TYPE_BACKUP,
306321
.set_speed = backup_set_speed,
307322
.commit = backup_commit,
308323
.abort = backup_abort,
309324
.attached_aio_context = backup_attached_aio_context,
325+
.drain = backup_drain,
310326
};
311327

312328
static BlockErrorAction backup_error_action(BackupBlockJob *job,
@@ -331,6 +347,7 @@ static void backup_complete(BlockJob *job, void *opaque)
331347
BackupCompleteData *data = opaque;
332348

333349
blk_unref(s->target);
350+
s->target = NULL;
334351

335352
block_job_completed(job, data->ret);
336353
g_free(data);

block/mirror.c

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -469,7 +469,11 @@ static void mirror_free_init(MirrorBlockJob *s)
469469
}
470470
}
471471

472-
static void mirror_drain(MirrorBlockJob *s)
472+
/* This is also used for the .pause callback. There is no matching
473+
* mirror_resume() because mirror_run() will begin iterating again
474+
* when the job is resumed.
475+
*/
476+
static void mirror_wait_for_all_io(MirrorBlockJob *s)
473477
{
474478
while (s->in_flight > 0) {
475479
mirror_wait_for_io(s);
@@ -528,6 +532,7 @@ static void mirror_exit(BlockJob *job, void *opaque)
528532
g_free(s->replaces);
529533
bdrv_op_unblock_all(target_bs, s->common.blocker);
530534
blk_unref(s->target);
535+
s->target = NULL;
531536
block_job_completed(&s->common, data->ret);
532537
g_free(data);
533538
bdrv_drained_end(src);
@@ -582,7 +587,7 @@ static int coroutine_fn mirror_dirty_init(MirrorBlockJob *s)
582587
sector_num += nb_sectors;
583588
}
584589

585-
mirror_drain(s);
590+
mirror_wait_for_all_io(s);
586591
}
587592

588593
/* First part, loop on the sectors and initialize the dirty bitmap. */
@@ -787,7 +792,7 @@ static void coroutine_fn mirror_run(void *opaque)
787792
* the target is a copy of the source.
788793
*/
789794
assert(ret < 0 || (!s->synced && block_job_is_cancelled(&s->common)));
790-
mirror_drain(s);
795+
mirror_wait_for_all_io(s);
791796
}
792797

793798
assert(s->in_flight == 0);
@@ -872,14 +877,11 @@ static void mirror_complete(BlockJob *job, Error **errp)
872877
block_job_enter(&s->common);
873878
}
874879

875-
/* There is no matching mirror_resume() because mirror_run() will begin
876-
* iterating again when the job is resumed.
877-
*/
878-
static void coroutine_fn mirror_pause(BlockJob *job)
880+
static void mirror_pause(BlockJob *job)
879881
{
880882
MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
881883

882-
mirror_drain(s);
884+
mirror_wait_for_all_io(s);
883885
}
884886

885887
static void mirror_attached_aio_context(BlockJob *job, AioContext *new_context)
@@ -889,13 +891,29 @@ static void mirror_attached_aio_context(BlockJob *job, AioContext *new_context)
889891
blk_set_aio_context(s->target, new_context);
890892
}
891893

894+
static void mirror_drain(BlockJob *job)
895+
{
896+
MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
897+
898+
/* Need to keep a reference in case blk_drain triggers execution
899+
* of mirror_complete...
900+
*/
901+
if (s->target) {
902+
BlockBackend *target = s->target;
903+
blk_ref(target);
904+
blk_drain(target);
905+
blk_unref(target);
906+
}
907+
}
908+
892909
static const BlockJobDriver mirror_job_driver = {
893910
.instance_size = sizeof(MirrorBlockJob),
894911
.job_type = BLOCK_JOB_TYPE_MIRROR,
895912
.set_speed = mirror_set_speed,
896913
.complete = mirror_complete,
897914
.pause = mirror_pause,
898915
.attached_aio_context = mirror_attached_aio_context,
916+
.drain = mirror_drain,
899917
};
900918

901919
static const BlockJobDriver commit_active_job_driver = {
@@ -905,6 +923,7 @@ static const BlockJobDriver commit_active_job_driver = {
905923
.complete = mirror_complete,
906924
.pause = mirror_pause,
907925
.attached_aio_context = mirror_attached_aio_context,
926+
.drain = mirror_drain,
908927
};
909928

910929
static void mirror_start_job(const char *job_id, BlockDriverState *bs,

blockjob.c

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,6 @@ BlockJob *block_job_get(const char *id)
7474
return NULL;
7575
}
7676

77-
/* Normally the job runs in its BlockBackend's AioContext. The exception is
78-
* block_job_defer_to_main_loop() where it runs in the QEMU main loop. Code
79-
* that supports both cases uses this helper function.
80-
*/
81-
static AioContext *block_job_get_aio_context(BlockJob *job)
82-
{
83-
return job->deferred_to_main_loop ?
84-
qemu_get_aio_context() :
85-
blk_get_aio_context(job->blk);
86-
}
87-
8877
static void block_job_attached_aio_context(AioContext *new_context,
8978
void *opaque)
9079
{
@@ -97,6 +86,17 @@ static void block_job_attached_aio_context(AioContext *new_context,
9786
block_job_resume(job);
9887
}
9988

89+
static void block_job_drain(BlockJob *job)
90+
{
91+
/* If job is !job->busy this kicks it into the next pause point. */
92+
block_job_enter(job);
93+
94+
blk_drain(job->blk);
95+
if (job->driver->drain) {
96+
job->driver->drain(job);
97+
}
98+
}
99+
100100
static void block_job_detach_aio_context(void *opaque)
101101
{
102102
BlockJob *job = opaque;
@@ -106,12 +106,8 @@ static void block_job_detach_aio_context(void *opaque)
106106

107107
block_job_pause(job);
108108

109-
if (!job->paused) {
110-
/* If job is !job->busy this kicks it into the next pause point. */
111-
block_job_enter(job);
112-
}
113109
while (!job->paused && !job->completed) {
114-
aio_poll(block_job_get_aio_context(job), true);
110+
block_job_drain(job);
115111
}
116112

117113
block_job_unref(job);
@@ -413,14 +409,21 @@ static int block_job_finish_sync(BlockJob *job,
413409
assert(blk_bs(job->blk)->job == job);
414410

415411
block_job_ref(job);
412+
416413
finish(job, &local_err);
417414
if (local_err) {
418415
error_propagate(errp, local_err);
419416
block_job_unref(job);
420417
return -EBUSY;
421418
}
419+
/* block_job_drain calls block_job_enter, and it should be enough to
420+
* induce progress until the job completes or moves to the main thread.
421+
*/
422+
while (!job->deferred_to_main_loop && !job->completed) {
423+
block_job_drain(job);
424+
}
422425
while (!job->completed) {
423-
aio_poll(block_job_get_aio_context(job), true);
426+
aio_poll(qemu_get_aio_context(), true);
424427
}
425428
ret = (job->cancelled && job->ret == 0) ? -ECANCELED : job->ret;
426429
block_job_unref(job);

include/block/blockjob.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,13 @@ typedef struct BlockJobDriver {
9292
* besides job->blk to the new AioContext.
9393
*/
9494
void (*attached_aio_context)(BlockJob *job, AioContext *new_context);
95+
96+
/*
97+
* If the callback is not NULL, it will be invoked when the job has to be
98+
* synchronously cancelled or completed; it should drain BlockDriverStates
99+
* as required to ensure progress.
100+
*/
101+
void (*drain)(BlockJob *job);
95102
} BlockJobDriver;
96103

97104
/**

0 commit comments

Comments
 (0)