diff --git a/src/protocols/kubernetes/Makefile.am b/src/protocols/kubernetes/Makefile.am index 9e50feb6..e818ff72 100644 --- a/src/protocols/kubernetes/Makefile.am +++ b/src/protocols/kubernetes/Makefile.am @@ -26,6 +26,7 @@ libguac_client_kubernetes_la_SOURCES = \ client.c \ clipboard.c \ input.c \ + io.c \ pipe.c \ settings.c \ kubernetes.c \ @@ -36,6 +37,7 @@ noinst_HEADERS = \ client.h \ clipboard.h \ input.h \ + io.h \ pipe.h \ settings.h \ kubernetes.h \ diff --git a/src/protocols/kubernetes/io.c b/src/protocols/kubernetes/io.c new file mode 100644 index 00000000..bfa37b1d --- /dev/null +++ b/src/protocols/kubernetes/io.c @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "kubernetes.h" +#include "terminal/terminal.h" + +#include +#include + +#include +#include +#include + +void guac_kubernetes_receive_data(guac_client* client, + const char* buffer, size_t length) { + + guac_kubernetes_client* kubernetes_client = + (guac_kubernetes_client*) client->data; + + /* Strip channel index from beginning of buffer */ + int channel = *(buffer++); + length--; + + switch (channel) { + + /* Write STDOUT / STDERR directly to terminal as output */ + case GUAC_KUBERNETES_CHANNEL_STDOUT: + case GUAC_KUBERNETES_CHANNEL_STDERR: + guac_terminal_write(kubernetes_client->term, buffer, length); + break; + + /* Ignore data on other channels */ + default: + guac_client_log(client, GUAC_LOG_DEBUG, "Received %i bytes along " + "channel %i.", length, channel); + + } + +} + +void guac_kubernetes_send_message(guac_client* client, + int channel, const char* data, int length) { + + guac_kubernetes_client* kubernetes_client = + (guac_kubernetes_client*) client->data; + + pthread_mutex_lock(&(kubernetes_client->outbound_message_lock)); + + /* Add message to buffer if space is available */ + if (kubernetes_client->outbound_messages_waiting + < GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES) { + + /* Calculate storage position of next message */ + int index = (kubernetes_client->outbound_messages_top + + kubernetes_client->outbound_messages_waiting) + % GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES; + + /* Obtain pointer to message slot at calculated position */ + guac_kubernetes_message* message = + &(kubernetes_client->outbound_messages[index]); + + /* Copy details of message into buffer */ + message->channel = channel; + memcpy(message->data, data, length); + message->length = length; + + /* One more message is now waiting */ + kubernetes_client->outbound_messages_waiting++; + + /* Notify libwebsockets that we need a callback to send pending + * messages */ + lws_callback_on_writable(kubernetes_client->wsi); + lws_cancel_service(kubernetes_client->context); + + } + + /* Warn if data has to be dropped */ + else + guac_client_log(client, GUAC_LOG_WARNING, "Send buffer could not be " + "flushed in time to handle additional data. Outbound " + "message dropped."); + + pthread_mutex_unlock(&(kubernetes_client->outbound_message_lock)); + +} + +bool guac_kubernetes_write_pending_message(guac_client* client) { + + bool messages_remain; + guac_kubernetes_client* kubernetes_client = + (guac_kubernetes_client*) client->data; + + pthread_mutex_lock(&(kubernetes_client->outbound_message_lock)); + + /* Send one message from top of buffer */ + if (kubernetes_client->outbound_messages_waiting > 0) { + + /* Obtain pointer to message at top */ + int top = kubernetes_client->outbound_messages_top; + guac_kubernetes_message* message = + &(kubernetes_client->outbound_messages[top]); + + /* Write message including channel index */ + lws_write(kubernetes_client->wsi, + ((unsigned char*) message) + LWS_PRE, + message->length + 1, LWS_WRITE_BINARY); + + /* Advance top to next message */ + kubernetes_client->outbound_messages_top++; + kubernetes_client->outbound_messages_top %= + GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES; + + /* One less message is waiting */ + kubernetes_client->outbound_messages_waiting--; + + } + + /* Record whether messages remained at time of completion */ + messages_remain = (kubernetes_client->outbound_messages_waiting > 0); + + pthread_mutex_unlock(&(kubernetes_client->outbound_message_lock)); + + return messages_remain; + +} + + diff --git a/src/protocols/kubernetes/io.h b/src/protocols/kubernetes/io.h new file mode 100644 index 00000000..40f2c69a --- /dev/null +++ b/src/protocols/kubernetes/io.h @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef GUAC_KUBERNETES_IO_H +#define GUAC_KUBERNETES_IO_H + +#include +#include + +#include +#include + +/** + * The maximum amount of data to include in any particular WebSocket message + * to Kubernetes. This excludes the storage space required for the channel + * index. + */ +#define GUAC_KUBERNETES_MAX_MESSAGE_SIZE 1024 + +/** + * The index of the Kubernetes channel used for STDIN. + */ +#define GUAC_KUBERNETES_CHANNEL_STDIN 0 + +/** + * The index of the Kubernetes channel used for STDOUT. + */ +#define GUAC_KUBERNETES_CHANNEL_STDOUT 1 + +/** + * The index of the Kubernetes channel used for STDERR. + */ +#define GUAC_KUBERNETES_CHANNEL_STDERR 2 + +/** + * The index of the Kubernetes channel used for terminal resize messages. + */ +#define GUAC_KUBERNETES_CHANNEL_RESIZE 4 + +/** + * An outbound message to be received by Kubernetes over WebSocket. + */ +typedef struct guac_kubernetes_message { + + /** + * lws_write() requires leading padding of LWS_PRE bytes to provide + * scratch space for WebSocket framing. + */ + uint8_t _padding[LWS_PRE]; + + /** + * The index of the channel receiving the data, such as + * GUAC_KUBERNETES_CHANNEL_STDIN. + */ + uint8_t channel; + + /** + * The data that should be sent to Kubernetes (along with the channel + * index). + */ + char data[GUAC_KUBERNETES_MAX_MESSAGE_SIZE]; + + /** + * The length of the data to be sent, excluding the channel index. + */ + int length; + +} guac_kubernetes_message; + + +/** + * Handles data received from Kubernetes over WebSocket, decoding the channel + * index of the received data and forwarding that data accordingly. + * + * @param client + * The guac_client associated with the connection to Kubernetes. + * + * @param buffer + * The data received from Kubernetes. + * + * @param length + * The size of the data received from Kubernetes, in bytes. + */ +void guac_kubernetes_receive_data(guac_client* client, + const char* buffer, size_t length); + +/** + * Requests that the given data be sent along the given channel to the + * Kubernetes server when the WebSocket connection is next available for + * writing. If the WebSocket connection has not been available for writing for + * long enough that the outbound message buffer is full, the request to send + * this particular message will be dropped. + * + * @param client + * The guac_client associated with the Kubernetes connection. + * + * @param channel + * The Kubernetes channel on which to send the message, + * such as GUAC_KUBERNETES_CHANNEL_STDIN. + * + * @param data + * A buffer containing the data to send. + * + * @param length + * The number of bytes to send. + */ +void guac_kubernetes_send_message(guac_client* client, + int channel, const char* data, int length); + +/** + * Writes the oldest pending message within the outbound message queue, + * as scheduled with guac_kubernetes_send_message(), removing that message + * from the queue. This function MAY NOT be invoked outside the libwebsockets + * event callback and MUST only be invoked in the context of a + * LWS_CALLBACK_CLIENT_WRITEABLE event. If no messages are pending, this + * function has no effect. + * + * @param client + * The guac_client associated with the Kubernetes connection. + * + * @return + * true if messages still remain to be written within the outbound message + * queue, false otherwise. + */ +bool guac_kubernetes_write_pending_message(guac_client* client); + +#endif + diff --git a/src/protocols/kubernetes/kubernetes.c b/src/protocols/kubernetes/kubernetes.c index 3644d6ec..4e7928ed 100644 --- a/src/protocols/kubernetes/kubernetes.c +++ b/src/protocols/kubernetes/kubernetes.c @@ -18,8 +18,9 @@ */ #include "config.h" -#include "kubernetes.h" #include "common/recording.h" +#include "io.h" +#include "kubernetes.h" #include "terminal/terminal.h" #include "url.h" @@ -30,168 +31,6 @@ #include #include #include -#include - -/** - * Handles data received from Kubernetes over WebSocket, decoding the channel - * index of the received data and forwarding that data accordingly. - * - * @param client - * The guac_client associated with the connection to Kubernetes. - * - * @param buffer - * The data received from Kubernetes. - * - * @param length - * The size of the data received from Kubernetes, in bytes. - */ -static void guac_kubernetes_receive_data(guac_client* client, - const char* buffer, size_t length) { - - guac_kubernetes_client* kubernetes_client = - (guac_kubernetes_client*) client->data; - - /* Strip channel index from beginning of buffer */ - int channel = *(buffer++); - length--; - - switch (channel) { - - /* Write STDOUT / STDERR directly to terminal as output */ - case GUAC_KUBERNETES_CHANNEL_STDOUT: - case GUAC_KUBERNETES_CHANNEL_STDERR: - guac_terminal_write(kubernetes_client->term, buffer, length); - break; - - /* Ignore data on other channels */ - default: - guac_client_log(client, GUAC_LOG_DEBUG, "Received %i bytes along " - "channel %i.", length, channel); - - } - -} - -/** - * Requests that the given data be sent along the given channel to the - * Kubernetes server when the WebSocket connection is next available for - * writing. If the WebSocket connection has not been available for writing for - * long enough that the outbound message buffer is full, the request to send - * this particular message will be dropped. - * - * @param client - * The guac_client associated with the Kubernetes connection. - * - * @param channel - * The Kubernetes channel on which to send the message, - * such as GUAC_KUBERNETES_CHANNEL_STDIN. - * - * @param data - * A buffer containing the data to send. - * - * @param length - * The number of bytes to send. - */ -static void guac_kubernetes_send_message(guac_client* client, - int channel, const char* data, int length) { - - guac_kubernetes_client* kubernetes_client = - (guac_kubernetes_client*) client->data; - - pthread_mutex_lock(&(kubernetes_client->outbound_message_lock)); - - /* Add message to buffer if space is available */ - if (kubernetes_client->outbound_messages_waiting - < GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES) { - - /* Calculate storage position of next message */ - int index = (kubernetes_client->outbound_messages_top - + kubernetes_client->outbound_messages_waiting) - % GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES; - - /* Obtain pointer to message slot at calculated position */ - guac_kubernetes_message* message = - &(kubernetes_client->outbound_messages[index]); - - /* Copy details of message into buffer */ - message->channel = channel; - memcpy(message->data, data, length); - message->length = length; - - /* One more message is now waiting */ - kubernetes_client->outbound_messages_waiting++; - - /* Notify libwebsockets that we need a callback to send pending - * messages */ - lws_callback_on_writable(kubernetes_client->wsi); - lws_cancel_service(kubernetes_client->context); - - } - - /* Warn if data has to be dropped */ - else - guac_client_log(client, GUAC_LOG_WARNING, "Send buffer could not be " - "flushed in time to handle additional data. Outbound " - "message dropped."); - - pthread_mutex_unlock(&(kubernetes_client->outbound_message_lock)); - -} - -/** - * Writes the oldest pending message within the outbound message queue, - * as scheduled with guac_kubernetes_send_message(), removing that message - * from the queue. This function MAY NOT be invoked outside the libwebsockets - * event callback and MUST only be invoked in the context of a - * LWS_CALLBACK_CLIENT_WRITEABLE event. If no messages are pending, this - * function has no effect. - * - * @param client - * The guac_client associated with the Kubernetes connection. - * - * @return - * true if messages still remain to be written within the outbound message - * queue, false otherwise. - */ -static bool guac_kubernetes_write_pending_message(guac_client* client) { - - bool messages_remain; - guac_kubernetes_client* kubernetes_client = - (guac_kubernetes_client*) client->data; - - pthread_mutex_lock(&(kubernetes_client->outbound_message_lock)); - - /* Send one message from top of buffer */ - if (kubernetes_client->outbound_messages_waiting > 0) { - - /* Obtain pointer to message at top */ - int top = kubernetes_client->outbound_messages_top; - guac_kubernetes_message* message = - &(kubernetes_client->outbound_messages[top]); - - /* Write message including channel index */ - lws_write(kubernetes_client->wsi, - ((unsigned char*) message) + LWS_PRE, - message->length + 1, LWS_WRITE_BINARY); - - /* Advance top to next message */ - kubernetes_client->outbound_messages_top++; - kubernetes_client->outbound_messages_top %= - GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES; - - /* One less message is waiting */ - kubernetes_client->outbound_messages_waiting--; - - } - - /* Record whether messages remained at time of completion */ - messages_remain = (kubernetes_client->outbound_messages_waiting > 0); - - pthread_mutex_unlock(&(kubernetes_client->outbound_message_lock)); - - return messages_remain; - -} /** * Callback invoked by libwebsockets for events related to a WebSocket being diff --git a/src/protocols/kubernetes/kubernetes.h b/src/protocols/kubernetes/kubernetes.h index 8c9a25e7..c37ca4cf 100644 --- a/src/protocols/kubernetes/kubernetes.h +++ b/src/protocols/kubernetes/kubernetes.h @@ -22,6 +22,7 @@ #include "common/clipboard.h" #include "common/recording.h" +#include "io.h" #include "settings.h" #include "terminal/terminal.h" @@ -29,7 +30,6 @@ #include #include -#include /** * The name of the WebSocket protocol specific to Kubernetes which should be @@ -37,33 +37,6 @@ */ #define GUAC_KUBERNETES_LWS_PROTOCOL "v4.channel.k8s.io" -/** - * The index of the Kubernetes channel used for STDIN. - */ -#define GUAC_KUBERNETES_CHANNEL_STDIN 0 - -/** - * The index of the Kubernetes channel used for STDOUT. - */ -#define GUAC_KUBERNETES_CHANNEL_STDOUT 1 - -/** - * The index of the Kubernetes channel used for STDERR. - */ -#define GUAC_KUBERNETES_CHANNEL_STDERR 2 - -/** - * The index of the Kubernetes channel used for terminal resize messages. - */ -#define GUAC_KUBERNETES_CHANNEL_RESIZE 4 - -/** - * The maximum amount of data to include in any particular WebSocket message - * to Kubernetes. This excludes the storage space required for the channel - * index. - */ -#define GUAC_KUBERNETES_MAX_MESSAGE_SIZE 1024 - /** * The maximum number of messages to allow within the outbound message buffer. * If messages are sent despite the buffer being full, those messages will be @@ -77,36 +50,6 @@ */ #define GUAC_KUBERNETES_SERVICE_INTERVAL 1000 -/** - * An outbound message to be received by Kubernetes over WebSocket. - */ -typedef struct guac_kubernetes_message { - - /** - * lws_write() requires leading padding of LWS_PRE bytes to provide - * scratch space for WebSocket framing. - */ - uint8_t _padding[LWS_PRE]; - - /** - * The index of the channel receiving the data, such as - * GUAC_KUBERNETES_CHANNEL_STDIN. - */ - uint8_t channel; - - /** - * The data that should be sent to Kubernetes (along with the channel - * index). - */ - char data[GUAC_KUBERNETES_MAX_MESSAGE_SIZE]; - - /** - * The length of the data to be sent, excluding the channel index. - */ - int length; - -} guac_kubernetes_message; - /** * Kubernetes-specific client data. */