[PATCH] afsql: Fix sql stuck problema
[sql] send a signal to the database thread only in case when the queue is syncronised, because in other cases the database thread wake up unnecessary many times in case of threaded mode And the sql stucked if the explicit commit is turned on and flush_lines and log_iw_size and fetch_limit and log_fifo_size have the same value. The reason of the stuck was that the queue become full but the database thread can't pop any messages form the queue. So it can be occurred that the reader finished the io_job and suspended and the database thread wait for the signal with 0 read message. Signed-off-by: Viktor Juhasz <jviktor@balabit.hu> diff --git a/lib/logqueue.c b/lib/logqueue.c index 29c39e4..43ed69e 100644 --- a/lib/logqueue.c +++ b/lib/logqueue.c @@ -55,6 +55,17 @@ log_queue_reset_parallel_push(LogQueue *self) g_static_mutex_unlock(&self->lock); } +void +log_queue_set_parallel_push(LogQueue *self, gint notify_limit, LogQueuePushNotifyFunc parallel_push_notify, gpointer user_data, GDestroyNotify user_data_destroy) +{ + g_static_mutex_lock(&self->lock); + self->parallel_push_notify = parallel_push_notify; + self->parallel_push_data = user_data; + self->parallel_push_notify_limit = notify_limit; + self->parallel_push_data_destroy = user_data_destroy; + g_static_mutex_unlock(&self->lock); +} + /* * * @batch_items: the number of items processed in a batch (e.g. the number of items the consumer is preferred to process at a single invocation) diff --git a/lib/logqueue.h b/lib/logqueue.h index 1d86f3f..ef04c99 100644 --- a/lib/logqueue.h +++ b/lib/logqueue.h @@ -139,6 +139,7 @@ log_queue_set_throttle(LogQueue *self, gint throttle) void log_queue_push_notify(LogQueue *self); void log_queue_reset_parallel_push(LogQueue *self); +void log_queue_set_parallel_push(LogQueue *self, gint notify_limit, LogQueuePushNotifyFunc parallel_push_notify, gpointer user_data, GDestroyNotify user_data_destroy); gboolean log_queue_check_items(LogQueue *self, gint batch_items, gboolean *partial_batch, gint *timeout, LogQueuePushNotifyFunc parallel_push_notify, gpointer user_data, GDestroyNotify user_data_destroy); void log_queue_set_counters(LogQueue *self, StatsCounterItem *stored_messages, StatsCounterItem *dropped_messages); void log_queue_init_instance(LogQueue *self, const gchar *persist_name); diff --git a/modules/afsql/afsql.c b/modules/afsql/afsql.c index 039270d..bbdd1d2 100644 --- a/modules/afsql/afsql.c +++ b/modules/afsql/afsql.c @@ -116,6 +116,7 @@ typedef struct _AFSqlDestDriver dbi_conn dbi_ctx; GHashTable *validated_tables; guint32 failed_message_counter; + } AFSqlDestDriver; static gboolean dbi_initialized = FALSE; @@ -685,11 +686,12 @@ afsql_dd_insert_db(AFSqlDestDriver *self) success = log_queue_pop_head(self->queue, &msg, &path_options, (self->flags & AFSQL_DDF_EXPLICIT_COMMITS), FALSE); g_mutex_unlock(self->db_thread_mutex); if (!success) - return TRUE; + { + return TRUE; + } } msg_set_context(msg); - table = g_string_sized_new(32); value = g_string_sized_new(256); query_string = g_string_sized_new(512); @@ -767,7 +769,6 @@ afsql_dd_insert_db(AFSqlDestDriver *self) if (success && self->flush_lines_queued != -1) { self->flush_lines_queued++; - if (self->flush_lines && self->flush_lines_queued == self->flush_lines && !afsql_dd_commit_txn(self, TRUE)) return FALSE; } @@ -1155,6 +1156,16 @@ afsql_dd_deinit(LogPipe *s) } static void +afsql_dd_queue_notify(gpointer user_data) +{ + AFSqlDestDriver *self = (AFSqlDestDriver *) user_data; + g_mutex_lock(self->db_thread_mutex); + g_cond_signal(self->db_thread_wakeup_cond); + log_queue_reset_parallel_push(self->queue); + g_mutex_unlock(self->db_thread_mutex); +} + +static void afsql_dd_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options, gpointer user_data) { AFSqlDestDriver *self = (AFSqlDestDriver *) s; @@ -1165,7 +1176,9 @@ afsql_dd_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options, log_queue_push_tail(self->queue, msg, path_options); if (queue_was_empty && !self->db_thread_suspended) - g_cond_signal(self->db_thread_wakeup_cond); + { + log_queue_set_parallel_push(self->queue, 1, afsql_dd_queue_notify, self, NULL); + } g_mutex_unlock(self->db_thread_mutex); }
Hi, Sorry the previous patch is not correct because it causes deadlock in case of no multithread. The correction of the patch is the following: [afsql] Fix afsql deadlock which is caused the previous patch (send a signal to the database thread only in case when the queue is syncronised) In case of no multithread the push tail call the push notify and it causes deadlock of sql Signed-off-by: Viktor Juhasz (jviktor@balabit.hu) diff --git a/modules/afsql/afsql.c b/modules/afsql/afsql.c index bbdd1d2..27fff99 100644 --- a/modules/afsql/afsql.c +++ b/modules/afsql/afsql.c @@ -1173,8 +1173,10 @@ afsql_dd_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options, g_mutex_lock(self->db_thread_mutex); queue_was_empty = log_queue_get_length(self->queue) == 0; + g_mutex_unlock(self->db_thread_mutex); log_queue_push_tail(self->queue, msg, path_options); + g_mutex_lock(self->db_thread_mutex); if (queue_was_empty && !self->db_thread_suspended) { log_queue_set_parallel_push(self->queue, 1, afsql_dd_queue_notify, self, NULL); Juhasz Viktor wrote:
------------------------------------------------------------------------
______________________________________________________________________________ Member info: https://lists.balabit.hu/mailman/listinfo/syslog-ng Documentation: http://www.balabit.com/support/documentation/?product=syslog-ng FAQ: http://www.balabit.com/wiki/syslog-ng-faq
Hi, I've finally integrated this patch, even though I don't really understand all intents behind it, and I'm almost certain that this still has some subtle issues. I'd like to revamp the way how blocking (e.g. threaded) destinations work, and move threading and synchronization into the core, rather than having to implement them in all plugins. Currently both mongodb and the sql destinations have a similar, but slightly different implementations of the threading part, and I'm afraid the same bug that this patch fixes affects mongodb too (even though that doesn't support flush-lines yet). Also, I think we'll have a bit more similar blocking style destination drivers, and I wouldn't want to duplicate code into them (one is already available in the SMTP plugin, which I guess is structured similarly, but I do have some further ideas, like SNMP trap sender), but since this fixes a concrete bug, and I'd like to push 3.3 out of the door, I wouldn't want to start that refactorization now. This is the patch ID of the commit that integrates both this and the follow-up patch into a single commit: commit 3d8c32601151343506be61a3f5d0af9888da785b Author: Balazs Scheidler <bazsi@balabit.hu> Date: Thu Aug 11 09:34:05 2011 +0200 [sql] send a signal to the database thread only in case when the queue is syncronised because in other cases the database thread wake up unnecessary many times in case of threaded mode And the sql stucked if the explicit commit is turned on and flush_lines and log_iw_size and fetch_limit and log_fifo_size have the same value. The reason of the stuck was that the queue become full but the database thread can't pop any messages form the queue. So it can be occurred that the reader finished the io_job and suspended and the database thread wait for the signal with 0 read message. Signed-off-by: Viktor Juhasz <jviktor@balabit.hu> On Mon, 2011-06-27 at 12:42 +0200, Juhasz Viktor wrote:
Hi,
Sorry the previous patch is not correct because it causes deadlock in case of no multithread. The correction of the patch is the following:
[afsql] Fix afsql deadlock which is caused the previous patch (send a signal to the database thread only in case when the queue is syncronised) In case of no multithread the push tail call the push notify and it causes deadlock of sql
Signed-off-by: Viktor Juhasz (jviktor@balabit.hu) diff --git a/modules/afsql/afsql.c b/modules/afsql/afsql.c index bbdd1d2..27fff99 100644 --- a/modules/afsql/afsql.c +++ b/modules/afsql/afsql.c @@ -1173,8 +1173,10 @@ afsql_dd_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options,
g_mutex_lock(self->db_thread_mutex); queue_was_empty = log_queue_get_length(self->queue) == 0; + g_mutex_unlock(self->db_thread_mutex); log_queue_push_tail(self->queue, msg, path_options);
+ g_mutex_lock(self->db_thread_mutex); if (queue_was_empty && !self->db_thread_suspended) { log_queue_set_parallel_push(self->queue, 1, afsql_dd_queue_notify, self, NULL);
Juhasz Viktor wrote:
------------------------------------------------------------------------
______________________________________________________________________________ Member info: https://lists.balabit.hu/mailman/listinfo/syslog-ng Documentation: http://www.balabit.com/support/documentation/?product=syslog-ng FAQ: http://www.balabit.com/wiki/syslog-ng-faq
______________________________________________________________________________ Member info: https://lists.balabit.hu/mailman/listinfo/syslog-ng Documentation: http://www.balabit.com/support/documentation/?product=syslog-ng FAQ: http://www.balabit.com/wiki/syslog-ng-faq
-- Bazsi
participants (2)
-
Balazs Scheidler
-
Juhasz Viktor