[PATCH] afmongodb, afsql: Fix a race condition in the queue handling.
On occassions where we received a message, woke up the worker thread, then received two two others before the worker finished, we had a high chance of triggering an assertion, because we ended up calling the insert before the parallel_push stuff managed to wake up the thread and clear the callbacks. Now we explicitly clear parallel_push before popping the queue head, for the simple reason that we do not need it at that point anymore. Many thanks to Balazs Scheidler <bazsi@balabit.hu> for pointing out what happens. Signed-off-by: Gergely Nagy <algernon@balabit.hu> --- modules/afmongodb/afmongodb.c | 1 + modules/afsql/afsql.c | 1 + 2 files changed, 2 insertions(+), 0 deletions(-) diff --git a/modules/afmongodb/afmongodb.c b/modules/afmongodb/afmongodb.c index 0f01f05..b2bd615 100644 --- a/modules/afmongodb/afmongodb.c +++ b/modules/afmongodb/afmongodb.c @@ -263,6 +263,7 @@ afmongodb_worker_insert (MongoDBDestDriver *self) afmongodb_dd_connect(self, TRUE); g_mutex_lock(self->queue_mutex); + log_queue_reset_parallel_push(self->queue); success = log_queue_pop_head(self->queue, &msg, &path_options, FALSE, FALSE); g_mutex_unlock(self->queue_mutex); if (!success) diff --git a/modules/afsql/afsql.c b/modules/afsql/afsql.c index 20db131..6fbd656 100644 --- a/modules/afsql/afsql.c +++ b/modules/afsql/afsql.c @@ -682,6 +682,7 @@ afsql_dd_insert_db(AFSqlDestDriver *self) else { g_mutex_lock(self->db_thread_mutex); + log_queue_reset_parallel_push(self->queue); success = log_queue_pop_head(self->queue, &msg, &path_options, (self->flags & AFSQL_DDF_EXPLICIT_COMMITS), FALSE); g_mutex_unlock(self->db_thread_mutex); if (!success) -- 1.7.6.3
On Thu, 2011-09-22 at 19:21 +0200, Gergely Nagy wrote:
On occassions where we received a message, woke up the worker thread, then received two two others before the worker finished, we had a high chance of triggering an assertion, because we ended up calling the insert before the parallel_push stuff managed to wake up the thread and clear the callbacks.
Now we explicitly clear parallel_push before popping the queue head, for the simple reason that we do not need it at that point anymore.
Many thanks to Balazs Scheidler <bazsi@balabit.hu> for pointing out what happens.
I've applied this. Although this is only a workaround, but one that gets the job done. The real issue is that our threaded destinations (sql, mongodb) should be using the asynchronous LogQueue interfaces, preferably wrapped behind a synchronous one so that not all destinations would need to do it on their own. What I roughly have in mind is: log_queue_check_items() would probably best be renamed to log_queue_poll_items() to better reflect what it does. Then define: - gboolean log_queue_wait_items(LogQueue *q, gint batch_size) to wait for a batch of messages to arrive synchronously, sleeping until those arrive. Uses log_queue_poll_items() under the hood. Returns TRUE when items are available, and FALSE when the wait was cancelled. - log_queue_cancel_wait(LogQueue *q) this can be used to cancel the wait implemented above Then our threaded destinations would: - in their thread function, the iteration to perform work would: - use log_queue_wait_items() to block until enough items arrive (e.g. the value for flush_lines()) - use log_queue_pop_head() to fetch items from the queue, just like normal destinations. - simply use log_queue_push_tail() in their queue() method I'll try to code this once I get there, but others may beat me to this :) -- Bazsi
participants (2)
-
Balazs Scheidler
-
Gergely Nagy