[sql] send a signal to the database thread only in case when the queue is syncronised, because in other cases the database thread wake up unnecessary many times in case of threaded mode And the sql stucked if the explicit commit is turned on and flush_lines and log_iw_size and fetch_limit and log_fifo_size have the same value. The reason of the stuck was that the queue become full but the database thread can't pop any messages form the queue. So it can be occurred that the reader finished the io_job and suspended and the database thread wait for the signal with 0 read message. Signed-off-by: Viktor Juhasz diff --git a/lib/logqueue.c b/lib/logqueue.c index 29c39e4..43ed69e 100644 --- a/lib/logqueue.c +++ b/lib/logqueue.c @@ -55,6 +55,17 @@ log_queue_reset_parallel_push(LogQueue *self) g_static_mutex_unlock(&self->lock); } +void +log_queue_set_parallel_push(LogQueue *self, gint notify_limit, LogQueuePushNotifyFunc parallel_push_notify, gpointer user_data, GDestroyNotify user_data_destroy) +{ + g_static_mutex_lock(&self->lock); + self->parallel_push_notify = parallel_push_notify; + self->parallel_push_data = user_data; + self->parallel_push_notify_limit = notify_limit; + self->parallel_push_data_destroy = user_data_destroy; + g_static_mutex_unlock(&self->lock); +} + /* * * @batch_items: the number of items processed in a batch (e.g. the number of items the consumer is preferred to process at a single invocation) diff --git a/lib/logqueue.h b/lib/logqueue.h index 1d86f3f..ef04c99 100644 --- a/lib/logqueue.h +++ b/lib/logqueue.h @@ -139,6 +139,7 @@ log_queue_set_throttle(LogQueue *self, gint throttle) void log_queue_push_notify(LogQueue *self); void log_queue_reset_parallel_push(LogQueue *self); +void log_queue_set_parallel_push(LogQueue *self, gint notify_limit, LogQueuePushNotifyFunc parallel_push_notify, gpointer user_data, GDestroyNotify user_data_destroy); gboolean log_queue_check_items(LogQueue *self, gint batch_items, gboolean *partial_batch, gint *timeout, LogQueuePushNotifyFunc parallel_push_notify, gpointer user_data, GDestroyNotify user_data_destroy); void log_queue_set_counters(LogQueue *self, StatsCounterItem *stored_messages, StatsCounterItem *dropped_messages); void log_queue_init_instance(LogQueue *self, const gchar *persist_name); diff --git a/modules/afsql/afsql.c b/modules/afsql/afsql.c index 039270d..bbdd1d2 100644 --- a/modules/afsql/afsql.c +++ b/modules/afsql/afsql.c @@ -116,6 +116,7 @@ typedef struct _AFSqlDestDriver dbi_conn dbi_ctx; GHashTable *validated_tables; guint32 failed_message_counter; + } AFSqlDestDriver; static gboolean dbi_initialized = FALSE; @@ -685,11 +686,12 @@ afsql_dd_insert_db(AFSqlDestDriver *self) success = log_queue_pop_head(self->queue, &msg, &path_options, (self->flags & AFSQL_DDF_EXPLICIT_COMMITS), FALSE); g_mutex_unlock(self->db_thread_mutex); if (!success) - return TRUE; + { + return TRUE; + } } msg_set_context(msg); - table = g_string_sized_new(32); value = g_string_sized_new(256); query_string = g_string_sized_new(512); @@ -767,7 +769,6 @@ afsql_dd_insert_db(AFSqlDestDriver *self) if (success && self->flush_lines_queued != -1) { self->flush_lines_queued++; - if (self->flush_lines && self->flush_lines_queued == self->flush_lines && !afsql_dd_commit_txn(self, TRUE)) return FALSE; } @@ -1155,6 +1156,16 @@ afsql_dd_deinit(LogPipe *s) } static void +afsql_dd_queue_notify(gpointer user_data) +{ + AFSqlDestDriver *self = (AFSqlDestDriver *) user_data; + g_mutex_lock(self->db_thread_mutex); + g_cond_signal(self->db_thread_wakeup_cond); + log_queue_reset_parallel_push(self->queue); + g_mutex_unlock(self->db_thread_mutex); +} + +static void afsql_dd_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options, gpointer user_data) { AFSqlDestDriver *self = (AFSqlDestDriver *) s; @@ -1165,7 +1176,9 @@ afsql_dd_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_options, log_queue_push_tail(self->queue, msg, path_options); if (queue_was_empty && !self->db_thread_suspended) - g_cond_signal(self->db_thread_wakeup_cond); + { + log_queue_set_parallel_push(self->queue, 1, afsql_dd_queue_notify, self, NULL); + } g_mutex_unlock(self->db_thread_mutex); }