net/colo-compare.c: Fix deadlock in compare_chr_send

The chr_out chardev is connected to a filter-redirector
running in the main loop. qemu_chr_fe_write_all might block
here in compare_chr_send if the (socket-)buffer is full.
If another filter-redirector in the main loop want's to
send data to chr_pri_in it might also block if the buffer
is full. This leads to a deadlock because both event loops
get blocked.

Fix this by converting compare_chr_send to a coroutine and
putting the packets in a send queue.

Signed-off-by: Lukas Straub <lukasstraub2@web.de>
Reviewed-by: Zhang Chen <chen.zhang@intel.com>
Tested-by: Zhang Chen <chen.zhang@intel.com>
Signed-off-by: Zhang Chen <chen.zhang@intel.com>
Signed-off-by: Jason Wang <jasowang@redhat.com>
This commit is contained in:
Lukas Straub 2020-05-22 15:53:53 +08:00 committed by Jason Wang
parent 2158fa1be7
commit 9c55fe9408
3 changed files with 162 additions and 51 deletions

View File

@ -32,6 +32,9 @@
#include "migration/migration.h" #include "migration/migration.h"
#include "util.h" #include "util.h"
#include "block/aio-wait.h"
#include "qemu/coroutine.h"
#define TYPE_COLO_COMPARE "colo-compare" #define TYPE_COLO_COMPARE "colo-compare"
#define COLO_COMPARE(obj) \ #define COLO_COMPARE(obj) \
OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE) OBJECT_CHECK(CompareState, (obj), TYPE_COLO_COMPARE)
@ -77,6 +80,23 @@ static int event_unhandled_count;
* |packet | |packet + |packet | |packet + * |packet | |packet + |packet | |packet +
* +--------+ +--------+ +--------+ +--------+ * +--------+ +--------+ +--------+ +--------+
*/ */
typedef struct SendCo {
Coroutine *co;
struct CompareState *s;
CharBackend *chr;
GQueue send_list;
bool notify_remote_frame;
bool done;
int ret;
} SendCo;
typedef struct SendEntry {
uint32_t size;
uint32_t vnet_hdr_len;
uint8_t *buf;
} SendEntry;
typedef struct CompareState { typedef struct CompareState {
Object parent; Object parent;
@ -91,6 +111,8 @@ typedef struct CompareState {
SocketReadState pri_rs; SocketReadState pri_rs;
SocketReadState sec_rs; SocketReadState sec_rs;
SocketReadState notify_rs; SocketReadState notify_rs;
SendCo out_sendco;
SendCo notify_sendco;
bool vnet_hdr; bool vnet_hdr;
uint32_t compare_timeout; uint32_t compare_timeout;
uint32_t expired_scan_cycle; uint32_t expired_scan_cycle;
@ -124,10 +146,11 @@ enum {
static int compare_chr_send(CompareState *s, static int compare_chr_send(CompareState *s,
const uint8_t *buf, uint8_t *buf,
uint32_t size, uint32_t size,
uint32_t vnet_hdr_len, uint32_t vnet_hdr_len,
bool notify_remote_frame); bool notify_remote_frame,
bool zero_copy);
static bool packet_matches_str(const char *str, static bool packet_matches_str(const char *str,
const uint8_t *buf, const uint8_t *buf,
@ -145,7 +168,7 @@ static void notify_remote_frame(CompareState *s)
char msg[] = "DO_CHECKPOINT"; char msg[] = "DO_CHECKPOINT";
int ret = 0; int ret = 0;
ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true); ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
if (ret < 0) { if (ret < 0) {
error_report("Notify Xen COLO-frame failed"); error_report("Notify Xen COLO-frame failed");
} }
@ -272,12 +295,13 @@ static void colo_release_primary_pkt(CompareState *s, Packet *pkt)
pkt->data, pkt->data,
pkt->size, pkt->size,
pkt->vnet_hdr_len, pkt->vnet_hdr_len,
false); false,
true);
if (ret < 0) { if (ret < 0) {
error_report("colo send primary packet failed"); error_report("colo send primary packet failed");
} }
trace_colo_compare_main("packet same and release packet"); trace_colo_compare_main("packet same and release packet");
packet_destroy(pkt, NULL); packet_destroy_partial(pkt, NULL);
} }
/* /*
@ -699,65 +723,115 @@ static void colo_compare_connection(void *opaque, void *user_data)
} }
} }
static void coroutine_fn _compare_chr_send(void *opaque)
{
SendCo *sendco = opaque;
CompareState *s = sendco->s;
int ret = 0;
while (!g_queue_is_empty(&sendco->send_list)) {
SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
uint32_t len = htonl(entry->size);
ret = qemu_chr_fe_write_all(sendco->chr, (uint8_t *)&len, sizeof(len));
if (ret != sizeof(len)) {
g_free(entry->buf);
g_slice_free(SendEntry, entry);
goto err;
}
if (!sendco->notify_remote_frame && s->vnet_hdr) {
/*
* We send vnet header len make other module(like filter-redirector)
* know how to parse net packet correctly.
*/
len = htonl(entry->vnet_hdr_len);
ret = qemu_chr_fe_write_all(sendco->chr,
(uint8_t *)&len,
sizeof(len));
if (ret != sizeof(len)) {
g_free(entry->buf);
g_slice_free(SendEntry, entry);
goto err;
}
}
ret = qemu_chr_fe_write_all(sendco->chr,
(uint8_t *)entry->buf,
entry->size);
if (ret != entry->size) {
g_free(entry->buf);
g_slice_free(SendEntry, entry);
goto err;
}
g_free(entry->buf);
g_slice_free(SendEntry, entry);
}
sendco->ret = 0;
goto out;
err:
while (!g_queue_is_empty(&sendco->send_list)) {
SendEntry *entry = g_queue_pop_tail(&sendco->send_list);
g_free(entry->buf);
g_slice_free(SendEntry, entry);
}
sendco->ret = ret < 0 ? ret : -EIO;
out:
sendco->co = NULL;
sendco->done = true;
aio_wait_kick();
}
static int compare_chr_send(CompareState *s, static int compare_chr_send(CompareState *s,
const uint8_t *buf, uint8_t *buf,
uint32_t size, uint32_t size,
uint32_t vnet_hdr_len, uint32_t vnet_hdr_len,
bool notify_remote_frame) bool notify_remote_frame,
bool zero_copy)
{ {
int ret = 0; SendCo *sendco;
uint32_t len = htonl(size); SendEntry *entry;
if (notify_remote_frame) {
sendco = &s->notify_sendco;
} else {
sendco = &s->out_sendco;
}
if (!size) { if (!size) {
return 0; return 0;
} }
if (notify_remote_frame) { entry = g_slice_new(SendEntry);
ret = qemu_chr_fe_write_all(&s->chr_notify_dev, entry->size = size;
(uint8_t *)&len, entry->vnet_hdr_len = vnet_hdr_len;
sizeof(len)); if (zero_copy) {
entry->buf = buf;
} else { } else {
ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)&len, sizeof(len)); entry->buf = g_malloc(size);
memcpy(entry->buf, buf, size);
} }
g_queue_push_head(&sendco->send_list, entry);
if (ret != sizeof(len)) { if (sendco->done) {
goto err; sendco->co = qemu_coroutine_create(_compare_chr_send, sendco);
} sendco->done = false;
qemu_coroutine_enter(sendco->co);
if (s->vnet_hdr) { if (sendco->done) {
/* /* report early errors */
* We send vnet header len make other module(like filter-redirector) return sendco->ret;
* know how to parse net packet correctly.
*/
len = htonl(vnet_hdr_len);
if (!notify_remote_frame) {
ret = qemu_chr_fe_write_all(&s->chr_out,
(uint8_t *)&len,
sizeof(len));
}
if (ret != sizeof(len)) {
goto err;
} }
} }
if (notify_remote_frame) { /* assume success */
ret = qemu_chr_fe_write_all(&s->chr_notify_dev,
(uint8_t *)buf,
size);
} else {
ret = qemu_chr_fe_write_all(&s->chr_out, (uint8_t *)buf, size);
}
if (ret != size) {
goto err;
}
return 0; return 0;
err:
return ret < 0 ? ret : -EIO;
} }
static int compare_chr_can_read(void *opaque) static int compare_chr_can_read(void *opaque)
@ -1063,6 +1137,7 @@ static void compare_pri_rs_finalize(SocketReadState *pri_rs)
pri_rs->buf, pri_rs->buf,
pri_rs->packet_len, pri_rs->packet_len,
pri_rs->vnet_hdr_len, pri_rs->vnet_hdr_len,
false,
false); false);
} else { } else {
/* compare packet in the specified connection */ /* compare packet in the specified connection */
@ -1093,7 +1168,7 @@ static void compare_notify_rs_finalize(SocketReadState *notify_rs)
if (packet_matches_str("COLO_USERSPACE_PROXY_INIT", if (packet_matches_str("COLO_USERSPACE_PROXY_INIT",
notify_rs->buf, notify_rs->buf,
notify_rs->packet_len)) { notify_rs->packet_len)) {
ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true); ret = compare_chr_send(s, (uint8_t *)msg, strlen(msg), 0, true, false);
if (ret < 0) { if (ret < 0) {
error_report("Notify Xen COLO-frame INIT failed"); error_report("Notify Xen COLO-frame INIT failed");
} }
@ -1199,6 +1274,20 @@ static void colo_compare_complete(UserCreatable *uc, Error **errp)
QTAILQ_INSERT_TAIL(&net_compares, s, next); QTAILQ_INSERT_TAIL(&net_compares, s, next);
s->out_sendco.s = s;
s->out_sendco.chr = &s->chr_out;
s->out_sendco.notify_remote_frame = false;
s->out_sendco.done = true;
g_queue_init(&s->out_sendco.send_list);
if (s->notify_dev) {
s->notify_sendco.s = s;
s->notify_sendco.chr = &s->chr_notify_dev;
s->notify_sendco.notify_remote_frame = true;
s->notify_sendco.done = true;
g_queue_init(&s->notify_sendco.send_list);
}
g_queue_init(&s->conn_list); g_queue_init(&s->conn_list);
qemu_mutex_init(&event_mtx); qemu_mutex_init(&event_mtx);
@ -1225,8 +1314,9 @@ static void colo_flush_packets(void *opaque, void *user_data)
pkt->data, pkt->data,
pkt->size, pkt->size,
pkt->vnet_hdr_len, pkt->vnet_hdr_len,
false); false,
packet_destroy(pkt, NULL); true);
packet_destroy_partial(pkt, NULL);
} }
while (!g_queue_is_empty(&conn->secondary_list)) { while (!g_queue_is_empty(&conn->secondary_list)) {
pkt = g_queue_pop_head(&conn->secondary_list); pkt = g_queue_pop_head(&conn->secondary_list);
@ -1297,10 +1387,23 @@ static void colo_compare_finalize(Object *obj)
} }
} }
AioContext *ctx = iothread_get_aio_context(s->iothread);
aio_context_acquire(ctx);
AIO_WAIT_WHILE(ctx, !s->out_sendco.done);
if (s->notify_dev) {
AIO_WAIT_WHILE(ctx, !s->notify_sendco.done);
}
aio_context_release(ctx);
/* Release all unhandled packets after compare thead exited */ /* Release all unhandled packets after compare thead exited */
g_queue_foreach(&s->conn_list, colo_flush_packets, s); g_queue_foreach(&s->conn_list, colo_flush_packets, s);
AIO_WAIT_WHILE(NULL, !s->out_sendco.done);
g_queue_clear(&s->conn_list); g_queue_clear(&s->conn_list);
g_queue_clear(&s->out_sendco.send_list);
if (s->notify_dev) {
g_queue_clear(&s->notify_sendco.send_list);
}
if (s->connection_track_table) { if (s->connection_track_table) {
g_hash_table_destroy(s->connection_track_table); g_hash_table_destroy(s->connection_track_table);

View File

@ -185,6 +185,13 @@ void packet_destroy(void *opaque, void *user_data)
g_slice_free(Packet, pkt); g_slice_free(Packet, pkt);
} }
void packet_destroy_partial(void *opaque, void *user_data)
{
Packet *pkt = opaque;
g_slice_free(Packet, pkt);
}
/* /*
* Clear hashtable, stop this hash growing really huge * Clear hashtable, stop this hash growing really huge
*/ */

View File

@ -102,5 +102,6 @@ bool connection_has_tracked(GHashTable *connection_track_table,
void connection_hashtable_reset(GHashTable *connection_track_table); void connection_hashtable_reset(GHashTable *connection_track_table);
Packet *packet_new(const void *data, int size, int vnet_hdr_len); Packet *packet_new(const void *data, int size, int vnet_hdr_len);
void packet_destroy(void *opaque, void *user_data); void packet_destroy(void *opaque, void *user_data);
void packet_destroy_partial(void *opaque, void *user_data);
#endif /* NET_COLO_H */ #endif /* NET_COLO_H */