nbd/server: Quiesce coroutines on context switch

When switching between AIO contexts we need to me make sure that both
recv_coroutine and send_coroutine are not scheduled to run. Otherwise,
QEMU may crash while attaching the new context with an error like
this one:

aio_co_schedule: Co-routine was already scheduled in 'aio_co_schedule'

To achieve this we need a local implementation of
'qio_channel_readv_all_eof' named 'nbd_read_eof' (a trick already done
by 'nbd/client.c') that allows us to interrupt the operation and to
know when recv_coroutine is yielding.

With this in place, we delegate detaching the AIO context to the
owning context with a BH ('nbd_aio_detach_bh') scheduled using
'aio_wait_bh_oneshot'. This BH signals that we need to quiesce the
channel by setting 'client->quiescing' to 'true', and either waits for
the coroutine to finish using AIO_WAIT_WHILE or, if it's yielding in
'nbd_read_eof', actively enters the coroutine to interrupt it.

RHBZ: https://bugzilla.redhat.com/show_bug.cgi?id=1900326
Signed-off-by: Sergio Lopez <slp@redhat.com>
Reviewed-by: Eric Blake <eblake@redhat.com>
Message-Id: <20201214170519.223781-4-slp@redhat.com>
Signed-off-by: Eric Blake <eblake@redhat.com>
This commit is contained in:
Sergio Lopez 2020-12-14 18:05:18 +01:00 committed by Eric Blake
parent c7040ff64e
commit f148ae7d36

View File

@ -132,6 +132,9 @@ struct NBDClient {
CoMutex send_lock; CoMutex send_lock;
Coroutine *send_coroutine; Coroutine *send_coroutine;
bool read_yielding;
bool quiescing;
QTAILQ_ENTRY(NBDClient) next; QTAILQ_ENTRY(NBDClient) next;
int nb_requests; int nb_requests;
bool closing; bool closing;
@ -1352,14 +1355,60 @@ static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
return 0; return 0;
} }
static int nbd_receive_request(QIOChannel *ioc, NBDRequest *request, /* nbd_read_eof
* Tries to read @size bytes from @ioc. This is a local implementation of
* qio_channel_readv_all_eof. We have it here because we need it to be
* interruptible and to know when the coroutine is yielding.
* Returns 1 on success
* 0 on eof, when no data was read (errp is not set)
* negative errno on failure (errp is set)
*/
static inline int coroutine_fn
nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
{
bool partial = false;
assert(size);
while (size > 0) {
struct iovec iov = { .iov_base = buffer, .iov_len = size };
ssize_t len;
len = qio_channel_readv(client->ioc, &iov, 1, errp);
if (len == QIO_CHANNEL_ERR_BLOCK) {
client->read_yielding = true;
qio_channel_yield(client->ioc, G_IO_IN);
client->read_yielding = false;
if (client->quiescing) {
return -EAGAIN;
}
continue;
} else if (len < 0) {
return -EIO;
} else if (len == 0) {
if (partial) {
error_setg(errp,
"Unexpected end-of-file before all bytes were read");
return -EIO;
} else {
return 0;
}
}
partial = true;
size -= len;
buffer = (uint8_t *) buffer + len;
}
return 1;
}
static int nbd_receive_request(NBDClient *client, NBDRequest *request,
Error **errp) Error **errp)
{ {
uint8_t buf[NBD_REQUEST_SIZE]; uint8_t buf[NBD_REQUEST_SIZE];
uint32_t magic; uint32_t magic;
int ret; int ret;
ret = nbd_read(ioc, buf, sizeof(buf), "request", errp); ret = nbd_read_eof(client, buf, sizeof(buf), errp);
if (ret < 0) { if (ret < 0) {
return ret; return ret;
} }
@ -1480,11 +1529,37 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
QTAILQ_FOREACH(client, &exp->clients, next) { QTAILQ_FOREACH(client, &exp->clients, next) {
qio_channel_attach_aio_context(client->ioc, ctx); qio_channel_attach_aio_context(client->ioc, ctx);
if (client->recv_coroutine) {
aio_co_schedule(ctx, client->recv_coroutine); assert(client->recv_coroutine == NULL);
assert(client->send_coroutine == NULL);
if (client->quiescing) {
client->quiescing = false;
nbd_client_receive_next_request(client);
} }
}
}
static void nbd_aio_detach_bh(void *opaque)
{
NBDExport *exp = opaque;
NBDClient *client;
QTAILQ_FOREACH(client, &exp->clients, next) {
qio_channel_detach_aio_context(client->ioc);
client->quiescing = true;
if (client->recv_coroutine) {
if (client->read_yielding) {
qemu_aio_coroutine_enter(exp->common.ctx,
client->recv_coroutine);
} else {
AIO_WAIT_WHILE(exp->common.ctx, client->recv_coroutine != NULL);
}
}
if (client->send_coroutine) { if (client->send_coroutine) {
aio_co_schedule(ctx, client->send_coroutine); AIO_WAIT_WHILE(exp->common.ctx, client->send_coroutine != NULL);
} }
} }
} }
@ -1492,13 +1567,10 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
static void blk_aio_detach(void *opaque) static void blk_aio_detach(void *opaque)
{ {
NBDExport *exp = opaque; NBDExport *exp = opaque;
NBDClient *client;
trace_nbd_blk_aio_detach(exp->name, exp->common.ctx); trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
QTAILQ_FOREACH(client, &exp->clients, next) { aio_wait_bh_oneshot(exp->common.ctx, nbd_aio_detach_bh, exp);
qio_channel_detach_aio_context(client->ioc);
}
exp->common.ctx = NULL; exp->common.ctx = NULL;
} }
@ -2151,20 +2223,23 @@ static int nbd_co_send_bitmap(NBDClient *client, uint64_t handle,
/* nbd_co_receive_request /* nbd_co_receive_request
* Collect a client request. Return 0 if request looks valid, -EIO to drop * Collect a client request. Return 0 if request looks valid, -EIO to drop
* connection right away, and any other negative value to report an error to * connection right away, -EAGAIN to indicate we were interrupted and the
* the client (although the caller may still need to disconnect after reporting * channel should be quiesced, and any other negative value to report an error
* the error). * to the client (although the caller may still need to disconnect after
* reporting the error).
*/ */
static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request, static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
Error **errp) Error **errp)
{ {
NBDClient *client = req->client; NBDClient *client = req->client;
int valid_flags; int valid_flags;
int ret;
g_assert(qemu_in_coroutine()); g_assert(qemu_in_coroutine());
assert(client->recv_coroutine == qemu_coroutine_self()); assert(client->recv_coroutine == qemu_coroutine_self());
if (nbd_receive_request(client->ioc, request, errp) < 0) { ret = nbd_receive_request(client, request, errp);
return -EIO; if (ret < 0) {
return ret;
} }
trace_nbd_co_receive_request_decode_type(request->handle, request->type, trace_nbd_co_receive_request_decode_type(request->handle, request->type,
@ -2507,6 +2582,17 @@ static coroutine_fn void nbd_trip(void *opaque)
return; return;
} }
if (client->quiescing) {
/*
* We're switching between AIO contexts. Don't attempt to receive a new
* request and kick the main context which may be waiting for us.
*/
nbd_client_put(client);
client->recv_coroutine = NULL;
aio_wait_kick();
return;
}
req = nbd_request_get(client); req = nbd_request_get(client);
ret = nbd_co_receive_request(req, &request, &local_err); ret = nbd_co_receive_request(req, &request, &local_err);
client->recv_coroutine = NULL; client->recv_coroutine = NULL;
@ -2519,6 +2605,11 @@ static coroutine_fn void nbd_trip(void *opaque)
goto done; goto done;
} }
if (ret == -EAGAIN) {
assert(client->quiescing);
goto done;
}
nbd_client_receive_next_request(client); nbd_client_receive_next_request(client);
if (ret == -EIO) { if (ret == -EIO) {
goto disconnect; goto disconnect;
@ -2565,7 +2656,8 @@ disconnect:
static void nbd_client_receive_next_request(NBDClient *client) static void nbd_client_receive_next_request(NBDClient *client)
{ {
if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) { if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS &&
!client->quiescing) {
nbd_client_get(client); nbd_client_get(client);
client->recv_coroutine = qemu_coroutine_create(nbd_trip, client); client->recv_coroutine = qemu_coroutine_create(nbd_trip, client);
aio_co_schedule(client->exp->common.ctx, client->recv_coroutine); aio_co_schedule(client->exp->common.ctx, client->recv_coroutine);