Re: [syslog-ng] Filtering duplicate messages
Here's the updated patch. I've ditched the "suppress" name in favour of the more appropriate "summary" option and "last_msg" function naming. References are used instead of making a copy of the last message. Timer based flushing is now implemented. Implement "Last message repeated N times" functionality. This behaviour can be enabled by adding the new "summary(<num>)" option to an output configuration e.g. destination tologfile { file("/var/log/messages" template(t) summary(30)); }; As a log message is added to the queue it is remembered as the last message seen. When a new message is added to the queue it is checked against the last message. If its contents are the same the message is dropped and a counter incremented. A message summary indicating the value of the last message counter and a snippet of the message will be inserted into the log queue if a new message that differs is seen or if the configurable timeout period expires. --- src/cfg-grammar.y | 3 +- src/cfg-lex.l | 3 +- src/logmsg.c | 19 ++++++++++ src/logmsg.h | 2 +- src/logwriter.c | 100 ++++++++++++++++++++++++++++++++++++++++++++++++++++- src/logwriter.h | 5 +++ src/messages.c | 4 +-- 7 files changed, 129 insertions(+), 7 deletions(-) diff --git a/src/cfg-grammar.y b/src/cfg-grammar.y index 39ff172..99b033f 100644 --- a/src/cfg-grammar.y +++ b/src/cfg-grammar.y @@ -62,7 +62,7 @@ gint last_addr_family = AF_INET; %token KW_USER KW_DOOR KW_SUN_STREAMS KW_PROGRAM /* option items */ -%token KW_FSYNC KW_MARK_FREQ KW_STATS_FREQ KW_FLUSH_LINES KW_FLUSH_TIMEOUT KW_LOG_MSG_SIZE KW_FILE_TEMPLATE KW_PROTO_TEMPLATE +%token KW_FSYNC KW_MARK_FREQ KW_STATS_FREQ KW_FLUSH_LINES KW_SUMMARY KW_FLUSH_TIMEOUT KW_LOG_MSG_SIZE KW_FILE_TEMPLATE KW_PROTO_TEMPLATE %token KW_CHAIN_HOSTNAMES KW_NORMALIZE_HOSTNAMES KW_KEEP_HOSTNAME KW_CHECK_HOSTNAME KW_BAD_HOSTNAME %token KW_KEEP_TIMESTAMP @@ -671,6 +671,7 @@ dest_writer_option : KW_FLAGS '(' dest_writer_options_flags ')' { last_writer_options->options = $3; } | KW_LOG_FIFO_SIZE '(' NUMBER ')' { last_writer_options->fifo_size = $3; } | KW_FLUSH_LINES '(' NUMBER ')' { last_writer_options->flush_lines = $3; } + | KW_SUMMARY '(' NUMBER ')' { last_writer_options->summary= $3; } | KW_FLUSH_TIMEOUT '(' NUMBER ')' { last_writer_options->flush_timeout = $3; } | KW_TEMPLATE '(' string ')' { last_writer_options->template = cfg_lookup_template(configuration, $3); diff --git a/src/cfg-lex.l b/src/cfg-lex.l index 5c5f54f..d8418b6 100644 --- a/src/cfg-lex.l +++ b/src/cfg-lex.l @@ -6,7 +6,7 @@ /*************************************************************************** * - * Copyright (c) 1999 Balzs Scheidler + * Copyright (c) 1999 Bal�zs Scheidler * Copyright (c) 1999-2007 BalaBit IT Ltd. * * This program is free software; you can redistribute it and/or modify @@ -86,6 +86,7 @@ static struct keyword keywords[] = { { "flush_timeout", KW_FLUSH_TIMEOUT }, { "sync_freq", KW_FLUSH_LINES }, /* obsolete */ { "sync", KW_FLUSH_LINES }, /* obsolete */ + { "summary", KW_SUMMARY }, { "fsync", KW_FSYNC }, { "long_hostnames", KW_CHAIN_HOSTNAMES }, { "chain_hostnames", KW_CHAIN_HOSTNAMES }, diff --git a/src/logmsg.c b/src/logmsg.c index 05354b5..9219a01 100644 --- a/src/logmsg.c +++ b/src/logmsg.c @@ -653,6 +653,25 @@ log_msg_new(gchar *msg, gint length, GSockAddr *saddr, guint flags, regex_t *bad } /** + * log_msg_new_internal: + * @prio: message priority (LOG_*) + * @msg: message text + * @flags: parse flags (LP_*) + * + * This function creates a new log message for messages originating + * internally to syslog-ng + **/ +LogMessage * +log_msg_new_internal(gint prio, gchar *msg, guint flags) +{ + gchar *buf = g_strdup_printf("<%d> syslog-ng[%d]: %s\n", prio, getpid(), msg); + LogMessage *self = log_msg_new(buf, s trlen(buf), NULL, flags, NULL); + g_free(buf); + + return self; +} + +/** * log_msg_new_mark: * * This function returns a new MARK message. MARK messages have the LF_MARK diff --git a/src/logmsg.h b/src/logmsg.h index 420a711..d780282 100644 --- a/src/logmsg.h +++ b/src/logmsg.h @@ -100,7 +100,7 @@ void log_msg_unref(LogMessage *m); LogMessage *log_msg_new(gchar *msg, gint length, GSockAddr *saddr, guint flags, regex_t *bad_hostname); LogMessage *log_msg_new_mark(void); - +LogMessage *log_msg_new_internal(gint prio, gchar *msg, guint flags); void log_msg_ack_block_inc(LogMessage *m); void log_msg_ack_block_start(LogMessage *m, LMAckFunc func, gpointer user_data); void log_msg_ack_block_end(LogMessage *m); diff --git a/src/logwriter.c b/src/logwriter.c index bb82b43..864b193 100644 --- a/src/logwriter.c +++ b/src/logwriter.c @@ -179,10 +179,104 @@ log_writer_watch_new(LogWriter *writer, FDWrite *fd) } static void +log_writer_last_msg_release(LogWriter *self) +{ + if(self->last_msg_timerid) + g_source_remove(self->last_msg_timerid); + + if(self->last_msg) + log_msg_unref(self->last_msg); + + self->last_msg = NULL; + self->last_msg_count = 0; + self->last_msg_timerid = 0; +} + +static void +log_writer_last_msg_flush(LogWriter *self) +{ + gchar *msg; + LogMessage *m; + + msg = g_strdup_printf("Last message '%.20s' repeated %d times\n", + self->last_msg->msg.str, + self->last_msg_count); + + m = log_msg_new_internal(self->last_msg->pri, msg, 0); + + g_string_assign(&m->host,self->last_msg->host.str ); + g_string_assign(&m->host_from,self->last_msg->host_from.str ); + g_queue_push_tail(self->queue, m); + g_queue_push_tail(self->queue, GUINT_TO_POINTER(0x80000000)); + g_free(msg); + + log_writer_last_msg_release(self); +} + +static gboolean +last_msg_timer(gpointer pt) +{ + LogWriter *self = (LogWriter *)pt; + + if(self->last_msg_count) + log_writer_last_msg_flush(self); + else + log_writer_last_msg_release(self); + + return FALSE; +} + +static void +log_writer_last_msg_record(LogWriter *self, LogMessage *lm) +{ + if(self->last_msg) + log_msg_unref(self->last_msg); + + log_msg_ref(lm); + self->last_msg = lm; + self->last_msg_count = 0; + self->last_msg_timerid = g_timeout_add(self->options->summary * 1000, last_msg_timer, self); +} + +static gboolean +log_writer_last_msg_check(LogWriter *self, LogMessage *lm, gint path_flags) +{ + if (self->options->summary <= 0) + return FALSE; + + if(self->last_msg) + { + if(strcmp(self->last_msg->msg.str, lm->msg.str) == 0 && + strcmp(self->last_msg->host.str, lm->host.str) == 0) + { + if (self->dropped_messages) + (*self->dropped_messages)++; + msg_debug("Dropping duplicate message", + NULL); + self->last_msg_count++; + log_msg_drop(lm, path_flags); + return TRUE; + } + + if (self->last_msg_count) + log_writer_last_msg_flush(self); + else + log_writer_last_msg_release(self); + } + + log_writer_last_msg_record(self, lm); + + return FALSE; +} + +static void log_writer_queue(LogPipe *s, LogMessage *lm, gint path_flags) { LogWriter *self = (LogWriter *) s; - + + if(log_writer_last_msg_check(self, lm, path_flags)) + return; + if ((self->queue->length / 2) == self->options->fifo_size) { /* drop incoming message, we must ack here, otherwise the sender might @@ -462,6 +556,9 @@ log_writer_new(guint32 flags, LogPipe *control, LogWriterOptions *options) self->queue = g_queue_new(); self->flags = flags; self->control = control; + self->last_msg = NULL; + self->last_msg_count = 0; + self->last_msg_timerid = 0; return &self->super; } @@ -476,6 +573,+ options->summary = 0; } void diff --git a/src/logwriter.h b/src/logwriter.h index 284899b..452693c 100644 --- a/ src/logwriter.h +++ b/src/logwriter.h @@ -66,6 +66,7 @@ typedef struct _LogWriterOptions gshort ts_format; glong zone_offset; gshort frac_digits; + gint summary; } LogWriterOptions; typedef struct _LogWriter @@ -79,6 +80,10 @@ typedef struct _LogWriter gint partial_pos; LogPipe *control; LogWriterOptions *options; + LogMessage *last_msg; + guint32 last_msg_count; + guint last_msg_timerid; + } LogWriter; void log_writer_set_options(LogWriter *self, LogPipe *control, LogWriterOptions *options); diff --git a/src/messages.c b/src/messages.c index 6c69f48..206206a 100644 --- a/src/messages.c +++ b/src/messages.c @@ -56,10 +56,8 @@ msg_send_internal_message(int prio, const char *msg) if (G_LIKELY(internal_msg_queue)) { - buf = g_strdup_printf("<%d> syslog-ng[%d]: %s\n", prio, getpid(), msg); - m = log_msg_new(buf, strlen(buf), NULL, LP_INTERNAL | LP_LOCAL, NULL); + m = log_msg_new_internal(prio, msg, LP_INTERNAL | LP_LOCAL); g_queue_push_tail(internal_msg_queue, m); - g_free(buf); } } } -- 1.5.4.5
Hi, On Fri, 2008-05-30 at 12:16 +1200, chris packham wrote:
Here's the updated patch. I've ditched the "suppress" name in favour of the more appropriate "summary" option and "last_msg" function naming. References are used instead of making a copy of the last message. Timer based flushing is now implemented.
I actually liked the "suppress" name better, people looking for this functionality will most probably look for this word in the documentation or via google. The patch generally looks nice, there are some minor nitpicks here and there, but I'll change those when integrating the patch, it's faster that way. I'll have to wait till the decision on the CLA question, and then integrate it. Release wise, I wanted to do a 2.1beta2 release today, but it would be nice to add this patch before that, and I'm going for Nordic Nagios Meet in Sweden next week. (some nice people organized a conference in Sweden and invited me to do a syslog-ng talk) I don't want to do another beta release after this one, so I'll hurry the legal guys and if I get a green light I'll do a release today or during the weekend. Thanks again for your contribution, the patch is nicely done. -- Bazsi
participants (2)
-
Balazs Scheidler
-
chris packham