linux-aio: avoid deadlock in nested aio_poll() calls

If two Linux AIO request completions are fetched in the same
io_getevents() call, QEMU will deadlock if request A's callback waits
for request B to complete using an aio_poll() loop.  This was reported
to happen with the mirror blockjob.

This patch moves completion processing into a BH and makes it resumable.
Nested event loops can resume completion processing so that request B
will complete and the deadlock will not occur.

Cc: Kevin Wolf <kwolf@redhat.com>
Cc: Paolo Bonzini <pbonzini@redhat.com>
Cc: Ming Lei <ming.lei@canonical.com>
Cc: Marcin Gibuła <m.gibula@beyond.pl>
Reported-by: Marcin Gibuła <m.gibula@beyond.pl>
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
Tested-by: Marcin Gibuła <m.gibula@beyond.pl>
This commit is contained in:
Stefan Hajnoczi 2014-08-04 16:56:33 +01:00
parent 12ade76090
commit 2cdff7f620

View File

@ -51,6 +51,12 @@ struct qemu_laio_state {
/* io queue for submit at batch */
LaioQueue io_q;
/* I/O completion processing */
QEMUBH *completion_bh;
struct io_event events[MAX_EVENTS];
int event_idx;
int event_max;
};
static inline ssize_t io_event_ret(struct io_event *ev)
@ -86,27 +92,58 @@ static void qemu_laio_process_completion(struct qemu_laio_state *s,
qemu_aio_release(laiocb);
}
/* The completion BH fetches completed I/O requests and invokes their
* callbacks.
*
* The function is somewhat tricky because it supports nested event loops, for
* example when a request callback invokes aio_poll(). In order to do this,
* the completion events array and index are kept in qemu_laio_state. The BH
* reschedules itself as long as there are completions pending so it will
* either be called again in a nested event loop or will be called after all
* events have been completed. When there are no events left to complete, the
* BH returns without rescheduling.
*/
static void qemu_laio_completion_bh(void *opaque)
{
struct qemu_laio_state *s = opaque;
/* Fetch more completion events when empty */
if (s->event_idx == s->event_max) {
do {
struct timespec ts = { 0 };
s->event_max = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS,
s->events, &ts);
} while (s->event_max == -EINTR);
s->event_idx = 0;
if (s->event_max <= 0) {
s->event_max = 0;
return; /* no more events */
}
}
/* Reschedule so nested event loops see currently pending completions */
qemu_bh_schedule(s->completion_bh);
/* Process completion events */
while (s->event_idx < s->event_max) {
struct iocb *iocb = s->events[s->event_idx].obj;
struct qemu_laiocb *laiocb =
container_of(iocb, struct qemu_laiocb, iocb);
laiocb->ret = io_event_ret(&s->events[s->event_idx]);
s->event_idx++;
qemu_laio_process_completion(s, laiocb);
}
}
static void qemu_laio_completion_cb(EventNotifier *e)
{
struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);
while (event_notifier_test_and_clear(&s->e)) {
struct io_event events[MAX_EVENTS];
struct timespec ts = { 0 };
int nevents, i;
do {
nevents = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS, events, &ts);
} while (nevents == -EINTR);
for (i = 0; i < nevents; i++) {
struct iocb *iocb = events[i].obj;
struct qemu_laiocb *laiocb =
container_of(iocb, struct qemu_laiocb, iocb);
laiocb->ret = io_event_ret(&events[i]);
qemu_laio_process_completion(s, laiocb);
}
if (event_notifier_test_and_clear(&s->e)) {
qemu_bh_schedule(s->completion_bh);
}
}
@ -272,12 +309,14 @@ void laio_detach_aio_context(void *s_, AioContext *old_context)
struct qemu_laio_state *s = s_;
aio_set_event_notifier(old_context, &s->e, NULL);
qemu_bh_delete(s->completion_bh);
}
void laio_attach_aio_context(void *s_, AioContext *new_context)
{
struct qemu_laio_state *s = s_;
s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
aio_set_event_notifier(new_context, &s->e, qemu_laio_completion_cb);
}