Re: [syslog-ng] Filtering duplicate messages
Here's a rough cut of my patch. It still needs a bit of cleanup and I have yet to implement the time-based behaviour. I just wanted to get some feedback. One particular thing I wanted to clarify is where in the log message pipeline would best to attach this behaviour to? I've currently attached it to the LogWriter. The disadvantage of that is that the new message I generate does not get passed through the filters or templates (and we've just done a bunch of wasted processing on a message that gets dropped) The behaviour implemented is as follows. As a log message is output to a destination it is remembered as the last message seen. When a new message is to be output it is checked against the last message seen. If its contents (message text, including hostname and pid if present) are the same the message is dropped and a couter incremented. If the message is different to the last message seen a new log message indicating the value of the last message counter will be inserted into the queue and the new message will be processed. TODO: Implement a time based flush of last seen messages --- src/cfg-grammar.y | 3 +- src/cfg-lex.l | 3 +- src/logmsg.c | 26 +++++++++++++++++++++++ src/logmsg.h | 2 + src/logwriter.c | 59 ++++++++++++++++++++++++++++++++++++++++++++++++++++- src/logwriter.h | 9 ++++++++ 6 files changed, 99 insertions(+), 3 deletions(-) diff --git a/src/cfg-grammar.y b/src/cfg-grammar.y index 39ff172..386c231 100644 --- a/src/cfg-grammar.y +++ b/src/cfg-grammar.y @@ -62,7 +62,7 @@ gint last_addr_family = AF_INET; %token KW_USER KW_DOOR KW_SUN_STREAMS KW_PROGRAM /* option items */ -%token KW_FSYNC KW_MARK_FREQ KW_STATS_FREQ KW_FLUSH_LINES KW_FLUSH_TIMEOUT KW_LOG_MSG_SIZE KW_FILE_TEMPLATE KW_PROTO_TEMPLATE +%token KW_FSYNC KW_MARK_FREQ KW_STATS_FREQ KW_FLUSH_LINES KW_SUPPRESS KW_FLUSH_TIMEOUT KW_LOG_MSG_SIZE KW_FILE_TEMPLATE KW_PROTO_TEMPLATE %token KW_CHAIN_HOSTNAMES KW_NORMALIZE_HOSTNAMES KW_KEEP_HOSTNAME KW_CHECK_HOSTNAME KW_BAD_HOSTNAME %token KW_KEEP_TIMESTAMP @@ -671,6 +671,7 @@ dest_writer_option : KW_FLAGS '(' dest_writer_options_flags ')' { last_writer_options->options = $3; } | KW_LOG_FIFO_SIZE '(' NUMBER ')' { last_writer_options->fifo_size = $3; } | KW_FLUSH_LINES '(' NUMBER ')' { last_writer_options->flush_lines = $3; } + | KW_SUPPRESS '(' yesno ')' { last_writer_options->suppress= $3; } | KW_FLUSH_TIMEOUT '(' NUMBER ')' { last_writer_options->flush_timeout = $3; } | KW_TEMPLATE '(' string ')' { last_writer_options->template = cfg_lookup_template(configuration, $3); diff --git a/src/cfg-lex.l b/src/cfg-lex.l index 5c5f54f..4eb0c63 100644 --- a/src/cfg-lex.l +++ b/src/cfg-lex.l @@ -6,7 +6,7 @@ /*************************************************************************** * - * Copyright (c) 1999 Balzs Scheidler + * Copyright (c) 1999 Bal�zs Scheidler * Copyright (c) 1999-2007 BalaBit IT Ltd. * * This program is free software; you can redistribute it and/or modify @@ -86,6 +86,7 @@ static struct keyword keywords[] = { { "flush_timeout", KW_FLUSH_TIMEOUT }, { "sync_freq", KW_FLUSH_LINES }, /* obsolete */ { "sync", KW_FLUSH_LINES }, /* obsolete */ + { "suppress", KW_SUPPRESS }, { "fsync", KW_FSYNC }, { "long_hostnames", KW_CHAIN_HOSTNAMES }, { "chain_hostnames", KW_CHAIN_HOSTNAMES }, diff --git a/src/logmsg.c b/src/logmsg.c index 05354b5..4da3af5 100644 --- a/src/logmsg.c +++ b/src/logmsg.c @@ -766,3 +766,29 @@ log_msg_drop(LogMessage *m, guint path_flags) log_msg_ack(m); log_msg_unref(m); } + +/** + * log_msg_dup: + * @m: LogMessage instance + * + * This function duplicates an existing log message. The new message is not parsed. + */ +LogMessage * +log_msg_dup(LogMessage *m) +{ + LogMessage *nm = g_new0(LogMessage, 1); + + log_msg_init(nm, m->saddr); + nm->flags = m->flags; + nm->pri = m->pri; + g_string_assign_len(&nm->date, m->date.str, m->date.len); + g_string_assign_len(&nm->host, m->host.str, m->host.len); + g_string_assign_len(&nm->host_from, m->host_from.str, m->host_from.len); + g_string_assign_len(&nm->program, m->program.str, m->program.len); + g_string_assign_len(&nm->msg, m->msg.str, m->msg.len); + memcpy(&nm->stamp, &m->stamp, sizeof(LogStamp)); + memcpy(&nm->recvd, &m->recvd, sizeof(LogStamp)); + + return nm; +} + diff --git a/src/logmsg.h b/src/logmsg.h index 420a711..9dbb886 100644 --- a/src/logmsg.h +++ b/src/logmsg.h @@ -107,6 +107,8 @@ void log_msg_ack_block_end(LogMessage *m); void log_msg_ack(LogMessage *msg); void log_msg_drop(LogMessage *msg, guint path_flags); void log_msg_clear_matches(LogMessage *self); +LogMessage *log_msg_dup(LogMessage *m); + gchar *log_msg_find_cr_or_lf(gchar *s, gsize n); diff --git a/src/logwriter.c b/src/logwriter.c index bb82b43..4843100 100644 --- a/src/logwriter.c +++ b/src/logwriter.c @@ -178,11 +178,63 @@ log_writer_watch_new(LogWriter *writer, FDWrite *fd) return &self->super; } +static gboolean +log_writer_supress_duplicate(LogWriter *self, LogMessage *lm, gint path_flags) +{ + if (!self->options->suppress) + return FALSE; + + if(self->suppress.hash) + { + guint hash = g_string_hash(&lm->msg); + if(self->suppress.hash == hash) + { + msg_debug("Dropping duplicate message", + NULL); + self->suppress.count ++; + log_msg_drop(lm, path_flags); + return TRUE; + } + + if (self->suppress.count) + { + gchar *buf; + LogMessage *m; + GString *line = g_string_sized_new(128); + + buf = g_strdup_printf("<%d> syslog-ng[%d]: Last message '%.20s' repeated %d times\n", + self->suppress.msg->pri, + getpid(), + self->suppress.msg->msg.str, + self->suppress.count); + m = log_msg_new(buf, strlen(buf), self->suppress.msg->saddr, 0, NULL); + g_queue_push_tail(self->queue, m); + g_queue_push_tail(self->queue, GUINT_TO_POINTER(0x80000000)); + g_free(buf); + } + log_msg_drop(self->suppress.msg, 0); + self->suppress.msg = log_msg_dup(lm); + self->suppress.hash = g_string_hash(&lm->msg); + self->suppress.count = 0; + } + else + { + self->suppress.msg = log_msg_dup(lm); + self->suppress.hash = g_string_hash(&lm->msg); + self->suppress.count = 0; + } + + return FALSE; +} + static void log_writer_queue(LogPipe *s, LogMessage *lm, gint path_flags) { LogWriter *self = (LogWriter *) s; - + + if(log_writer_supress_duplicate(self, lm, path_flags)) + return; + if ((self->queue->length / 2) == self->options->fifo_size) { /* drop incoming message, we must ack here, otherwise the sender might @@ -462,6 +514,9 @@ log_writer_new(guint32 flags, LogPipe *control, LogWriterOptions *options) self->queue = g_queue_new(); self->flags = flags; self->control = control; + self->suppress.msg = NULL; + self->suppress.hash = 0; + self->suppress.count = 0; return &self->super; } @@ -476,6 +531,8 @@ log_writer_options_defaults(LogWriterOptions *options) options->ts_format = -1; options->zone_offset = -1; options->frac_digits = -1; + options->suppress = FALSE; + } void diff --git a/src/logwriter.h b/src/logwriter.h index 284899b..d3a136f 100644 --- a/src/logwriter.h +++ b/src/logwriter.h @@ -41,6 +41,13 @@ /* several writers use the same counter */ #define LWOF_SHARE_STATS 0x0002 +typedef struct _LogSuppress +{ + LogMessage *msg; + guint hash; + guint32 count; +} LogSuppress; + glong zone_offset; gshort frac_digits; + gboolean suppress; } LogWriterOptions; typedef struct _LogWriter @@ -79,6 +87,7 @@ typedef struct _LogWriter gint partial_pos; LogPipe *control; LogWriterOptions *options; + LogSuppress suppress; } LogWriter; void log_writer_set_options(LogWriter *self, LogPipe *control, LogWriterOptions *options); -- 1.5.4.5
Hi Chris, Thanks for your contribution. Please find my comments in-line. Please note that in order to get this integrated into syslog-ng, you'll need to sign a "Contributory License Agreement" which is available on our website here: http://www.balabit.com/network-security/syslog-ng/opensource-logging-system/... Thanks. So after the legalese, let's see your notes and patch below. On Thu, 2008-05-29 at 12:57 +1200, chris packham wrote:
Here's a rough cut of my patch. It still needs a bit of cleanup and I have yet to implement the time-based behaviour. I just wanted to get some feedback.
One particular thing I wanted to clarify is where in the log message pipeline would best to attach this behaviour to? I've currently attached it to the LogWriter. The disadvantage of that is that the new message I generate does not get passed through the filters or templates (and we've just done a bunch of wasted processing on a message that gets dropped)
There are basically three options: 1) sources 2) as a processing element between sources/destinations (for example filters, LogProcess in syslog-ng 2.2) 3) in the LogWriter All three has pros and cons: 1) suppress on input: pros: * drop messages as early as possible * every destination would receive the same suppressions from the same host cons: * would need a host-name indexed hashtable to store host specific suppression info, this needs memory and CPU 2) suppress as a processing element: pros: * probably the most flexible, the user could choose which log pipes have suppressions and which don't cons: * it requires the same amount of state as suppressing on input * it is more complex than any other alternative and the flexibility it provides is probably not needed 3) suppress in the LogWriter pros: * the easiest to implement * no need for per-host state, it uses a per-destination state but that's already available (every destination has a separate writer) cons: * different destinations will have a different set of suppressions depending on the set of sources that feed the given destination I'd say that doing it in LogWriter is a sane choice. We could do better but that'd cost us memory and CPU for an added flexibility which is probably not needed.
The behaviour implemented is as follows.
As a log message is output to a destination it is remembered as the last message seen.
When a new message is to be output it is checked against the last message seen. If its contents (message text, including hostname and pid if present) are the same the message is dropped and a couter incremented. If the message is different to the last message seen a new log message indicating the value of the last message counter will be inserted into the queue and the new message will be processed.
TODO: Implement a time based flush of last seen messages
--- src/cfg-grammar.y | 3 +- src/cfg-lex.l | 3 +- src/logmsg.c | 26 +++++++++++++++++++++++ src/logmsg.h | 2 + src/logwriter.c | 59 ++++++++++++++++++++++++++++++++++++++++++++++++++++- src/logwriter.h | 9 ++++++++ 6 files changed, 99 insertions(+), 3 deletions(-)
diff --git a/src/cfg-grammar.y b/src/cfg-grammar.y index 39ff172..386c231 100644 --- a/src/cfg-grammar.y +++ b/src/cfg-grammar.y @@ -62,7 +62,7 @@ gint last_addr_family = AF_INET; %token KW_USER KW_DOOR KW_SUN_STREAMS KW_PROGRAM
/* option items */ -%token KW_FSYNC KW_MARK_FREQ KW_STATS_FREQ KW_FLUSH_LINES KW_FLUSH_TIMEOUT KW_LOG_MSG_SIZE KW_FILE_TEMPLATE KW_PROTO_TEMPLATE +%token KW_FSYNC KW_MARK_FREQ KW_STATS_FREQ KW_FLUSH_LINES KW_SUPPRESS KW_FLUSH_TIMEOUT KW_LOG_MSG_SIZE KW_FILE_TEMPLATE KW_PROTO_TEMPLATE
%token KW_CHAIN_HOSTNAMES KW_NORMALIZE_HOSTNAMES KW_KEEP_HOSTNAME KW_CHECK_HOSTNAME KW_BAD_HOSTNAME %token KW_KEEP_TIMESTAMP @@ -671,6 +671,7 @@ dest_writer_option : KW_FLAGS '(' dest_writer_options_flags ')' { last_writer_options->options = $3; } | KW_LOG_FIFO_SIZE '(' NUMBER ')' { last_writer_options->fifo_size = $3; } | KW_FLUSH_LINES '(' NUMBER ')' { last_writer_options->flush_lines = $3; } + | KW_SUPPRESS '(' yesno ')' { last_writer_options->suppress= $3; } | KW_FLUSH_TIMEOUT '(' NUMBER ')' { last_writer_options->flush_timeout = $3; } | KW_TEMPLATE '(' string ')' {
last_writer_options->template = cfg_lookup_template(configuration, $3);
ok.
diff --git a/src/cfg-lex.l b/src/cfg-lex.l index 5c5f54f..4eb0c63 100644 --- a/src/cfg-lex.l +++ b/src/cfg-lex.l @@ -6,7 +6,7 @@
/*************************************************************************** * - * Copyright (c) 1999 Balzs Scheidler + * Copyright (c) 1999 Bal�zs Scheidler * Copyright (c) 1999-2007 BalaBit IT Ltd. * * This program is free software; you can redistribute it and/or modify @@ -86,6 +86,7 @@ static struct keyword keywords[] = { { "flush_timeout", KW_FLUSH_TIMEOUT }, { "sync_freq", KW_FLUSH_LINES }, /* obsolete */ { "sync", KW_FLUSH_LINES }, /* obsolete */ + { "suppress", KW_SUPPRESS }, { "fsync", KW_FSYNC }, { "long_hostnames", KW_CHAIN_HOSTNAMES }, { "chain_hostnames", KW_CHAIN_HOSTNAMES },
ok.
diff --git a/src/logmsg.c b/src/logmsg.c index 05354b5..4da3af5 100644 --- a/src/logmsg.c +++ b/src/logmsg.c @@ -766,3 +766,29 @@ log_msg_drop(LogMessage *m, guint path_flags) log_msg_ack(m); log_msg_unref(m); } + +/** + * log_msg_dup: + * @m: LogMessage instance + * + * This function duplicates an existing log message. The new message is not parsed. + */ +LogMessage * +log_msg_dup(LogMessage *m) +{ + LogMessage *nm = g_new0(LogMessage, 1); + + log_msg_init(nm, m->saddr); + nm->flags = m->flags; + nm->pri = m->pri; + g_string_assign_len(&nm->date, m->date.str, m->date.len); + g_string_assign_len(&nm->host, m->host.str, m->host.len); + g_string_assign_len(&nm->host_from, m->host_from.str, m->host_from.len); + g_string_assign_len(&nm->program, m->program.str, m->program.len); + g_string_assign_len(&nm->msg, m->msg.str, m->msg.len); + memcpy(&nm->stamp, &m->stamp, sizeof(LogStamp)); + memcpy(&nm->recvd, &m->recvd, sizeof(LogStamp)); + + return nm; +}
I don't think this is needed, but see below.
+ diff --git a/src/logmsg.h b/src/logmsg.h index 420a711..9dbb886 100644 --- a/src/logmsg.h +++ b/src/logmsg.h @@ -107,6 +107,8 @@ void log_msg_ack_block_end(LogMessage *m); void log_msg_ack(LogMessage *msg); void log_msg_drop(LogMessage *msg, guint path_flags); void log_msg_clear_matches(LogMessage *self); +LogMessage *log_msg_dup(LogMessage *m); +
gchar *log_msg_find_cr_or_lf(gchar *s, gsize n);
diff --git a/src/logwriter.c b/src/logwriter.c index bb82b43..4843100 100644 --- a/src/logwriter.c +++ b/src/logwriter.c @@ -178,11 +178,63 @@ log_writer_watch_new(LogWriter *writer, FDWrite *fd) return &self->super; }
+static gboolean +log_writer_supress_duplicate(LogWriter *self, LogMessage *lm, gint path_flags) +{ + if (!self->options->suppress) + return FALSE; + + if(self->suppress.hash) + { + guint hash = g_string_hash(&lm->msg); + if(self->suppress.hash == hash) + { + msg_debug("Dropping duplicate message", + NULL); + self->suppress.count ++; + log_msg_drop(lm, path_flags); + return TRUE; + }
I don't think g_string_hash buys us anything, in fact it might well be slower than doing a direct string comparison (strcmp is performed in chunks of sizeof(long) bytes, g_string_hash iterates over characters individually). You mentioned that you are also checking the hostname, which is not the case, lm->msg contains the application name and pid only. I think we should also check that.
+ + if (self->suppress.count) + { + gchar *buf; + LogMessage *m; + GString *line = g_string_sized_new(128); + + buf = g_strdup_printf("<%d> syslog-ng[%d]: Last message '%.20s' repeated %d times\n", + self->suppress.msg->pri, + getpid(), + self->suppress.msg->msg.str, + self->suppress.count);
we'd probably need a log_msg_new_internal() function, that would produce the same message without having to parse it. This and the code in messages.c would be a direct user of that function.
+ m = log_msg_new(buf, strlen(buf), self->suppress.msg->saddr, 0, NULL); + g_queue_push_tail(self->queue, m); + g_queue_push_tail(self->queue, GUINT_TO_POINTER(0x80000000)); + g_free(buf); + } + log_msg_drop(self->suppress.msg, 0); + self->suppress.msg = log_msg_dup(lm);
As a LogMessage is read-only (well most of, and I'm directing syslog-ng 2.2 so that LogMessage is going to be read only), I think it is enough to store a reference and no need to copy the complete message. This is too expensive.
+ self->suppress.hash = g_string_hash(&lm->msg); + self->suppress.count = 0; + } + else + { + self->suppress.msg = log_msg_dup(lm); + self->suppress.hash = g_string_hash(&lm->msg); + self->suppress.count = 0; + } + + return FALSE; +} + static void log_writer_queue(LogPipe *s, LogMessage *lm, gint path_flags) { LogWriter *self = (LogWriter *) s; - + + if(log_writer_supress_duplicate(self, lm, path_flags)) + return; + if ((self->queue->length / 2) == self->options->fifo_size) { /* drop incoming message, we must ack here, otherwise the sender might @@ -462,6 +514,9 @@ log_writer_new(guint32 flags, LogPipe *control, LogWriterOptions *options) self->queue = g_queue_new(); self->flags = flags; self->control = control; + self->suppress.msg = NULL; + self->suppress.hash = 0; + self->suppress.count = 0; return &self->super; }
@@ -476,6 +531,8 @@ log_writer_options_defaults(LogWriterOptions *options) options->ts_format = -1; options->zone_offset = -1; options->frac_digits = -1; + options->suppress = FALSE; + }
void diff --git a/src/logwriter.h b/src/logwriter.h index 284899b..d3a136f 100644 --- a/src/logwriter.h +++ b/src/logwriter.h @@ -41,6 +41,13 @@ /* several writers use the same counter */ #define LWOF_SHARE_STATS 0x0002
+typedef struct _LogSuppress +{ + LogMessage *msg; + guint hash; + guint32 count; +} LogSuppress;
As I've removed hash from the picture, I don't think we need a separate structure for this. Just add two fields in LogWriter.
+ glong zone_offset; gshort frac_digits; + gboolean suppress; } LogWriterOptions;
typedef struct _LogWriter @@ -79,6 +87,7 @@ typedef struct _LogWriter gint partial_pos; LogPipe *control; LogWriterOptions *options; + LogSuppress suppress; } LogWriter;
void log_writer_set_options(LogWriter *self, LogPipe *control, LogWriterOptions *options); -- 1.5.4.5
______________________________________________________________________________ Member info: https://lists.balabit.hu/mailman/listinfo/syslog-ng Documentation: http://www.balabit.com/support/documentation/?product=syslog-ng FAQ: http://www.campin.net/syslog-ng/faq.html
-- Bazsi
participants (2)
-
Balazs Scheidler
-
chris packham