From 54d3b160d7593634f23ee67b45fd0e2218279285 Mon Sep 17 00:00:00 2001 From: Michael Jumper Date: Sun, 30 Jun 2013 10:55:14 -0700 Subject: [PATCH] Fix threading issues regarding flush DURING write not being blocked. Set process-shared attribute on mutexes in socket. --- src/libguac/guacamole/socket.h | 22 +++++++++++++ src/libguac/socket.c | 58 ++++++++++++++++++++++++++++------ 2 files changed, 71 insertions(+), 9 deletions(-) diff --git a/src/libguac/guacamole/socket.h b/src/libguac/guacamole/socket.h index 74eeb881..a9d63d74 100644 --- a/src/libguac/guacamole/socket.h +++ b/src/libguac/guacamole/socket.h @@ -205,6 +205,11 @@ struct guac_socket { */ pthread_mutex_t __instruction_write_lock; + /** + * Lock which is acquired when the buffer is being modified or flushed. + */ + pthread_mutex_t __buffer_lock; + }; /** @@ -251,6 +256,23 @@ void guac_socket_instruction_begin(guac_socket* socket); */ void guac_socket_instruction_end(guac_socket* socket); +/** + * Marks the beginning of a socket's buffer modification. If threadsafety is + * enabled on the socket, other functions which modify the buffer will be + * blocked until this modification is complete. + * + * @param socket The guac_socket whose buffer is being updated. + */ +void guac_socket_update_buffer_begin(guac_socket* socket); + +/** + * Marks the end of a socket's buffer modification. If threadsafety is enabled + * on the socket, other functions which modify the buffer will now be allowed + * to continue. + * + * @param socket The guac_socket whose buffer is done being updated. + */ +void guac_socket_update_buffer_end(guac_socket* socket); /** * Allocates and initializes a new guac_socket object with the given open diff --git a/src/libguac/socket.c b/src/libguac/socket.c index af7bba14..e25c5f27 100644 --- a/src/libguac/socket.c +++ b/src/libguac/socket.c @@ -125,6 +125,7 @@ int guac_socket_select(guac_socket* socket, int usec_timeout) { guac_socket* guac_socket_alloc() { + pthread_mutexattr_t lock_attributes; guac_socket* socket = malloc(sizeof(guac_socket)); /* If no memory available, return with error */ @@ -155,10 +156,14 @@ guac_socket* guac_socket_alloc() { socket->__instructionbuf_parse_start = 0; socket->__instructionbuf_elementc = 0; - /* Default to unsafe threading */ socket->__threadsafe_instructions = 0; - pthread_mutex_init(&(socket->__instruction_write_lock), NULL); + + pthread_mutexattr_init(&lock_attributes); + pthread_mutexattr_setpshared(&lock_attributes, PTHREAD_PROCESS_SHARED); + + pthread_mutex_init(&(socket->__instruction_write_lock), &lock_attributes); + pthread_mutex_init(&(socket->__buffer_lock), &lock_attributes); /* No handlers yet */ socket->read_handler = NULL; @@ -184,12 +189,28 @@ void guac_socket_instruction_begin(guac_socket* socket) { void guac_socket_instruction_end(guac_socket* socket) { - /* Lock writes if threadsafety enabled */ + /* Unlock writes if threadsafety enabled */ if (socket->__threadsafe_instructions) pthread_mutex_unlock(&(socket->__instruction_write_lock)); } +void guac_socket_update_buffer_begin(guac_socket* socket) { + + /* Lock if threadsafety enabled */ + if (socket->__threadsafe_instructions) + pthread_mutex_lock(&(socket->__buffer_lock)); + +} + +void guac_socket_update_buffer_end(guac_socket* socket) { + + /* Unlock if threadsafety enabled */ + if (socket->__threadsafe_instructions) + pthread_mutex_unlock(&(socket->__buffer_lock)); + +} + void guac_socket_free(guac_socket* socket) { /* Call free handler if defined */ @@ -214,6 +235,8 @@ ssize_t guac_socket_write_string(guac_socket* socket, const char* str) { char* __out_buf = socket->__out_buf; + guac_socket_update_buffer_begin(socket); + for (; *str != '\0'; str++) { __out_buf[socket->__written++] = *str; @@ -221,10 +244,12 @@ ssize_t guac_socket_write_string(guac_socket* socket, const char* str) { /* Flush when necessary, return on error. Note that we must flush within 4 bytes of boundary because * __guac_socket_write_base64_triplet ALWAYS writes four bytes, and would otherwise potentially overflow * the buffer. */ - if (socket->__written >= GUAC_SOCKET_OUTPUT_BUFFER_SIZE - 4) { + if (socket->__written > GUAC_SOCKET_OUTPUT_BUFFER_SIZE - 4) { - if (guac_socket_write(socket, __out_buf, socket->__written)) + if (guac_socket_write(socket, __out_buf, socket->__written)) { + guac_socket_update_buffer_end(socket); return 1; + } socket->__written = 0; @@ -232,6 +257,7 @@ ssize_t guac_socket_write_string(guac_socket* socket, const char* str) { } + guac_socket_update_buffer_end(socket); return 0; } @@ -264,7 +290,7 @@ ssize_t __guac_socket_write_base64_triplet(guac_socket* socket, int a, int b, in /* At this point, 4 bytes have been socket->__written */ /* Flush when necessary, return on error */ - if (socket->__written >= GUAC_SOCKET_OUTPUT_BUFFER_SIZE - 4) { + if (socket->__written > GUAC_SOCKET_OUTPUT_BUFFER_SIZE - 4) { if (guac_socket_write(socket, __out_buf, socket->__written)) return -1; @@ -309,14 +335,18 @@ ssize_t guac_socket_write_base64(guac_socket* socket, const void* buf, size_t co const unsigned char* char_buf = (const unsigned char*) buf; const unsigned char* end = char_buf + count; + guac_socket_update_buffer_begin(socket); while (char_buf < end) { retval = __guac_socket_write_base64_byte(socket, *(char_buf++)); - if (retval < 0) + if (retval < 0) { + guac_socket_update_buffer_end(socket); return retval; + } } + guac_socket_update_buffer_end(socket); return 0; } @@ -324,14 +354,18 @@ ssize_t guac_socket_write_base64(guac_socket* socket, const void* buf, size_t co ssize_t guac_socket_flush(guac_socket* socket) { /* Flush remaining bytes in buffer */ + guac_socket_update_buffer_begin(socket); if (socket->__written > 0) { - if (guac_socket_write(socket, socket->__out_buf, socket->__written)) + if (guac_socket_write(socket, socket->__out_buf, socket->__written)) { + guac_socket_update_buffer_end(socket); return 1; + } socket->__written = 0; } + guac_socket_update_buffer_end(socket); return 0; } @@ -341,12 +375,18 @@ ssize_t guac_socket_flush_base64(guac_socket* socket) { int retval; /* Flush triplet to output buffer */ + guac_socket_update_buffer_begin(socket); while (socket->__ready > 0) { + retval = __guac_socket_write_base64_byte(socket, -1); - if (retval < 0) + if (retval < 0) { + guac_socket_update_buffer_end(socket); return retval; + } + } + guac_socket_update_buffer_end(socket); return 0; }