GUACAMOLE-623: Send typed data to Kubernetes via the STDIN channel.
This commit is contained in:
parent
f35517b3ff
commit
b7c938c239
@ -77,6 +77,127 @@ static void guac_kubernetes_receive_data(guac_client* client,
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Requests that the given data be sent along the given channel to the
|
||||||
|
* Kubernetes server when the WebSocket connection is next available for
|
||||||
|
* writing. If the WebSocket connection has not been available for writing for
|
||||||
|
* long enough that the outbound message buffer is full, the request to send
|
||||||
|
* this particular message will be dropped.
|
||||||
|
*
|
||||||
|
* @param client
|
||||||
|
* The guac_client associated with the Kubernetes connection.
|
||||||
|
*
|
||||||
|
* @param channel
|
||||||
|
* The Kubernetes channel on which to send the message,
|
||||||
|
* such as GUAC_KUBERNETES_CHANNEL_STDIN.
|
||||||
|
*
|
||||||
|
* @param data
|
||||||
|
* A buffer containing the data to send.
|
||||||
|
*
|
||||||
|
* @param length
|
||||||
|
* The number of bytes to send.
|
||||||
|
*/
|
||||||
|
static void guac_kubernetes_send_message(guac_client* client,
|
||||||
|
int channel, const char* data, int length) {
|
||||||
|
|
||||||
|
guac_kubernetes_client* kubernetes_client =
|
||||||
|
(guac_kubernetes_client*) client->data;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&(kubernetes_client->outbound_message_lock));
|
||||||
|
|
||||||
|
/* Add message to buffer if space is available */
|
||||||
|
if (kubernetes_client->outbound_messages_waiting
|
||||||
|
< GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES) {
|
||||||
|
|
||||||
|
/* Calculate storage position of next message */
|
||||||
|
int index = (kubernetes_client->outbound_messages_top
|
||||||
|
+ kubernetes_client->outbound_messages_waiting)
|
||||||
|
% GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES;
|
||||||
|
|
||||||
|
/* Obtain pointer to message slot at calculated position */
|
||||||
|
guac_kubernetes_message* message =
|
||||||
|
&(kubernetes_client->outbound_messages[index]);
|
||||||
|
|
||||||
|
/* Copy details of message into buffer */
|
||||||
|
message->channel = channel;
|
||||||
|
memcpy(message->data, data, length);
|
||||||
|
message->length = length;
|
||||||
|
|
||||||
|
/* One more message is now waiting */
|
||||||
|
kubernetes_client->outbound_messages_waiting++;
|
||||||
|
|
||||||
|
/* Notify libwebsockets that we need a callback to send pending
|
||||||
|
* messages */
|
||||||
|
lws_callback_on_writable(kubernetes_client->wsi);
|
||||||
|
lws_cancel_service(kubernetes_client->context);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Warn if data has to be dropped */
|
||||||
|
else
|
||||||
|
guac_client_log(client, GUAC_LOG_WARNING, "Send buffer could not be "
|
||||||
|
"flushed in time to handle additional data. Outbound "
|
||||||
|
"message dropped.");
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&(kubernetes_client->outbound_message_lock));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Writes the oldest pending message within the outbound message queue,
|
||||||
|
* as scheduled with guac_kubernetes_send_message(), removing that message
|
||||||
|
* from the queue. This function MAY NOT be invoked outside the libwebsockets
|
||||||
|
* event callback and MUST only be invoked in the context of a
|
||||||
|
* LWS_CALLBACK_CLIENT_WRITEABLE event. If no messages are pending, this
|
||||||
|
* function has no effect.
|
||||||
|
*
|
||||||
|
* @param client
|
||||||
|
* The guac_client associated with the Kubernetes connection.
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
* true if messages still remain to be written within the outbound message
|
||||||
|
* queue, false otherwise.
|
||||||
|
*/
|
||||||
|
static bool guac_kubernetes_write_pending_message(guac_client* client) {
|
||||||
|
|
||||||
|
bool messages_remain;
|
||||||
|
guac_kubernetes_client* kubernetes_client =
|
||||||
|
(guac_kubernetes_client*) client->data;
|
||||||
|
|
||||||
|
pthread_mutex_lock(&(kubernetes_client->outbound_message_lock));
|
||||||
|
|
||||||
|
/* Send one message from top of buffer */
|
||||||
|
if (kubernetes_client->outbound_messages_waiting > 0) {
|
||||||
|
|
||||||
|
/* Obtain pointer to message at top */
|
||||||
|
int top = kubernetes_client->outbound_messages_top;
|
||||||
|
guac_kubernetes_message* message =
|
||||||
|
&(kubernetes_client->outbound_messages[top]);
|
||||||
|
|
||||||
|
/* Write message including channel index */
|
||||||
|
lws_write(kubernetes_client->wsi,
|
||||||
|
((unsigned char*) message) + LWS_PRE,
|
||||||
|
message->length + 1, LWS_WRITE_BINARY);
|
||||||
|
|
||||||
|
/* Advance top to next message */
|
||||||
|
kubernetes_client->outbound_messages_top++;
|
||||||
|
kubernetes_client->outbound_messages_top %=
|
||||||
|
GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES;
|
||||||
|
|
||||||
|
/* One less message is waiting */
|
||||||
|
kubernetes_client->outbound_messages_waiting--;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Record whether messages remained at time of completion */
|
||||||
|
messages_remain = (kubernetes_client->outbound_messages_waiting > 0);
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&(kubernetes_client->outbound_message_lock));
|
||||||
|
|
||||||
|
return messages_remain;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Callback invoked by libwebsockets for events related to a WebSocket being
|
* Callback invoked by libwebsockets for events related to a WebSocket being
|
||||||
* used for communicating with an attached Kubernetes pod.
|
* used for communicating with an attached Kubernetes pod.
|
||||||
@ -132,8 +253,14 @@ static int guac_kubernetes_lws_callback(struct lws* wsi,
|
|||||||
guac_kubernetes_receive_data(client, (const char*) in, length);
|
guac_kubernetes_receive_data(client, (const char*) in, length);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
/* TODO: Only send data here. Request callback for writing via lws_callback_on_writable(some struct lws*) */
|
/* WebSocket is ready for writing */
|
||||||
case LWS_CALLBACK_CLIENT_WRITEABLE:
|
case LWS_CALLBACK_CLIENT_WRITEABLE:
|
||||||
|
|
||||||
|
/* Send any pending messages, requesting another callback if
|
||||||
|
* yet more messages remain */
|
||||||
|
if (guac_kubernetes_write_pending_message(client))
|
||||||
|
lws_callback_on_writable(wsi);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
/* TODO: Add configure test */
|
/* TODO: Add configure test */
|
||||||
@ -189,14 +316,15 @@ static void* guac_kubernetes_input_thread(void* data) {
|
|||||||
guac_kubernetes_client* kubernetes_client =
|
guac_kubernetes_client* kubernetes_client =
|
||||||
(guac_kubernetes_client*) client->data;
|
(guac_kubernetes_client*) client->data;
|
||||||
|
|
||||||
char buffer[8192];
|
char buffer[GUAC_KUBERNETES_MAX_MESSAGE_SIZE];
|
||||||
int bytes_read;
|
int bytes_read;
|
||||||
|
|
||||||
/* Write all data read */
|
/* Write all data read */
|
||||||
while ((bytes_read = guac_terminal_read_stdin(kubernetes_client->term, buffer, sizeof(buffer))) > 0) {
|
while ((bytes_read = guac_terminal_read_stdin(kubernetes_client->term, buffer, sizeof(buffer))) > 0) {
|
||||||
|
|
||||||
/* TODO: Send to Kubernetes */
|
/* Send received data to Kubernetes along STDIN channel */
|
||||||
guac_terminal_write(kubernetes_client->term, buffer, bytes_read);
|
guac_kubernetes_send_message(client, GUAC_KUBERNETES_CHANNEL_STDIN,
|
||||||
|
buffer, bytes_read);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -206,8 +334,6 @@ static void* guac_kubernetes_input_thread(void* data) {
|
|||||||
|
|
||||||
void* guac_kubernetes_client_thread(void* data) {
|
void* guac_kubernetes_client_thread(void* data) {
|
||||||
|
|
||||||
struct lws_context* context = NULL;
|
|
||||||
|
|
||||||
guac_client* client = (guac_client*) data;
|
guac_client* client = (guac_client*) data;
|
||||||
guac_kubernetes_client* kubernetes_client =
|
guac_kubernetes_client* kubernetes_client =
|
||||||
(guac_kubernetes_client*) client->data;
|
(guac_kubernetes_client*) client->data;
|
||||||
@ -297,15 +423,15 @@ void* guac_kubernetes_client_thread(void* data) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Create libwebsockets context */
|
/* Create libwebsockets context */
|
||||||
context = lws_create_context(&context_info);
|
kubernetes_client->context = lws_create_context(&context_info);
|
||||||
if (!context) {
|
if (!kubernetes_client->context) {
|
||||||
guac_client_abort(client, GUAC_PROTOCOL_STATUS_SERVER_ERROR,
|
guac_client_abort(client, GUAC_PROTOCOL_STATUS_SERVER_ERROR,
|
||||||
"Initialization of libwebsockets failed");
|
"Initialization of libwebsockets failed");
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* FIXME: Generate path dynamically */
|
/* FIXME: Generate path dynamically */
|
||||||
connection_info.context = context;
|
connection_info.context = kubernetes_client->context;
|
||||||
connection_info.path = "/api/v1/namespaces/default/pods/my-shell-68974bb7f7-rpjgr/attach?container=my-shell&stdin=true&stdout=true&tty=true";
|
connection_info.path = "/api/v1/namespaces/default/pods/my-shell-68974bb7f7-rpjgr/attach?container=my-shell&stdin=true&stdout=true&tty=true";
|
||||||
|
|
||||||
/* Open WebSocket connection to Kubernetes */
|
/* Open WebSocket connection to Kubernetes */
|
||||||
@ -329,7 +455,8 @@ void* guac_kubernetes_client_thread(void* data) {
|
|||||||
while (client->state == GUAC_CLIENT_RUNNING) {
|
while (client->state == GUAC_CLIENT_RUNNING) {
|
||||||
|
|
||||||
/* Cease polling libwebsockets if an error condition is signalled */
|
/* Cease polling libwebsockets if an error condition is signalled */
|
||||||
if (lws_service(context, 1000) < 0)
|
if (lws_service(kubernetes_client->context,
|
||||||
|
GUAC_KUBERNETES_SERVICE_INTERVAL) < 0)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -350,8 +477,8 @@ fail:
|
|||||||
guac_common_recording_free(kubernetes_client->recording);
|
guac_common_recording_free(kubernetes_client->recording);
|
||||||
|
|
||||||
/* Free WebSocket context if successfully allocated */
|
/* Free WebSocket context if successfully allocated */
|
||||||
if (context != NULL)
|
if (kubernetes_client->context != NULL)
|
||||||
lws_context_destroy(context);
|
lws_context_destroy(kubernetes_client->context);
|
||||||
|
|
||||||
guac_client_log(client, GUAC_LOG_INFO, "Kubernetes connection ended.");
|
guac_client_log(client, GUAC_LOG_INFO, "Kubernetes connection ended.");
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -55,6 +55,13 @@
|
|||||||
*/
|
*/
|
||||||
#define GUAC_KUBERNETES_CHANNEL_RESIZE 4
|
#define GUAC_KUBERNETES_CHANNEL_RESIZE 4
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The maximum amount of data to include in any particular WebSocket message
|
||||||
|
* to Kubernetes. This excludes the storage space required for the channel
|
||||||
|
* index.
|
||||||
|
*/
|
||||||
|
#define GUAC_KUBERNETES_MAX_MESSAGE_SIZE 1024
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The maximum number of messages to allow within the outbound message buffer.
|
* The maximum number of messages to allow within the outbound message buffer.
|
||||||
* If messages are sent despite the buffer being full, those messages will be
|
* If messages are sent despite the buffer being full, those messages will be
|
||||||
@ -62,11 +69,23 @@
|
|||||||
*/
|
*/
|
||||||
#define GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES 8
|
#define GUAC_KUBERNETES_MAX_OUTBOUND_MESSAGES 8
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The maximum number of milliseconds to wait for a libwebsockets event to
|
||||||
|
* occur before entering another iteration of the libwebsockets event loop.
|
||||||
|
*/
|
||||||
|
#define GUAC_KUBERNETES_SERVICE_INTERVAL 1000
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An outbound message to be received by Kubernetes over WebSocket.
|
* An outbound message to be received by Kubernetes over WebSocket.
|
||||||
*/
|
*/
|
||||||
typedef struct guac_kubernetes_message {
|
typedef struct guac_kubernetes_message {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* lws_write() requires leading padding of LWS_PRE bytes to provide
|
||||||
|
* scratch space for WebSocket framing.
|
||||||
|
*/
|
||||||
|
uint8_t _padding[LWS_PRE];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The index of the channel receiving the data, such as
|
* The index of the channel receiving the data, such as
|
||||||
* GUAC_KUBERNETES_CHANNEL_STDIN.
|
* GUAC_KUBERNETES_CHANNEL_STDIN.
|
||||||
@ -77,7 +96,7 @@ typedef struct guac_kubernetes_message {
|
|||||||
* The data that should be sent to Kubernetes (along with the channel
|
* The data that should be sent to Kubernetes (along with the channel
|
||||||
* index).
|
* index).
|
||||||
*/
|
*/
|
||||||
char data[1024];
|
char data[GUAC_KUBERNETES_MAX_MESSAGE_SIZE];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The length of the data to be sent, excluding the channel index.
|
* The length of the data to be sent, excluding the channel index.
|
||||||
@ -96,6 +115,11 @@ typedef struct guac_kubernetes_client {
|
|||||||
*/
|
*/
|
||||||
guac_kubernetes_settings* settings;
|
guac_kubernetes_settings* settings;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The libwebsockets context associated with the connected WebSocket.
|
||||||
|
*/
|
||||||
|
struct lws_context* context;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The connected WebSocket.
|
* The connected WebSocket.
|
||||||
*/
|
*/
|
||||||
|
Loading…
Reference in New Issue
Block a user