GUACAMOLE-1201: Throttle outbound audio data to avoid overflowing RDP server audio input buffer.

The RDP specification for the AUDIO_INPUT channel requires that all
audio be sent in packets of a specific size. Guacamole does correctly
limit itself to sending packets of this size to the RDP server, but will
send quite a few of these packets all at once if it has received more
audio data than the RDP packet size. This is OK in principle (the
Guacamole client should be able to send audio in packets of whatever
size it chooses), but may overwhelm the software running within the RDP
server if the amount of data received exceeds the available buffer
space, resulting in dropped samples.

As there is no way to know the size of the remote audio buffer, we need
to instead ensure that audio is streamed as close to real time as
possible, with each audio packet of N bytes not being sent until roughly
the amount of time represented by those N bytes has elapsed since the
last packet. This throttling ensures that software expecting to process
audio in real time should never run out of buffer space.

That said, if we never exceed the per-packet data rate and occasionally
send a packet earlier than real time would dictate, unavoidable latency
in sending/receiving audio data would accumulate over time. For example,
if each audio packet represents 10ms of audio data, but we receive that
audio packet 10.1ms after the previous packet, we need to adjust the
timing of the next audio packet(s) to account for that additional 0.1ms.
Simply waiting 10ms after sending each packet would cause that 0.1ms to
accumulate each time it occurs, eventually resulting in noticable
latency and finally running out of buffer space.

Thus, these changes:

1. Leverage a flush thread and per-packet scheduling to ensure that each
   flushed audio packet does not exceed the equivalent real time rate.
2. Calculate the amount of additional latency from the amount of data
   received beyond the required packet size, and amortize scheduling
   corrections to account for that latency over the next several audio
   packets.

This ensures that audio is streamed exactly as it is received if the
audio matches the packet size of the RDP server, and audio that is
received in a different size or varying sizes is buffered and throttled
to keep things within the expectations of software running within the
RDP server.
This commit is contained in:
Michael Jumper 2021-04-28 18:32:39 -07:00
parent bf6922b31e
commit 189d8fab30
2 changed files with 348 additions and 20 deletions

View File

@ -24,17 +24,253 @@
#include <guacamole/protocol.h> #include <guacamole/protocol.h>
#include <guacamole/socket.h> #include <guacamole/socket.h>
#include <guacamole/stream.h> #include <guacamole/stream.h>
#include <guacamole/timestamp.h>
#include <guacamole/user.h> #include <guacamole/user.h>
#include <assert.h> #include <assert.h>
#include <errno.h>
#include <pthread.h> #include <pthread.h>
#include <stdint.h> #include <stdint.h>
#include <stdlib.h> #include <stdlib.h>
#include <time.h>
/**
* The number of nanoseconds in one second.
*/
#define NANOS_PER_SECOND 1000000000L
/**
* Returns whether the given timespec represents a point in time in the future
* relative to the current system time.
*
* @param ts
* The timespec to test.
*
* @return
* Non-zero if the given timespec is in the future relative to the current
* system time, zero otherwise.
*/
static int guac_rdp_audio_buffer_is_future(const struct timespec* ts) {
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
if (now.tv_sec != ts->tv_sec)
return now.tv_sec < ts->tv_sec;
return now.tv_nsec < ts->tv_nsec;
}
/**
* Returns whether the given audio buffer may be flushed. An audio buffer may
* be flushed if the audio buffer is not currently being freed, at least one
* packet of audio data is available within the buffer, and flushing the next
* packet of audio data now would not violate scheduling/throttling rules for
* outbound audio data.
*
* IMPORTANT: The guac_rdp_audio_buffer's lock MUST already be held when
* invoking this function.
*
* @param audio_buffer
* The guac_rdp_audio_buffer to test.
*
* @return
* Non-zero if the given audio buffer may be flushed, zero if the audio
* buffer cannot be flushed for any reason.
*/
static int guac_rdp_audio_buffer_may_flush(guac_rdp_audio_buffer* audio_buffer) {
return !audio_buffer->stopping
&& audio_buffer->packet_size > 0
&& audio_buffer->bytes_written >= audio_buffer->packet_size
&& !guac_rdp_audio_buffer_is_future(&audio_buffer->next_flush);
}
/**
* Returns the duration of the given quantity of audio data in milliseconds.
*
* @param format
* The format of the audio data in question.
*
* @param length
* The number of bytes of audio data.
*
* @return
* The duration of the audio data in milliseconds.
*/
static int guac_rdp_audio_buffer_duration(const guac_rdp_audio_format* format, int length) {
return length * 1000 / format->rate / format->bps / format->channels;
}
/**
* Returns the number of bytes required to store audio data in the given format
* covering the given length of time.
*
* @param format
* The format of the audio data in question.
*
* @param duration
* The duration of the audio data in milliseconds.
*
* @return
* The number of bytes required to store audio data in the given format
* covering the given length of time.
*/
static int guac_rdp_audio_buffer_length(const guac_rdp_audio_format* format, int duration) {
return duration * format->rate * format->bps * format->channels / 1000;
}
/**
* Notifies the given guac_rdp_audio_buffer that a single packet of audio data
* has just been flushed, updating the scheduled time of the next flush. The
* timing of the next flush will be set such that the overall real time audio
* generation rate is not exceeded, but will be adjusted as necessary to
* compensate for latency induced by differences in audio packet size/duration.
*
* IMPORTANT: The guac_rdp_audio_buffer's lock MUST already be held when
* invoking this function.
*
* @param audio_buffer
* The guac_rdp_audio_buffer to update.
*/
static void guac_rdp_audio_buffer_schedule_flush(guac_rdp_audio_buffer* audio_buffer) {
struct timespec next_flush;
clock_gettime(CLOCK_REALTIME, &next_flush);
/* Calculate the point in time that the next flush would be allowed,
* assuming that the remote server processes data no faster than
* real time */
uint64_t delta_nsecs = audio_buffer->packet_size * NANOS_PER_SECOND
/ audio_buffer->out_format.rate
/ audio_buffer->out_format.bps
/ audio_buffer->out_format.channels;
/* Amortize the additional latency from packet data buffered beyond the
* desired packet size over each remaining packet such that we gradually
* approach an effective additional latency of 0 */
int packets_remaining = audio_buffer->bytes_written / audio_buffer->packet_size;
if (packets_remaining > 1)
delta_nsecs = delta_nsecs * (packets_remaining - 1) / packets_remaining;
uint64_t nsecs = next_flush.tv_nsec + delta_nsecs;
next_flush.tv_sec += nsecs / NANOS_PER_SECOND;
next_flush.tv_nsec = nsecs % NANOS_PER_SECOND;
audio_buffer->next_flush = next_flush;
}
/**
* Waits for additional data to be available for flush within the given audio
* buffer. If data is available but insufficient time has elapsed since the
* last flush, this function may block until sufficient time has elapsed. If
* the state of the audio buffer changes in any way while waiting for
* additional data, or if the audio buffer is being freed, this function will
* return immediately.
*
* It is the responsibility of the caller to check the state of the audio
* buffer after this function returns to verify whether the desired state
* change has occurred and re-invoke the function if needed.
*
* @param audio_buffer
* The guac_rdp_audio_buffer to wait for.
*/
static void guac_rdp_audio_buffer_wait(guac_rdp_audio_buffer* audio_buffer) {
pthread_mutex_lock(&(audio_buffer->lock));
/* Do not wait if audio_buffer is already closed */
if (!audio_buffer->stopping) {
/* If sufficient data exists for a flush, wait until next possible
* flush OR until some other state change occurs (such as the buffer
* being closed) */
if (audio_buffer->bytes_written && audio_buffer->bytes_written >= audio_buffer->packet_size)
pthread_cond_timedwait(&audio_buffer->modified, &audio_buffer->lock,
&audio_buffer->next_flush);
/* If sufficient data DOES NOT exist, we should wait indefinitely */
else
pthread_cond_wait(&audio_buffer->modified, &audio_buffer->lock);
}
pthread_mutex_unlock(&(audio_buffer->lock));
}
/**
* Regularly and automatically flushes audio packets by invoking the flush
* handler of the associated audio buffer. Packets are scheduled automatically
* to avoid potentially exceeding the processing and buffering capabilities of
* the software running within the RDP server. Once started, this thread runs
* until the associated audio buffer is freed via guac_rdp_audio_buffer_free().
*
* @param data
* A pointer to the guac_rdp_audio_buffer that should be flushed.
*
* @return
* Always NULL.
*/
static void* guac_rdp_audio_buffer_flush_thread(void* data) {
guac_rdp_audio_buffer* audio_buffer = (guac_rdp_audio_buffer*) data;
while (!audio_buffer->stopping) {
pthread_mutex_lock(&(audio_buffer->lock));
if (!guac_rdp_audio_buffer_may_flush(audio_buffer)) {
pthread_mutex_unlock(&(audio_buffer->lock));
/* Wait for additional data if we aren't able to flush */
guac_rdp_audio_buffer_wait(audio_buffer);
/* We might still not be able to flush (buffer might be closed,
* some other state change might occur that isn't receipt of data,
* data might be received but not enough for a flush, etc.) */
continue;
}
guac_client_log(audio_buffer->client, GUAC_LOG_TRACE, "Current audio input latency: %i ms (%i bytes waiting in buffer)",
guac_rdp_audio_buffer_duration(&audio_buffer->out_format, audio_buffer->bytes_written),
audio_buffer->bytes_written);
/* Only actually invoke if defined */
if (audio_buffer->flush_handler) {
guac_rdp_audio_buffer_schedule_flush(audio_buffer);
audio_buffer->flush_handler(audio_buffer,
audio_buffer->packet_size);
}
/* Shift buffer back by one packet */
audio_buffer->bytes_written -= audio_buffer->packet_size;
memmove(audio_buffer->packet, audio_buffer->packet + audio_buffer->packet_size, audio_buffer->bytes_written);
pthread_cond_broadcast(&(audio_buffer->modified));
pthread_mutex_unlock(&(audio_buffer->lock));
}
return NULL;
}
guac_rdp_audio_buffer* guac_rdp_audio_buffer_alloc(guac_client* client) { guac_rdp_audio_buffer* guac_rdp_audio_buffer_alloc(guac_client* client) {
guac_rdp_audio_buffer* buffer = calloc(1, sizeof(guac_rdp_audio_buffer)); guac_rdp_audio_buffer* buffer = calloc(1, sizeof(guac_rdp_audio_buffer));
pthread_mutex_init(&(buffer->lock), NULL); pthread_mutex_init(&(buffer->lock), NULL);
pthread_cond_init(&(buffer->modified), NULL);
buffer->client = client; buffer->client = client;
/* Begin automated, throttled flush of future data */
pthread_create(&(buffer->flush_thread), NULL,
guac_rdp_audio_buffer_flush_thread, (void*) buffer);
return buffer; return buffer;
} }
@ -46,6 +282,9 @@ guac_rdp_audio_buffer* guac_rdp_audio_buffer_alloc(guac_client* client) {
* instruction nor been associated with an "ack" having an error code), and is * instruction nor been associated with an "ack" having an error code), and is
* associated with an active RDP AUDIO_INPUT channel. * associated with an active RDP AUDIO_INPUT channel.
* *
* IMPORTANT: The guac_rdp_audio_buffer's lock MUST already be held when
* invoking this function.
*
* @param audio_buffer * @param audio_buffer
* The audio buffer associated with the guac_stream for which the "ack" * The audio buffer associated with the guac_stream for which the "ack"
* instruction should be sent, if any. If there is no associated * instruction should be sent, if any. If there is no associated
@ -98,6 +337,7 @@ void guac_rdp_audio_buffer_set_stream(guac_rdp_audio_buffer* audio_buffer,
audio_buffer->in_format.rate, audio_buffer->in_format.rate,
audio_buffer->in_format.bps); audio_buffer->in_format.bps);
pthread_cond_broadcast(&(audio_buffer->modified));
pthread_mutex_unlock(&(audio_buffer->lock)); pthread_mutex_unlock(&(audio_buffer->lock));
} }
@ -112,6 +352,7 @@ void guac_rdp_audio_buffer_set_output(guac_rdp_audio_buffer* audio_buffer,
audio_buffer->out_format.channels = channels; audio_buffer->out_format.channels = channels;
audio_buffer->out_format.bps = bps; audio_buffer->out_format.bps = bps;
pthread_cond_broadcast(&(audio_buffer->modified));
pthread_mutex_unlock(&(audio_buffer->lock)); pthread_mutex_unlock(&(audio_buffer->lock));
} }
@ -132,14 +373,30 @@ void guac_rdp_audio_buffer_begin(guac_rdp_audio_buffer* audio_buffer,
* audio_buffer->out_format.channels * audio_buffer->out_format.channels
* audio_buffer->out_format.bps; * audio_buffer->out_format.bps;
/* Ensure outbound buffer includes enough space for at least 250ms of
* audio */
int ideal_size = guac_rdp_audio_buffer_length(&audio_buffer->out_format,
GUAC_RDP_AUDIO_BUFFER_MIN_DURATION);
/* Round up to nearest whole packet */
int ideal_packets = (ideal_size + audio_buffer->packet_size - 1) / audio_buffer->packet_size;
/* Allocate new buffer */ /* Allocate new buffer */
free(audio_buffer->packet); audio_buffer->packet_buffer_size = ideal_packets * audio_buffer->packet_size;
audio_buffer->packet = malloc(audio_buffer->packet_size); audio_buffer->packet = malloc(audio_buffer->packet_buffer_size);
guac_client_log(audio_buffer->client, GUAC_LOG_DEBUG, "Output buffer for "
"audio input is %i bytes (up to %i ms).", audio_buffer->packet_buffer_size,
guac_rdp_audio_buffer_duration(&audio_buffer->out_format, audio_buffer->packet_buffer_size));
/* Next flush can occur as soon as data is received */
clock_gettime(CLOCK_REALTIME, &audio_buffer->next_flush);
/* Acknowledge stream creation (if stream is ready to receive) */ /* Acknowledge stream creation (if stream is ready to receive) */
guac_rdp_audio_buffer_ack(audio_buffer, guac_rdp_audio_buffer_ack(audio_buffer,
"OK", GUAC_PROTOCOL_STATUS_SUCCESS); "OK", GUAC_PROTOCOL_STATUS_SUCCESS);
pthread_cond_broadcast(&(audio_buffer->modified));
pthread_mutex_unlock(&(audio_buffer->lock)); pthread_mutex_unlock(&(audio_buffer->lock));
} }
@ -152,6 +409,9 @@ void guac_rdp_audio_buffer_begin(guac_rdp_audio_buffer* audio_buffer,
* input and output formats, the number of bytes sent thus far, and the * input and output formats, the number of bytes sent thus far, and the
* number of bytes received (excluding the contents of the buffer). * number of bytes received (excluding the contents of the buffer).
* *
* IMPORTANT: The guac_rdp_audio_buffer's lock MUST already be held when
* invoking this function.
*
* @param audio_buffer * @param audio_buffer
* The audio buffer dictating the format of the given data buffer, as * The audio buffer dictating the format of the given data buffer, as
* well as the offset from which the sample should be read. * well as the offset from which the sample should be read.
@ -238,12 +498,26 @@ void guac_rdp_audio_buffer_write(guac_rdp_audio_buffer* audio_buffer,
pthread_mutex_lock(&(audio_buffer->lock)); pthread_mutex_lock(&(audio_buffer->lock));
guac_client_log(audio_buffer->client, GUAC_LOG_TRACE, "Received %i bytes (%i ms) of audio data",
length, guac_rdp_audio_buffer_duration(&audio_buffer->in_format, length));
/* Ignore packet if there is no buffer */ /* Ignore packet if there is no buffer */
if (audio_buffer->packet_size == 0 || audio_buffer->packet == NULL) { if (audio_buffer->packet_buffer_size == 0 || audio_buffer->packet == NULL) {
guac_client_log(audio_buffer->client, GUAC_LOG_DEBUG, "Dropped %i "
"bytes of received audio data (buffer full or closed).", length);
pthread_mutex_unlock(&(audio_buffer->lock)); pthread_mutex_unlock(&(audio_buffer->lock));
return; return;
} }
/* Truncate received samples if exceeding size of buffer */
int available = audio_buffer->packet_buffer_size - audio_buffer->bytes_written;
if (length > available) {
guac_client_log(audio_buffer->client, GUAC_LOG_DEBUG, "Truncating %i "
"bytes of received audio data to %i bytes (insufficient space "
"in buffer).", length, available);
length = available;
}
int out_bps = audio_buffer->out_format.bps; int out_bps = audio_buffer->out_format.bps;
/* Continuously write packets until no data remains */ /* Continuously write packets until no data remains */
@ -266,24 +540,12 @@ void guac_rdp_audio_buffer_write(guac_rdp_audio_buffer* audio_buffer,
audio_buffer->bytes_written += out_bps; audio_buffer->bytes_written += out_bps;
audio_buffer->total_bytes_sent += out_bps; audio_buffer->total_bytes_sent += out_bps;
/* Invoke flush handler if full */
if (audio_buffer->bytes_written == audio_buffer->packet_size) {
/* Only actually invoke if defined */
if (audio_buffer->flush_handler)
audio_buffer->flush_handler(audio_buffer,
audio_buffer->bytes_written);
/* Reset buffer in all cases */
audio_buffer->bytes_written = 0;
}
} /* end packet write loop */ } /* end packet write loop */
/* Track current position in audio stream */ /* Track current position in audio stream */
audio_buffer->total_bytes_received += length; audio_buffer->total_bytes_received += length;
pthread_cond_broadcast(&(audio_buffer->modified));
pthread_mutex_unlock(&(audio_buffer->lock)); pthread_mutex_unlock(&(audio_buffer->lock));
} }
@ -292,6 +554,12 @@ void guac_rdp_audio_buffer_end(guac_rdp_audio_buffer* audio_buffer) {
pthread_mutex_lock(&(audio_buffer->lock)); pthread_mutex_lock(&(audio_buffer->lock));
/* Ignore if stream is already closed */
if (audio_buffer->stream == NULL) {
pthread_mutex_unlock(&(audio_buffer->lock));
return;
}
/* The stream is now closed */ /* The stream is now closed */
guac_rdp_audio_buffer_ack(audio_buffer, guac_rdp_audio_buffer_ack(audio_buffer,
"CLOSED", GUAC_PROTOCOL_STATUS_RESOURCE_CLOSED); "CLOSED", GUAC_PROTOCOL_STATUS_RESOURCE_CLOSED);
@ -303,6 +571,7 @@ void guac_rdp_audio_buffer_end(guac_rdp_audio_buffer* audio_buffer) {
/* Reset buffer state */ /* Reset buffer state */
audio_buffer->bytes_written = 0; audio_buffer->bytes_written = 0;
audio_buffer->packet_size = 0; audio_buffer->packet_size = 0;
audio_buffer->packet_buffer_size = 0;
audio_buffer->flush_handler = NULL; audio_buffer->flush_handler = NULL;
/* Reset I/O counters */ /* Reset I/O counters */
@ -313,13 +582,27 @@ void guac_rdp_audio_buffer_end(guac_rdp_audio_buffer* audio_buffer) {
free(audio_buffer->packet); free(audio_buffer->packet);
audio_buffer->packet = NULL; audio_buffer->packet = NULL;
pthread_cond_broadcast(&(audio_buffer->modified));
pthread_mutex_unlock(&(audio_buffer->lock)); pthread_mutex_unlock(&(audio_buffer->lock));
} }
void guac_rdp_audio_buffer_free(guac_rdp_audio_buffer* audio_buffer) { void guac_rdp_audio_buffer_free(guac_rdp_audio_buffer* audio_buffer) {
guac_rdp_audio_buffer_end(audio_buffer);
/* Signal termination of flush thread */
pthread_mutex_lock(&(audio_buffer->lock));
audio_buffer->stopping = 1;
pthread_cond_broadcast(&(audio_buffer->modified));
pthread_mutex_unlock(&(audio_buffer->lock));
/* Clean up flush thread */
pthread_join(audio_buffer->flush_thread, NULL);
pthread_mutex_destroy(&(audio_buffer->lock)); pthread_mutex_destroy(&(audio_buffer->lock));
free(audio_buffer->packet); pthread_cond_destroy(&(audio_buffer->modified));
free(audio_buffer); free(audio_buffer);
} }

View File

@ -23,6 +23,16 @@
#include <guacamole/stream.h> #include <guacamole/stream.h>
#include <guacamole/user.h> #include <guacamole/user.h>
#include <pthread.h> #include <pthread.h>
#include <time.h>
/**
* The minimum number of milliseconds of audio data that each instance of
* guac_rdp_audio_buffer should provide storage for. This buffer space does not
* induce additional latency, but is required to compensate for latency and
* functions as an upper limit on the amount of latency the buffer will
* compensate for.
*/
#define GUAC_RDP_AUDIO_BUFFER_MIN_DURATION 250
/** /**
* A buffer of arbitrary audio data. Received audio data can be written to this * A buffer of arbitrary audio data. Received audio data can be written to this
@ -73,10 +83,18 @@ struct guac_rdp_audio_buffer {
/** /**
* Lock which is acquired/released to ensure accesses to the audio buffer * Lock which is acquired/released to ensure accesses to the audio buffer
* are atomic. * are atomic. This lock is also bound to the modified pthread_cond_t,
* which should be signalled whenever the audio buffer structure has been
* modified.
*/ */
pthread_mutex_t lock; pthread_mutex_t lock;
/**
* Condition which is signalled when any part of the audio buffer structure
* has been modified.
*/
pthread_cond_t modified;
/** /**
* The guac_client instance handling the relevant RDP connection. * The guac_client instance handling the relevant RDP connection.
*/ */
@ -84,13 +102,15 @@ struct guac_rdp_audio_buffer {
/** /**
* The user from which this audio buffer will receive data. If no user has * The user from which this audio buffer will receive data. If no user has
* yet opened an associated audio stream, this will be NULL. * yet opened an associated audio stream, or if the audio stream has been
* closed, this will be NULL.
*/ */
guac_user* user; guac_user* user;
/** /**
* The stream from which this audio buffer will receive data. If no user * The stream from which this audio buffer will receive data. If no user
* has yet opened an associated audio stream, this will be NULL. * has yet opened an associated audio stream, or if the audio stream has
* been closed, this will be NULL.
*/ */
guac_stream* stream; guac_stream* stream;
@ -114,6 +134,11 @@ struct guac_rdp_audio_buffer {
*/ */
int packet_size; int packet_size;
/**
* The total number of bytes available within the packet buffer.
*/
int packet_buffer_size;
/** /**
* The number of bytes currently stored within the packet buffer. * The number of bytes currently stored within the packet buffer.
*/ */
@ -136,6 +161,20 @@ struct guac_rdp_audio_buffer {
*/ */
char* packet; char* packet;
/**
* Thread which flushes the audio buffer at a rate that does not exceed the
* the audio sample rate (which might result in dropped samples due to
* overflow of the remote audio buffer).
*/
pthread_t flush_thread;
/**
* The absolute point in time that the next packet of audio data sould be
* flushed. Another packet of received data should not be flushed prior to
* this time.
*/
struct timespec next_flush;
/** /**
* Handler function which will be invoked when a full audio packet is * Handler function which will be invoked when a full audio packet is
* ready to be flushed to the AUDIO_INPUT channel, if defined. If NULL, * ready to be flushed to the AUDIO_INPUT channel, if defined. If NULL,
@ -143,6 +182,12 @@ struct guac_rdp_audio_buffer {
*/ */
guac_rdp_audio_buffer_flush_handler* flush_handler; guac_rdp_audio_buffer_flush_handler* flush_handler;
/**
* Whether guac_rdp_audio_buffer_free() has been invoked and the audio
* buffer is being cleaned up.
*/
int stopping;
/** /**
* Arbitrary data assigned by the AUDIO_INPUT plugin implementation. * Arbitrary data assigned by the AUDIO_INPUT plugin implementation.
*/ */