Fix threading issues regarding flush DURING write not being blocked. Set process-shared attribute on mutexes in socket.

This commit is contained in:
Michael Jumper 2013-06-30 10:55:14 -07:00
parent 59b058be3d
commit 54d3b160d7
2 changed files with 71 additions and 9 deletions

View File

@ -205,6 +205,11 @@ struct guac_socket {
*/ */
pthread_mutex_t __instruction_write_lock; pthread_mutex_t __instruction_write_lock;
/**
* Lock which is acquired when the buffer is being modified or flushed.
*/
pthread_mutex_t __buffer_lock;
}; };
/** /**
@ -251,6 +256,23 @@ void guac_socket_instruction_begin(guac_socket* socket);
*/ */
void guac_socket_instruction_end(guac_socket* socket); void guac_socket_instruction_end(guac_socket* socket);
/**
* Marks the beginning of a socket's buffer modification. If threadsafety is
* enabled on the socket, other functions which modify the buffer will be
* blocked until this modification is complete.
*
* @param socket The guac_socket whose buffer is being updated.
*/
void guac_socket_update_buffer_begin(guac_socket* socket);
/**
* Marks the end of a socket's buffer modification. If threadsafety is enabled
* on the socket, other functions which modify the buffer will now be allowed
* to continue.
*
* @param socket The guac_socket whose buffer is done being updated.
*/
void guac_socket_update_buffer_end(guac_socket* socket);
/** /**
* Allocates and initializes a new guac_socket object with the given open * Allocates and initializes a new guac_socket object with the given open

View File

@ -125,6 +125,7 @@ int guac_socket_select(guac_socket* socket, int usec_timeout) {
guac_socket* guac_socket_alloc() { guac_socket* guac_socket_alloc() {
pthread_mutexattr_t lock_attributes;
guac_socket* socket = malloc(sizeof(guac_socket)); guac_socket* socket = malloc(sizeof(guac_socket));
/* If no memory available, return with error */ /* If no memory available, return with error */
@ -155,10 +156,14 @@ guac_socket* guac_socket_alloc() {
socket->__instructionbuf_parse_start = 0; socket->__instructionbuf_parse_start = 0;
socket->__instructionbuf_elementc = 0; socket->__instructionbuf_elementc = 0;
/* Default to unsafe threading */ /* Default to unsafe threading */
socket->__threadsafe_instructions = 0; socket->__threadsafe_instructions = 0;
pthread_mutex_init(&(socket->__instruction_write_lock), NULL);
pthread_mutexattr_init(&lock_attributes);
pthread_mutexattr_setpshared(&lock_attributes, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&(socket->__instruction_write_lock), &lock_attributes);
pthread_mutex_init(&(socket->__buffer_lock), &lock_attributes);
/* No handlers yet */ /* No handlers yet */
socket->read_handler = NULL; socket->read_handler = NULL;
@ -184,12 +189,28 @@ void guac_socket_instruction_begin(guac_socket* socket) {
void guac_socket_instruction_end(guac_socket* socket) { void guac_socket_instruction_end(guac_socket* socket) {
/* Lock writes if threadsafety enabled */ /* Unlock writes if threadsafety enabled */
if (socket->__threadsafe_instructions) if (socket->__threadsafe_instructions)
pthread_mutex_unlock(&(socket->__instruction_write_lock)); pthread_mutex_unlock(&(socket->__instruction_write_lock));
} }
void guac_socket_update_buffer_begin(guac_socket* socket) {
/* Lock if threadsafety enabled */
if (socket->__threadsafe_instructions)
pthread_mutex_lock(&(socket->__buffer_lock));
}
void guac_socket_update_buffer_end(guac_socket* socket) {
/* Unlock if threadsafety enabled */
if (socket->__threadsafe_instructions)
pthread_mutex_unlock(&(socket->__buffer_lock));
}
void guac_socket_free(guac_socket* socket) { void guac_socket_free(guac_socket* socket) {
/* Call free handler if defined */ /* Call free handler if defined */
@ -214,6 +235,8 @@ ssize_t guac_socket_write_string(guac_socket* socket, const char* str) {
char* __out_buf = socket->__out_buf; char* __out_buf = socket->__out_buf;
guac_socket_update_buffer_begin(socket);
for (; *str != '\0'; str++) { for (; *str != '\0'; str++) {
__out_buf[socket->__written++] = *str; __out_buf[socket->__written++] = *str;
@ -221,10 +244,12 @@ ssize_t guac_socket_write_string(guac_socket* socket, const char* str) {
/* Flush when necessary, return on error. Note that we must flush within 4 bytes of boundary because /* Flush when necessary, return on error. Note that we must flush within 4 bytes of boundary because
* __guac_socket_write_base64_triplet ALWAYS writes four bytes, and would otherwise potentially overflow * __guac_socket_write_base64_triplet ALWAYS writes four bytes, and would otherwise potentially overflow
* the buffer. */ * the buffer. */
if (socket->__written >= GUAC_SOCKET_OUTPUT_BUFFER_SIZE - 4) { if (socket->__written > GUAC_SOCKET_OUTPUT_BUFFER_SIZE - 4) {
if (guac_socket_write(socket, __out_buf, socket->__written)) if (guac_socket_write(socket, __out_buf, socket->__written)) {
guac_socket_update_buffer_end(socket);
return 1; return 1;
}
socket->__written = 0; socket->__written = 0;
@ -232,6 +257,7 @@ ssize_t guac_socket_write_string(guac_socket* socket, const char* str) {
} }
guac_socket_update_buffer_end(socket);
return 0; return 0;
} }
@ -264,7 +290,7 @@ ssize_t __guac_socket_write_base64_triplet(guac_socket* socket, int a, int b, in
/* At this point, 4 bytes have been socket->__written */ /* At this point, 4 bytes have been socket->__written */
/* Flush when necessary, return on error */ /* Flush when necessary, return on error */
if (socket->__written >= GUAC_SOCKET_OUTPUT_BUFFER_SIZE - 4) { if (socket->__written > GUAC_SOCKET_OUTPUT_BUFFER_SIZE - 4) {
if (guac_socket_write(socket, __out_buf, socket->__written)) if (guac_socket_write(socket, __out_buf, socket->__written))
return -1; return -1;
@ -309,14 +335,18 @@ ssize_t guac_socket_write_base64(guac_socket* socket, const void* buf, size_t co
const unsigned char* char_buf = (const unsigned char*) buf; const unsigned char* char_buf = (const unsigned char*) buf;
const unsigned char* end = char_buf + count; const unsigned char* end = char_buf + count;
guac_socket_update_buffer_begin(socket);
while (char_buf < end) { while (char_buf < end) {
retval = __guac_socket_write_base64_byte(socket, *(char_buf++)); retval = __guac_socket_write_base64_byte(socket, *(char_buf++));
if (retval < 0) if (retval < 0) {
guac_socket_update_buffer_end(socket);
return retval; return retval;
}
} }
guac_socket_update_buffer_end(socket);
return 0; return 0;
} }
@ -324,14 +354,18 @@ ssize_t guac_socket_write_base64(guac_socket* socket, const void* buf, size_t co
ssize_t guac_socket_flush(guac_socket* socket) { ssize_t guac_socket_flush(guac_socket* socket) {
/* Flush remaining bytes in buffer */ /* Flush remaining bytes in buffer */
guac_socket_update_buffer_begin(socket);
if (socket->__written > 0) { if (socket->__written > 0) {
if (guac_socket_write(socket, socket->__out_buf, socket->__written)) if (guac_socket_write(socket, socket->__out_buf, socket->__written)) {
guac_socket_update_buffer_end(socket);
return 1; return 1;
}
socket->__written = 0; socket->__written = 0;
} }
guac_socket_update_buffer_end(socket);
return 0; return 0;
} }
@ -341,12 +375,18 @@ ssize_t guac_socket_flush_base64(guac_socket* socket) {
int retval; int retval;
/* Flush triplet to output buffer */ /* Flush triplet to output buffer */
guac_socket_update_buffer_begin(socket);
while (socket->__ready > 0) { while (socket->__ready > 0) {
retval = __guac_socket_write_base64_byte(socket, -1); retval = __guac_socket_write_base64_byte(socket, -1);
if (retval < 0) if (retval < 0) {
guac_socket_update_buffer_end(socket);
return retval; return retval;
} }
}
guac_socket_update_buffer_end(socket);
return 0; return 0;
} }