From 9db22fdfcf3bbc2e1956c0812361083097246602 Mon Sep 17 00:00:00 2001 From: Jacek Caban Date: Tue, 21 Mar 2017 13:03:03 +0100 Subject: [PATCH] server: Introduce pipe message queue. Signed-off-by: Jacek Caban Signed-off-by: Alexandre Julliard --- server/named_pipe.c | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/server/named_pipe.c b/server/named_pipe.c index 8113b18777..044bc97394 100644 --- a/server/named_pipe.c +++ b/server/named_pipe.c @@ -65,6 +65,14 @@ enum pipe_state struct named_pipe; +struct pipe_message +{ + struct list entry; /* entry in message queue */ + data_size_t read_pos; /* already read bytes */ + struct iosb *iosb; /* message iosb */ + struct async *async; /* async of pending write */ +}; + struct pipe_end { struct object obj; /* object header */ @@ -72,6 +80,7 @@ struct pipe_end unsigned int flags; /* pipe flags */ struct pipe_end *connection; /* the other end of the pipe */ data_size_t buffer_size;/* size of buffered data that doesn't block caller */ + struct list message_queue; }; struct pipe_server @@ -379,6 +388,13 @@ static void notify_empty( struct pipe_server *server ) fd_async_wake_up( server->pipe_end.fd, ASYNC_TYPE_WAIT, STATUS_SUCCESS ); } +static void free_message( struct pipe_message *message ) +{ + list_remove( &message->entry ); + if (message->iosb) release_object( message->iosb ); + free( message ); +} + static void pipe_end_disconnect( struct pipe_end *pipe_end, unsigned int status ) { struct pipe_end *connection = pipe_end->connection; @@ -409,6 +425,18 @@ static void do_disconnect( struct pipe_server *server ) server->pipe_end.fd = NULL; } +static void pipe_end_destroy( struct pipe_end *pipe_end ) +{ + struct pipe_message *message; + + while (!list_empty( &pipe_end->message_queue )) + { + message = LIST_ENTRY( list_head(&pipe_end->message_queue), struct pipe_message, entry ); + assert( !message->async ); + free_message( message ); + } +} + static void pipe_server_destroy( struct object *obj) { struct pipe_server *server = (struct pipe_server *)obj; @@ -423,6 +451,7 @@ static void pipe_server_destroy( struct object *obj) do_disconnect( server ); } + pipe_end_destroy( &server->pipe_end ); if (server->client) { server->client->server = NULL; @@ -467,6 +496,8 @@ static void pipe_client_destroy( struct object *obj) server->client = NULL; client->server = NULL; } + + pipe_end_destroy( &client->pipe_end ); if (client->pipe_end.fd) release_object( client->pipe_end.fd ); } @@ -550,7 +581,7 @@ static int pipe_data_remaining( struct pipe_server *server ) assert( server->client ); if (use_server_io( &server->pipe_end )) - return 0; + return !list_empty( &server->client->pipe_end.message_queue ); fd = get_unix_fd( server->client->pipe_end.fd ); if (fd < 0) @@ -721,6 +752,7 @@ static void init_pipe_end( struct pipe_end *pipe_end, unsigned int pipe_flags, d pipe_end->flags = pipe_flags; pipe_end->connection = NULL; pipe_end->buffer_size = buffer_size; + list_init( &pipe_end->message_queue ); } static struct pipe_server *create_pipe_server( struct named_pipe *pipe, unsigned int options,