diff --git a/src/protocols/rdp/channels/audio-input/audio-buffer.c b/src/protocols/rdp/channels/audio-input/audio-buffer.c index 5e5a7259..617cb2c8 100644 --- a/src/protocols/rdp/channels/audio-input/audio-buffer.c +++ b/src/protocols/rdp/channels/audio-input/audio-buffer.c @@ -24,17 +24,253 @@ #include #include #include +#include #include #include +#include #include #include #include +#include + +/** + * The number of nanoseconds in one second. + */ +#define NANOS_PER_SECOND 1000000000L + +/** + * Returns whether the given timespec represents a point in time in the future + * relative to the current system time. + * + * @param ts + * The timespec to test. + * + * @return + * Non-zero if the given timespec is in the future relative to the current + * system time, zero otherwise. + */ +static int guac_rdp_audio_buffer_is_future(const struct timespec* ts) { + + struct timespec now; + clock_gettime(CLOCK_REALTIME, &now); + + if (now.tv_sec != ts->tv_sec) + return now.tv_sec < ts->tv_sec; + + return now.tv_nsec < ts->tv_nsec; + +} + +/** + * Returns whether the given audio buffer may be flushed. An audio buffer may + * be flushed if the audio buffer is not currently being freed, at least one + * packet of audio data is available within the buffer, and flushing the next + * packet of audio data now would not violate scheduling/throttling rules for + * outbound audio data. + * + * IMPORTANT: The guac_rdp_audio_buffer's lock MUST already be held when + * invoking this function. + * + * @param audio_buffer + * The guac_rdp_audio_buffer to test. + * + * @return + * Non-zero if the given audio buffer may be flushed, zero if the audio + * buffer cannot be flushed for any reason. + */ +static int guac_rdp_audio_buffer_may_flush(guac_rdp_audio_buffer* audio_buffer) { + return !audio_buffer->stopping + && audio_buffer->packet_size > 0 + && audio_buffer->bytes_written >= audio_buffer->packet_size + && !guac_rdp_audio_buffer_is_future(&audio_buffer->next_flush); +} + +/** + * Returns the duration of the given quantity of audio data in milliseconds. + * + * @param format + * The format of the audio data in question. + * + * @param length + * The number of bytes of audio data. + * + * @return + * The duration of the audio data in milliseconds. + */ +static int guac_rdp_audio_buffer_duration(const guac_rdp_audio_format* format, int length) { + return length * 1000 / format->rate / format->bps / format->channels; +} + +/** + * Returns the number of bytes required to store audio data in the given format + * covering the given length of time. + * + * @param format + * The format of the audio data in question. + * + * @param duration + * The duration of the audio data in milliseconds. + * + * @return + * The number of bytes required to store audio data in the given format + * covering the given length of time. + */ +static int guac_rdp_audio_buffer_length(const guac_rdp_audio_format* format, int duration) { + return duration * format->rate * format->bps * format->channels / 1000; +} + +/** + * Notifies the given guac_rdp_audio_buffer that a single packet of audio data + * has just been flushed, updating the scheduled time of the next flush. The + * timing of the next flush will be set such that the overall real time audio + * generation rate is not exceeded, but will be adjusted as necessary to + * compensate for latency induced by differences in audio packet size/duration. + * + * IMPORTANT: The guac_rdp_audio_buffer's lock MUST already be held when + * invoking this function. + * + * @param audio_buffer + * The guac_rdp_audio_buffer to update. + */ +static void guac_rdp_audio_buffer_schedule_flush(guac_rdp_audio_buffer* audio_buffer) { + + struct timespec next_flush; + clock_gettime(CLOCK_REALTIME, &next_flush); + + /* Calculate the point in time that the next flush would be allowed, + * assuming that the remote server processes data no faster than + * real time */ + uint64_t delta_nsecs = audio_buffer->packet_size * NANOS_PER_SECOND + / audio_buffer->out_format.rate + / audio_buffer->out_format.bps + / audio_buffer->out_format.channels; + + /* Amortize the additional latency from packet data buffered beyond the + * desired packet size over each remaining packet such that we gradually + * approach an effective additional latency of 0 */ + int packets_remaining = audio_buffer->bytes_written / audio_buffer->packet_size; + if (packets_remaining > 1) + delta_nsecs = delta_nsecs * (packets_remaining - 1) / packets_remaining; + + uint64_t nsecs = next_flush.tv_nsec + delta_nsecs; + + next_flush.tv_sec += nsecs / NANOS_PER_SECOND; + next_flush.tv_nsec = nsecs % NANOS_PER_SECOND; + + audio_buffer->next_flush = next_flush; + +} + +/** + * Waits for additional data to be available for flush within the given audio + * buffer. If data is available but insufficient time has elapsed since the + * last flush, this function may block until sufficient time has elapsed. If + * the state of the audio buffer changes in any way while waiting for + * additional data, or if the audio buffer is being freed, this function will + * return immediately. + * + * It is the responsibility of the caller to check the state of the audio + * buffer after this function returns to verify whether the desired state + * change has occurred and re-invoke the function if needed. + * + * @param audio_buffer + * The guac_rdp_audio_buffer to wait for. + */ +static void guac_rdp_audio_buffer_wait(guac_rdp_audio_buffer* audio_buffer) { + + pthread_mutex_lock(&(audio_buffer->lock)); + + /* Do not wait if audio_buffer is already closed */ + if (!audio_buffer->stopping) { + + /* If sufficient data exists for a flush, wait until next possible + * flush OR until some other state change occurs (such as the buffer + * being closed) */ + if (audio_buffer->bytes_written && audio_buffer->bytes_written >= audio_buffer->packet_size) + pthread_cond_timedwait(&audio_buffer->modified, &audio_buffer->lock, + &audio_buffer->next_flush); + + /* If sufficient data DOES NOT exist, we should wait indefinitely */ + else + pthread_cond_wait(&audio_buffer->modified, &audio_buffer->lock); + + } + + pthread_mutex_unlock(&(audio_buffer->lock)); + +} + +/** + * Regularly and automatically flushes audio packets by invoking the flush + * handler of the associated audio buffer. Packets are scheduled automatically + * to avoid potentially exceeding the processing and buffering capabilities of + * the software running within the RDP server. Once started, this thread runs + * until the associated audio buffer is freed via guac_rdp_audio_buffer_free(). + * + * @param data + * A pointer to the guac_rdp_audio_buffer that should be flushed. + * + * @return + * Always NULL. + */ +static void* guac_rdp_audio_buffer_flush_thread(void* data) { + + guac_rdp_audio_buffer* audio_buffer = (guac_rdp_audio_buffer*) data; + while (!audio_buffer->stopping) { + + pthread_mutex_lock(&(audio_buffer->lock)); + + if (!guac_rdp_audio_buffer_may_flush(audio_buffer)) { + + pthread_mutex_unlock(&(audio_buffer->lock)); + + /* Wait for additional data if we aren't able to flush */ + guac_rdp_audio_buffer_wait(audio_buffer); + + /* We might still not be able to flush (buffer might be closed, + * some other state change might occur that isn't receipt of data, + * data might be received but not enough for a flush, etc.) */ + continue; + + } + + guac_client_log(audio_buffer->client, GUAC_LOG_TRACE, "Current audio input latency: %i ms (%i bytes waiting in buffer)", + guac_rdp_audio_buffer_duration(&audio_buffer->out_format, audio_buffer->bytes_written), + audio_buffer->bytes_written); + + /* Only actually invoke if defined */ + if (audio_buffer->flush_handler) { + guac_rdp_audio_buffer_schedule_flush(audio_buffer); + audio_buffer->flush_handler(audio_buffer, + audio_buffer->packet_size); + } + + /* Shift buffer back by one packet */ + audio_buffer->bytes_written -= audio_buffer->packet_size; + memmove(audio_buffer->packet, audio_buffer->packet + audio_buffer->packet_size, audio_buffer->bytes_written); + + pthread_cond_broadcast(&(audio_buffer->modified)); + pthread_mutex_unlock(&(audio_buffer->lock)); + + } + + return NULL; + +} guac_rdp_audio_buffer* guac_rdp_audio_buffer_alloc(guac_client* client) { + guac_rdp_audio_buffer* buffer = calloc(1, sizeof(guac_rdp_audio_buffer)); + pthread_mutex_init(&(buffer->lock), NULL); + pthread_cond_init(&(buffer->modified), NULL); buffer->client = client; + + /* Begin automated, throttled flush of future data */ + pthread_create(&(buffer->flush_thread), NULL, + guac_rdp_audio_buffer_flush_thread, (void*) buffer); + return buffer; } @@ -46,6 +282,9 @@ guac_rdp_audio_buffer* guac_rdp_audio_buffer_alloc(guac_client* client) { * instruction nor been associated with an "ack" having an error code), and is * associated with an active RDP AUDIO_INPUT channel. * + * IMPORTANT: The guac_rdp_audio_buffer's lock MUST already be held when + * invoking this function. + * * @param audio_buffer * The audio buffer associated with the guac_stream for which the "ack" * instruction should be sent, if any. If there is no associated @@ -98,6 +337,7 @@ void guac_rdp_audio_buffer_set_stream(guac_rdp_audio_buffer* audio_buffer, audio_buffer->in_format.rate, audio_buffer->in_format.bps); + pthread_cond_broadcast(&(audio_buffer->modified)); pthread_mutex_unlock(&(audio_buffer->lock)); } @@ -112,6 +352,7 @@ void guac_rdp_audio_buffer_set_output(guac_rdp_audio_buffer* audio_buffer, audio_buffer->out_format.channels = channels; audio_buffer->out_format.bps = bps; + pthread_cond_broadcast(&(audio_buffer->modified)); pthread_mutex_unlock(&(audio_buffer->lock)); } @@ -132,14 +373,30 @@ void guac_rdp_audio_buffer_begin(guac_rdp_audio_buffer* audio_buffer, * audio_buffer->out_format.channels * audio_buffer->out_format.bps; + /* Ensure outbound buffer includes enough space for at least 250ms of + * audio */ + int ideal_size = guac_rdp_audio_buffer_length(&audio_buffer->out_format, + GUAC_RDP_AUDIO_BUFFER_MIN_DURATION); + + /* Round up to nearest whole packet */ + int ideal_packets = (ideal_size + audio_buffer->packet_size - 1) / audio_buffer->packet_size; + /* Allocate new buffer */ - free(audio_buffer->packet); - audio_buffer->packet = malloc(audio_buffer->packet_size); + audio_buffer->packet_buffer_size = ideal_packets * audio_buffer->packet_size; + audio_buffer->packet = malloc(audio_buffer->packet_buffer_size); + + guac_client_log(audio_buffer->client, GUAC_LOG_DEBUG, "Output buffer for " + "audio input is %i bytes (up to %i ms).", audio_buffer->packet_buffer_size, + guac_rdp_audio_buffer_duration(&audio_buffer->out_format, audio_buffer->packet_buffer_size)); + + /* Next flush can occur as soon as data is received */ + clock_gettime(CLOCK_REALTIME, &audio_buffer->next_flush); /* Acknowledge stream creation (if stream is ready to receive) */ guac_rdp_audio_buffer_ack(audio_buffer, "OK", GUAC_PROTOCOL_STATUS_SUCCESS); + pthread_cond_broadcast(&(audio_buffer->modified)); pthread_mutex_unlock(&(audio_buffer->lock)); } @@ -152,6 +409,9 @@ void guac_rdp_audio_buffer_begin(guac_rdp_audio_buffer* audio_buffer, * input and output formats, the number of bytes sent thus far, and the * number of bytes received (excluding the contents of the buffer). * + * IMPORTANT: The guac_rdp_audio_buffer's lock MUST already be held when + * invoking this function. + * * @param audio_buffer * The audio buffer dictating the format of the given data buffer, as * well as the offset from which the sample should be read. @@ -238,12 +498,26 @@ void guac_rdp_audio_buffer_write(guac_rdp_audio_buffer* audio_buffer, pthread_mutex_lock(&(audio_buffer->lock)); + guac_client_log(audio_buffer->client, GUAC_LOG_TRACE, "Received %i bytes (%i ms) of audio data", + length, guac_rdp_audio_buffer_duration(&audio_buffer->in_format, length)); + /* Ignore packet if there is no buffer */ - if (audio_buffer->packet_size == 0 || audio_buffer->packet == NULL) { + if (audio_buffer->packet_buffer_size == 0 || audio_buffer->packet == NULL) { + guac_client_log(audio_buffer->client, GUAC_LOG_DEBUG, "Dropped %i " + "bytes of received audio data (buffer full or closed).", length); pthread_mutex_unlock(&(audio_buffer->lock)); return; } + /* Truncate received samples if exceeding size of buffer */ + int available = audio_buffer->packet_buffer_size - audio_buffer->bytes_written; + if (length > available) { + guac_client_log(audio_buffer->client, GUAC_LOG_DEBUG, "Truncating %i " + "bytes of received audio data to %i bytes (insufficient space " + "in buffer).", length, available); + length = available; + } + int out_bps = audio_buffer->out_format.bps; /* Continuously write packets until no data remains */ @@ -266,24 +540,12 @@ void guac_rdp_audio_buffer_write(guac_rdp_audio_buffer* audio_buffer, audio_buffer->bytes_written += out_bps; audio_buffer->total_bytes_sent += out_bps; - /* Invoke flush handler if full */ - if (audio_buffer->bytes_written == audio_buffer->packet_size) { - - /* Only actually invoke if defined */ - if (audio_buffer->flush_handler) - audio_buffer->flush_handler(audio_buffer, - audio_buffer->bytes_written); - - /* Reset buffer in all cases */ - audio_buffer->bytes_written = 0; - - } - } /* end packet write loop */ /* Track current position in audio stream */ audio_buffer->total_bytes_received += length; + pthread_cond_broadcast(&(audio_buffer->modified)); pthread_mutex_unlock(&(audio_buffer->lock)); } @@ -292,6 +554,12 @@ void guac_rdp_audio_buffer_end(guac_rdp_audio_buffer* audio_buffer) { pthread_mutex_lock(&(audio_buffer->lock)); + /* Ignore if stream is already closed */ + if (audio_buffer->stream == NULL) { + pthread_mutex_unlock(&(audio_buffer->lock)); + return; + } + /* The stream is now closed */ guac_rdp_audio_buffer_ack(audio_buffer, "CLOSED", GUAC_PROTOCOL_STATUS_RESOURCE_CLOSED); @@ -303,6 +571,7 @@ void guac_rdp_audio_buffer_end(guac_rdp_audio_buffer* audio_buffer) { /* Reset buffer state */ audio_buffer->bytes_written = 0; audio_buffer->packet_size = 0; + audio_buffer->packet_buffer_size = 0; audio_buffer->flush_handler = NULL; /* Reset I/O counters */ @@ -313,13 +582,27 @@ void guac_rdp_audio_buffer_end(guac_rdp_audio_buffer* audio_buffer) { free(audio_buffer->packet); audio_buffer->packet = NULL; + pthread_cond_broadcast(&(audio_buffer->modified)); pthread_mutex_unlock(&(audio_buffer->lock)); } void guac_rdp_audio_buffer_free(guac_rdp_audio_buffer* audio_buffer) { + + guac_rdp_audio_buffer_end(audio_buffer); + + /* Signal termination of flush thread */ + pthread_mutex_lock(&(audio_buffer->lock)); + audio_buffer->stopping = 1; + pthread_cond_broadcast(&(audio_buffer->modified)); + pthread_mutex_unlock(&(audio_buffer->lock)); + + /* Clean up flush thread */ + pthread_join(audio_buffer->flush_thread, NULL); + pthread_mutex_destroy(&(audio_buffer->lock)); - free(audio_buffer->packet); + pthread_cond_destroy(&(audio_buffer->modified)); free(audio_buffer); + } diff --git a/src/protocols/rdp/channels/audio-input/audio-buffer.h b/src/protocols/rdp/channels/audio-input/audio-buffer.h index b67f9fd5..723ca6a6 100644 --- a/src/protocols/rdp/channels/audio-input/audio-buffer.h +++ b/src/protocols/rdp/channels/audio-input/audio-buffer.h @@ -23,6 +23,16 @@ #include #include #include +#include + +/** + * The minimum number of milliseconds of audio data that each instance of + * guac_rdp_audio_buffer should provide storage for. This buffer space does not + * induce additional latency, but is required to compensate for latency and + * functions as an upper limit on the amount of latency the buffer will + * compensate for. + */ +#define GUAC_RDP_AUDIO_BUFFER_MIN_DURATION 250 /** * A buffer of arbitrary audio data. Received audio data can be written to this @@ -73,10 +83,18 @@ struct guac_rdp_audio_buffer { /** * Lock which is acquired/released to ensure accesses to the audio buffer - * are atomic. + * are atomic. This lock is also bound to the modified pthread_cond_t, + * which should be signalled whenever the audio buffer structure has been + * modified. */ pthread_mutex_t lock; + /** + * Condition which is signalled when any part of the audio buffer structure + * has been modified. + */ + pthread_cond_t modified; + /** * The guac_client instance handling the relevant RDP connection. */ @@ -84,13 +102,15 @@ struct guac_rdp_audio_buffer { /** * The user from which this audio buffer will receive data. If no user has - * yet opened an associated audio stream, this will be NULL. + * yet opened an associated audio stream, or if the audio stream has been + * closed, this will be NULL. */ guac_user* user; /** * The stream from which this audio buffer will receive data. If no user - * has yet opened an associated audio stream, this will be NULL. + * has yet opened an associated audio stream, or if the audio stream has + * been closed, this will be NULL. */ guac_stream* stream; @@ -114,6 +134,11 @@ struct guac_rdp_audio_buffer { */ int packet_size; + /** + * The total number of bytes available within the packet buffer. + */ + int packet_buffer_size; + /** * The number of bytes currently stored within the packet buffer. */ @@ -136,6 +161,20 @@ struct guac_rdp_audio_buffer { */ char* packet; + /** + * Thread which flushes the audio buffer at a rate that does not exceed the + * the audio sample rate (which might result in dropped samples due to + * overflow of the remote audio buffer). + */ + pthread_t flush_thread; + + /** + * The absolute point in time that the next packet of audio data sould be + * flushed. Another packet of received data should not be flushed prior to + * this time. + */ + struct timespec next_flush; + /** * Handler function which will be invoked when a full audio packet is * ready to be flushed to the AUDIO_INPUT channel, if defined. If NULL, @@ -143,6 +182,12 @@ struct guac_rdp_audio_buffer { */ guac_rdp_audio_buffer_flush_handler* flush_handler; + /** + * Whether guac_rdp_audio_buffer_free() has been invoked and the audio + * buffer is being cleaned up. + */ + int stopping; + /** * Arbitrary data assigned by the AUDIO_INPUT plugin implementation. */