From f35517b3ff42f268e20f84282bc93130fa86e98d Mon Sep 17 00:00:00 2001 From: Michael Jumper Date: Mon, 10 Sep 2018 01:26:13 -0700 Subject: [PATCH] GUACAMOLE-623: Add outbound message buffer. --- src/protocols/kubernetes/client.c | 13 +----- src/protocols/kubernetes/kubernetes.c | 31 ++++++++++---- src/protocols/kubernetes/kubernetes.h | 58 +++++++++++++++++++++++++++ 3 files changed, 84 insertions(+), 18 deletions(-) diff --git a/src/protocols/kubernetes/client.c b/src/protocols/kubernetes/client.c index 1b5d175d..77aa6473 100644 --- a/src/protocols/kubernetes/client.c +++ b/src/protocols/kubernetes/client.c @@ -67,17 +67,8 @@ int guac_kubernetes_client_free_handler(guac_client* client) { guac_kubernetes_client* kubernetes_client = (guac_kubernetes_client*) client->data; - /* Clean up recording, if in progress */ - if (kubernetes_client->recording != NULL) - guac_common_recording_free(kubernetes_client->recording); - - /* Kill terminal */ - guac_terminal_free(kubernetes_client->term); - - /* TODO: Wait for and free WebSocket session, if connected */ - /*if (kubernetes_client->websocket != NULL) { - pthread_join(kubernetes_client->client_thread, NULL); - }*/ + /* Wait client thread to terminate */ + pthread_join(kubernetes_client->client_thread, NULL); /* Free settings */ if (kubernetes_client->settings != NULL) diff --git a/src/protocols/kubernetes/kubernetes.c b/src/protocols/kubernetes/kubernetes.c index e38d5e37..62fb6ee7 100644 --- a/src/protocols/kubernetes/kubernetes.c +++ b/src/protocols/kubernetes/kubernetes.c @@ -206,6 +206,8 @@ static void* guac_kubernetes_input_thread(void* data) { void* guac_kubernetes_client_thread(void* data) { + struct lws_context* context = NULL; + guac_client* client = (guac_client*) data; guac_kubernetes_client* kubernetes_client = (guac_kubernetes_client*) client->data; @@ -236,7 +238,7 @@ void* guac_kubernetes_client_thread(void* data) { if (kubernetes_client->term == NULL) { guac_client_abort(client, GUAC_PROTOCOL_STATUS_SERVER_ERROR, "Terminal initialization failed"); - return NULL; + goto fail; } /* Set up typescript, if requested */ @@ -295,11 +297,11 @@ void* guac_kubernetes_client_thread(void* data) { } /* Create libwebsockets context */ - struct lws_context* context = lws_create_context(&context_info); + context = lws_create_context(&context_info); if (!context) { guac_client_abort(client, GUAC_PROTOCOL_STATUS_SERVER_ERROR, "Initialization of libwebsockets failed"); - return NULL; + goto fail; } /* FIXME: Generate path dynamically */ @@ -311,13 +313,16 @@ void* guac_kubernetes_client_thread(void* data) { if (kubernetes_client->wsi == NULL) { guac_client_abort(client, GUAC_PROTOCOL_STATUS_SERVER_ERROR, "Connection via libwebsockets failed"); - return NULL; + goto fail; } + /* Init outbound message buffer */ + pthread_mutex_init(&(kubernetes_client->outbound_message_lock), NULL); + /* Start input thread */ if (pthread_create(&(input_thread), NULL, guac_kubernetes_input_thread, (void*) client)) { guac_client_abort(client, GUAC_PROTOCOL_STATUS_SERVER_ERROR, "Unable to start input thread"); - return NULL; + goto fail; } /* As long as client is connected, continue polling libwebsockets */ @@ -330,11 +335,23 @@ void* guac_kubernetes_client_thread(void* data) { } /* Kill client and Wait for input thread to die */ + guac_terminal_stop(kubernetes_client->term); guac_client_stop(client); pthread_join(input_thread, NULL); - /* All done with libwebsockets */ - lws_context_destroy(context); +fail: + + /* Kill and free terminal, if allocated */ + if (kubernetes_client->term != NULL) + guac_terminal_free(kubernetes_client->term); + + /* Clean up recording, if in progress */ + if (kubernetes_client->recording != NULL) + guac_common_recording_free(kubernetes_client->recording); + + /* Free WebSocket context if successfully allocated */ + if (context != NULL) + lws_context_destroy(context); guac_client_log(client, GUAC_LOG_INFO, "Kubernetes connection ended."); return NULL; diff --git a/src/protocols/kubernetes/kubernetes.h b/src/protocols/kubernetes/kubernetes.h index 89b6678f..8fb917db 100644 --- a/src/protocols/kubernetes/kubernetes.h +++ b/src/protocols/kubernetes/kubernetes.h @@ -55,6 +55,37 @@ */ #define GUAC_KUBERNETES_CHANNEL_RESIZE 4 +/** + * The maximum number of messages to allow within the outbound message buffer. + * If messages are sent despite the buffer being full, those messages will be + * dropped. + */ +#define GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES 8 + +/** + * An outbound message to be received by Kubernetes over WebSocket. + */ +typedef struct guac_kubernetes_message { + + /** + * The index of the channel receiving the data, such as + * GUAC_KUBERNETES_CHANNEL_STDIN. + */ + uint8_t channel; + + /** + * The data that should be sent to Kubernetes (along with the channel + * index). + */ + char data[1024]; + + /** + * The length of the data to be sent, excluding the channel index. + */ + int length; + +} guac_kubernetes_message; + /** * Kubernetes-specific client data. */ @@ -70,6 +101,33 @@ typedef struct guac_kubernetes_client { */ struct lws* wsi; + /** + * Outbound message ring buffer for outbound WebSocket messages. As + * libwebsockets uses an event loop for all operations, outbound messages + * may be sent only in context of a particular event received via a + * callback. Until that event is received, pending data must accumulate in + * a buffer. + */ + guac_kubernetes_message outbound_messages[GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES]; + + /** + * The number of messages currently waiting in the outbound message + * buffer. + */ + int outbound_messages_waiting; + + /** + * The index of the oldest entry in the outbound message buffer. Newer + * messages follow this entry. + */ + int outbound_messages_top; + + /** + * Lock which is acquired when the outbound message buffer is being read + * or manipulated. + */ + pthread_mutex_t outbound_message_lock; + /** * The Kubernetes client thread. */