This patch converts syslog-ng to use upstream ivykis, unmodified. Apart from the boring struct and type renaming stuff, there is one major change: iv_fd_pollable() is gone, we have iv_fd_register_try(). As such, the logic in log_reader_start_watches() and log_writer_start_watches() was updated to use the upstream API instead. Many thanks to Lennert Buytenhek <buytenh@wantstofly.org> and Balazs Scheidler <bazsi@balabit.hu> for their invaluable help. Signed-off-by: Gergely Nagy <algernon@balabit.hu> --- .gitmodules | 2 +- lib/ivykis | 2 +- lib/logmsg.c | 2 +- lib/logmsg.h | 2 +- lib/logqueue-fifo.c | 52 +++++++++++++++++++++++++-------------------------- lib/logreader.c | 31 ++++++++++++++++++------------ lib/logwriter.c | 15 ++++++++------- lib/mainloop.c | 36 +++++++++++++++++------------------ lib/mainloop.h | 6 +++--- 9 files changed, 78 insertions(+), 70 deletions(-) diff --git a/.gitmodules b/.gitmodules index 6254dc1..68956f3 100644 --- a/.gitmodules +++ b/.gitmodules @@ -3,7 +3,7 @@ url = git://git.balabit.hu/bazsi/libmongo-client.git [submodule "lib/ivykis"] path = lib/ivykis - url = git://git.balabit.hu/bazsi/ivykis.git + url = git://github.com/buytenh/ivykis.git [submodule "lib/eventlog"] path = lib/eventlog url = git://git.balabit.hu/bazsi/eventlog-1.0.git diff --git a/lib/ivykis b/lib/ivykis index 73c0c27..905fa75 160000 --- a/lib/ivykis +++ b/lib/ivykis @@ -1 +1 @@ -Subproject commit 73c0c27a19755f700b268081a5a59acdd04b421e +Subproject commit 905fa75c813f0058d383100fd4819a39bc10f45b diff --git a/lib/logmsg.c b/lib/logmsg.c index 31d9c4c..e912c73 100644 --- a/lib/logmsg.c +++ b/lib/logmsg.c @@ -416,7 +416,7 @@ log_msg_is_handle_match(NVHandle handle) static void log_msg_init_queue_node(LogMessage *msg, LogMessageQueueNode *node, const LogPathOptions *path_options) { - INIT_LIST_HEAD(&node->list); + INIT_IV_LIST_HEAD(&node->list); node->ack_needed = path_options->ack_needed; node->msg = log_msg_ref(msg); log_msg_write_protect(msg); diff --git a/lib/logmsg.h b/lib/logmsg.h index 4f0e93f..d322141 100644 --- a/lib/logmsg.h +++ b/lib/logmsg.h @@ -113,7 +113,7 @@ enum typedef struct _LogMessageQueueNode { - struct list_head list; + struct iv_list_head list; LogMessage *msg; gboolean ack_needed:1, embedded:1; } LogMessageQueueNode; diff --git a/lib/logqueue-fifo.c b/lib/logqueue-fifo.c index 9c27a2b..f7a2386 100644 --- a/lib/logqueue-fifo.c +++ b/lib/logqueue-fifo.c @@ -75,18 +75,18 @@ typedef struct _LogQueueFifo LogQueue super; /* scalable qoverflow implementation */ - struct list_head qoverflow_output; - struct list_head qoverflow_wait; + struct iv_list_head qoverflow_output; + struct iv_list_head qoverflow_wait; gint qoverflow_wait_len; gint qoverflow_output_len; gint qoverflow_size; /* in number of elements */ - struct list_head qbacklog; /* entries that were sent but not acked yet */ + struct iv_list_head qbacklog; /* entries that were sent but not acked yet */ gint qbacklog_len; struct { - struct list_head items; + struct iv_list_head items; MainLoopIOWorkerFinishCallback cb; guint16 len; guint16 finish_cb_registered; @@ -150,10 +150,10 @@ log_queue_fifo_move_input_unlocked(LogQueueFifo *self, gint thread_id) for (i = 0; i < n; i++) { - LogMessageQueueNode *node = list_entry(self->qoverflow_input[thread_id].items.next, LogMessageQueueNode, list); + LogMessageQueueNode *node = iv_list_entry(self->qoverflow_input[thread_id].items.next, LogMessageQueueNode, list); LogMessage *msg = node->msg; - list_del(&node->list); + iv_list_del(&node->list); self->qoverflow_input[thread_id].len--; path_options.ack_needed = node->ack_needed; stats_counter_inc(self->super.dropped_messages); @@ -167,7 +167,7 @@ log_queue_fifo_move_input_unlocked(LogQueueFifo *self, gint thread_id) NULL); } stats_counter_add(self->super.stored_messages, self->qoverflow_input[thread_id].len); - list_splice_tail_init(&self->qoverflow_input[thread_id].items, &self->qoverflow_wait); + iv_list_splice_tail_init(&self->qoverflow_input[thread_id].items, &self->qoverflow_wait); self->qoverflow_wait_len += self->qoverflow_input[thread_id].len; self->qoverflow_input[thread_id].len = 0; } @@ -239,7 +239,7 @@ log_queue_fifo_push_tail(LogQueue *s, LogMessage *msg, const LogPathOptions *pat } node = log_msg_alloc_queue_node(msg, path_options); - list_add_tail(&node->list, &self->qoverflow_input[thread_id].items); + iv_list_add_tail(&node->list, &self->qoverflow_input[thread_id].items); self->qoverflow_input[thread_id].len++; log_msg_unref(msg); return; @@ -256,7 +256,7 @@ log_queue_fifo_push_tail(LogQueue *s, LogMessage *msg, const LogPathOptions *pat { node = log_msg_alloc_queue_node(msg, path_options); - list_add_tail(&node->list, &self->qoverflow_wait); + iv_list_add_tail(&node->list, &self->qoverflow_wait); self->qoverflow_wait_len++; log_queue_push_notify(&self->super); @@ -297,7 +297,7 @@ log_queue_fifo_push_head(LogQueue *s, LogMessage *msg, const LogPathOptions *pat log_queue_assert_output_thread(s); node = log_msg_alloc_dynamic_queue_node(msg, path_options); - list_add(&node->list, &self->qoverflow_output); + iv_list_add(&node->list, &self->qoverflow_output); self->qoverflow_output_len++; stats_counter_inc(self->super.stored_messages); @@ -323,7 +323,7 @@ log_queue_fifo_pop_head(LogQueue *s, LogMessage **msg, LogPathOptions *path_opti { /* slow path, output queue is empty, get some elements from the wait queue */ g_static_mutex_lock(&self->super.lock); - list_splice_tail_init(&self->qoverflow_wait, &self->qoverflow_output); + iv_list_splice_tail_init(&self->qoverflow_wait, &self->qoverflow_output); self->qoverflow_output_len = self->qoverflow_wait_len; self->qoverflow_wait_len = 0; g_static_mutex_unlock(&self->super.lock); @@ -331,19 +331,19 @@ log_queue_fifo_pop_head(LogQueue *s, LogMessage **msg, LogPathOptions *path_opti if (self->qoverflow_output_len > 0) { - node = list_entry(self->qoverflow_output.next, LogMessageQueueNode, list); + node = iv_list_entry(self->qoverflow_output.next, LogMessageQueueNode, list); *msg = node->msg; path_options->ack_needed = node->ack_needed; self->qoverflow_output_len--; if (!push_to_backlog) { - list_del(&node->list); + iv_list_del(&node->list); log_msg_free_queue_node(node); } else { - list_del_init(&node->list); + iv_list_del_init(&node->list); } } else @@ -363,7 +363,7 @@ log_queue_fifo_pop_head(LogQueue *s, LogMessage **msg, LogPathOptions *path_opti if (push_to_backlog) { log_msg_ref(*msg); - list_add_tail(&node->list, &self->qbacklog); + iv_list_add_tail(&node->list, &self->qbacklog); self->qbacklog_len++; } if (!ignore_throttle && self->super.throttle_buckets > 0) @@ -391,11 +391,11 @@ log_queue_fifo_ack_backlog(LogQueue *s, gint n) { LogMessageQueueNode *node; - node = list_entry(self->qbacklog.next, LogMessageQueueNode, list); + node = iv_list_entry(self->qbacklog.next, LogMessageQueueNode, list); msg = node->msg; path_options.ack_needed = node->ack_needed; - list_del(&node->list); + iv_list_del(&node->list); log_msg_free_queue_node(node); self->qbacklog_len--; @@ -422,23 +422,23 @@ log_queue_fifo_rewind_backlog(LogQueue *s) log_queue_assert_output_thread(s); - list_splice_tail_init(&self->qbacklog, &self->qoverflow_output); + iv_list_splice_tail_init(&self->qbacklog, &self->qoverflow_output); self->qoverflow_output_len += self->qbacklog_len; stats_counter_add(self->super.stored_messages, self->qbacklog_len); self->qbacklog_len = 0; } static void -log_queue_fifo_free_queue(struct list_head *q) +log_queue_fifo_free_queue(struct iv_list_head *q) { - while (!list_empty(q)) + while (!iv_list_empty(q)) { LogMessageQueueNode *node; LogPathOptions path_options = LOG_PATH_OPTIONS_INIT; LogMessage *msg; - node = list_entry(q->next, LogMessageQueueNode, list); - list_del(&node->list); + node = iv_list_entry(q->next, LogMessageQueueNode, list); + iv_list_del(&node->list); path_options.ack_needed = node->ack_needed; msg = node->msg; @@ -484,14 +484,14 @@ log_queue_fifo_new(gint qoverflow_size, const gchar *persist_name) for (i = 0; i < log_queue_max_threads; i++) { - INIT_LIST_HEAD(&self->qoverflow_input[i].items); + INIT_IV_LIST_HEAD(&self->qoverflow_input[i].items); main_loop_io_worker_finish_callback_init(&self->qoverflow_input[i].cb); self->qoverflow_input[i].cb.user_data = self; self->qoverflow_input[i].cb.func = log_queue_fifo_move_input; } - INIT_LIST_HEAD(&self->qoverflow_wait); - INIT_LIST_HEAD(&self->qoverflow_output); - INIT_LIST_HEAD(&self->qbacklog); + INIT_IV_LIST_HEAD(&self->qoverflow_wait); + INIT_IV_LIST_HEAD(&self->qoverflow_output); + INIT_IV_LIST_HEAD(&self->qbacklog); self->qoverflow_size = qoverflow_size; return &self->super; diff --git a/lib/logreader.c b/lib/logreader.c index 562edd9..5a35e06 100644 --- a/lib/logreader.c +++ b/lib/logreader.c @@ -329,9 +329,6 @@ log_reader_start_watches(LogReader *self) log_proto_prepare(self->proto, &fd, &cond); - if (self->pollable_state < 0 && fd >= 0) - self->pollable_state = iv_fd_pollable(fd); - if (self->options->follow_freq > 0) { /* follow freq specified (only the file source does that, go into timed polling */ @@ -345,18 +342,28 @@ log_reader_start_watches(LogReader *self) NULL); return FALSE; } - else if (self->pollable_state > 0) + else { /* we have an FD, it is possible to poll it, register it */ self->fd_watch.fd = fd; - iv_fd_register(&self->fd_watch); - } - else - { - msg_error("Unable to determine how to monitor this fd, follow_freq() not set and it is not possible to poll it with the current ivykis polling method, try changing IV_EXCLUDE_POLL_METHOD environment variable", - evt_tag_int("fd", fd), - NULL); - return FALSE; + if (self->pollable_state < 0) + { + if (iv_fd_register_try(&self->fd_watch) == 0) + self->pollable_state = 1; + else + self->pollable_state = 0; + } + else if (self->pollable_state > 0) + { + iv_fd_register(&self->fd_watch); + } + else + { + msg_error("Unable to determine how to monitor this fd, follow_freq() not set and it is not possible to poll it with the current ivykis polling method, try changing IV_EXCLUDE_POLL_METHOD environment variable", + evt_tag_int("fd", fd), + NULL); + return FALSE; + } } log_reader_update_watches(self); diff --git a/lib/logwriter.c b/lib/logwriter.c index 91a2b27..6d0c28f 100644 --- a/lib/logwriter.c +++ b/lib/logwriter.c @@ -418,19 +418,20 @@ log_writer_start_watches(LogWriter *self) { log_proto_prepare(self->proto, &fd, &cond); + self->fd_watch.fd = fd; + if (self->pollable_state < 0) { + /* auto-detect if the fd can be polled using ivykis */ if (is_file_regular(fd)) self->pollable_state = 0; + else if (iv_fd_register_try(&self->fd_watch) == 0) + self->pollable_state = 1; else - self->pollable_state = iv_fd_pollable(fd); - } - - if (self->pollable_state) - { - self->fd_watch.fd = fd; - iv_fd_register(&self->fd_watch); + self->pollable_state = 0; } + else if (self->pollable_state > 0) + iv_fd_register(&self->fd_watch); log_writer_update_watches(self); self->watches_running = TRUE; diff --git a/lib/mainloop.c b/lib/mainloop.c index c9fc8b0..35a6836 100644 --- a/lib/mainloop.c +++ b/lib/mainloop.c @@ -116,7 +116,7 @@ static GlobalConfig *current_configuration; typedef struct _MainLoopTaskCallSite MainLoopTaskCallSite; struct _MainLoopTaskCallSite { - struct list_head list; + struct iv_list_head list; MainLoopTaskFunc func; gpointer user_data; gpointer result; @@ -135,7 +135,7 @@ TLS_BLOCK_START TLS_BLOCK_END; static GStaticMutex main_task_lock = G_STATIC_MUTEX_INIT; -static struct list_head main_task_queue = LIST_HEAD_INIT(main_task_queue); +static struct iv_list_head main_task_queue = IV_LIST_HEAD_INIT(main_task_queue); static struct iv_event main_task_posted; GThread *main_thread_handle; @@ -166,14 +166,14 @@ main_loop_call(MainLoopTaskFunc func, gpointer user_data, gboolean wait) } /* call_info.lock is no longer needed, since we're the only ones using call_info now */ - INIT_LIST_HEAD(&call_info.list); + INIT_IV_LIST_HEAD(&call_info.list); call_info.pending = TRUE; call_info.func = func; call_info.user_data = user_data; call_info.wait = wait; if (!call_info.cond) call_info.cond = g_cond_new(); - list_add(&call_info.list, &main_task_queue); + iv_list_add(&call_info.list, &main_task_queue); iv_event_post(&main_task_posted); if (wait) { @@ -188,13 +188,13 @@ static void main_loop_call_handler(gpointer user_data) { g_static_mutex_lock(&main_task_lock); - while (!list_empty(&main_task_queue)) + while (!iv_list_empty(&main_task_queue)) { MainLoopTaskCallSite *site; gpointer result; - site = list_entry(main_task_queue.next, MainLoopTaskCallSite, list); - list_del_init(&site->list); + site = iv_list_entry(main_task_queue.next, MainLoopTaskCallSite, list); + iv_list_del_init(&site->list); g_static_mutex_unlock(&main_task_lock); result = site->func(site->user_data); @@ -362,21 +362,21 @@ main_loop_io_worker_job_submit(MainLoopIOWorkerJob *self) static void main_loop_io_worker_job_start(MainLoopIOWorkerJob *self) { - struct list_head *lh, *lh2; + struct iv_list_head *lh, *lh2; g_assert(main_loop_current_job == NULL); main_loop_current_job = self; self->work(self->user_data); - list_for_each_safe(lh, lh2, &self->finish_callbacks) + iv_list_for_each_safe(lh, lh2, &self->finish_callbacks) { - MainLoopIOWorkerFinishCallback *cb = list_entry(lh, MainLoopIOWorkerFinishCallback, list); + MainLoopIOWorkerFinishCallback *cb = iv_list_entry(lh, MainLoopIOWorkerFinishCallback, list); cb->func(cb->user_data); - list_del_init(&cb->list); + iv_list_del_init(&cb->list); } - g_assert(list_empty(&self->finish_callbacks)); + g_assert(iv_list_empty(&self->finish_callbacks)); main_loop_current_job = NULL; } @@ -432,7 +432,7 @@ main_loop_io_worker_register_finish_callback(MainLoopIOWorkerFinishCallback *cb) { g_assert(main_loop_current_job != NULL); - list_add(&cb->list, &main_loop_current_job->finish_callbacks); + iv_list_add(&cb->list, &main_loop_current_job->finish_callbacks); } void @@ -442,7 +442,7 @@ main_loop_io_worker_job_init(MainLoopIOWorkerJob *self) self->work_item.cookie = self; self->work_item.work = (void (*)(void *)) main_loop_io_worker_job_start; self->work_item.completion = (void (*)(void *)) main_loop_io_worker_job_complete; - INIT_LIST_HEAD(&self->finish_callbacks); + INIT_IV_LIST_HEAD(&self->finish_callbacks); } static void @@ -705,26 +705,26 @@ main_loop_run(void) IV_SIGNAL_INIT(&sighup_poll); sighup_poll.signum = SIGHUP; - sighup_poll.exclusive = 1; + sighup_poll.flags = IV_SIGNAL_FLAG_EXCLUSIVE; sighup_poll.cookie = NULL; sighup_poll.handler = sig_hup_handler; iv_signal_register(&sighup_poll); IV_SIGNAL_INIT(&sigchild_poll); sigchild_poll.signum = SIGCHLD; - sigchild_poll.exclusive = 1; + sigchild_poll.flags = IV_SIGNAL_FLAG_EXCLUSIVE; sigchild_poll.handler = sig_child_handler; iv_signal_register(&sigchild_poll); IV_SIGNAL_INIT(&sigterm_poll); sigterm_poll.signum = SIGTERM; - sigterm_poll.exclusive = 1; + sigterm_poll.flags = IV_SIGNAL_FLAG_EXCLUSIVE; sigterm_poll.handler = sig_term_handler; iv_signal_register(&sigterm_poll); IV_SIGNAL_INIT(&sigint_poll); sigint_poll.signum = SIGINT; - sigint_poll.exclusive = 1; + sigint_poll.flags = IV_SIGNAL_FLAG_EXCLUSIVE; sigint_poll.handler = sig_term_handler; iv_signal_register(&sigint_poll); diff --git a/lib/mainloop.h b/lib/mainloop.h index c432718..cb02d8a 100644 --- a/lib/mainloop.h +++ b/lib/mainloop.h @@ -36,7 +36,7 @@ typedef gpointer (*MainLoopTaskFunc)(gpointer user_data); typedef struct _MainLoopIOWorkerFinishCallback { - struct list_head list; + struct iv_list_head list; MainLoopTaskFunc func; gpointer user_data; } MainLoopIOWorkerFinishCallback; @@ -44,7 +44,7 @@ typedef struct _MainLoopIOWorkerFinishCallback static inline void main_loop_io_worker_finish_callback_init(MainLoopIOWorkerFinishCallback *self) { - INIT_LIST_HEAD(&self->list); + INIT_IV_LIST_HEAD(&self->list); } typedef struct _MainLoopIOWorkerJob @@ -56,7 +56,7 @@ typedef struct _MainLoopIOWorkerJob struct iv_work_item work_item; /* function to be called back when the current job is finished. */ - struct list_head finish_callbacks; + struct iv_list_head finish_callbacks; } MainLoopIOWorkerJob; static inline gboolean -- 1.7.10