From 5edfbeafb0f6bca2a613b52ca0d3329cd64b1684 Mon Sep 17 00:00:00 2001 From: Gregor Richards Date: Mon, 12 Sep 2016 07:42:35 -0400 Subject: [PATCH] Switched Netplay over to TCP. A lot of the stalling logic had to change for this, and in particular, it now sometimes stalls in a way that makes it very difficult to actually input anything (whoops :) ). Simply setting the sync frames higher avoids that. With supported cores, this is incredibly risilient, but when it fails, it mostly fails to freezing, which is less than ideal. TODO: Stall frames should be configurable. All the UDP code is still there but commented out, should be gutted. The original fast-forward code is now commented out, but really both fast-forward and stalling should be options; the only complication is that it needs to send simulated self-input for fast-forward. --- network/netplay/netplay.c | 159 +++++++++++++++++++++--------- network/netplay/netplay.h | 17 ++-- network/netplay/netplay_common.c | 3 + network/netplay/netplay_net.c | 28 ++++-- network/netplay/netplay_private.h | 18 ++-- 5 files changed, 157 insertions(+), 68 deletions(-) diff --git a/network/netplay/netplay.c b/network/netplay/netplay.c index d80728b8ac..3425c8c712 100644 --- a/network/netplay/netplay.c +++ b/network/netplay/netplay.c @@ -65,7 +65,9 @@ static void warn_hangup(void) bool check_netplay_synched(netplay_t* netplay) { retro_assert(netplay); - return netplay->self_frame_count < (netplay->flip_frame + 2 * UDP_FRAME_PACKETS); + /*return netplay->self_frame_count < (netplay->flip_frame + 2 * UDP_FRAME_PACKETS);*/ + /* FIXME */ + return true; } static bool netplay_info_cb(netplay_t* netplay, unsigned frames) { @@ -96,6 +98,7 @@ static bool netplay_can_poll(netplay_t *netplay) return netplay->can_poll; } +#if 0 static bool send_chunk(netplay_t *netplay) { const struct sockaddr *addr = NULL; @@ -128,6 +131,7 @@ static bool send_chunk(netplay_t *netplay) } return true; } +#endif /** * get_self_input_state: @@ -139,10 +143,11 @@ static bool send_chunk(netplay_t *netplay) **/ static bool get_self_input_state(netplay_t *netplay) { - uint32_t state[UDP_WORDS_PER_FRAME - 1] = {0}; + uint32_t state[WORDS_PER_FRAME - 1] = {0}; struct delta_frame *ptr = &netplay->buffer[netplay->self_ptr]; - if (!netplay_delta_frame_ready(netplay, ptr, netplay->self_frame_count)) return false; + if (!netplay_delta_frame_ready(netplay, ptr, netplay->self_frame_count)) + return false; if (!input_driver_is_libretro_input_blocked() && netplay->self_frame_count > 0) { @@ -181,18 +186,19 @@ static bool get_self_input_state(netplay_t *netplay) * } * * payload { - * ; To compat packet losses, send input in a sliding window - * frame redundancy_frames[UDP_FRAME_PACKETS]; + * cmd (CMD_INPUT) + * cmd_size (4 words) + * frame * } */ - memmove(netplay->packet_buffer, netplay->packet_buffer + UDP_WORDS_PER_FRAME, - sizeof (netplay->packet_buffer) - UDP_WORDS_PER_FRAME * sizeof(uint32_t)); - netplay->packet_buffer[(UDP_FRAME_PACKETS - 1) * UDP_WORDS_PER_FRAME] = htonl(netplay->self_frame_count); - netplay->packet_buffer[(UDP_FRAME_PACKETS - 1) * UDP_WORDS_PER_FRAME + 1] = htonl(state[0]); - netplay->packet_buffer[(UDP_FRAME_PACKETS - 1) * UDP_WORDS_PER_FRAME + 2] = htonl(state[1]); - netplay->packet_buffer[(UDP_FRAME_PACKETS - 1) * UDP_WORDS_PER_FRAME + 3] = htonl(state[2]); + netplay->packet_buffer[0] = htonl(NETPLAY_CMD_INPUT); + netplay->packet_buffer[1] = htonl(WORDS_PER_FRAME * sizeof(uint32_t)); + netplay->packet_buffer[2] = htonl(netplay->self_frame_count); + netplay->packet_buffer[3] = htonl(state[0]); + netplay->packet_buffer[4] = htonl(state[1]); + netplay->packet_buffer[5] = htonl(state[2]); - if (!send_chunk(netplay)) + if (!socket_send_all_blocking(netplay->fd, netplay->packet_buffer, sizeof(netplay->packet_buffer), false)) { warn_hangup(); netplay->has_connection = false; @@ -204,16 +210,32 @@ static bool get_self_input_state(netplay_t *netplay) return true; } +static bool netplay_send_raw_cmd(netplay_t *netplay, uint32_t cmd, + const void *data, size_t size) +{ + uint32_t cmdbuf[2]; + + cmdbuf[0] = htonl(cmd); + cmdbuf[1] = htonl(size); + + if (!socket_send_all_blocking(netplay->fd, cmdbuf, sizeof(cmdbuf), false)) + return false; + + if (size > 0) + if (!socket_send_all_blocking(netplay->fd, data, size, false)) + return false; + + return true; +} + static bool netplay_cmd_ack(netplay_t *netplay) { - uint32_t cmd = htonl(NETPLAY_CMD_ACK); - return socket_send_all_blocking(netplay->fd, &cmd, sizeof(cmd), false); + return netplay_send_raw_cmd(netplay, NETPLAY_CMD_ACK, NULL, 0); } static bool netplay_cmd_nak(netplay_t *netplay) { - uint32_t cmd = htonl(NETPLAY_CMD_NAK); - return socket_send_all_blocking(netplay->fd, &cmd, sizeof(cmd), false); + return netplay_send_raw_cmd(netplay, NETPLAY_CMD_NAK, NULL, 0); } static bool netplay_get_response(netplay_t *netplay) @@ -231,6 +253,10 @@ static bool netplay_get_cmd(netplay_t *netplay) uint32_t flip_frame; uint32_t cmd_size; + /* If we're not ready for input, wait until we are. Could fill the TCP buffer, stalling the other side. */ + if (!netplay_delta_frame_ready(netplay, &netplay->buffer[netplay->read_ptr], netplay->read_frame_count)) + return false; + if (!socket_receive_all_blocking(netplay->fd, &cmd, sizeof(cmd))) return false; @@ -243,6 +269,53 @@ static bool netplay_get_cmd(netplay_t *netplay) switch (cmd) { + case NETPLAY_CMD_ACK: + case NETPLAY_CMD_NAK: + /* Why are we even bothering? */ + return true; + + case NETPLAY_CMD_INPUT: + { + uint32_t buffer[WORDS_PER_FRAME]; + unsigned i; + + if (cmd_size != WORDS_PER_FRAME * sizeof(uint32_t)) + { + RARCH_ERR("NETPLAY_CMD_INPUT received an unexpected payload size.\n"); + return netplay_cmd_nak(netplay); + } + + if (!socket_receive_all_blocking(netplay->fd, buffer, sizeof(buffer))) + { + RARCH_ERR("Failed to receive NETPLAY_CMD_INPUT input.\n"); + return netplay_cmd_nak(netplay); + } + + for (i = 0; i < WORDS_PER_FRAME; i++) + buffer[i] = ntohl(buffer[i]); + + if (buffer[0] != netplay->read_frame_count) + { + /* FIXME: JUST drop it? */ + return netplay_cmd_nak(netplay); + } + + if (!netplay_delta_frame_ready(netplay, &netplay->buffer[netplay->read_ptr], netplay->read_frame_count)) + { + /* FIXME: If we're here, we're desyncing. */ + netplay->must_fast_forward = true; + return netplay_cmd_nak(netplay); + } + + /* The data's good! */ + netplay->buffer[netplay->read_ptr].is_simulated = false; + memcpy(netplay->buffer[netplay->read_ptr].real_input_state, buffer + 1, sizeof(buffer) - sizeof(uint32_t)); + netplay->read_ptr = NEXT_PTR(netplay->read_ptr); + netplay->read_frame_count++; + netplay->timeout_cnt = 0; + return netplay_cmd_ack(netplay); + } + case NETPLAY_CMD_FLIP_PLAYERS: if (cmd_size != sizeof(uint32_t)) { @@ -305,6 +378,7 @@ static bool netplay_get_cmd(netplay_t *netplay) static int poll_input(netplay_t *netplay, bool block) { + bool had_input = false; int max_fd = (netplay->fd > netplay->udp_fd ? netplay->fd : netplay->udp_fd) + 1; struct timeval tv = {0}; @@ -318,6 +392,7 @@ static int poll_input(netplay_t *netplay, bool block) * Technically possible for select() to modify tmp_tv, so * we go paranoia mode. */ struct timeval tmp_tv = tv; + had_input = false; netplay->timeout_cnt++; @@ -330,31 +405,40 @@ static int poll_input(netplay_t *netplay, bool block) /* Somewhat hacky, * but we aren't using the TCP connection for anything useful atm. */ - if (FD_ISSET(netplay->fd, &fds) && !netplay_get_cmd(netplay)) - return -1; + if (FD_ISSET(netplay->fd, &fds)) + { + had_input = true; + if (!netplay_get_cmd(netplay)) + return -1; + } +#if 0 if (FD_ISSET(netplay->udp_fd, &fds)) return 1; +#endif if (!block) continue; +#if 0 if (!send_chunk(netplay)) { warn_hangup(); netplay->has_connection = false; return -1; } +#endif RARCH_LOG("Network is stalling, resending packet... Count %u of %d ...\n", netplay->timeout_cnt, MAX_RETRIES); - } while ((netplay->timeout_cnt < MAX_RETRIES) && block); + } while (had_input || (block && netplay->read_frame_count < netplay->self_frame_count)); - if (block) - return -1; + /*if (block) + return -1;*/ return 0; } +#if 0 static bool receive_data(netplay_t *netplay, uint32_t *buffer, size_t size) { socklen_t addrlen = sizeof(netplay->their_addr); @@ -400,6 +484,7 @@ static void parse_packet(netplay_t *netplay, uint32_t *buffer, unsigned size) netplay->timeout_cnt = 0; } } +#endif /* TODO: Somewhat better prediction. :P */ static void simulate_input(netplay_t *netplay) @@ -454,7 +539,7 @@ static bool netplay_poll(netplay_t *netplay) /* We might have reached the end of the buffer, where we * simply have to block. */ - res = poll_input(netplay, netplay->other_ptr == netplay->self_ptr); + res = poll_input(netplay, netplay->read_frame_count < netplay->self_frame_count - 10); /* FIXME: configure stalling intervals */ if (res == -1) { netplay->has_connection = false; @@ -462,6 +547,7 @@ static bool netplay_poll(netplay_t *netplay) return false; } +#if 0 if (res == 1) { uint32_t first_read = netplay->read_frame_count; @@ -489,8 +575,9 @@ static bool netplay_poll(netplay_t *netplay) return false; } } +#endif - if (netplay->read_frame_count <= netplay->self_frame_count) + if (netplay->read_frame_count < netplay->self_frame_count) simulate_input(netplay); else netplay->buffer[PREV_PTR(netplay->self_ptr)].used_real = true; @@ -839,8 +926,8 @@ netplay_t *netplay_new(const char *server, uint16_t port, { netplay_t *netplay = NULL; - if (frames > UDP_FRAME_PACKETS) - frames = UDP_FRAME_PACKETS; + /*if (frames > UDP_FRAME_PACKETS) + frames = UDP_FRAME_PACKETS;*/ netplay = (netplay_t*)calloc(1, sizeof(*netplay)); if (!netplay) @@ -880,26 +967,6 @@ error: return NULL; } -static bool netplay_send_raw_cmd(netplay_t *netplay, uint32_t cmd, - const void *data, size_t size) -{ - uint32_t cmd_size; - - cmd = htonl(cmd); - cmd_size = htonl(size); - - if (!socket_send_all_blocking(netplay->fd, &cmd, sizeof(cmd), false)) - return false; - - if (!socket_send_all_blocking(netplay->fd, &cmd_size, sizeof(cmd_size), false)) - return false; - - if (!socket_send_all_blocking(netplay->fd, data, size, false)) - return false; - - return true; -} - /** * netplay_command: * @netplay : pointer to netplay object @@ -970,7 +1037,7 @@ error: **/ static void netplay_flip_users(netplay_t *netplay) { - uint32_t flip_frame = netplay->self_frame_count + 2 * UDP_FRAME_PACKETS; + uint32_t flip_frame = netplay->self_frame_count + 32; /* FIXME: This value is now arbitrary */ uint32_t flip_frame_net = htonl(flip_frame); bool command = netplay_command( netplay, NETPLAY_CMD_FLIP_PLAYERS, diff --git a/network/netplay/netplay.h b/network/netplay/netplay.h index b0888a2ba0..3aac5bf1c0 100644 --- a/network/netplay/netplay.h +++ b/network/netplay/netplay.h @@ -40,7 +40,7 @@ enum rarch_netplay_ctl_state enum netplay_cmd { - /* Miscellaneous commands */ + /* Basic commands */ /* Acknowlegement response */ NETPLAY_CMD_ACK = 0x0000, @@ -48,23 +48,28 @@ enum netplay_cmd /* Failed acknowlegement response */ NETPLAY_CMD_NAK = 0x0001, + /* Input data */ + NETPLAY_CMD_INPUT = 0x0002, + + /* Misc. commands */ + /* Swap inputs between player 1 and player 2 */ - NETPLAY_CMD_FLIP_PLAYERS = 0x0002, + NETPLAY_CMD_FLIP_PLAYERS = 0x0003, /* Toggle spectate/join mode */ - NETPLAY_CMD_SPECTATE = 0x0003, + NETPLAY_CMD_SPECTATE = 0x0004, /* Gracefully disconnects from host */ - NETPLAY_CMD_DISCONNECT = 0x0004, + NETPLAY_CMD_DISCONNECT = 0x0005, /* Sends multiple config requests over, * See enum netplay_cmd_cfg */ - NETPLAY_CMD_CFG = 0x0005, + NETPLAY_CMD_CFG = 0x0006, /* CMD_CFG streamlines sending multiple configurations. This acknowledges each one individually */ - NETPLAY_CMD_CFG_ACK = 0x0006, + NETPLAY_CMD_CFG_ACK = 0x0007, /* Loading and synchronization */ diff --git a/network/netplay/netplay_common.c b/network/netplay/netplay_common.c index ffdbddaa80..cbcceedf24 100644 --- a/network/netplay/netplay_common.c +++ b/network/netplay/netplay_common.c @@ -347,13 +347,16 @@ bool netplay_is_spectate(netplay_t* netplay) bool netplay_delta_frame_ready(netplay_t *netplay, struct delta_frame *delta, uint32_t frame) { + void *remember_state; if (delta->frame == frame) return true; if (netplay->other_frame_count <= delta->frame) { /* We haven't even replayed this frame yet, so we can't overwrite it! */ return false; } + remember_state = delta->state; memset(delta, 0, sizeof(struct delta_frame)); delta->frame = frame; + delta->state = remember_state; return true; } diff --git a/network/netplay/netplay_net.c b/network/netplay/netplay_net.c index c6ee585ce8..35bfae9866 100644 --- a/network/netplay/netplay_net.c +++ b/network/netplay/netplay_net.c @@ -32,6 +32,7 @@ static void netplay_net_pre_frame(netplay_t *netplay) if (netplay_delta_frame_ready(netplay, &netplay->buffer[netplay->self_ptr], netplay->self_frame_count)) { + serial_info.data_const = NULL; serial_info.data = netplay->buffer[netplay->self_ptr].state; serial_info.size = netplay->state_size; @@ -79,13 +80,17 @@ static void netplay_net_post_frame(netplay_t *netplay) /* Replay frames. */ netplay->is_replay = true; - netplay->replay_ptr = netplay->other_ptr; - netplay->replay_frame_count = netplay->other_frame_count; + netplay->replay_ptr = PREV_PTR(netplay->other_ptr); + netplay->replay_frame_count = netplay->other_frame_count - 1; - serial_info.data_const = netplay->buffer[netplay->other_ptr].state; - serial_info.size = netplay->state_size; - - core_unserialize(&serial_info); + if (netplay->replay_frame_count < netplay->self_frame_count) + { + serial_info.data = NULL; + serial_info.data_const = netplay->buffer[netplay->replay_ptr].state; + serial_info.size = netplay->state_size; + + core_unserialize(&serial_info); + } while (netplay->replay_frame_count < netplay->self_frame_count) { @@ -106,11 +111,21 @@ static void netplay_net_post_frame(netplay_t *netplay) netplay->replay_frame_count++; } + /* For the remainder of the frames up to the read count, we can use the real data */ + while (netplay->replay_frame_count < netplay->read_frame_count) + { + netplay->buffer[netplay->replay_ptr].is_simulated = false; + netplay->buffer[netplay->replay_ptr].used_real = true; + netplay->replay_ptr = NEXT_PTR(netplay->replay_ptr); + netplay->replay_frame_count++; + } + netplay->other_ptr = netplay->read_ptr; netplay->other_frame_count = netplay->read_frame_count; netplay->is_replay = false; } +#if 0 /* And if the other side has gotten too far ahead of /us/, skip to catch up * FIXME: Make this configurable */ if (netplay->read_frame_count > netplay->self_frame_count + 10 || @@ -149,6 +164,7 @@ static void netplay_net_post_frame(netplay_t *netplay) netplay->other_frame_count = netplay->read_frame_count; netplay->is_replay = false; } +#endif } static bool netplay_net_init_buffers(netplay_t *netplay) diff --git a/network/netplay/netplay_private.h b/network/netplay/netplay_private.h index 13a7dbc904..2479376c0e 100644 --- a/network/netplay/netplay_private.h +++ b/network/netplay/netplay_private.h @@ -30,10 +30,9 @@ #define HAVE_IPV6 #endif -#define UDP_FRAME_PACKETS 16 -#define UDP_WORDS_PER_FRAME 4 /* Allows us to send 128 bits worth of state per frame. */ -#define MAX_SPECTATORS 16 -#define RARCH_DEFAULT_PORT 55435 +#define WORDS_PER_FRAME 4 /* Allows us to send 128 bits worth of state per frame. */ +#define MAX_SPECTATORS 16 +#define RARCH_DEFAULT_PORT 55435 #define NETPLAY_PROTOCOL_VERSION 1 @@ -46,9 +45,9 @@ struct delta_frame void *state; - uint32_t real_input_state[UDP_WORDS_PER_FRAME - 1]; - uint32_t simulated_input_state[UDP_WORDS_PER_FRAME - 1]; - uint32_t self_state[UDP_WORDS_PER_FRAME - 1]; + uint32_t real_input_state[WORDS_PER_FRAME - 1]; + uint32_t simulated_input_state[WORDS_PER_FRAME - 1]; + uint32_t self_state[WORDS_PER_FRAME - 1]; bool have_local; bool is_simulated; @@ -98,9 +97,8 @@ struct netplay /* If we end up having to drop remote frame data because it's ahead of us, fast-forward is URGENT */ bool must_fast_forward; - /* To compat UDP packet loss we also send - * old data along with the packets. */ - uint32_t packet_buffer[UDP_FRAME_PACKETS * UDP_WORDS_PER_FRAME]; + /* A buffer for outgoing input packets. */ + uint32_t packet_buffer[2 + WORDS_PER_FRAME]; uint32_t self_frame_count; uint32_t read_frame_count; uint32_t other_frame_count;