Skip to content

Commit 5ccac6f

Browse files
jnsnowcodyprime
authored andcommitted
blockjob: add block_job_start
Instead of automatically starting jobs at creation time via backup_start et al, we'd like to return a job object pointer that can be started manually at later point in time. For now, add the block_job_start mechanism and start the jobs automatically as we have been doing, with conversions job-by-job coming in later patches. Of note: cancellation of unstarted jobs will perform all the normal cleanup as if the job had started, particularly abort and clean. The only difference is that we will not emit any events, because the job never actually started. Signed-off-by: John Snow <jsnow@redhat.com> Message-id: 1478587839-9834-5-git-send-email-jsnow@redhat.com Signed-off-by: Jeff Cody <jcody@redhat.com>
1 parent a7815a7 commit 5ccac6f

File tree

8 files changed

+67
-32
lines changed

8 files changed

+67
-32
lines changed

block/backup.c

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -654,9 +654,8 @@ void backup_start(const char *job_id, BlockDriverState *bs,
654654

655655
block_job_add_bdrv(&job->common, target);
656656
job->common.len = len;
657-
job->common.co = qemu_coroutine_create(job->common.driver->start, job);
658657
block_job_txn_add_job(txn, &job->common);
659-
qemu_coroutine_enter(job->common.co);
658+
block_job_start(&job->common);
660659
return;
661660

662661
error:

block/commit.c

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -289,10 +289,9 @@ void commit_start(const char *job_id, BlockDriverState *bs,
289289
s->backing_file_str = g_strdup(backing_file_str);
290290

291291
s->on_error = on_error;
292-
s->common.co = qemu_coroutine_create(s->common.driver->start, s);
293292

294-
trace_commit_start(bs, base, top, s, s->common.co);
295-
qemu_coroutine_enter(s->common.co);
293+
trace_commit_start(bs, base, top, s);
294+
block_job_start(&s->common);
296295
}
297296

298297

block/mirror.c

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,9 +1009,8 @@ static void mirror_start_job(const char *job_id, BlockDriverState *bs,
10091009
}
10101010
}
10111011

1012-
s->common.co = qemu_coroutine_create(s->common.driver->start, s);
1013-
trace_mirror_start(bs, s, s->common.co, opaque);
1014-
qemu_coroutine_enter(s->common.co);
1012+
trace_mirror_start(bs, s, opaque);
1013+
block_job_start(&s->common);
10151014
}
10161015

10171016
void mirror_start(const char *job_id, BlockDriverState *bs,

block/stream.c

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,6 @@ void stream_start(const char *job_id, BlockDriverState *bs,
255255
s->bs_flags = orig_bs_flags;
256256

257257
s->on_error = on_error;
258-
s->common.co = qemu_coroutine_create(s->common.driver->start, s);
259-
trace_stream_start(bs, base, s, s->common.co);
260-
qemu_coroutine_enter(s->common.co);
258+
trace_stream_start(bs, base, s);
259+
block_job_start(&s->common);
261260
}

block/trace-events

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@ bdrv_co_do_copy_on_readv(void *bs, int64_t offset, unsigned int bytes, int64_t c
1919

2020
# block/stream.c
2121
stream_one_iteration(void *s, int64_t sector_num, int nb_sectors, int is_allocated) "s %p sector_num %"PRId64" nb_sectors %d is_allocated %d"
22-
stream_start(void *bs, void *base, void *s, void *co) "bs %p base %p s %p co %p"
22+
stream_start(void *bs, void *base, void *s) "bs %p base %p s %p"
2323

2424
# block/commit.c
2525
commit_one_iteration(void *s, int64_t sector_num, int nb_sectors, int is_allocated) "s %p sector_num %"PRId64" nb_sectors %d is_allocated %d"
26-
commit_start(void *bs, void *base, void *top, void *s, void *co) "bs %p base %p top %p s %p co %p"
26+
commit_start(void *bs, void *base, void *top, void *s) "bs %p base %p top %p s %p"
2727

2828
# block/mirror.c
29-
mirror_start(void *bs, void *s, void *co, void *opaque) "bs %p s %p co %p opaque %p"
29+
mirror_start(void *bs, void *s, void *opaque) "bs %p s %p opaque %p"
3030
mirror_restart_iter(void *s, int64_t cnt) "s %p dirty count %"PRId64
3131
mirror_before_flush(void *s) "s %p"
3232
mirror_before_drain(void *s, int64_t cnt) "s %p dirty count %"PRId64

blockjob.c

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,9 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
174174
job->blk = blk;
175175
job->cb = cb;
176176
job->opaque = opaque;
177-
job->busy = true;
177+
job->busy = false;
178+
job->paused = true;
179+
job->pause_count = 1;
178180
job->refcnt = 1;
179181
bs->job = job;
180182

@@ -202,6 +204,23 @@ bool block_job_is_internal(BlockJob *job)
202204
return (job->id == NULL);
203205
}
204206

207+
static bool block_job_started(BlockJob *job)
208+
{
209+
return job->co;
210+
}
211+
212+
void block_job_start(BlockJob *job)
213+
{
214+
assert(job && !block_job_started(job) && job->paused &&
215+
!job->busy && job->driver->start);
216+
job->co = qemu_coroutine_create(job->driver->start, job);
217+
if (--job->pause_count == 0) {
218+
job->paused = false;
219+
job->busy = true;
220+
qemu_coroutine_enter(job->co);
221+
}
222+
}
223+
205224
void block_job_ref(BlockJob *job)
206225
{
207226
++job->refcnt;
@@ -248,14 +267,18 @@ static void block_job_completed_single(BlockJob *job)
248267
if (job->cb) {
249268
job->cb(job->opaque, job->ret);
250269
}
251-
if (block_job_is_cancelled(job)) {
252-
block_job_event_cancelled(job);
253-
} else {
254-
const char *msg = NULL;
255-
if (job->ret < 0) {
256-
msg = strerror(-job->ret);
270+
271+
/* Emit events only if we actually started */
272+
if (block_job_started(job)) {
273+
if (block_job_is_cancelled(job)) {
274+
block_job_event_cancelled(job);
275+
} else {
276+
const char *msg = NULL;
277+
if (job->ret < 0) {
278+
msg = strerror(-job->ret);
279+
}
280+
block_job_event_completed(job, msg);
257281
}
258-
block_job_event_completed(job, msg);
259282
}
260283

261284
if (job->txn) {
@@ -363,7 +386,8 @@ void block_job_complete(BlockJob *job, Error **errp)
363386
{
364387
/* Should not be reachable via external interface for internal jobs */
365388
assert(job->id);
366-
if (job->pause_count || job->cancelled || !job->driver->complete) {
389+
if (job->pause_count || job->cancelled ||
390+
!block_job_started(job) || !job->driver->complete) {
367391
error_setg(errp, "The active block job '%s' cannot be completed",
368392
job->id);
369393
return;
@@ -395,6 +419,8 @@ bool block_job_user_paused(BlockJob *job)
395419

396420
void coroutine_fn block_job_pause_point(BlockJob *job)
397421
{
422+
assert(job && block_job_started(job));
423+
398424
if (!block_job_should_pause(job)) {
399425
return;
400426
}
@@ -446,9 +472,13 @@ void block_job_enter(BlockJob *job)
446472

447473
void block_job_cancel(BlockJob *job)
448474
{
449-
job->cancelled = true;
450-
block_job_iostatus_reset(job);
451-
block_job_enter(job);
475+
if (block_job_started(job)) {
476+
job->cancelled = true;
477+
block_job_iostatus_reset(job);
478+
block_job_enter(job);
479+
} else {
480+
block_job_completed(job, -ECANCELED);
481+
}
452482
}
453483

454484
bool block_job_is_cancelled(BlockJob *job)

include/block/blockjob.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,15 @@ void block_job_add_bdrv(BlockJob *job, BlockDriverState *bs);
188188
*/
189189
void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp);
190190

191+
/**
192+
* block_job_start:
193+
* @job: A job that has not yet been started.
194+
*
195+
* Begins execution of a block job.
196+
* Takes ownership of one reference to the job object.
197+
*/
198+
void block_job_start(BlockJob *job);
199+
191200
/**
192201
* block_job_cancel:
193202
* @job: The job to be canceled.

tests/test-blockjob-txn.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,6 @@ typedef struct {
2424
int *result;
2525
} TestBlockJob;
2626

27-
static const BlockJobDriver test_block_job_driver = {
28-
.instance_size = sizeof(TestBlockJob),
29-
};
30-
3127
static void test_block_job_complete(BlockJob *job, void *opaque)
3228
{
3329
BlockDriverState *bs = blk_bs(job->blk);
@@ -77,6 +73,11 @@ static void test_block_job_cb(void *opaque, int ret)
7773
g_free(data);
7874
}
7975

76+
static const BlockJobDriver test_block_job_driver = {
77+
.instance_size = sizeof(TestBlockJob),
78+
.start = test_block_job_run,
79+
};
80+
8081
/* Create a block job that completes with a given return code after a given
8182
* number of event loop iterations. The return code is stored in the given
8283
* result pointer.
@@ -104,10 +105,9 @@ static BlockJob *test_block_job_start(unsigned int iterations,
104105
s->use_timer = use_timer;
105106
s->rc = rc;
106107
s->result = result;
107-
s->common.co = qemu_coroutine_create(test_block_job_run, s);
108108
data->job = s;
109109
data->result = result;
110-
qemu_coroutine_enter(s->common.co);
110+
block_job_start(&s->common);
111111
return &s->common;
112112
}
113113

0 commit comments

Comments
 (0)