GUACAMOLE-623: Add outbound message buffer.

This commit is contained in:
Michael Jumper 2018-09-10 01:26:13 -07:00
parent cbe593503f
commit f35517b3ff
3 changed files with 84 additions and 18 deletions

View File

@ -67,17 +67,8 @@ int guac_kubernetes_client_free_handler(guac_client* client) {
guac_kubernetes_client* kubernetes_client = guac_kubernetes_client* kubernetes_client =
(guac_kubernetes_client*) client->data; (guac_kubernetes_client*) client->data;
/* Clean up recording, if in progress */ /* Wait client thread to terminate */
if (kubernetes_client->recording != NULL)
guac_common_recording_free(kubernetes_client->recording);
/* Kill terminal */
guac_terminal_free(kubernetes_client->term);
/* TODO: Wait for and free WebSocket session, if connected */
/*if (kubernetes_client->websocket != NULL) {
pthread_join(kubernetes_client->client_thread, NULL); pthread_join(kubernetes_client->client_thread, NULL);
}*/
/* Free settings */ /* Free settings */
if (kubernetes_client->settings != NULL) if (kubernetes_client->settings != NULL)

View File

@ -206,6 +206,8 @@ static void* guac_kubernetes_input_thread(void* data) {
void* guac_kubernetes_client_thread(void* data) { void* guac_kubernetes_client_thread(void* data) {
struct lws_context* context = NULL;
guac_client* client = (guac_client*) data; guac_client* client = (guac_client*) data;
guac_kubernetes_client* kubernetes_client = guac_kubernetes_client* kubernetes_client =
(guac_kubernetes_client*) client->data; (guac_kubernetes_client*) client->data;
@ -236,7 +238,7 @@ void* guac_kubernetes_client_thread(void* data) {
if (kubernetes_client->term == NULL) { if (kubernetes_client->term == NULL) {
guac_client_abort(client, GUAC_PROTOCOL_STATUS_SERVER_ERROR, guac_client_abort(client, GUAC_PROTOCOL_STATUS_SERVER_ERROR,
"Terminal initialization failed"); "Terminal initialization failed");
return NULL; goto fail;
} }
/* Set up typescript, if requested */ /* Set up typescript, if requested */
@ -295,11 +297,11 @@ void* guac_kubernetes_client_thread(void* data) {
} }
/* Create libwebsockets context */ /* Create libwebsockets context */
struct lws_context* context = lws_create_context(&context_info); context = lws_create_context(&context_info);
if (!context) { if (!context) {
guac_client_abort(client, GUAC_PROTOCOL_STATUS_SERVER_ERROR, guac_client_abort(client, GUAC_PROTOCOL_STATUS_SERVER_ERROR,
"Initialization of libwebsockets failed"); "Initialization of libwebsockets failed");
return NULL; goto fail;
} }
/* FIXME: Generate path dynamically */ /* FIXME: Generate path dynamically */
@ -311,13 +313,16 @@ void* guac_kubernetes_client_thread(void* data) {
if (kubernetes_client->wsi == NULL) { if (kubernetes_client->wsi == NULL) {
guac_client_abort(client, GUAC_PROTOCOL_STATUS_SERVER_ERROR, guac_client_abort(client, GUAC_PROTOCOL_STATUS_SERVER_ERROR,
"Connection via libwebsockets failed"); "Connection via libwebsockets failed");
return NULL; goto fail;
} }
/* Init outbound message buffer */
pthread_mutex_init(&(kubernetes_client->outbound_message_lock), NULL);
/* Start input thread */ /* Start input thread */
if (pthread_create(&(input_thread), NULL, guac_kubernetes_input_thread, (void*) client)) { if (pthread_create(&(input_thread), NULL, guac_kubernetes_input_thread, (void*) client)) {
guac_client_abort(client, GUAC_PROTOCOL_STATUS_SERVER_ERROR, "Unable to start input thread"); guac_client_abort(client, GUAC_PROTOCOL_STATUS_SERVER_ERROR, "Unable to start input thread");
return NULL; goto fail;
} }
/* As long as client is connected, continue polling libwebsockets */ /* As long as client is connected, continue polling libwebsockets */
@ -330,10 +335,22 @@ void* guac_kubernetes_client_thread(void* data) {
} }
/* Kill client and Wait for input thread to die */ /* Kill client and Wait for input thread to die */
guac_terminal_stop(kubernetes_client->term);
guac_client_stop(client); guac_client_stop(client);
pthread_join(input_thread, NULL); pthread_join(input_thread, NULL);
/* All done with libwebsockets */ fail:
/* Kill and free terminal, if allocated */
if (kubernetes_client->term != NULL)
guac_terminal_free(kubernetes_client->term);
/* Clean up recording, if in progress */
if (kubernetes_client->recording != NULL)
guac_common_recording_free(kubernetes_client->recording);
/* Free WebSocket context if successfully allocated */
if (context != NULL)
lws_context_destroy(context); lws_context_destroy(context);
guac_client_log(client, GUAC_LOG_INFO, "Kubernetes connection ended."); guac_client_log(client, GUAC_LOG_INFO, "Kubernetes connection ended.");

View File

@ -55,6 +55,37 @@
*/ */
#define GUAC_KUBERNETES_CHANNEL_RESIZE 4 #define GUAC_KUBERNETES_CHANNEL_RESIZE 4
/**
* The maximum number of messages to allow within the outbound message buffer.
* If messages are sent despite the buffer being full, those messages will be
* dropped.
*/
#define GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES 8
/**
* An outbound message to be received by Kubernetes over WebSocket.
*/
typedef struct guac_kubernetes_message {
/**
* The index of the channel receiving the data, such as
* GUAC_KUBERNETES_CHANNEL_STDIN.
*/
uint8_t channel;
/**
* The data that should be sent to Kubernetes (along with the channel
* index).
*/
char data[1024];
/**
* The length of the data to be sent, excluding the channel index.
*/
int length;
} guac_kubernetes_message;
/** /**
* Kubernetes-specific client data. * Kubernetes-specific client data.
*/ */
@ -70,6 +101,33 @@ typedef struct guac_kubernetes_client {
*/ */
struct lws* wsi; struct lws* wsi;
/**
* Outbound message ring buffer for outbound WebSocket messages. As
* libwebsockets uses an event loop for all operations, outbound messages
* may be sent only in context of a particular event received via a
* callback. Until that event is received, pending data must accumulate in
* a buffer.
*/
guac_kubernetes_message outbound_messages[GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES];
/**
* The number of messages currently waiting in the outbound message
* buffer.
*/
int outbound_messages_waiting;
/**
* The index of the oldest entry in the outbound message buffer. Newer
* messages follow this entry.
*/
int outbound_messages_top;
/**
* Lock which is acquired when the outbound message buffer is being read
* or manipulated.
*/
pthread_mutex_t outbound_message_lock;
/** /**
* The Kubernetes client thread. * The Kubernetes client thread.
*/ */