GUACAMOLE-1201: Merge throttle outbound audio data to avoid overflowing RDP server audio input buffer.

This commit is contained in:
Virtually Nick 2021-05-01 22:10:11 -04:00 committed by GitHub
commit a6a7e8ac26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 348 additions and 20 deletions

View File

@ -24,17 +24,253 @@
#include <guacamole/protocol.h> #include <guacamole/protocol.h>
#include <guacamole/socket.h> #include <guacamole/socket.h>
#include <guacamole/stream.h> #include <guacamole/stream.h>
#include <guacamole/timestamp.h>
#include <guacamole/user.h> #include <guacamole/user.h>
#include <assert.h> #include <assert.h>
#include <errno.h>
#include <pthread.h> #include <pthread.h>
#include <stdint.h> #include <stdint.h>
#include <stdlib.h> #include <stdlib.h>
#include <time.h>
/**
* The number of nanoseconds in one second.
*/
#define NANOS_PER_SECOND 1000000000L
/**
* Returns whether the given timespec represents a point in time in the future
* relative to the current system time.
*
* @param ts
* The timespec to test.
*
* @return
* Non-zero if the given timespec is in the future relative to the current
* system time, zero otherwise.
*/
static int guac_rdp_audio_buffer_is_future(const struct timespec* ts) {
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
if (now.tv_sec != ts->tv_sec)
return now.tv_sec < ts->tv_sec;
return now.tv_nsec < ts->tv_nsec;
}
/**
* Returns whether the given audio buffer may be flushed. An audio buffer may
* be flushed if the audio buffer is not currently being freed, at least one
* packet of audio data is available within the buffer, and flushing the next
* packet of audio data now would not violate scheduling/throttling rules for
* outbound audio data.
*
* IMPORTANT: The guac_rdp_audio_buffer's lock MUST already be held when
* invoking this function.
*
* @param audio_buffer
* The guac_rdp_audio_buffer to test.
*
* @return
* Non-zero if the given audio buffer may be flushed, zero if the audio
* buffer cannot be flushed for any reason.
*/
static int guac_rdp_audio_buffer_may_flush(guac_rdp_audio_buffer* audio_buffer) {
return !audio_buffer->stopping
&& audio_buffer->packet_size > 0
&& audio_buffer->bytes_written >= audio_buffer->packet_size
&& !guac_rdp_audio_buffer_is_future(&audio_buffer->next_flush);
}
/**
* Returns the duration of the given quantity of audio data in milliseconds.
*
* @param format
* The format of the audio data in question.
*
* @param length
* The number of bytes of audio data.
*
* @return
* The duration of the audio data in milliseconds.
*/
static int guac_rdp_audio_buffer_duration(const guac_rdp_audio_format* format, int length) {
return length * 1000 / format->rate / format->bps / format->channels;
}
/**
* Returns the number of bytes required to store audio data in the given format
* covering the given length of time.
*
* @param format
* The format of the audio data in question.
*
* @param duration
* The duration of the audio data in milliseconds.
*
* @return
* The number of bytes required to store audio data in the given format
* covering the given length of time.
*/
static int guac_rdp_audio_buffer_length(const guac_rdp_audio_format* format, int duration) {
return duration * format->rate * format->bps * format->channels / 1000;
}
/**
* Notifies the given guac_rdp_audio_buffer that a single packet of audio data
* has just been flushed, updating the scheduled time of the next flush. The
* timing of the next flush will be set such that the overall real time audio
* generation rate is not exceeded, but will be adjusted as necessary to
* compensate for latency induced by differences in audio packet size/duration.
*
* IMPORTANT: The guac_rdp_audio_buffer's lock MUST already be held when
* invoking this function.
*
* @param audio_buffer
* The guac_rdp_audio_buffer to update.
*/
static void guac_rdp_audio_buffer_schedule_flush(guac_rdp_audio_buffer* audio_buffer) {
struct timespec next_flush;
clock_gettime(CLOCK_REALTIME, &next_flush);
/* Calculate the point in time that the next flush would be allowed,
* assuming that the remote server processes data no faster than
* real time */
uint64_t delta_nsecs = audio_buffer->packet_size * NANOS_PER_SECOND
/ audio_buffer->out_format.rate
/ audio_buffer->out_format.bps
/ audio_buffer->out_format.channels;
/* Amortize the additional latency from packet data buffered beyond the
* desired packet size over each remaining packet such that we gradually
* approach an effective additional latency of 0 */
int packets_remaining = audio_buffer->bytes_written / audio_buffer->packet_size;
if (packets_remaining > 1)
delta_nsecs = delta_nsecs * (packets_remaining - 1) / packets_remaining;
uint64_t nsecs = next_flush.tv_nsec + delta_nsecs;
next_flush.tv_sec += nsecs / NANOS_PER_SECOND;
next_flush.tv_nsec = nsecs % NANOS_PER_SECOND;
audio_buffer->next_flush = next_flush;
}
/**
* Waits for additional data to be available for flush within the given audio
* buffer. If data is available but insufficient time has elapsed since the
* last flush, this function may block until sufficient time has elapsed. If
* the state of the audio buffer changes in any way while waiting for
* additional data, or if the audio buffer is being freed, this function will
* return immediately.
*
* It is the responsibility of the caller to check the state of the audio
* buffer after this function returns to verify whether the desired state
* change has occurred and re-invoke the function if needed.
*
* @param audio_buffer
* The guac_rdp_audio_buffer to wait for.
*/
static void guac_rdp_audio_buffer_wait(guac_rdp_audio_buffer* audio_buffer) {
pthread_mutex_lock(&(audio_buffer->lock));
/* Do not wait if audio_buffer is already closed */
if (!audio_buffer->stopping) {
/* If sufficient data exists for a flush, wait until next possible
* flush OR until some other state change occurs (such as the buffer
* being closed) */
if (audio_buffer->bytes_written && audio_buffer->bytes_written >= audio_buffer->packet_size)
pthread_cond_timedwait(&audio_buffer->modified, &audio_buffer->lock,
&audio_buffer->next_flush);
/* If sufficient data DOES NOT exist, we should wait indefinitely */
else
pthread_cond_wait(&audio_buffer->modified, &audio_buffer->lock);
}
pthread_mutex_unlock(&(audio_buffer->lock));
}
/**
* Regularly and automatically flushes audio packets by invoking the flush
* handler of the associated audio buffer. Packets are scheduled automatically
* to avoid potentially exceeding the processing and buffering capabilities of
* the software running within the RDP server. Once started, this thread runs
* until the associated audio buffer is freed via guac_rdp_audio_buffer_free().
*
* @param data
* A pointer to the guac_rdp_audio_buffer that should be flushed.
*
* @return
* Always NULL.
*/
static void* guac_rdp_audio_buffer_flush_thread(void* data) {
guac_rdp_audio_buffer* audio_buffer = (guac_rdp_audio_buffer*) data;
while (!audio_buffer->stopping) {
pthread_mutex_lock(&(audio_buffer->lock));
if (!guac_rdp_audio_buffer_may_flush(audio_buffer)) {
pthread_mutex_unlock(&(audio_buffer->lock));
/* Wait for additional data if we aren't able to flush */
guac_rdp_audio_buffer_wait(audio_buffer);
/* We might still not be able to flush (buffer might be closed,
* some other state change might occur that isn't receipt of data,
* data might be received but not enough for a flush, etc.) */
continue;
}
guac_client_log(audio_buffer->client, GUAC_LOG_TRACE, "Current audio input latency: %i ms (%i bytes waiting in buffer)",
guac_rdp_audio_buffer_duration(&audio_buffer->out_format, audio_buffer->bytes_written),
audio_buffer->bytes_written);
/* Only actually invoke if defined */
if (audio_buffer->flush_handler) {
guac_rdp_audio_buffer_schedule_flush(audio_buffer);
audio_buffer->flush_handler(audio_buffer,
audio_buffer->packet_size);
}
/* Shift buffer back by one packet */
audio_buffer->bytes_written -= audio_buffer->packet_size;
memmove(audio_buffer->packet, audio_buffer->packet + audio_buffer->packet_size, audio_buffer->bytes_written);
pthread_cond_broadcast(&(audio_buffer->modified));
pthread_mutex_unlock(&(audio_buffer->lock));
}
return NULL;
}
guac_rdp_audio_buffer* guac_rdp_audio_buffer_alloc(guac_client* client) { guac_rdp_audio_buffer* guac_rdp_audio_buffer_alloc(guac_client* client) {
guac_rdp_audio_buffer* buffer = calloc(1, sizeof(guac_rdp_audio_buffer)); guac_rdp_audio_buffer* buffer = calloc(1, sizeof(guac_rdp_audio_buffer));
pthread_mutex_init(&(buffer->lock), NULL); pthread_mutex_init(&(buffer->lock), NULL);
pthread_cond_init(&(buffer->modified), NULL);
buffer->client = client; buffer->client = client;
/* Begin automated, throttled flush of future data */
pthread_create(&(buffer->flush_thread), NULL,
guac_rdp_audio_buffer_flush_thread, (void*) buffer);
return buffer; return buffer;
} }
@ -46,6 +282,9 @@ guac_rdp_audio_buffer* guac_rdp_audio_buffer_alloc(guac_client* client) {
* instruction nor been associated with an "ack" having an error code), and is * instruction nor been associated with an "ack" having an error code), and is
* associated with an active RDP AUDIO_INPUT channel. * associated with an active RDP AUDIO_INPUT channel.
* *
* IMPORTANT: The guac_rdp_audio_buffer's lock MUST already be held when
* invoking this function.
*
* @param audio_buffer * @param audio_buffer
* The audio buffer associated with the guac_stream for which the "ack" * The audio buffer associated with the guac_stream for which the "ack"
* instruction should be sent, if any. If there is no associated * instruction should be sent, if any. If there is no associated
@ -98,6 +337,7 @@ void guac_rdp_audio_buffer_set_stream(guac_rdp_audio_buffer* audio_buffer,
audio_buffer->in_format.rate, audio_buffer->in_format.rate,
audio_buffer->in_format.bps); audio_buffer->in_format.bps);
pthread_cond_broadcast(&(audio_buffer->modified));
pthread_mutex_unlock(&(audio_buffer->lock)); pthread_mutex_unlock(&(audio_buffer->lock));
} }
@ -112,6 +352,7 @@ void guac_rdp_audio_buffer_set_output(guac_rdp_audio_buffer* audio_buffer,
audio_buffer->out_format.channels = channels; audio_buffer->out_format.channels = channels;
audio_buffer->out_format.bps = bps; audio_buffer->out_format.bps = bps;
pthread_cond_broadcast(&(audio_buffer->modified));
pthread_mutex_unlock(&(audio_buffer->lock)); pthread_mutex_unlock(&(audio_buffer->lock));
} }
@ -132,14 +373,30 @@ void guac_rdp_audio_buffer_begin(guac_rdp_audio_buffer* audio_buffer,
* audio_buffer->out_format.channels * audio_buffer->out_format.channels
* audio_buffer->out_format.bps; * audio_buffer->out_format.bps;
/* Ensure outbound buffer includes enough space for at least 250ms of
* audio */
int ideal_size = guac_rdp_audio_buffer_length(&audio_buffer->out_format,
GUAC_RDP_AUDIO_BUFFER_MIN_DURATION);
/* Round up to nearest whole packet */
int ideal_packets = (ideal_size + audio_buffer->packet_size - 1) / audio_buffer->packet_size;
/* Allocate new buffer */ /* Allocate new buffer */
free(audio_buffer->packet); audio_buffer->packet_buffer_size = ideal_packets * audio_buffer->packet_size;
audio_buffer->packet = malloc(audio_buffer->packet_size); audio_buffer->packet = malloc(audio_buffer->packet_buffer_size);
guac_client_log(audio_buffer->client, GUAC_LOG_DEBUG, "Output buffer for "
"audio input is %i bytes (up to %i ms).", audio_buffer->packet_buffer_size,
guac_rdp_audio_buffer_duration(&audio_buffer->out_format, audio_buffer->packet_buffer_size));
/* Next flush can occur as soon as data is received */
clock_gettime(CLOCK_REALTIME, &audio_buffer->next_flush);
/* Acknowledge stream creation (if stream is ready to receive) */ /* Acknowledge stream creation (if stream is ready to receive) */
guac_rdp_audio_buffer_ack(audio_buffer, guac_rdp_audio_buffer_ack(audio_buffer,
"OK", GUAC_PROTOCOL_STATUS_SUCCESS); "OK", GUAC_PROTOCOL_STATUS_SUCCESS);
pthread_cond_broadcast(&(audio_buffer->modified));
pthread_mutex_unlock(&(audio_buffer->lock)); pthread_mutex_unlock(&(audio_buffer->lock));
} }
@ -152,6 +409,9 @@ void guac_rdp_audio_buffer_begin(guac_rdp_audio_buffer* audio_buffer,
* input and output formats, the number of bytes sent thus far, and the * input and output formats, the number of bytes sent thus far, and the
* number of bytes received (excluding the contents of the buffer). * number of bytes received (excluding the contents of the buffer).
* *
* IMPORTANT: The guac_rdp_audio_buffer's lock MUST already be held when
* invoking this function.
*
* @param audio_buffer * @param audio_buffer
* The audio buffer dictating the format of the given data buffer, as * The audio buffer dictating the format of the given data buffer, as
* well as the offset from which the sample should be read. * well as the offset from which the sample should be read.
@ -238,12 +498,26 @@ void guac_rdp_audio_buffer_write(guac_rdp_audio_buffer* audio_buffer,
pthread_mutex_lock(&(audio_buffer->lock)); pthread_mutex_lock(&(audio_buffer->lock));
guac_client_log(audio_buffer->client, GUAC_LOG_TRACE, "Received %i bytes (%i ms) of audio data",
length, guac_rdp_audio_buffer_duration(&audio_buffer->in_format, length));
/* Ignore packet if there is no buffer */ /* Ignore packet if there is no buffer */
if (audio_buffer->packet_size == 0 || audio_buffer->packet == NULL) { if (audio_buffer->packet_buffer_size == 0 || audio_buffer->packet == NULL) {
guac_client_log(audio_buffer->client, GUAC_LOG_DEBUG, "Dropped %i "
"bytes of received audio data (buffer full or closed).", length);
pthread_mutex_unlock(&(audio_buffer->lock)); pthread_mutex_unlock(&(audio_buffer->lock));
return; return;
} }
/* Truncate received samples if exceeding size of buffer */
int available = audio_buffer->packet_buffer_size - audio_buffer->bytes_written;
if (length > available) {
guac_client_log(audio_buffer->client, GUAC_LOG_DEBUG, "Truncating %i "
"bytes of received audio data to %i bytes (insufficient space "
"in buffer).", length, available);
length = available;
}
int out_bps = audio_buffer->out_format.bps; int out_bps = audio_buffer->out_format.bps;
/* Continuously write packets until no data remains */ /* Continuously write packets until no data remains */
@ -266,24 +540,12 @@ void guac_rdp_audio_buffer_write(guac_rdp_audio_buffer* audio_buffer,
audio_buffer->bytes_written += out_bps; audio_buffer->bytes_written += out_bps;
audio_buffer->total_bytes_sent += out_bps; audio_buffer->total_bytes_sent += out_bps;
/* Invoke flush handler if full */
if (audio_buffer->bytes_written == audio_buffer->packet_size) {
/* Only actually invoke if defined */
if (audio_buffer->flush_handler)
audio_buffer->flush_handler(audio_buffer,
audio_buffer->bytes_written);
/* Reset buffer in all cases */
audio_buffer->bytes_written = 0;
}
} /* end packet write loop */ } /* end packet write loop */
/* Track current position in audio stream */ /* Track current position in audio stream */
audio_buffer->total_bytes_received += length; audio_buffer->total_bytes_received += length;
pthread_cond_broadcast(&(audio_buffer->modified));
pthread_mutex_unlock(&(audio_buffer->lock)); pthread_mutex_unlock(&(audio_buffer->lock));
} }
@ -292,6 +554,12 @@ void guac_rdp_audio_buffer_end(guac_rdp_audio_buffer* audio_buffer) {
pthread_mutex_lock(&(audio_buffer->lock)); pthread_mutex_lock(&(audio_buffer->lock));
/* Ignore if stream is already closed */
if (audio_buffer->stream == NULL) {
pthread_mutex_unlock(&(audio_buffer->lock));
return;
}
/* The stream is now closed */ /* The stream is now closed */
guac_rdp_audio_buffer_ack(audio_buffer, guac_rdp_audio_buffer_ack(audio_buffer,
"CLOSED", GUAC_PROTOCOL_STATUS_RESOURCE_CLOSED); "CLOSED", GUAC_PROTOCOL_STATUS_RESOURCE_CLOSED);
@ -303,6 +571,7 @@ void guac_rdp_audio_buffer_end(guac_rdp_audio_buffer* audio_buffer) {
/* Reset buffer state */ /* Reset buffer state */
audio_buffer->bytes_written = 0; audio_buffer->bytes_written = 0;
audio_buffer->packet_size = 0; audio_buffer->packet_size = 0;
audio_buffer->packet_buffer_size = 0;
audio_buffer->flush_handler = NULL; audio_buffer->flush_handler = NULL;
/* Reset I/O counters */ /* Reset I/O counters */
@ -313,13 +582,27 @@ void guac_rdp_audio_buffer_end(guac_rdp_audio_buffer* audio_buffer) {
free(audio_buffer->packet); free(audio_buffer->packet);
audio_buffer->packet = NULL; audio_buffer->packet = NULL;
pthread_cond_broadcast(&(audio_buffer->modified));
pthread_mutex_unlock(&(audio_buffer->lock)); pthread_mutex_unlock(&(audio_buffer->lock));
} }
void guac_rdp_audio_buffer_free(guac_rdp_audio_buffer* audio_buffer) { void guac_rdp_audio_buffer_free(guac_rdp_audio_buffer* audio_buffer) {
guac_rdp_audio_buffer_end(audio_buffer);
/* Signal termination of flush thread */
pthread_mutex_lock(&(audio_buffer->lock));
audio_buffer->stopping = 1;
pthread_cond_broadcast(&(audio_buffer->modified));
pthread_mutex_unlock(&(audio_buffer->lock));
/* Clean up flush thread */
pthread_join(audio_buffer->flush_thread, NULL);
pthread_mutex_destroy(&(audio_buffer->lock)); pthread_mutex_destroy(&(audio_buffer->lock));
free(audio_buffer->packet); pthread_cond_destroy(&(audio_buffer->modified));
free(audio_buffer); free(audio_buffer);
} }

View File

@ -23,6 +23,16 @@
#include <guacamole/stream.h> #include <guacamole/stream.h>
#include <guacamole/user.h> #include <guacamole/user.h>
#include <pthread.h> #include <pthread.h>
#include <time.h>
/**
* The minimum number of milliseconds of audio data that each instance of
* guac_rdp_audio_buffer should provide storage for. This buffer space does not
* induce additional latency, but is required to compensate for latency and
* functions as an upper limit on the amount of latency the buffer will
* compensate for.
*/
#define GUAC_RDP_AUDIO_BUFFER_MIN_DURATION 250
/** /**
* A buffer of arbitrary audio data. Received audio data can be written to this * A buffer of arbitrary audio data. Received audio data can be written to this
@ -73,10 +83,18 @@ struct guac_rdp_audio_buffer {
/** /**
* Lock which is acquired/released to ensure accesses to the audio buffer * Lock which is acquired/released to ensure accesses to the audio buffer
* are atomic. * are atomic. This lock is also bound to the modified pthread_cond_t,
* which should be signalled whenever the audio buffer structure has been
* modified.
*/ */
pthread_mutex_t lock; pthread_mutex_t lock;
/**
* Condition which is signalled when any part of the audio buffer structure
* has been modified.
*/
pthread_cond_t modified;
/** /**
* The guac_client instance handling the relevant RDP connection. * The guac_client instance handling the relevant RDP connection.
*/ */
@ -84,13 +102,15 @@ struct guac_rdp_audio_buffer {
/** /**
* The user from which this audio buffer will receive data. If no user has * The user from which this audio buffer will receive data. If no user has
* yet opened an associated audio stream, this will be NULL. * yet opened an associated audio stream, or if the audio stream has been
* closed, this will be NULL.
*/ */
guac_user* user; guac_user* user;
/** /**
* The stream from which this audio buffer will receive data. If no user * The stream from which this audio buffer will receive data. If no user
* has yet opened an associated audio stream, this will be NULL. * has yet opened an associated audio stream, or if the audio stream has
* been closed, this will be NULL.
*/ */
guac_stream* stream; guac_stream* stream;
@ -114,6 +134,11 @@ struct guac_rdp_audio_buffer {
*/ */
int packet_size; int packet_size;
/**
* The total number of bytes available within the packet buffer.
*/
int packet_buffer_size;
/** /**
* The number of bytes currently stored within the packet buffer. * The number of bytes currently stored within the packet buffer.
*/ */
@ -136,6 +161,20 @@ struct guac_rdp_audio_buffer {
*/ */
char* packet; char* packet;
/**
* Thread which flushes the audio buffer at a rate that does not exceed the
* the audio sample rate (which might result in dropped samples due to
* overflow of the remote audio buffer).
*/
pthread_t flush_thread;
/**
* The absolute point in time that the next packet of audio data sould be
* flushed. Another packet of received data should not be flushed prior to
* this time.
*/
struct timespec next_flush;
/** /**
* Handler function which will be invoked when a full audio packet is * Handler function which will be invoked when a full audio packet is
* ready to be flushed to the AUDIO_INPUT channel, if defined. If NULL, * ready to be flushed to the AUDIO_INPUT channel, if defined. If NULL,
@ -143,6 +182,12 @@ struct guac_rdp_audio_buffer {
*/ */
guac_rdp_audio_buffer_flush_handler* flush_handler; guac_rdp_audio_buffer_flush_handler* flush_handler;
/**
* Whether guac_rdp_audio_buffer_free() has been invoked and the audio
* buffer is being cleaned up.
*/
int stopping;
/** /**
* Arbitrary data assigned by the AUDIO_INPUT plugin implementation. * Arbitrary data assigned by the AUDIO_INPUT plugin implementation.
*/ */