From b7c938c239f0c61b6122c817c6ce44309d61bd1f Mon Sep 17 00:00:00 2001 From: Michael Jumper Date: Mon, 10 Sep 2018 02:16:36 -0700 Subject: [PATCH] GUACAMOLE-623: Send typed data to Kubernetes via the STDIN channel. --- src/protocols/kubernetes/kubernetes.c | 151 ++++++++++++++++++++++++-- src/protocols/kubernetes/kubernetes.h | 26 ++++- 2 files changed, 164 insertions(+), 13 deletions(-) diff --git a/src/protocols/kubernetes/kubernetes.c b/src/protocols/kubernetes/kubernetes.c index 62fb6ee7..aadb448d 100644 --- a/src/protocols/kubernetes/kubernetes.c +++ b/src/protocols/kubernetes/kubernetes.c @@ -77,6 +77,127 @@ static void guac_kubernetes_receive_data(guac_client* client, } +/** + * Requests that the given data be sent along the given channel to the + * Kubernetes server when the WebSocket connection is next available for + * writing. If the WebSocket connection has not been available for writing for + * long enough that the outbound message buffer is full, the request to send + * this particular message will be dropped. + * + * @param client + * The guac_client associated with the Kubernetes connection. + * + * @param channel + * The Kubernetes channel on which to send the message, + * such as GUAC_KUBERNETES_CHANNEL_STDIN. + * + * @param data + * A buffer containing the data to send. + * + * @param length + * The number of bytes to send. + */ +static void guac_kubernetes_send_message(guac_client* client, + int channel, const char* data, int length) { + + guac_kubernetes_client* kubernetes_client = + (guac_kubernetes_client*) client->data; + + pthread_mutex_lock(&(kubernetes_client->outbound_message_lock)); + + /* Add message to buffer if space is available */ + if (kubernetes_client->outbound_messages_waiting + < GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES) { + + /* Calculate storage position of next message */ + int index = (kubernetes_client->outbound_messages_top + + kubernetes_client->outbound_messages_waiting) + % GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES; + + /* Obtain pointer to message slot at calculated position */ + guac_kubernetes_message* message = + &(kubernetes_client->outbound_messages[index]); + + /* Copy details of message into buffer */ + message->channel = channel; + memcpy(message->data, data, length); + message->length = length; + + /* One more message is now waiting */ + kubernetes_client->outbound_messages_waiting++; + + /* Notify libwebsockets that we need a callback to send pending + * messages */ + lws_callback_on_writable(kubernetes_client->wsi); + lws_cancel_service(kubernetes_client->context); + + } + + /* Warn if data has to be dropped */ + else + guac_client_log(client, GUAC_LOG_WARNING, "Send buffer could not be " + "flushed in time to handle additional data. Outbound " + "message dropped."); + + pthread_mutex_unlock(&(kubernetes_client->outbound_message_lock)); + +} + +/** + * Writes the oldest pending message within the outbound message queue, + * as scheduled with guac_kubernetes_send_message(), removing that message + * from the queue. This function MAY NOT be invoked outside the libwebsockets + * event callback and MUST only be invoked in the context of a + * LWS_CALLBACK_CLIENT_WRITEABLE event. If no messages are pending, this + * function has no effect. + * + * @param client + * The guac_client associated with the Kubernetes connection. + * + * @return + * true if messages still remain to be written within the outbound message + * queue, false otherwise. + */ +static bool guac_kubernetes_write_pending_message(guac_client* client) { + + bool messages_remain; + guac_kubernetes_client* kubernetes_client = + (guac_kubernetes_client*) client->data; + + pthread_mutex_lock(&(kubernetes_client->outbound_message_lock)); + + /* Send one message from top of buffer */ + if (kubernetes_client->outbound_messages_waiting > 0) { + + /* Obtain pointer to message at top */ + int top = kubernetes_client->outbound_messages_top; + guac_kubernetes_message* message = + &(kubernetes_client->outbound_messages[top]); + + /* Write message including channel index */ + lws_write(kubernetes_client->wsi, + ((unsigned char*) message) + LWS_PRE, + message->length + 1, LWS_WRITE_BINARY); + + /* Advance top to next message */ + kubernetes_client->outbound_messages_top++; + kubernetes_client->outbound_messages_top %= + GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES; + + /* One less message is waiting */ + kubernetes_client->outbound_messages_waiting--; + + } + + /* Record whether messages remained at time of completion */ + messages_remain = (kubernetes_client->outbound_messages_waiting > 0); + + pthread_mutex_unlock(&(kubernetes_client->outbound_message_lock)); + + return messages_remain; + +} + /** * Callback invoked by libwebsockets for events related to a WebSocket being * used for communicating with an attached Kubernetes pod. @@ -132,8 +253,14 @@ static int guac_kubernetes_lws_callback(struct lws* wsi, guac_kubernetes_receive_data(client, (const char*) in, length); break; - /* TODO: Only send data here. Request callback for writing via lws_callback_on_writable(some struct lws*) */ + /* WebSocket is ready for writing */ case LWS_CALLBACK_CLIENT_WRITEABLE: + + /* Send any pending messages, requesting another callback if + * yet more messages remain */ + if (guac_kubernetes_write_pending_message(client)) + lws_callback_on_writable(wsi); + break; /* TODO: Add configure test */ @@ -189,14 +316,15 @@ static void* guac_kubernetes_input_thread(void* data) { guac_kubernetes_client* kubernetes_client = (guac_kubernetes_client*) client->data; - char buffer[8192]; + char buffer[GUAC_KUBERNETES_MAX_MESSAGE_SIZE]; int bytes_read; /* Write all data read */ while ((bytes_read = guac_terminal_read_stdin(kubernetes_client->term, buffer, sizeof(buffer))) > 0) { - /* TODO: Send to Kubernetes */ - guac_terminal_write(kubernetes_client->term, buffer, bytes_read); + /* Send received data to Kubernetes along STDIN channel */ + guac_kubernetes_send_message(client, GUAC_KUBERNETES_CHANNEL_STDIN, + buffer, bytes_read); } @@ -206,8 +334,6 @@ static void* guac_kubernetes_input_thread(void* data) { void* guac_kubernetes_client_thread(void* data) { - struct lws_context* context = NULL; - guac_client* client = (guac_client*) data; guac_kubernetes_client* kubernetes_client = (guac_kubernetes_client*) client->data; @@ -297,15 +423,15 @@ void* guac_kubernetes_client_thread(void* data) { } /* Create libwebsockets context */ - context = lws_create_context(&context_info); - if (!context) { + kubernetes_client->context = lws_create_context(&context_info); + if (!kubernetes_client->context) { guac_client_abort(client, GUAC_PROTOCOL_STATUS_SERVER_ERROR, "Initialization of libwebsockets failed"); goto fail; } /* FIXME: Generate path dynamically */ - connection_info.context = context; + connection_info.context = kubernetes_client->context; connection_info.path = "/api/v1/namespaces/default/pods/my-shell-68974bb7f7-rpjgr/attach?container=my-shell&stdin=true&stdout=true&tty=true"; /* Open WebSocket connection to Kubernetes */ @@ -329,7 +455,8 @@ void* guac_kubernetes_client_thread(void* data) { while (client->state == GUAC_CLIENT_RUNNING) { /* Cease polling libwebsockets if an error condition is signalled */ - if (lws_service(context, 1000) < 0) + if (lws_service(kubernetes_client->context, + GUAC_KUBERNETES_SERVICE_INTERVAL) < 0) break; } @@ -350,8 +477,8 @@ fail: guac_common_recording_free(kubernetes_client->recording); /* Free WebSocket context if successfully allocated */ - if (context != NULL) - lws_context_destroy(context); + if (kubernetes_client->context != NULL) + lws_context_destroy(kubernetes_client->context); guac_client_log(client, GUAC_LOG_INFO, "Kubernetes connection ended."); return NULL; diff --git a/src/protocols/kubernetes/kubernetes.h b/src/protocols/kubernetes/kubernetes.h index 8fb917db..761a897c 100644 --- a/src/protocols/kubernetes/kubernetes.h +++ b/src/protocols/kubernetes/kubernetes.h @@ -55,6 +55,13 @@ */ #define GUAC_KUBERNETES_CHANNEL_RESIZE 4 +/** + * The maximum amount of data to include in any particular WebSocket message + * to Kubernetes. This excludes the storage space required for the channel + * index. + */ +#define GUAC_KUBERNETES_MAX_MESSAGE_SIZE 1024 + /** * The maximum number of messages to allow within the outbound message buffer. * If messages are sent despite the buffer being full, those messages will be @@ -62,11 +69,23 @@ */ #define GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES 8 +/** + * The maximum number of milliseconds to wait for a libwebsockets event to + * occur before entering another iteration of the libwebsockets event loop. + */ +#define GUAC_KUBERNETES_SERVICE_INTERVAL 1000 + /** * An outbound message to be received by Kubernetes over WebSocket. */ typedef struct guac_kubernetes_message { + /** + * lws_write() requires leading padding of LWS_PRE bytes to provide + * scratch space for WebSocket framing. + */ + uint8_t _padding[LWS_PRE]; + /** * The index of the channel receiving the data, such as * GUAC_KUBERNETES_CHANNEL_STDIN. @@ -77,7 +96,7 @@ typedef struct guac_kubernetes_message { * The data that should be sent to Kubernetes (along with the channel * index). */ - char data[1024]; + char data[GUAC_KUBERNETES_MAX_MESSAGE_SIZE]; /** * The length of the data to be sent, excluding the channel index. @@ -96,6 +115,11 @@ typedef struct guac_kubernetes_client { */ guac_kubernetes_settings* settings; + /** + * The libwebsockets context associated with the connected WebSocket. + */ + struct lws_context* context; + /** * The connected WebSocket. */