On Thu, 2011-09-22 at 19:21 +0200, Gergely Nagy wrote:
On occassions where we received a message, woke up the worker thread, then received two two others before the worker finished, we had a high chance of triggering an assertion, because we ended up calling the insert before the parallel_push stuff managed to wake up the thread and clear the callbacks.
Now we explicitly clear parallel_push before popping the queue head, for the simple reason that we do not need it at that point anymore.
Many thanks to Balazs Scheidler <bazsi@balabit.hu> for pointing out what happens.
I've applied this. Although this is only a workaround, but one that gets the job done. The real issue is that our threaded destinations (sql, mongodb) should be using the asynchronous LogQueue interfaces, preferably wrapped behind a synchronous one so that not all destinations would need to do it on their own. What I roughly have in mind is: log_queue_check_items() would probably best be renamed to log_queue_poll_items() to better reflect what it does. Then define: - gboolean log_queue_wait_items(LogQueue *q, gint batch_size) to wait for a batch of messages to arrive synchronously, sleeping until those arrive. Uses log_queue_poll_items() under the hood. Returns TRUE when items are available, and FALSE when the wait was cancelled. - log_queue_cancel_wait(LogQueue *q) this can be used to cancel the wait implemented above Then our threaded destinations would: - in their thread function, the iteration to perform work would: - use log_queue_wait_items() to block until enough items arrive (e.g. the value for flush_lines()) - use log_queue_pop_head() to fetch items from the queue, just like normal destinations. - simply use log_queue_push_tail() in their queue() method I'll try to code this once I get there, but others may beat me to this :) -- Bazsi