Switched Netplay over to TCP. A lot of the stalling logic had to change

for this, and in particular, it now sometimes stalls in a way that makes
it very difficult to actually input anything (whoops :) ). Simply
setting the sync frames higher avoids that. With supported cores, this
is incredibly risilient, but when it fails, it mostly fails to freezing,
which is less than ideal.

TODO: Stall frames should be configurable. All the UDP code is still
there but commented out, should be gutted. The original fast-forward
code is now commented out, but really both fast-forward and stalling
should be options; the only complication is that it needs to send
simulated self-input for fast-forward.
This commit is contained in:
Gregor Richards 2016-09-12 07:42:35 -04:00
parent 9a80a1bd7e
commit 5edfbeafb0
5 changed files with 157 additions and 68 deletions

View File

@ -65,7 +65,9 @@ static void warn_hangup(void)
bool check_netplay_synched(netplay_t* netplay)
{
retro_assert(netplay);
return netplay->self_frame_count < (netplay->flip_frame + 2 * UDP_FRAME_PACKETS);
/*return netplay->self_frame_count < (netplay->flip_frame + 2 * UDP_FRAME_PACKETS);*/
/* FIXME */
return true;
}
static bool netplay_info_cb(netplay_t* netplay, unsigned frames) {
@ -96,6 +98,7 @@ static bool netplay_can_poll(netplay_t *netplay)
return netplay->can_poll;
}
#if 0
static bool send_chunk(netplay_t *netplay)
{
const struct sockaddr *addr = NULL;
@ -128,6 +131,7 @@ static bool send_chunk(netplay_t *netplay)
}
return true;
}
#endif
/**
* get_self_input_state:
@ -139,10 +143,11 @@ static bool send_chunk(netplay_t *netplay)
**/
static bool get_self_input_state(netplay_t *netplay)
{
uint32_t state[UDP_WORDS_PER_FRAME - 1] = {0};
uint32_t state[WORDS_PER_FRAME - 1] = {0};
struct delta_frame *ptr = &netplay->buffer[netplay->self_ptr];
if (!netplay_delta_frame_ready(netplay, ptr, netplay->self_frame_count)) return false;
if (!netplay_delta_frame_ready(netplay, ptr, netplay->self_frame_count))
return false;
if (!input_driver_is_libretro_input_blocked() && netplay->self_frame_count > 0)
{
@ -181,18 +186,19 @@ static bool get_self_input_state(netplay_t *netplay)
* }
*
* payload {
* ; To compat packet losses, send input in a sliding window
* frame redundancy_frames[UDP_FRAME_PACKETS];
* cmd (CMD_INPUT)
* cmd_size (4 words)
* frame
* }
*/
memmove(netplay->packet_buffer, netplay->packet_buffer + UDP_WORDS_PER_FRAME,
sizeof (netplay->packet_buffer) - UDP_WORDS_PER_FRAME * sizeof(uint32_t));
netplay->packet_buffer[(UDP_FRAME_PACKETS - 1) * UDP_WORDS_PER_FRAME] = htonl(netplay->self_frame_count);
netplay->packet_buffer[(UDP_FRAME_PACKETS - 1) * UDP_WORDS_PER_FRAME + 1] = htonl(state[0]);
netplay->packet_buffer[(UDP_FRAME_PACKETS - 1) * UDP_WORDS_PER_FRAME + 2] = htonl(state[1]);
netplay->packet_buffer[(UDP_FRAME_PACKETS - 1) * UDP_WORDS_PER_FRAME + 3] = htonl(state[2]);
netplay->packet_buffer[0] = htonl(NETPLAY_CMD_INPUT);
netplay->packet_buffer[1] = htonl(WORDS_PER_FRAME * sizeof(uint32_t));
netplay->packet_buffer[2] = htonl(netplay->self_frame_count);
netplay->packet_buffer[3] = htonl(state[0]);
netplay->packet_buffer[4] = htonl(state[1]);
netplay->packet_buffer[5] = htonl(state[2]);
if (!send_chunk(netplay))
if (!socket_send_all_blocking(netplay->fd, netplay->packet_buffer, sizeof(netplay->packet_buffer), false))
{
warn_hangup();
netplay->has_connection = false;
@ -204,16 +210,32 @@ static bool get_self_input_state(netplay_t *netplay)
return true;
}
static bool netplay_send_raw_cmd(netplay_t *netplay, uint32_t cmd,
const void *data, size_t size)
{
uint32_t cmdbuf[2];
cmdbuf[0] = htonl(cmd);
cmdbuf[1] = htonl(size);
if (!socket_send_all_blocking(netplay->fd, cmdbuf, sizeof(cmdbuf), false))
return false;
if (size > 0)
if (!socket_send_all_blocking(netplay->fd, data, size, false))
return false;
return true;
}
static bool netplay_cmd_ack(netplay_t *netplay)
{
uint32_t cmd = htonl(NETPLAY_CMD_ACK);
return socket_send_all_blocking(netplay->fd, &cmd, sizeof(cmd), false);
return netplay_send_raw_cmd(netplay, NETPLAY_CMD_ACK, NULL, 0);
}
static bool netplay_cmd_nak(netplay_t *netplay)
{
uint32_t cmd = htonl(NETPLAY_CMD_NAK);
return socket_send_all_blocking(netplay->fd, &cmd, sizeof(cmd), false);
return netplay_send_raw_cmd(netplay, NETPLAY_CMD_NAK, NULL, 0);
}
static bool netplay_get_response(netplay_t *netplay)
@ -231,6 +253,10 @@ static bool netplay_get_cmd(netplay_t *netplay)
uint32_t flip_frame;
uint32_t cmd_size;
/* If we're not ready for input, wait until we are. Could fill the TCP buffer, stalling the other side. */
if (!netplay_delta_frame_ready(netplay, &netplay->buffer[netplay->read_ptr], netplay->read_frame_count))
return false;
if (!socket_receive_all_blocking(netplay->fd, &cmd, sizeof(cmd)))
return false;
@ -243,6 +269,53 @@ static bool netplay_get_cmd(netplay_t *netplay)
switch (cmd)
{
case NETPLAY_CMD_ACK:
case NETPLAY_CMD_NAK:
/* Why are we even bothering? */
return true;
case NETPLAY_CMD_INPUT:
{
uint32_t buffer[WORDS_PER_FRAME];
unsigned i;
if (cmd_size != WORDS_PER_FRAME * sizeof(uint32_t))
{
RARCH_ERR("NETPLAY_CMD_INPUT received an unexpected payload size.\n");
return netplay_cmd_nak(netplay);
}
if (!socket_receive_all_blocking(netplay->fd, buffer, sizeof(buffer)))
{
RARCH_ERR("Failed to receive NETPLAY_CMD_INPUT input.\n");
return netplay_cmd_nak(netplay);
}
for (i = 0; i < WORDS_PER_FRAME; i++)
buffer[i] = ntohl(buffer[i]);
if (buffer[0] != netplay->read_frame_count)
{
/* FIXME: JUST drop it? */
return netplay_cmd_nak(netplay);
}
if (!netplay_delta_frame_ready(netplay, &netplay->buffer[netplay->read_ptr], netplay->read_frame_count))
{
/* FIXME: If we're here, we're desyncing. */
netplay->must_fast_forward = true;
return netplay_cmd_nak(netplay);
}
/* The data's good! */
netplay->buffer[netplay->read_ptr].is_simulated = false;
memcpy(netplay->buffer[netplay->read_ptr].real_input_state, buffer + 1, sizeof(buffer) - sizeof(uint32_t));
netplay->read_ptr = NEXT_PTR(netplay->read_ptr);
netplay->read_frame_count++;
netplay->timeout_cnt = 0;
return netplay_cmd_ack(netplay);
}
case NETPLAY_CMD_FLIP_PLAYERS:
if (cmd_size != sizeof(uint32_t))
{
@ -305,6 +378,7 @@ static bool netplay_get_cmd(netplay_t *netplay)
static int poll_input(netplay_t *netplay, bool block)
{
bool had_input = false;
int max_fd = (netplay->fd > netplay->udp_fd ?
netplay->fd : netplay->udp_fd) + 1;
struct timeval tv = {0};
@ -318,6 +392,7 @@ static int poll_input(netplay_t *netplay, bool block)
* Technically possible for select() to modify tmp_tv, so
* we go paranoia mode. */
struct timeval tmp_tv = tv;
had_input = false;
netplay->timeout_cnt++;
@ -330,31 +405,40 @@ static int poll_input(netplay_t *netplay, bool block)
/* Somewhat hacky,
* but we aren't using the TCP connection for anything useful atm. */
if (FD_ISSET(netplay->fd, &fds) && !netplay_get_cmd(netplay))
if (FD_ISSET(netplay->fd, &fds))
{
had_input = true;
if (!netplay_get_cmd(netplay))
return -1;
}
#if 0
if (FD_ISSET(netplay->udp_fd, &fds))
return 1;
#endif
if (!block)
continue;
#if 0
if (!send_chunk(netplay))
{
warn_hangup();
netplay->has_connection = false;
return -1;
}
#endif
RARCH_LOG("Network is stalling, resending packet... Count %u of %d ...\n",
netplay->timeout_cnt, MAX_RETRIES);
} while ((netplay->timeout_cnt < MAX_RETRIES) && block);
} while (had_input || (block && netplay->read_frame_count < netplay->self_frame_count));
if (block)
return -1;
/*if (block)
return -1;*/
return 0;
}
#if 0
static bool receive_data(netplay_t *netplay, uint32_t *buffer, size_t size)
{
socklen_t addrlen = sizeof(netplay->their_addr);
@ -400,6 +484,7 @@ static void parse_packet(netplay_t *netplay, uint32_t *buffer, unsigned size)
netplay->timeout_cnt = 0;
}
}
#endif
/* TODO: Somewhat better prediction. :P */
static void simulate_input(netplay_t *netplay)
@ -454,7 +539,7 @@ static bool netplay_poll(netplay_t *netplay)
/* We might have reached the end of the buffer, where we
* simply have to block. */
res = poll_input(netplay, netplay->other_ptr == netplay->self_ptr);
res = poll_input(netplay, netplay->read_frame_count < netplay->self_frame_count - 10); /* FIXME: configure stalling intervals */
if (res == -1)
{
netplay->has_connection = false;
@ -462,6 +547,7 @@ static bool netplay_poll(netplay_t *netplay)
return false;
}
#if 0
if (res == 1)
{
uint32_t first_read = netplay->read_frame_count;
@ -489,8 +575,9 @@ static bool netplay_poll(netplay_t *netplay)
return false;
}
}
#endif
if (netplay->read_frame_count <= netplay->self_frame_count)
if (netplay->read_frame_count < netplay->self_frame_count)
simulate_input(netplay);
else
netplay->buffer[PREV_PTR(netplay->self_ptr)].used_real = true;
@ -839,8 +926,8 @@ netplay_t *netplay_new(const char *server, uint16_t port,
{
netplay_t *netplay = NULL;
if (frames > UDP_FRAME_PACKETS)
frames = UDP_FRAME_PACKETS;
/*if (frames > UDP_FRAME_PACKETS)
frames = UDP_FRAME_PACKETS;*/
netplay = (netplay_t*)calloc(1, sizeof(*netplay));
if (!netplay)
@ -880,26 +967,6 @@ error:
return NULL;
}
static bool netplay_send_raw_cmd(netplay_t *netplay, uint32_t cmd,
const void *data, size_t size)
{
uint32_t cmd_size;
cmd = htonl(cmd);
cmd_size = htonl(size);
if (!socket_send_all_blocking(netplay->fd, &cmd, sizeof(cmd), false))
return false;
if (!socket_send_all_blocking(netplay->fd, &cmd_size, sizeof(cmd_size), false))
return false;
if (!socket_send_all_blocking(netplay->fd, data, size, false))
return false;
return true;
}
/**
* netplay_command:
* @netplay : pointer to netplay object
@ -970,7 +1037,7 @@ error:
**/
static void netplay_flip_users(netplay_t *netplay)
{
uint32_t flip_frame = netplay->self_frame_count + 2 * UDP_FRAME_PACKETS;
uint32_t flip_frame = netplay->self_frame_count + 32; /* FIXME: This value is now arbitrary */
uint32_t flip_frame_net = htonl(flip_frame);
bool command = netplay_command(
netplay, NETPLAY_CMD_FLIP_PLAYERS,

View File

@ -40,7 +40,7 @@ enum rarch_netplay_ctl_state
enum netplay_cmd
{
/* Miscellaneous commands */
/* Basic commands */
/* Acknowlegement response */
NETPLAY_CMD_ACK = 0x0000,
@ -48,23 +48,28 @@ enum netplay_cmd
/* Failed acknowlegement response */
NETPLAY_CMD_NAK = 0x0001,
/* Input data */
NETPLAY_CMD_INPUT = 0x0002,
/* Misc. commands */
/* Swap inputs between player 1 and player 2 */
NETPLAY_CMD_FLIP_PLAYERS = 0x0002,
NETPLAY_CMD_FLIP_PLAYERS = 0x0003,
/* Toggle spectate/join mode */
NETPLAY_CMD_SPECTATE = 0x0003,
NETPLAY_CMD_SPECTATE = 0x0004,
/* Gracefully disconnects from host */
NETPLAY_CMD_DISCONNECT = 0x0004,
NETPLAY_CMD_DISCONNECT = 0x0005,
/* Sends multiple config requests over,
* See enum netplay_cmd_cfg */
NETPLAY_CMD_CFG = 0x0005,
NETPLAY_CMD_CFG = 0x0006,
/* CMD_CFG streamlines sending multiple
configurations. This acknowledges
each one individually */
NETPLAY_CMD_CFG_ACK = 0x0006,
NETPLAY_CMD_CFG_ACK = 0x0007,
/* Loading and synchronization */

View File

@ -347,13 +347,16 @@ bool netplay_is_spectate(netplay_t* netplay)
bool netplay_delta_frame_ready(netplay_t *netplay, struct delta_frame *delta, uint32_t frame)
{
void *remember_state;
if (delta->frame == frame) return true;
if (netplay->other_frame_count <= delta->frame)
{
/* We haven't even replayed this frame yet, so we can't overwrite it! */
return false;
}
remember_state = delta->state;
memset(delta, 0, sizeof(struct delta_frame));
delta->frame = frame;
delta->state = remember_state;
return true;
}

View File

@ -32,6 +32,7 @@ static void netplay_net_pre_frame(netplay_t *netplay)
if (netplay_delta_frame_ready(netplay, &netplay->buffer[netplay->self_ptr], netplay->self_frame_count))
{
serial_info.data_const = NULL;
serial_info.data = netplay->buffer[netplay->self_ptr].state;
serial_info.size = netplay->state_size;
@ -79,13 +80,17 @@ static void netplay_net_post_frame(netplay_t *netplay)
/* Replay frames. */
netplay->is_replay = true;
netplay->replay_ptr = netplay->other_ptr;
netplay->replay_frame_count = netplay->other_frame_count;
netplay->replay_ptr = PREV_PTR(netplay->other_ptr);
netplay->replay_frame_count = netplay->other_frame_count - 1;
serial_info.data_const = netplay->buffer[netplay->other_ptr].state;
if (netplay->replay_frame_count < netplay->self_frame_count)
{
serial_info.data = NULL;
serial_info.data_const = netplay->buffer[netplay->replay_ptr].state;
serial_info.size = netplay->state_size;
core_unserialize(&serial_info);
}
while (netplay->replay_frame_count < netplay->self_frame_count)
{
@ -106,11 +111,21 @@ static void netplay_net_post_frame(netplay_t *netplay)
netplay->replay_frame_count++;
}
/* For the remainder of the frames up to the read count, we can use the real data */
while (netplay->replay_frame_count < netplay->read_frame_count)
{
netplay->buffer[netplay->replay_ptr].is_simulated = false;
netplay->buffer[netplay->replay_ptr].used_real = true;
netplay->replay_ptr = NEXT_PTR(netplay->replay_ptr);
netplay->replay_frame_count++;
}
netplay->other_ptr = netplay->read_ptr;
netplay->other_frame_count = netplay->read_frame_count;
netplay->is_replay = false;
}
#if 0
/* And if the other side has gotten too far ahead of /us/, skip to catch up
* FIXME: Make this configurable */
if (netplay->read_frame_count > netplay->self_frame_count + 10 ||
@ -149,6 +164,7 @@ static void netplay_net_post_frame(netplay_t *netplay)
netplay->other_frame_count = netplay->read_frame_count;
netplay->is_replay = false;
}
#endif
}
static bool netplay_net_init_buffers(netplay_t *netplay)

View File

@ -30,8 +30,7 @@
#define HAVE_IPV6
#endif
#define UDP_FRAME_PACKETS 16
#define UDP_WORDS_PER_FRAME 4 /* Allows us to send 128 bits worth of state per frame. */
#define WORDS_PER_FRAME 4 /* Allows us to send 128 bits worth of state per frame. */
#define MAX_SPECTATORS 16
#define RARCH_DEFAULT_PORT 55435
@ -46,9 +45,9 @@ struct delta_frame
void *state;
uint32_t real_input_state[UDP_WORDS_PER_FRAME - 1];
uint32_t simulated_input_state[UDP_WORDS_PER_FRAME - 1];
uint32_t self_state[UDP_WORDS_PER_FRAME - 1];
uint32_t real_input_state[WORDS_PER_FRAME - 1];
uint32_t simulated_input_state[WORDS_PER_FRAME - 1];
uint32_t self_state[WORDS_PER_FRAME - 1];
bool have_local;
bool is_simulated;
@ -98,9 +97,8 @@ struct netplay
/* If we end up having to drop remote frame data because it's ahead of us, fast-forward is URGENT */
bool must_fast_forward;
/* To compat UDP packet loss we also send
* old data along with the packets. */
uint32_t packet_buffer[UDP_FRAME_PACKETS * UDP_WORDS_PER_FRAME];
/* A buffer for outgoing input packets. */
uint32_t packet_buffer[2 + WORDS_PER_FRAME];
uint32_t self_frame_count;
uint32_t read_frame_count;
uint32_t other_frame_count;