[syslog-ng] Filtering duplicate messages
chris packham
chris.packham at alliedtelesis.co.nz
Fri May 30 02:16:11 CEST 2008
Here's the updated patch. I've ditched the "suppress" name in favour of
the more appropriate "summary" option and "last_msg" function naming.
References are used instead of making a copy of the last message. Timer
based flushing is now implemented.
Implement "Last message repeated N times" functionality.
This behaviour can be enabled by adding the new "summary(<num>)" option
to
an output configuration e.g.
destination tologfile { file("/var/log/messages" template(t)
summary(30)); };
As a log message is added to the queue it is remembered as the last
message
seen.
When a new message is added to the queue it is checked against the last
message. If its contents are the same the message is dropped and a
counter
incremented.
A message summary indicating the value of the last message counter and a
snippet of the message will be inserted into the log queue if a new
message
that differs is seen or if the configurable timeout period expires.
---
src/cfg-grammar.y | 3 +-
src/cfg-lex.l | 3 +-
src/logmsg.c | 19 ++++++++++
src/logmsg.h | 2 +-
src/logwriter.c | 100
++++++++++++++++++++++++++++++++++++++++++++++++++++-
src/logwriter.h | 5 +++
src/messages.c | 4 +--
7 files changed, 129 insertions(+), 7 deletions(-)
diff --git a/src/cfg-grammar.y b/src/cfg-grammar.y
index 39ff172..99b033f 100644
--- a/src/cfg-grammar.y
+++ b/src/cfg-grammar.y
@@ -62,7 +62,7 @@ gint last_addr_family = AF_INET;
%token KW_USER KW_DOOR KW_SUN_STREAMS KW_PROGRAM
/* option items */
-%token KW_FSYNC KW_MARK_FREQ KW_STATS_FREQ KW_FLUSH_LINES
KW_FLUSH_TIMEOUT KW_LOG_MSG_SIZE KW_FILE_TEMPLATE KW_PROTO_TEMPLATE
+%token KW_FSYNC KW_MARK_FREQ KW_STATS_FREQ KW_FLUSH_LINES KW_SUMMARY
KW_FLUSH_TIMEOUT KW_LOG_MSG_SIZE KW_FILE_TEMPLATE KW_PROTO_TEMPLATE
%token KW_CHAIN_HOSTNAMES KW_NORMALIZE_HOSTNAMES KW_KEEP_HOSTNAME
KW_CHECK_HOSTNAME KW_BAD_HOSTNAME
%token KW_KEEP_TIMESTAMP
@@ -671,6 +671,7 @@ dest_writer_option
: KW_FLAGS '(' dest_writer_options_flags ')' {
last_writer_options->options = $3; }
| KW_LOG_FIFO_SIZE '(' NUMBER ')' {
last_writer_options->fifo_size = $3; }
| KW_FLUSH_LINES '(' NUMBER ')' {
last_writer_options->flush_lines = $3; }
+ | KW_SUMMARY '(' NUMBER ')' {
last_writer_options->summary= $3; }
| KW_FLUSH_TIMEOUT '(' NUMBER ')' {
last_writer_options->flush_timeout = $3; }
| KW_TEMPLATE '(' string ')' {
last_writer_options->template = cfg_lookup_template(configuration, $3);
diff --git a/src/cfg-lex.l b/src/cfg-lex.l
index 5c5f54f..d8418b6 100644
--- a/src/cfg-lex.l
+++ b/src/cfg-lex.l
@@ -6,7 +6,7 @@
/***************************************************************************
*
- * Copyright (c) 1999 Balzs Scheidler
+ * Copyright (c) 1999 Bal�zs Scheidler
* Copyright (c) 1999-2007 BalaBit IT Ltd.
*
* This program is free software; you can redistribute it and/or modify
@@ -86,6 +86,7 @@ static struct keyword keywords[] = {
{ "flush_timeout", KW_FLUSH_TIMEOUT },
{ "sync_freq", KW_FLUSH_LINES }, /* obsolete */
{ "sync", KW_FLUSH_LINES }, /* obsolete */
+ { "summary", KW_SUMMARY },
{ "fsync", KW_FSYNC },
{ "long_hostnames", KW_CHAIN_HOSTNAMES },
{ "chain_hostnames", KW_CHAIN_HOSTNAMES },
diff --git a/src/logmsg.c b/src/logmsg.c
index 05354b5..9219a01 100644
--- a/src/logmsg.c
+++ b/src/logmsg.c
@@ -653,6 +653,25 @@ log_msg_new(gchar *msg, gint length, GSockAddr
*saddr, guint flags, regex_t *bad
}
/**
+ * log_msg_new_internal:
+ * @prio: message priority (LOG_*)
+ * @msg: message text
+ * @flags: parse flags (LP_*)
+ *
+ * This function creates a new log message for messages originating
+ * internally to syslog-ng
+ **/
+LogMessage *
+log_msg_new_internal(gint prio, gchar *msg, guint flags)
+{
+ gchar *buf = g_strdup_printf("<%d> syslog-ng[%d]: %s\n", prio,
getpid(), msg);
+ LogMessage *self = log_msg_new(buf, s
trlen(buf), NULL, flags, NULL);
+ g_free(buf);
+
+ return self;
+}
+
+/**
* log_msg_new_mark:
*
* This function returns a new MARK message. MARK messages have the
LF_MARK
diff --git a/src/logmsg.h b/src/logmsg.h
index 420a711..d780282 100644
--- a/src/logmsg.h
+++ b/src/logmsg.h
@@ -100,7 +100,7 @@ void log_msg_unref(LogMessage *m);
LogMessage *log_msg_new(gchar *msg, gint length, GSockAddr *saddr,
guint flags, regex_t *bad_hostname);
LogMessage *log_msg_new_mark(void);
-
+LogMessage *log_msg_new_internal(gint prio, gchar *msg, guint flags);
void log_msg_ack_block_inc(LogMessage *m);
void log_msg_ack_block_start(LogMessage *m, LMAckFunc func, gpointer
user_data);
void log_msg_ack_block_end(LogMessage *m);
diff --git a/src/logwriter.c b/src/logwriter.c
index bb82b43..864b193 100644
--- a/src/logwriter.c
+++ b/src/logwriter.c
@@ -179,10 +179,104 @@ log_writer_watch_new(LogWriter *writer, FDWrite
*fd)
}
static void
+log_writer_last_msg_release(LogWriter *self)
+{
+ if(self->last_msg_timerid)
+ g_source_remove(self->last_msg_timerid);
+
+ if(self->last_msg)
+ log_msg_unref(self->last_msg);
+
+ self->last_msg = NULL;
+ self->last_msg_count = 0;
+ self->last_msg_timerid = 0;
+}
+
+static void
+log_writer_last_msg_flush(LogWriter *self)
+{
+ gchar *msg;
+ LogMessage *m;
+
+ msg = g_strdup_printf("Last message '%.20s' repeated %d times\n",
+ self->last_msg->msg.str,
+ self->last_msg_count);
+
+ m = log_msg_new_internal(self->last_msg->pri, msg, 0);
+
+ g_string_assign(&m->host,self->last_msg->host.str );
+ g_string_assign(&m->host_from,self->last_msg->host_from.str );
+ g_queue_push_tail(self->queue, m);
+ g_queue_push_tail(self->queue, GUINT_TO_POINTER(0x80000000));
+ g_free(msg);
+
+ log_writer_last_msg_release(self);
+}
+
+static gboolean
+last_msg_timer(gpointer pt)
+{
+ LogWriter *self = (LogWriter *)pt;
+
+ if(self->last_msg_count)
+ log_writer_last_msg_flush(self);
+ else
+ log_writer_last_msg_release(self);
+
+ return FALSE;
+}
+
+static void
+log_writer_last_msg_record(LogWriter *self, LogMessage *lm)
+{
+ if(self->last_msg)
+ log_msg_unref(self->last_msg);
+
+ log_msg_ref(lm);
+ self->last_msg = lm;
+ self->last_msg_count = 0;
+ self->last_msg_timerid = g_timeout_add(self->options->summary * 1000,
last_msg_timer, self);
+}
+
+static gboolean
+log_writer_last_msg_check(LogWriter *self, LogMessage *lm, gint
path_flags)
+{
+ if (self->options->summary <= 0)
+ return FALSE;
+
+ if(self->last_msg)
+ {
+ if(strcmp(self->last_msg->msg.str, lm->msg.str) == 0 &&
+ strcmp(self->last_msg->host.str, lm->host.str) == 0)
+ {
+ if (self->dropped_messages)
+ (*self->dropped_messages)++;
+ msg_debug("Dropping duplicate message",
+ NULL);
+ self->last_msg_count++;
+ log_msg_drop(lm, path_flags);
+ return TRUE;
+ }
+
+ if (self->last_msg_count)
+ log_writer_last_msg_flush(self);
+ else
+ log_writer_last_msg_release(self);
+ }
+
+ log_writer_last_msg_record(self, lm);
+
+ return FALSE;
+}
+
+static void
log_writer_queue(LogPipe *s, LogMessage *lm, gint path_flags)
{
LogWriter *self = (LogWriter *) s;
-
+
+ if(log_writer_last_msg_check(self, lm, path_flags))
+ return;
+
if ((self->queue->length / 2) == self->options->fifo_size)
{
/* drop incoming message, we must ack here, otherwise the sender
might
@@ -462,6 +556,9 @@ log_writer_new(guint32 flags, LogPipe *control,
LogWriterOptions *options)
self->queue = g_queue_new();
self->flags = flags;
self->control = control;
+ self->last_msg = NULL;
+ self->last_msg_count = 0;
+ self->last_msg_timerid = 0;
return &self->super;
}
@@ -476,6 +573,+ options->summary = 0;
}
void
diff --git a/src/logwriter.h b/src/logwriter.h
index 284899b..452693c 100644
--- a/
src/logwriter.h
+++ b/src/logwriter.h
@@ -66,6 +66,7 @@ typedef struct _LogWriterOptions
gshort ts_format;
glong zone_offset;
gshort frac_digits;
+ gint summary;
} LogWriterOptions;
typedef struct _LogWriter
@@ -79,6 +80,10 @@ typedef struct _LogWriter
gint partial_pos;
LogPipe *control;
LogWriterOptions *options;
+ LogMessage *last_msg;
+ guint32 last_msg_count;
+ guint last_msg_timerid;
+
} LogWriter;
void log_writer_set_options(LogWriter *self, LogPipe *control,
LogWriterOptions *options);
diff --git a/src/messages.c b/src/messages.c
index 6c69f48..206206a 100644
--- a/src/messages.c
+++ b/src/messages.c
@@ -56,10 +56,8 @@ msg_send_internal_message(int prio, const char *msg)
if (G_LIKELY(internal_msg_queue))
{
- buf = g_strdup_printf("<%d> syslog-ng[%d]: %s\n", prio,
getpid(), msg);
- m = log_msg_new(buf, strlen(buf), NULL, LP_INTERNAL |
LP_LOCAL, NULL);
+ m = log_msg_new_internal(prio, msg, LP_INTERNAL | LP_LOCAL);
g_queue_push_tail(internal_msg_queue, m);
- g_free(buf);
}
}
}
--
1.5.4.5
More information about the syslog-ng
mailing list