Block layer patches:

- Fix some jobs/drain/aio_poll related hangs
 - commit: Add top-node/base-node options
 - linux-aio: Fix locking for qemu_laio_process_completions()
 - Fix use after free error in bdrv_open_inherit
 -----BEGIN PGP SIGNATURE-----
 
 iQIcBAABAgAGBQJbqj35AAoJEH8JsnLIjy/WEVwQAKi8nyi8Y4vifZB4HNWiusgy
 xvjqKrUN7zoL8nWmz4rgOsXkrf/H076mNg+sby3MNL6CY1dh/H5QjXJ9s0Zhb91a
 KW4CCawxtgILeIVbx7qDPl5DIxL49/ChbLuajE4NttEp/gQo3EqA4hAV4apIxHqv
 XLRr0Z+sewMpwClxiqnHn2rV9NQJmWa82dtXUMg8wBosGGY4/qeEhGLdMHJH+2kc
 vFYOxYyulVCIi9YBkxpQDbrjD0wudOeASngFHDRNd3HucuNgOUpuLnpanREC8ZqV
 WDiHK717hXrq4T6bzFUrRpLo13xpYcbAADXp91NhDRKy/36sBSyxeQPJvaCMgF6g
 s01YZLhM7x0qvattNyIo9dD3+ZQh8ktZ0W1gyI/51nV5GB3mLwKsQ3yhBICn+8ei
 QyNWkP/3mfCcdblo7+3xeSanMnd++iVtJKcRLj9w974l0noKlZIOyHTp0AaBJMcO
 ijCZuHiOq4b5Rsb0V0VxI+fMUQ1YqgrS5Bj4jDOLuJgsyTJlOurXQsPhHN6vtt4J
 xgyUNPLtZ5omLDw5apGpnikK18/EgNFqy23pHQyoS4tTpoOBWeNeg7B9ngVww3EB
 5IbT5UA9SHd4bR2kotHVo7lMvseF4nMoqbB7lxLbCVnG54tht3y82WBfv19QB0n9
 jRoR247n/VoZtGkmeF2o
 =c2Ll
 -----END PGP SIGNATURE-----

Merge remote-tracking branch 'kevin/tags/for-upstream' into block

Block layer patches:

- Fix some jobs/drain/aio_poll related hangs
- commit: Add top-node/base-node options
- linux-aio: Fix locking for qemu_laio_process_completions()
- Fix use after free error in bdrv_open_inherit

# gpg: Signature made Tue Sep 25 15:54:01 2018 CEST
# gpg:                using RSA key 7F09B272C88F2FD6
# gpg: Good signature from "Kevin Wolf <kwolf@redhat.com>"
# Primary key fingerprint: DC3D EB15 9A9A F95D 3D74  56FE 7F09 B272 C88F 2FD6

* kevin/tags/for-upstream: (26 commits)
  test-bdrv-drain: Test draining job source child and parent
  block: Use a single global AioWait
  test-bdrv-drain: Fix outdated comments
  test-bdrv-drain: AIO_WAIT_WHILE() in job .commit/.abort
  job: Avoid deadlocks in job_completed_txn_abort()
  test-bdrv-drain: Test nested poll in bdrv_drain_poll_top_level()
  block: Remove aio_poll() in bdrv_drain_poll variants
  blockjob: Lie better in child_job_drained_poll()
  block-backend: Decrease in_flight only after callback
  block-backend: Fix potential double blk_delete()
  block-backend: Add .drained_poll callback
  block: Add missing locking in bdrv_co_drain_bh_cb()
  test-bdrv-drain: Test AIO_WAIT_WHILE() in completion callback
  job: Use AIO_WAIT_WHILE() in job_finish_sync()
  test-blockjob: Acquire AioContext around job_cancel_sync()
  test-bdrv-drain: Drain with block jobs in an I/O thread
  aio-wait: Increase num_waiters even in home thread
  blockjob: Wake up BDS when job becomes idle
  job: Fix missing locking due to mismerge
  job: Fix nested aio_poll() hanging in job_txn_apply
  ...

Signed-off-by: Max Reitz <mreitz@redhat.com>
This commit is contained in:
Max Reitz 2018-09-25 16:12:41 +02:00
commit 9c76ff9c16
24 changed files with 527 additions and 114 deletions

View File

@ -2792,6 +2792,7 @@ static BlockDriverState *bdrv_open_inherit(const char *filename,
bdrv_parent_cb_change_media(bs, true);
qobject_unref(options);
options = NULL;
/* For snapshot=on, create a temporary qcow2 overlay. bs points to the
* temporary snapshot afterwards. */
@ -4885,11 +4886,6 @@ AioContext *bdrv_get_aio_context(BlockDriverState *bs)
return bs ? bs->aio_context : qemu_get_aio_context();
}
AioWait *bdrv_get_aio_wait(BlockDriverState *bs)
{
return bs ? &bs->wait : NULL;
}
void bdrv_coroutine_enter(BlockDriverState *bs, Coroutine *co)
{
aio_co_enter(bdrv_get_aio_context(bs), co);

View File

@ -88,7 +88,6 @@ struct BlockBackend {
* Accessed with atomic ops.
*/
unsigned int in_flight;
AioWait wait;
};
typedef struct BlockBackendAIOCB {
@ -121,6 +120,7 @@ static void blk_root_inherit_options(int *child_flags, QDict *child_options,
abort();
}
static void blk_root_drained_begin(BdrvChild *child);
static bool blk_root_drained_poll(BdrvChild *child);
static void blk_root_drained_end(BdrvChild *child);
static void blk_root_change_media(BdrvChild *child, bool load);
@ -294,6 +294,7 @@ static const BdrvChildRole child_root = {
.get_parent_desc = blk_root_get_parent_desc,
.drained_begin = blk_root_drained_begin,
.drained_poll = blk_root_drained_poll,
.drained_end = blk_root_drained_end,
.activate = blk_root_activate,
@ -433,6 +434,7 @@ int blk_get_refcnt(BlockBackend *blk)
*/
void blk_ref(BlockBackend *blk)
{
assert(blk->refcnt > 0);
blk->refcnt++;
}
@ -445,7 +447,13 @@ void blk_unref(BlockBackend *blk)
{
if (blk) {
assert(blk->refcnt > 0);
if (!--blk->refcnt) {
if (blk->refcnt > 1) {
blk->refcnt--;
} else {
blk_drain(blk);
/* blk_drain() cannot resurrect blk, nobody held a reference */
assert(blk->refcnt == 1);
blk->refcnt = 0;
blk_delete(blk);
}
}
@ -1289,7 +1297,7 @@ static void blk_inc_in_flight(BlockBackend *blk)
static void blk_dec_in_flight(BlockBackend *blk)
{
atomic_dec(&blk->in_flight);
aio_wait_kick(&blk->wait);
aio_wait_kick();
}
static void error_callback_bh(void *opaque)
@ -1330,8 +1338,8 @@ static const AIOCBInfo blk_aio_em_aiocb_info = {
static void blk_aio_complete(BlkAioEmAIOCB *acb)
{
if (acb->has_returned) {
blk_dec_in_flight(acb->rwco.blk);
acb->common.cb(acb->common.opaque, acb->rwco.ret);
blk_dec_in_flight(acb->rwco.blk);
qemu_aio_unref(acb);
}
}
@ -1590,9 +1598,8 @@ void blk_drain(BlockBackend *blk)
}
/* We may have -ENOMEDIUM completions in flight */
AIO_WAIT_WHILE(&blk->wait,
blk_get_aio_context(blk),
atomic_mb_read(&blk->in_flight) > 0);
AIO_WAIT_WHILE(blk_get_aio_context(blk),
atomic_mb_read(&blk->in_flight) > 0);
if (bs) {
bdrv_drained_end(bs);
@ -1611,8 +1618,7 @@ void blk_drain_all(void)
aio_context_acquire(ctx);
/* We may have -ENOMEDIUM completions in flight */
AIO_WAIT_WHILE(&blk->wait, ctx,
atomic_mb_read(&blk->in_flight) > 0);
AIO_WAIT_WHILE(ctx, atomic_mb_read(&blk->in_flight) > 0);
aio_context_release(ctx);
}
@ -2189,6 +2195,13 @@ static void blk_root_drained_begin(BdrvChild *child)
}
}
static bool blk_root_drained_poll(BdrvChild *child)
{
BlockBackend *blk = child->opaque;
assert(blk->quiesce_counter);
return !!blk->in_flight;
}
static void blk_root_drained_end(BdrvChild *child)
{
BlockBackend *blk = child->opaque;

View File

@ -38,8 +38,6 @@
/* Maximum bounce buffer for copy-on-read and write zeroes, in bytes */
#define MAX_BOUNCE_BUFFER (32768 << BDRV_SECTOR_BITS)
static AioWait drain_all_aio_wait;
static void bdrv_parent_cb_resize(BlockDriverState *bs);
static int coroutine_fn bdrv_co_do_pwrite_zeroes(BlockDriverState *bs,
int64_t offset, int bytes, BdrvRequestFlags flags);
@ -268,10 +266,6 @@ bool bdrv_drain_poll(BlockDriverState *bs, bool recursive,
static bool bdrv_drain_poll_top_level(BlockDriverState *bs, bool recursive,
BdrvChild *ignore_parent)
{
/* Execute pending BHs first and check everything else only after the BHs
* have executed. */
while (aio_poll(bs->aio_context, false));
return bdrv_drain_poll(bs, recursive, ignore_parent, false);
}
@ -288,6 +282,18 @@ static void bdrv_co_drain_bh_cb(void *opaque)
BlockDriverState *bs = data->bs;
if (bs) {
AioContext *ctx = bdrv_get_aio_context(bs);
AioContext *co_ctx = qemu_coroutine_get_aio_context(co);
/*
* When the coroutine yielded, the lock for its home context was
* released, so we need to re-acquire it here. If it explicitly
* acquired a different context, the lock is still held and we don't
* want to lock it a second time (or AIO_WAIT_WHILE() would hang).
*/
if (ctx == co_ctx) {
aio_context_acquire(ctx);
}
bdrv_dec_in_flight(bs);
if (data->begin) {
bdrv_do_drained_begin(bs, data->recursive, data->parent,
@ -296,6 +302,9 @@ static void bdrv_co_drain_bh_cb(void *opaque)
bdrv_do_drained_end(bs, data->recursive, data->parent,
data->ignore_bds_parents);
}
if (ctx == co_ctx) {
aio_context_release(ctx);
}
} else {
assert(data->begin);
bdrv_drain_all_begin();
@ -496,10 +505,6 @@ static bool bdrv_drain_all_poll(void)
BlockDriverState *bs = NULL;
bool result = false;
/* Execute pending BHs first (may modify the graph) and check everything
* else only after the BHs have executed. */
while (aio_poll(qemu_get_aio_context(), false));
/* bdrv_drain_poll() can't make changes to the graph and we are holding the
* main AioContext lock, so iterating bdrv_next_all_states() is safe. */
while ((bs = bdrv_next_all_states(bs))) {
@ -550,7 +555,7 @@ void bdrv_drain_all_begin(void)
}
/* Now poll the in-flight requests */
AIO_WAIT_WHILE(&drain_all_aio_wait, NULL, bdrv_drain_all_poll());
AIO_WAIT_WHILE(NULL, bdrv_drain_all_poll());
while ((bs = bdrv_next_all_states(bs))) {
bdrv_drain_assert_idle(bs);
@ -706,8 +711,7 @@ void bdrv_inc_in_flight(BlockDriverState *bs)
void bdrv_wakeup(BlockDriverState *bs)
{
aio_wait_kick(bdrv_get_aio_wait(bs));
aio_wait_kick(&drain_all_aio_wait);
aio_wait_kick();
}
void bdrv_dec_in_flight(BlockDriverState *bs)

View File

@ -234,9 +234,9 @@ static void qemu_laio_process_completions(LinuxAioState *s)
static void qemu_laio_process_completions_and_submit(LinuxAioState *s)
{
aio_context_acquire(s->aio_context);
qemu_laio_process_completions(s);
aio_context_acquire(s->aio_context);
if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
ioq_submit(s);
}

View File

@ -3214,7 +3214,9 @@ out:
}
void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
bool has_base_node, const char *base_node,
bool has_base, const char *base,
bool has_top_node, const char *top_node,
bool has_top, const char *top,
bool has_backing_file, const char *backing_file,
bool has_speed, int64_t speed,
@ -3275,7 +3277,20 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
/* default top_bs is the active layer */
top_bs = bs;
if (has_top && top) {
if (has_top_node && has_top) {
error_setg(errp, "'top-node' and 'top' are mutually exclusive");
goto out;
} else if (has_top_node) {
top_bs = bdrv_lookup_bs(NULL, top_node, errp);
if (top_bs == NULL) {
goto out;
}
if (!bdrv_chain_contains(bs, top_bs)) {
error_setg(errp, "'%s' is not in this backing file chain",
top_node);
goto out;
}
} else if (has_top && top) {
if (strcmp(bs->filename, top) != 0) {
top_bs = bdrv_find_backing_image(bs, top);
}
@ -3288,7 +3303,20 @@ void qmp_block_commit(bool has_job_id, const char *job_id, const char *device,
assert(bdrv_get_aio_context(top_bs) == aio_context);
if (has_base && base) {
if (has_base_node && has_base) {
error_setg(errp, "'base-node' and 'base' are mutually exclusive");
goto out;
} else if (has_base_node) {
base_bs = bdrv_lookup_bs(NULL, base_node, errp);
if (base_bs == NULL) {
goto out;
}
if (!bdrv_chain_contains(top_bs, base_bs)) {
error_setg(errp, "'%s' is not in this backing file chain",
base_node);
goto out;
}
} else if (has_base && base) {
base_bs = bdrv_find_backing_image(top_bs, base);
} else {
base_bs = bdrv_find_base(top_bs);

View File

@ -164,7 +164,7 @@ static bool child_job_drained_poll(BdrvChild *c)
/* An inactive or completed job doesn't have any pending requests. Jobs
* with !job->busy are either already paused or have a pause point after
* being reentered, so no job driver code will run before they pause. */
if (!job->busy || job_is_completed(job) || job->deferred_to_main_loop) {
if (!job->busy || job_is_completed(job)) {
return false;
}
@ -221,6 +221,11 @@ int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
return 0;
}
static void block_job_on_idle(Notifier *n, void *opaque)
{
aio_wait_kick();
}
bool block_job_is_internal(BlockJob *job)
{
return (job->job.id == NULL);
@ -416,6 +421,7 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
job->finalize_completed_notifier.notify = block_job_event_completed;
job->pending_notifier.notify = block_job_event_pending;
job->ready_notifier.notify = block_job_event_ready;
job->idle_notifier.notify = block_job_on_idle;
notifier_list_add(&job->job.on_finalize_cancelled,
&job->finalize_cancelled_notifier);
@ -423,6 +429,7 @@ void *block_job_create(const char *job_id, const BlockJobDriver *driver,
&job->finalize_completed_notifier);
notifier_list_add(&job->job.on_pending, &job->pending_notifier);
notifier_list_add(&job->job.on_ready, &job->ready_notifier);
notifier_list_add(&job->job.on_idle, &job->idle_notifier);
error_setg(&job->blocker, "block device is in use by block job: %s",
job_type_str(&job->job));

View File

@ -30,14 +30,15 @@
/**
* AioWait:
*
* An object that facilitates synchronous waiting on a condition. The main
* loop can wait on an operation running in an IOThread as follows:
* An object that facilitates synchronous waiting on a condition. A single
* global AioWait object (global_aio_wait) is used internally.
*
* The main loop can wait on an operation running in an IOThread as follows:
*
* AioWait *wait = ...;
* AioContext *ctx = ...;
* MyWork work = { .done = false };
* schedule_my_work_in_iothread(ctx, &work);
* AIO_WAIT_WHILE(wait, ctx, !work.done);
* AIO_WAIT_WHILE(ctx, !work.done);
*
* The IOThread must call aio_wait_kick() to notify the main loop when
* work.done changes:
@ -46,7 +47,7 @@
* {
* ...
* work.done = true;
* aio_wait_kick(wait);
* aio_wait_kick();
* }
*/
typedef struct {
@ -54,9 +55,10 @@ typedef struct {
unsigned num_waiters;
} AioWait;
extern AioWait global_aio_wait;
/**
* AIO_WAIT_WHILE:
* @wait: the aio wait object
* @ctx: the aio context, or NULL if multiple aio contexts (for which the
* caller does not hold a lock) are involved in the polling condition.
* @cond: wait while this conditional expression is true
@ -72,10 +74,12 @@ typedef struct {
* wait on conditions between two IOThreads since that could lead to deadlock,
* go via the main loop instead.
*/
#define AIO_WAIT_WHILE(wait, ctx, cond) ({ \
#define AIO_WAIT_WHILE(ctx, cond) ({ \
bool waited_ = false; \
AioWait *wait_ = (wait); \
AioWait *wait_ = &global_aio_wait; \
AioContext *ctx_ = (ctx); \
/* Increment wait_->num_waiters before evaluating cond. */ \
atomic_inc(&wait_->num_waiters); \
if (ctx_ && in_aio_context_home_thread(ctx_)) { \
while ((cond)) { \
aio_poll(ctx_, true); \
@ -84,8 +88,6 @@ typedef struct {
} else { \
assert(qemu_get_current_aio_context() == \
qemu_get_aio_context()); \
/* Increment wait_->num_waiters before evaluating cond. */ \
atomic_inc(&wait_->num_waiters); \
while ((cond)) { \
if (ctx_) { \
aio_context_release(ctx_); \
@ -96,20 +98,18 @@ typedef struct {
} \
waited_ = true; \
} \
atomic_dec(&wait_->num_waiters); \
} \
atomic_dec(&wait_->num_waiters); \
waited_; })
/**
* aio_wait_kick:
* @wait: the aio wait object that should re-evaluate its condition
*
* Wake up the main thread if it is waiting on AIO_WAIT_WHILE(). During
* synchronous operations performed in an IOThread, the main thread lets the
* IOThread's event loop run, waiting for the operation to complete. A
* aio_wait_kick() call will wake up the main thread.
*/
void aio_wait_kick(AioWait *wait);
void aio_wait_kick(void);
/**
* aio_wait_bh_oneshot:

View File

@ -410,13 +410,9 @@ void bdrv_drain_all_begin(void);
void bdrv_drain_all_end(void);
void bdrv_drain_all(void);
/* Returns NULL when bs == NULL */
AioWait *bdrv_get_aio_wait(BlockDriverState *bs);
#define BDRV_POLL_WHILE(bs, cond) ({ \
BlockDriverState *bs_ = (bs); \
AIO_WAIT_WHILE(bdrv_get_aio_wait(bs_), \
bdrv_get_aio_context(bs_), \
AIO_WAIT_WHILE(bdrv_get_aio_context(bs_), \
cond); })
int bdrv_pdiscard(BdrvChild *child, int64_t offset, int bytes);

View File

@ -794,9 +794,6 @@ struct BlockDriverState {
unsigned int in_flight;
unsigned int serialising_in_flight;
/* Kicked to signal main loop when a request completes. */
AioWait wait;
/* counter for nested bdrv_io_plug.
* Accessed with atomic ops.
*/

View File

@ -70,6 +70,9 @@ typedef struct BlockJob {
/** Called when the job transitions to READY */
Notifier ready_notifier;
/** Called when the job coroutine yields or terminates */
Notifier idle_notifier;
/** BlockDriverStates that are involved in this block job */
GSList *nodes;
} BlockJob;

View File

@ -89,6 +89,11 @@ void qemu_aio_coroutine_enter(AioContext *ctx, Coroutine *co);
*/
void coroutine_fn qemu_coroutine_yield(void);
/**
* Get the AioContext of the given coroutine
*/
AioContext *coroutine_fn qemu_coroutine_get_aio_context(Coroutine *co);
/**
* Get the currently executing coroutine
*/

View File

@ -76,6 +76,9 @@ typedef struct Job {
* Set to false by the job while the coroutine has yielded and may be
* re-entered by job_enter(). There may still be I/O or event loop activity
* pending. Accessed under block_job_mutex (in blockjob.c).
*
* When the job is deferred to the main loop, busy is true as long as the
* bottom half is still pending.
*/
bool busy;
@ -156,6 +159,9 @@ typedef struct Job {
/** Notifiers called when the job transitions to READY */
NotifierList on_ready;
/** Notifiers called when the job coroutine yields or terminates */
NotifierList on_idle;
/** Element of the list of jobs */
QLIST_ENTRY(Job) job_list;
@ -521,6 +527,8 @@ void job_user_cancel(Job *job, bool force, Error **errp);
*
* Returns the return value from the job if the job actually completed
* during the call, or -ECANCELED if it was canceled.
*
* Callers must hold the AioContext lock of job->aio_context.
*/
int job_cancel_sync(Job *job);
@ -538,6 +546,8 @@ void job_cancel_sync_all(void);
* function).
*
* Returns the return value from the job.
*
* Callers must hold the AioContext lock of job->aio_context.
*/
int job_complete_sync(Job *job, Error **errp);
@ -563,6 +573,8 @@ void job_dismiss(Job **job, Error **errp);
*
* Returns 0 if the job is successfully completed, -ECANCELED if the job was
* cancelled before completing, and -errno in other error cases.
*
* Callers must hold the AioContext lock of job->aio_context.
*/
int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp);

67
job.c
View File

@ -29,6 +29,7 @@
#include "qemu/job.h"
#include "qemu/id.h"
#include "qemu/main-loop.h"
#include "block/aio-wait.h"
#include "trace-root.h"
#include "qapi/qapi-events-job.h"
@ -136,21 +137,13 @@ static void job_txn_del_job(Job *job)
}
}
static int job_txn_apply(JobTxn *txn, int fn(Job *), bool lock)
static int job_txn_apply(JobTxn *txn, int fn(Job *))
{
AioContext *ctx;
Job *job, *next;
int rc = 0;
QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) {
if (lock) {
ctx = job->aio_context;
aio_context_acquire(ctx);
}
rc = fn(job);
if (lock) {
aio_context_release(ctx);
}
if (rc) {
break;
}
@ -410,6 +403,11 @@ static void job_event_ready(Job *job)
notifier_list_notify(&job->on_ready, job);
}
static void job_event_idle(Job *job)
{
notifier_list_notify(&job->on_idle, job);
}
void job_enter_cond(Job *job, bool(*fn)(Job *job))
{
if (!job_started(job)) {
@ -455,6 +453,7 @@ static void coroutine_fn job_do_yield(Job *job, uint64_t ns)
timer_mod(&job->sleep_timer, ns);
}
job->busy = false;
job_event_idle(job);
job_unlock();
qemu_coroutine_yield();
@ -719,6 +718,7 @@ static void job_cancel_async(Job *job, bool force)
static void job_completed_txn_abort(Job *job)
{
AioContext *outer_ctx = job->aio_context;
AioContext *ctx;
JobTxn *txn = job->txn;
Job *other_job;
@ -732,23 +732,26 @@ static void job_completed_txn_abort(Job *job)
txn->aborting = true;
job_txn_ref(txn);
/* We are the first failed job. Cancel other jobs. */
QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
ctx = other_job->aio_context;
aio_context_acquire(ctx);
}
/* We can only hold the single job's AioContext lock while calling
* job_finalize_single() because the finalization callbacks can involve
* calls of AIO_WAIT_WHILE(), which could deadlock otherwise. */
aio_context_release(outer_ctx);
/* Other jobs are effectively cancelled by us, set the status for
* them; this job, however, may or may not be cancelled, depending
* on the caller, so leave it. */
QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
if (other_job != job) {
ctx = other_job->aio_context;
aio_context_acquire(ctx);
job_cancel_async(other_job, false);
aio_context_release(ctx);
}
}
while (!QLIST_EMPTY(&txn->jobs)) {
other_job = QLIST_FIRST(&txn->jobs);
ctx = other_job->aio_context;
aio_context_acquire(ctx);
if (!job_is_completed(other_job)) {
assert(job_is_cancelled(other_job));
job_finish_sync(other_job, NULL, NULL);
@ -757,6 +760,8 @@ static void job_completed_txn_abort(Job *job)
aio_context_release(ctx);
}
aio_context_acquire(outer_ctx);
job_txn_unref(txn);
}
@ -780,11 +785,11 @@ static void job_do_finalize(Job *job)
assert(job && job->txn);
/* prepare the transaction to complete */
rc = job_txn_apply(job->txn, job_prepare, true);
rc = job_txn_apply(job->txn, job_prepare);
if (rc) {
job_completed_txn_abort(job);
} else {
job_txn_apply(job->txn, job_finalize_single, true);
job_txn_apply(job->txn, job_finalize_single);
}
}
@ -830,10 +835,10 @@ static void job_completed_txn_success(Job *job)
assert(other_job->ret == 0);
}
job_txn_apply(txn, job_transition_to_pending, false);
job_txn_apply(txn, job_transition_to_pending);
/* If no jobs need manual finalization, automatically do so */
if (job_txn_apply(txn, job_needs_finalize, false) == 0) {
if (job_txn_apply(txn, job_needs_finalize) == 0) {
job_do_finalize(job);
}
}
@ -855,7 +860,20 @@ static void job_completed(Job *job)
static void job_exit(void *opaque)
{
Job *job = (Job *)opaque;
AioContext *ctx = job->aio_context;
aio_context_acquire(ctx);
/* This is a lie, we're not quiescent, but still doing the completion
* callbacks. However, completion callbacks tend to involve operations that
* drain block nodes, and if .drained_poll still returned true, we would
* deadlock. */
job->busy = false;
job_event_idle(job);
job_completed(job);
aio_context_release(ctx);
}
/**
@ -870,6 +888,7 @@ static void coroutine_fn job_co_entry(void *opaque)
job_pause_point(job);
job->ret = job->driver->run(job, &job->err);
job->deferred_to_main_loop = true;
job->busy = true;
aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
}
@ -971,14 +990,10 @@ int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp)
job_unref(job);
return -EBUSY;
}
/* job_drain calls job_enter, and it should be enough to induce progress
* until the job completes or moves to the main thread. */
while (!job->deferred_to_main_loop && !job_is_completed(job)) {
job_drain(job);
}
while (!job_is_completed(job)) {
aio_poll(qemu_get_aio_context(), true);
}
AIO_WAIT_WHILE(job->aio_context,
(job_drain(job), !job_is_completed(job)));
ret = (job_is_cancelled(job) && job->ret == 0) ? -ECANCELED : job->ret;
job_unref(job);
return ret;

View File

@ -1457,12 +1457,23 @@
#
# @device: the device name or node-name of a root node
#
# @base: The file name of the backing image to write data into.
# If not specified, this is the deepest backing image.
# @base-node: The node name of the backing image to write data into.
# If not specified, this is the deepest backing image.
# (since: 3.1)
#
# @top: The file name of the backing image within the image chain,
# which contains the topmost data to be committed down. If
# not specified, this is the active layer.
# @base: Same as @base-node, except that it is a file name rather than a node
# name. This must be the exact filename string that was used to open the
# node; other strings, even if addressing the same file, are not
# accepted (deprecated, use @base-node instead)
#
# @top-node: The node name of the backing image within the image chain
# which contains the topmost data to be committed down. If
# not specified, this is the active layer. (since: 3.1)
#
# @top: Same as @top-node, except that it is a file name rather than a node
# name. This must be the exact filename string that was used to open the
# node; other strings, even if addressing the same file, are not
# accepted (deprecated, use @base-node instead)
#
# @backing-file: The backing file string to write into the overlay
# image of 'top'. If 'top' is the active layer,
@ -1528,7 +1539,8 @@
#
##
{ 'command': 'block-commit',
'data': { '*job-id': 'str', 'device': 'str', '*base': 'str', '*top': 'str',
'data': { '*job-id': 'str', 'device': 'str', '*base-node': 'str',
'*base': 'str', '*top-node': 'str', '*top': 'str',
'*backing-file': 'str', '*speed': 'int',
'*filter-node-name': 'str',
'*auto-finalize': 'bool', '*auto-dismiss': 'bool' } }

View File

@ -57,9 +57,12 @@ class ImageCommitTestCase(iotests.QMPTestCase):
self.assert_no_active_block_jobs()
self.vm.shutdown()
def run_commit_test(self, top, base, need_ready=False):
def run_commit_test(self, top, base, need_ready=False, node_names=False):
self.assert_no_active_block_jobs()
result = self.vm.qmp('block-commit', device='drive0', top=top, base=base)
if node_names:
result = self.vm.qmp('block-commit', device='drive0', top_node=top, base_node=base)
else:
result = self.vm.qmp('block-commit', device='drive0', top=top, base=base)
self.assert_qmp(result, 'return', {})
self.wait_for_complete(need_ready)
@ -101,6 +104,11 @@ class TestSingleDrive(ImageCommitTestCase):
self.assertEqual(-1, qemu_io('-f', 'raw', '-c', 'read -P 0xab 0 524288', backing_img).find("verification failed"))
self.assertEqual(-1, qemu_io('-f', 'raw', '-c', 'read -P 0xef 524288 524288', backing_img).find("verification failed"))
def test_commit_node(self):
self.run_commit_test("mid", "base", node_names=True)
self.assertEqual(-1, qemu_io('-f', 'raw', '-c', 'read -P 0xab 0 524288', backing_img).find("verification failed"))
self.assertEqual(-1, qemu_io('-f', 'raw', '-c', 'read -P 0xef 524288 524288', backing_img).find("verification failed"))
def test_device_not_found(self):
result = self.vm.qmp('block-commit', device='nonexistent', top='%s' % mid_img)
self.assert_qmp(result, 'error/class', 'DeviceNotFound')
@ -123,6 +131,30 @@ class TestSingleDrive(ImageCommitTestCase):
self.assert_qmp(result, 'error/class', 'GenericError')
self.assert_qmp(result, 'error/desc', 'Base \'badfile\' not found')
def test_top_node_invalid(self):
self.assert_no_active_block_jobs()
result = self.vm.qmp('block-commit', device='drive0', top_node='badfile', base_node='base')
self.assert_qmp(result, 'error/class', 'GenericError')
self.assert_qmp(result, 'error/desc', "Cannot find device= nor node_name=badfile")
def test_base_node_invalid(self):
self.assert_no_active_block_jobs()
result = self.vm.qmp('block-commit', device='drive0', top_node='mid', base_node='badfile')
self.assert_qmp(result, 'error/class', 'GenericError')
self.assert_qmp(result, 'error/desc', "Cannot find device= nor node_name=badfile")
def test_top_path_and_node(self):
self.assert_no_active_block_jobs()
result = self.vm.qmp('block-commit', device='drive0', top_node='mid', base_node='base', top='%s' % mid_img)
self.assert_qmp(result, 'error/class', 'GenericError')
self.assert_qmp(result, 'error/desc', "'top-node' and 'top' are mutually exclusive")
def test_base_path_and_node(self):
self.assert_no_active_block_jobs()
result = self.vm.qmp('block-commit', device='drive0', top_node='mid', base_node='base', base='%s' % backing_img)
self.assert_qmp(result, 'error/class', 'GenericError')
self.assert_qmp(result, 'error/desc', "'base-node' and 'base' are mutually exclusive")
def test_top_is_active(self):
self.run_commit_test(test_img, backing_img, need_ready=True)
self.assertEqual(-1, qemu_io('-f', 'raw', '-c', 'read -P 0xab 0 524288', backing_img).find("verification failed"))
@ -139,6 +171,22 @@ class TestSingleDrive(ImageCommitTestCase):
self.assert_qmp(result, 'error/class', 'GenericError')
self.assert_qmp(result, 'error/desc', 'Base \'%s\' not found' % mid_img)
def test_top_and_base_node_reversed(self):
self.assert_no_active_block_jobs()
result = self.vm.qmp('block-commit', device='drive0', top_node='base', base_node='top')
self.assert_qmp(result, 'error/class', 'GenericError')
self.assert_qmp(result, 'error/desc', "'top' is not in this backing file chain")
def test_top_node_in_wrong_chain(self):
self.assert_no_active_block_jobs()
result = self.vm.qmp('blockdev-add', driver='null-co', node_name='null')
self.assert_qmp(result, 'return', {})
result = self.vm.qmp('block-commit', device='drive0', top_node='null', base_node='base')
self.assert_qmp(result, 'error/class', 'GenericError')
self.assert_qmp(result, 'error/desc', "'null' is not in this backing file chain")
# When the job is running on a BB that is automatically deleted on hot
# unplug, the job is cancelled when the device disappears
def test_hot_unplug(self):

View File

@ -1,5 +1,5 @@
.............................
...........................................
----------------------------------------------------------------------
Ran 29 tests
Ran 43 tests
OK

View File

@ -354,6 +354,9 @@ printf %b "qemu-io $device_id \"write -P 0x33 0 4k\"\ncommit $device_id\n" |
$QEMU_IO -c "read -P 0x33 0 4k" "$TEST_IMG" | _filter_qemu_io
# Using snapshot=on with a non-existent TMPDIR
TMPDIR=/nonexistent run_qemu -drive driver=null-co,snapshot=on
# success, all done
echo "*** done"
rm -f $seq.full

View File

@ -455,4 +455,7 @@ wrote 4096/4096 bytes at offset 0
read 4096/4096 bytes at offset 0
4 KiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
Testing: -drive driver=null-co,snapshot=on
QEMU_PROG: -drive driver=null-co,snapshot=on: Could not get temporary filename: No such file or directory
*** done

View File

@ -527,4 +527,7 @@ wrote 4096/4096 bytes at offset 0
read 4096/4096 bytes at offset 0
4 KiB, X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)
Testing: -drive driver=null-co,snapshot=on
QEMU_PROG: -drive driver=null-co,snapshot=on: Could not get temporary filename: No such file or directory
*** done

View File

@ -174,6 +174,28 @@ static void do_drain_end(enum drain_type drain_type, BlockDriverState *bs)
}
}
static void do_drain_begin_unlocked(enum drain_type drain_type, BlockDriverState *bs)
{
if (drain_type != BDRV_DRAIN_ALL) {
aio_context_acquire(bdrv_get_aio_context(bs));
}
do_drain_begin(drain_type, bs);
if (drain_type != BDRV_DRAIN_ALL) {
aio_context_release(bdrv_get_aio_context(bs));
}
}
static void do_drain_end_unlocked(enum drain_type drain_type, BlockDriverState *bs)
{
if (drain_type != BDRV_DRAIN_ALL) {
aio_context_acquire(bdrv_get_aio_context(bs));
}
do_drain_end(drain_type, bs);
if (drain_type != BDRV_DRAIN_ALL) {
aio_context_release(bdrv_get_aio_context(bs));
}
}
static void test_drv_cb_common(enum drain_type drain_type, bool recursive)
{
BlockBackend *blk;
@ -614,6 +636,17 @@ static void test_iothread_aio_cb(void *opaque, int ret)
qemu_event_set(&done_event);
}
static void test_iothread_main_thread_bh(void *opaque)
{
struct test_iothread_data *data = opaque;
/* Test that the AioContext is not yet locked in a random BH that is
* executed during drain, otherwise this would deadlock. */
aio_context_acquire(bdrv_get_aio_context(data->bs));
bdrv_flush(data->bs);
aio_context_release(bdrv_get_aio_context(data->bs));
}
/*
* Starts an AIO request on a BDS that runs in the AioContext of iothread 1.
* The request involves a BH on iothread 2 before it can complete.
@ -683,6 +716,8 @@ static void test_iothread_common(enum drain_type drain_type, int drain_thread)
aio_context_acquire(ctx_a);
}
aio_bh_schedule_oneshot(ctx_a, test_iothread_main_thread_bh, &data);
/* The request is running on the IOThread a. Draining its block device
* will make sure that it has completed as far as the BDS is concerned,
* but the drain in this thread can continue immediately after
@ -749,23 +784,56 @@ static void test_iothread_drain_subtree(void)
typedef struct TestBlockJob {
BlockJob common;
int run_ret;
int prepare_ret;
bool running;
bool should_complete;
} TestBlockJob;
static int test_job_prepare(Job *job)
{
TestBlockJob *s = container_of(job, TestBlockJob, common.job);
/* Provoke an AIO_WAIT_WHILE() call to verify there is no deadlock */
blk_flush(s->common.blk);
return s->prepare_ret;
}
static void test_job_commit(Job *job)
{
TestBlockJob *s = container_of(job, TestBlockJob, common.job);
/* Provoke an AIO_WAIT_WHILE() call to verify there is no deadlock */
blk_flush(s->common.blk);
}
static void test_job_abort(Job *job)
{
TestBlockJob *s = container_of(job, TestBlockJob, common.job);
/* Provoke an AIO_WAIT_WHILE() call to verify there is no deadlock */
blk_flush(s->common.blk);
}
static int coroutine_fn test_job_run(Job *job, Error **errp)
{
TestBlockJob *s = container_of(job, TestBlockJob, common.job);
/* We are running the actual job code past the pause point in
* job_co_entry(). */
s->running = true;
job_transition_to_ready(&s->common.job);
while (!s->should_complete) {
/* Avoid block_job_sleep_ns() because it marks the job as !busy. We
* want to emulate some actual activity (probably some I/O) here so
* that drain has to wait for this acitivity to stop. */
qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, 100000);
/* Avoid job_sleep_ns() because it marks the job as !busy. We want to
* emulate some actual activity (probably some I/O) here so that drain
* has to wait for this activity to stop. */
qemu_co_sleep_ns(QEMU_CLOCK_REALTIME, 1000000);
job_pause_point(&s->common.job);
}
return 0;
return s->run_ret;
}
static void test_job_complete(Job *job, Error **errp)
@ -782,36 +850,115 @@ BlockJobDriver test_job_driver = {
.drain = block_job_drain,
.run = test_job_run,
.complete = test_job_complete,
.prepare = test_job_prepare,
.commit = test_job_commit,
.abort = test_job_abort,
},
};
static void test_blockjob_common(enum drain_type drain_type)
enum test_job_result {
TEST_JOB_SUCCESS,
TEST_JOB_FAIL_RUN,
TEST_JOB_FAIL_PREPARE,
};
enum test_job_drain_node {
TEST_JOB_DRAIN_SRC,
TEST_JOB_DRAIN_SRC_CHILD,
TEST_JOB_DRAIN_SRC_PARENT,
};
static void test_blockjob_common_drain_node(enum drain_type drain_type,
bool use_iothread,
enum test_job_result result,
enum test_job_drain_node drain_node)
{
BlockBackend *blk_src, *blk_target;
BlockDriverState *src, *target;
BlockDriverState *src, *src_backing, *src_overlay, *target, *drain_bs;
BlockJob *job;
TestBlockJob *tjob;
IOThread *iothread = NULL;
AioContext *ctx;
int ret;
src = bdrv_new_open_driver(&bdrv_test, "source", BDRV_O_RDWR,
&error_abort);
src_backing = bdrv_new_open_driver(&bdrv_test, "source-backing",
BDRV_O_RDWR, &error_abort);
src_overlay = bdrv_new_open_driver(&bdrv_test, "source-overlay",
BDRV_O_RDWR, &error_abort);
bdrv_set_backing_hd(src_overlay, src, &error_abort);
bdrv_unref(src);
bdrv_set_backing_hd(src, src_backing, &error_abort);
bdrv_unref(src_backing);
blk_src = blk_new(BLK_PERM_ALL, BLK_PERM_ALL);
blk_insert_bs(blk_src, src, &error_abort);
blk_insert_bs(blk_src, src_overlay, &error_abort);
switch (drain_node) {
case TEST_JOB_DRAIN_SRC:
drain_bs = src;
break;
case TEST_JOB_DRAIN_SRC_CHILD:
drain_bs = src_backing;
break;
case TEST_JOB_DRAIN_SRC_PARENT:
drain_bs = src_overlay;
break;
default:
g_assert_not_reached();
}
if (use_iothread) {
iothread = iothread_new();
ctx = iothread_get_aio_context(iothread);
blk_set_aio_context(blk_src, ctx);
} else {
ctx = qemu_get_aio_context();
}
target = bdrv_new_open_driver(&bdrv_test, "target", BDRV_O_RDWR,
&error_abort);
blk_target = blk_new(BLK_PERM_ALL, BLK_PERM_ALL);
blk_insert_bs(blk_target, target, &error_abort);
job = block_job_create("job0", &test_job_driver, NULL, src, 0, BLK_PERM_ALL,
0, 0, NULL, NULL, &error_abort);
aio_context_acquire(ctx);
tjob = block_job_create("job0", &test_job_driver, NULL, src,
0, BLK_PERM_ALL,
0, 0, NULL, NULL, &error_abort);
job = &tjob->common;
block_job_add_bdrv(job, "target", target, 0, BLK_PERM_ALL, &error_abort);
switch (result) {
case TEST_JOB_SUCCESS:
break;
case TEST_JOB_FAIL_RUN:
tjob->run_ret = -EIO;
break;
case TEST_JOB_FAIL_PREPARE:
tjob->prepare_ret = -EIO;
break;
}
job_start(&job->job);
aio_context_release(ctx);
if (use_iothread) {
/* job_co_entry() is run in the I/O thread, wait for the actual job
* code to start (we don't want to catch the job in the pause point in
* job_co_entry(). */
while (!tjob->running) {
aio_poll(qemu_get_aio_context(), false);
}
}
g_assert_cmpint(job->job.pause_count, ==, 0);
g_assert_false(job->job.paused);
g_assert_true(job->job.busy); /* We're in job_sleep_ns() */
g_assert_true(tjob->running);
g_assert_true(job->job.busy); /* We're in qemu_co_sleep_ns() */
do_drain_begin(drain_type, src);
do_drain_begin_unlocked(drain_type, drain_bs);
if (drain_type == BDRV_DRAIN_ALL) {
/* bdrv_drain_all() drains both src and target */
@ -822,7 +969,14 @@ static void test_blockjob_common(enum drain_type drain_type)
g_assert_true(job->job.paused);
g_assert_false(job->job.busy); /* The job is paused */
do_drain_end(drain_type, src);
do_drain_end_unlocked(drain_type, drain_bs);
if (use_iothread) {
/* paused is reset in the I/O thread, wait for it */
while (job->job.paused) {
aio_poll(qemu_get_aio_context(), false);
}
}
g_assert_cmpint(job->job.pause_count, ==, 0);
g_assert_false(job->job.paused);
@ -841,32 +995,113 @@ static void test_blockjob_common(enum drain_type drain_type)
do_drain_end(drain_type, target);
if (use_iothread) {
/* paused is reset in the I/O thread, wait for it */
while (job->job.paused) {
aio_poll(qemu_get_aio_context(), false);
}
}
g_assert_cmpint(job->job.pause_count, ==, 0);
g_assert_false(job->job.paused);
g_assert_true(job->job.busy); /* We're in job_sleep_ns() */
g_assert_true(job->job.busy); /* We're in qemu_co_sleep_ns() */
aio_context_acquire(ctx);
ret = job_complete_sync(&job->job, &error_abort);
g_assert_cmpint(ret, ==, 0);
g_assert_cmpint(ret, ==, (result == TEST_JOB_SUCCESS ? 0 : -EIO));
if (use_iothread) {
blk_set_aio_context(blk_src, qemu_get_aio_context());
}
aio_context_release(ctx);
blk_unref(blk_src);
blk_unref(blk_target);
bdrv_unref(src);
bdrv_unref(src_overlay);
bdrv_unref(target);
if (iothread) {
iothread_join(iothread);
}
}
static void test_blockjob_common(enum drain_type drain_type, bool use_iothread,
enum test_job_result result)
{
test_blockjob_common_drain_node(drain_type, use_iothread, result,
TEST_JOB_DRAIN_SRC);
test_blockjob_common_drain_node(drain_type, use_iothread, result,
TEST_JOB_DRAIN_SRC_CHILD);
if (drain_type == BDRV_SUBTREE_DRAIN) {
test_blockjob_common_drain_node(drain_type, use_iothread, result,
TEST_JOB_DRAIN_SRC_PARENT);
}
}
static void test_blockjob_drain_all(void)
{
test_blockjob_common(BDRV_DRAIN_ALL);
test_blockjob_common(BDRV_DRAIN_ALL, false, TEST_JOB_SUCCESS);
}
static void test_blockjob_drain(void)
{
test_blockjob_common(BDRV_DRAIN);
test_blockjob_common(BDRV_DRAIN, false, TEST_JOB_SUCCESS);
}
static void test_blockjob_drain_subtree(void)
{
test_blockjob_common(BDRV_SUBTREE_DRAIN);
test_blockjob_common(BDRV_SUBTREE_DRAIN, false, TEST_JOB_SUCCESS);
}
static void test_blockjob_error_drain_all(void)
{
test_blockjob_common(BDRV_DRAIN_ALL, false, TEST_JOB_FAIL_RUN);
test_blockjob_common(BDRV_DRAIN_ALL, false, TEST_JOB_FAIL_PREPARE);
}
static void test_blockjob_error_drain(void)
{
test_blockjob_common(BDRV_DRAIN, false, TEST_JOB_FAIL_RUN);
test_blockjob_common(BDRV_DRAIN, false, TEST_JOB_FAIL_PREPARE);
}
static void test_blockjob_error_drain_subtree(void)
{
test_blockjob_common(BDRV_SUBTREE_DRAIN, false, TEST_JOB_FAIL_RUN);
test_blockjob_common(BDRV_SUBTREE_DRAIN, false, TEST_JOB_FAIL_PREPARE);
}
static void test_blockjob_iothread_drain_all(void)
{
test_blockjob_common(BDRV_DRAIN_ALL, true, TEST_JOB_SUCCESS);
}
static void test_blockjob_iothread_drain(void)
{
test_blockjob_common(BDRV_DRAIN, true, TEST_JOB_SUCCESS);
}
static void test_blockjob_iothread_drain_subtree(void)
{
test_blockjob_common(BDRV_SUBTREE_DRAIN, true, TEST_JOB_SUCCESS);
}
static void test_blockjob_iothread_error_drain_all(void)
{
test_blockjob_common(BDRV_DRAIN_ALL, true, TEST_JOB_FAIL_RUN);
test_blockjob_common(BDRV_DRAIN_ALL, true, TEST_JOB_FAIL_PREPARE);
}
static void test_blockjob_iothread_error_drain(void)
{
test_blockjob_common(BDRV_DRAIN, true, TEST_JOB_FAIL_RUN);
test_blockjob_common(BDRV_DRAIN, true, TEST_JOB_FAIL_PREPARE);
}
static void test_blockjob_iothread_error_drain_subtree(void)
{
test_blockjob_common(BDRV_SUBTREE_DRAIN, true, TEST_JOB_FAIL_RUN);
test_blockjob_common(BDRV_SUBTREE_DRAIN, true, TEST_JOB_FAIL_PREPARE);
}
@ -1338,6 +1573,27 @@ int main(int argc, char **argv)
g_test_add_func("/bdrv-drain/blockjob/drain_subtree",
test_blockjob_drain_subtree);
g_test_add_func("/bdrv-drain/blockjob/error/drain_all",
test_blockjob_error_drain_all);
g_test_add_func("/bdrv-drain/blockjob/error/drain",
test_blockjob_error_drain);
g_test_add_func("/bdrv-drain/blockjob/error/drain_subtree",
test_blockjob_error_drain_subtree);
g_test_add_func("/bdrv-drain/blockjob/iothread/drain_all",
test_blockjob_iothread_drain_all);
g_test_add_func("/bdrv-drain/blockjob/iothread/drain",
test_blockjob_iothread_drain);
g_test_add_func("/bdrv-drain/blockjob/iothread/drain_subtree",
test_blockjob_iothread_drain_subtree);
g_test_add_func("/bdrv-drain/blockjob/iothread/error/drain_all",
test_blockjob_iothread_error_drain_all);
g_test_add_func("/bdrv-drain/blockjob/iothread/error/drain",
test_blockjob_iothread_error_drain);
g_test_add_func("/bdrv-drain/blockjob/iothread/error/drain_subtree",
test_blockjob_iothread_error_drain_subtree);
g_test_add_func("/bdrv-drain/deletion/drain", test_delete_by_drain);
g_test_add_func("/bdrv-drain/detach/drain_all", test_detach_by_drain_all);
g_test_add_func("/bdrv-drain/detach/drain", test_detach_by_drain);

View File

@ -223,6 +223,10 @@ static void cancel_common(CancelJob *s)
BlockJob *job = &s->common;
BlockBackend *blk = s->blk;
JobStatus sts = job->job.status;
AioContext *ctx;
ctx = job->job.aio_context;
aio_context_acquire(ctx);
job_cancel_sync(&job->job);
if (sts != JOB_STATUS_CREATED && sts != JOB_STATUS_CONCLUDED) {
@ -232,6 +236,8 @@ static void cancel_common(CancelJob *s)
assert(job->job.status == JOB_STATUS_NULL);
job_unref(&job->job);
destroy_blk(blk);
aio_context_release(ctx);
}
static void test_cancel_created(void)

View File

@ -26,21 +26,22 @@
#include "qemu/main-loop.h"
#include "block/aio-wait.h"
AioWait global_aio_wait;
static void dummy_bh_cb(void *opaque)
{
/* The point is to make AIO_WAIT_WHILE()'s aio_poll() return */
}
void aio_wait_kick(AioWait *wait)
void aio_wait_kick(void)
{
/* The barrier (or an atomic op) is in the caller. */
if (atomic_read(&wait->num_waiters)) {
if (atomic_read(&global_aio_wait.num_waiters)) {
aio_bh_schedule_oneshot(qemu_get_aio_context(), dummy_bh_cb, NULL);
}
}
typedef struct {
AioWait wait;
bool done;
QEMUBHFunc *cb;
void *opaque;
@ -54,7 +55,7 @@ static void aio_wait_bh(void *opaque)
data->cb(data->opaque);
data->done = true;
aio_wait_kick(&data->wait);
aio_wait_kick();
}
void aio_wait_bh_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
@ -67,5 +68,5 @@ void aio_wait_bh_oneshot(AioContext *ctx, QEMUBHFunc *cb, void *opaque)
assert(qemu_get_current_aio_context() == qemu_get_aio_context());
aio_bh_schedule_oneshot(ctx, aio_wait_bh, &data);
AIO_WAIT_WHILE(&data.wait, ctx, !data.done);
AIO_WAIT_WHILE(ctx, !data.done);
}

View File

@ -400,7 +400,7 @@ static void co_schedule_bh_cb(void *opaque)
/* Protected by write barrier in qemu_aio_coroutine_enter */
atomic_set(&co->scheduled, NULL);
qemu_coroutine_enter(co);
qemu_aio_coroutine_enter(ctx, co);
aio_context_release(ctx);
}
}

View File

@ -198,3 +198,8 @@ bool qemu_coroutine_entered(Coroutine *co)
{
return co->caller;
}
AioContext *coroutine_fn qemu_coroutine_get_aio_context(Coroutine *co)
{
return co->ctx;
}