Ported a patch from afsql to afmongodb, that makes the writer thread wake up only when it's synced. This makes threaded(yes) work more reliably. Signed-off-by: Gergely Nagy <algernon@balabit.hu> --- modules/afmongodb/afmongodb.c | 18 ++++++++++++++++-- 1 files changed, 16 insertions(+), 2 deletions(-) diff --git a/modules/afmongodb/afmongodb.c b/modules/afmongodb/afmongodb.c index 9e155ed..0f01f05 100644 --- a/modules/afmongodb/afmongodb.c +++ b/modules/afmongodb/afmongodb.c @@ -497,6 +497,17 @@ afmongodb_dd_free(LogPipe *d) } static void +afmongodb_dd_queue_notify(gpointer user_data) +{ + MongoDBDestDriver *self = (MongoDBDestDriver *)user_data; + + g_mutex_lock(self->queue_mutex); + g_cond_signal(self->writer_thread_wakeup_cond); + log_queue_reset_parallel_push(self->queue); + g_mutex_unlock(self->queue_mutex); +} + +static void afmongodb_dd_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options, gpointer user_data) { MongoDBDestDriver *self = (MongoDBDestDriver *)s; @@ -509,12 +520,15 @@ afmongodb_dd_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_optio g_mutex_lock(self->queue_mutex); self->last_msg_stamp = cached_g_current_time_sec (); queue_was_empty = log_queue_get_length(self->queue) == 0; - log_queue_push_tail(self->queue, msg, path_options); g_mutex_unlock(self->queue_mutex); + + log_queue_push_tail(self->queue, msg, path_options); g_mutex_lock(self->suspend_mutex); if (queue_was_empty && !self->writer_thread_suspended) - g_cond_signal(self->writer_thread_wakeup_cond); + { + log_queue_set_parallel_push(self->queue, 1, afmongodb_dd_queue_notify, self, NULL); + } g_mutex_unlock(self->suspend_mutex); } -- 1.7.0.4