On occassions where we received a message, woke up the worker thread, then received two two others before the worker finished, we had a high chance of triggering an assertion, because we ended up calling the insert before the parallel_push stuff managed to wake up the thread and clear the callbacks. Now we explicitly clear parallel_push before popping the queue head, for the simple reason that we do not need it at that point anymore. Many thanks to Balazs Scheidler <bazsi@balabit.hu> for pointing out what happens. Signed-off-by: Gergely Nagy <algernon@balabit.hu> --- modules/afmongodb/afmongodb.c | 1 + modules/afsql/afsql.c | 1 + 2 files changed, 2 insertions(+), 0 deletions(-) diff --git a/modules/afmongodb/afmongodb.c b/modules/afmongodb/afmongodb.c index 0f01f05..b2bd615 100644 --- a/modules/afmongodb/afmongodb.c +++ b/modules/afmongodb/afmongodb.c @@ -263,6 +263,7 @@ afmongodb_worker_insert (MongoDBDestDriver *self) afmongodb_dd_connect(self, TRUE); g_mutex_lock(self->queue_mutex); + log_queue_reset_parallel_push(self->queue); success = log_queue_pop_head(self->queue, &msg, &path_options, FALSE, FALSE); g_mutex_unlock(self->queue_mutex); if (!success) diff --git a/modules/afsql/afsql.c b/modules/afsql/afsql.c index 20db131..6fbd656 100644 --- a/modules/afsql/afsql.c +++ b/modules/afsql/afsql.c @@ -682,6 +682,7 @@ afsql_dd_insert_db(AFSqlDestDriver *self) else { g_mutex_lock(self->db_thread_mutex); + log_queue_reset_parallel_push(self->queue); success = log_queue_pop_head(self->queue, &msg, &path_options, (self->flags & AFSQL_DDF_EXPLICIT_COMMITS), FALSE); g_mutex_unlock(self->db_thread_mutex); if (!success) -- 1.7.6.3