[syslog-ng] Filtering duplicate messages

chris packham chris.packham at alliedtelesis.co.nz
Fri May 30 02:16:11 CEST 2008


Here's the updated patch. I've ditched the "suppress" name in favour of
the more appropriate "summary" option and "last_msg" function naming.
References are used instead of making a copy of the last message. Timer
based flushing is now implemented.

Implement "Last message repeated N times" functionality.

This behaviour can be enabled by adding the new "summary(<num>)" option
to
an output configuration e.g.

destination tologfile { file("/var/log/messages" template(t)
summary(30)); };

As a log message is added to the queue it is remembered as the last
message
seen.

When a new message is added to the queue it is checked against the last
message. If its contents are the same the message is dropped and a
counter
incremented.

A message summary indicating the value of the last message counter and a
snippet of the message will be inserted into the log queue if a new
message
that differs is seen or if the configurable timeout period expires.
---
 src/cfg-grammar.y |    3 +-
 src/cfg-lex.l     |    3 +-
 src/logmsg.c      |   19 ++++++++++
 src/logmsg.h      |    2 +-
 src/logwriter.c   |  100
++++++++++++++++++++++++++++++++++++++++++++++++++++-
 src/logwriter.h   |    5 +++
 src/messages.c    |    4 +--
 7 files changed, 129 insertions(+), 7 deletions(-)

diff --git a/src/cfg-grammar.y b/src/cfg-grammar.y
index 39ff172..99b033f 100644
--- a/src/cfg-grammar.y
+++ b/src/cfg-grammar.y
@@ -62,7 +62,7 @@ gint last_addr_family = AF_INET;
 %token  KW_USER KW_DOOR KW_SUN_STREAMS KW_PROGRAM
 
 /* option items */
-%token KW_FSYNC KW_MARK_FREQ KW_STATS_FREQ KW_FLUSH_LINES
KW_FLUSH_TIMEOUT KW_LOG_MSG_SIZE KW_FILE_TEMPLATE KW_PROTO_TEMPLATE
+%token KW_FSYNC KW_MARK_FREQ KW_STATS_FREQ KW_FLUSH_LINES KW_SUMMARY
KW_FLUSH_TIMEOUT KW_LOG_MSG_SIZE KW_FILE_TEMPLATE KW_PROTO_TEMPLATE
 
 %token KW_CHAIN_HOSTNAMES KW_NORMALIZE_HOSTNAMES KW_KEEP_HOSTNAME
KW_CHECK_HOSTNAME KW_BAD_HOSTNAME
 %token KW_KEEP_TIMESTAMP
@@ -671,6 +671,7 @@ dest_writer_option
 	: KW_FLAGS '(' dest_writer_options_flags ')' {
last_writer_options->options = $3; }
 	| KW_LOG_FIFO_SIZE '(' NUMBER ')'	{
last_writer_options->fifo_size = $3; }
 	| KW_FLUSH_LINES '(' NUMBER ')'		{
last_writer_options->flush_lines = $3; }
+	| KW_SUMMARY '(' NUMBER ')'		{
last_writer_options->summary= $3; }
 	| KW_FLUSH_TIMEOUT '(' NUMBER ')'	{
last_writer_options->flush_timeout = $3; }
 	| KW_TEMPLATE '(' string ')'       	{ 
 	                                         
last_writer_options->template = cfg_lookup_template(configuration, $3);
diff --git a/src/cfg-lex.l b/src/cfg-lex.l
index 5c5f54f..d8418b6 100644
--- a/src/cfg-lex.l
+++ b/src/cfg-lex.l
@@ -6,7 +6,7 @@
 

/***************************************************************************
  *
- * Copyright (c) 1999 Balzs Scheidler
+ * Copyright (c) 1999 Bal�zs Scheidler
  * Copyright (c) 1999-2007 BalaBit IT Ltd.
  * 
  * This program is free software; you can redistribute it and/or modify
@@ -86,6 +86,7 @@ static struct keyword keywords[] = {
 	{ "flush_timeout", 	KW_FLUSH_TIMEOUT },
 	{ "sync_freq", 		KW_FLUSH_LINES },  /* obsolete */
 	{ "sync", 		KW_FLUSH_LINES },  /* obsolete */
+	{ "summary", 		KW_SUMMARY },
 	{ "fsync",		KW_FSYNC },
 	{ "long_hostnames",	KW_CHAIN_HOSTNAMES },
         { "chain_hostnames",    KW_CHAIN_HOSTNAMES },
diff --git a/src/logmsg.c b/src/logmsg.c
index 05354b5..9219a01 100644
--- a/src/logmsg.c
+++ b/src/logmsg.c
@@ -653,6 +653,25 @@ log_msg_new(gchar *msg, gint length, GSockAddr
*saddr, guint flags, regex_t *bad
 }
 
 /**
+ * log_msg_new_internal:
+ * @prio: message priority (LOG_*)
+ * @msg: message text
+ * @flags: parse flags (LP_*)
+ *
+ * This function creates a new log message for messages originating 
+ * internally to syslog-ng
+ **/
+LogMessage *
+log_msg_new_internal(gint prio, gchar *msg, guint flags)
+{
+  gchar *buf = g_strdup_printf("<%d> syslog-ng[%d]: %s\n", prio,
getpid(), msg);
+  LogMessage *self = log_msg_new(buf, s
trlen(buf), NULL, flags, NULL);
+  g_free(buf);
+
+  return self;
+}
+
+/**
  * log_msg_new_mark:
  * 
  * This function returns a new MARK message. MARK messages have the
LF_MARK
diff --git a/src/logmsg.h b/src/logmsg.h
index 420a711..d780282 100644
--- a/src/logmsg.h
+++ b/src/logmsg.h
@@ -100,7 +100,7 @@ void log_msg_unref(LogMessage *m);
 
 LogMessage *log_msg_new(gchar *msg, gint length, GSockAddr *saddr,
guint flags, regex_t *bad_hostname);
 LogMessage *log_msg_new_mark(void);
-
+LogMessage *log_msg_new_internal(gint prio, gchar *msg, guint flags);
 void log_msg_ack_block_inc(LogMessage *m);
 void log_msg_ack_block_start(LogMessage *m, LMAckFunc func, gpointer
user_data);
 void log_msg_ack_block_end(LogMessage *m);
diff --git a/src/logwriter.c b/src/logwriter.c
index bb82b43..864b193 100644
--- a/src/logwriter.c
+++ b/src/logwriter.c
@@ -179,10 +179,104 @@ log_writer_watch_new(LogWriter *writer, FDWrite
*fd)
 }
 
 static void
+log_writer_last_msg_release(LogWriter *self)
+{
+  if(self->last_msg_timerid)
+    g_source_remove(self->last_msg_timerid);
+
+  if(self->last_msg)
+    log_msg_unref(self->last_msg);
+
+  self->last_msg = NULL;
+  self->last_msg_count = 0;
+  self->last_msg_timerid = 0;
+}
+
+static void
+log_writer_last_msg_flush(LogWriter *self)
+{
+  gchar *msg;
+  LogMessage *m;
+
+  msg = g_strdup_printf("Last message '%.20s' repeated %d times\n",
+                        self->last_msg->msg.str,
+                        self->last_msg_count);
+
+  m = log_msg_new_internal(self->last_msg->pri, msg, 0);
+
+  g_string_assign(&m->host,self->last_msg->host.str );
+  g_string_assign(&m->host_from,self->last_msg->host_from.str );
+  g_queue_push_tail(self->queue, m);
+  g_queue_push_tail(self->queue, GUINT_TO_POINTER(0x80000000));
+  g_free(msg);
+
+  log_writer_last_msg_release(self);
+}
+
+static gboolean
+last_msg_timer(gpointer pt)
+{
+  LogWriter *self = (LogWriter *)pt;
+
+  if(self->last_msg_count)
+    log_writer_last_msg_flush(self);
+  else
+    log_writer_last_msg_release(self);
+
+  return FALSE;
+}
+
+static void
+log_writer_last_msg_record(LogWriter *self, LogMessage *lm)
+{
+  if(self->last_msg)
+    log_msg_unref(self->last_msg);
+
+  log_msg_ref(lm);
+  self->last_msg = lm;
+  self->last_msg_count = 0;
+  self->last_msg_timerid = g_timeout_add(self->options->summary * 1000,
last_msg_timer, self);
+}
+
+static gboolean
+log_writer_last_msg_check(LogWriter *self, LogMessage *lm, gint
path_flags)
+{
+  if (self->options->summary <= 0)
+    return FALSE;
+
+  if(self->last_msg)
+    {
+      if(strcmp(self->last_msg->msg.str, lm->msg.str) == 0 &&
+         strcmp(self->last_msg->host.str, lm->host.str) == 0)
+        {
+          if (self->dropped_messages)
+            (*self->dropped_messages)++;
+          msg_debug("Dropping duplicate message",
+                    NULL);
+          self->last_msg_count++;
+          log_msg_drop(lm, path_flags);
+          return TRUE;
+        }
+
+      if (self->last_msg_count)
+        log_writer_last_msg_flush(self);
+      else
+        log_writer_last_msg_release(self);
+    }
+
+  log_writer_last_msg_record(self, lm);
+
+  return FALSE;
+}
+
+static void
 log_writer_queue(LogPipe *s, LogMessage *lm, gint path_flags)
 {
   LogWriter *self = (LogWriter *) s;
-  
+
+  if(log_writer_last_msg_check(self, lm, path_flags))
+    return;
+
   if ((self->queue->length / 2) == self->options->fifo_size)
     {
       /* drop incoming message, we must ack here, otherwise the sender
might
@@ -462,6 +556,9 @@ log_writer_new(guint32 flags, LogPipe *control,
LogWriterOptions *options)
   self->queue = g_queue_new();
   self->flags = flags;
   self->control = control;
+  self->last_msg = NULL;
+  self->last_msg_count = 0;
+  self->last_msg_timerid = 0;
   return &self->super;
 }
 
@@ -476,6 +573,+  options->summary = 0;
 }
 
 void 
diff --git a/src/logwriter.h b/src/logwriter.h
index 284899b..452693c 100644
--- a/
src/logwriter.h
+++ b/src/logwriter.h
@@ -66,6 +66,7 @@ typedef struct _LogWriterOptions
   gshort ts_format;
   glong zone_offset;
   gshort frac_digits;
+  gint summary;
 } LogWriterOptions;
 
 typedef struct _LogWriter
@@ -79,6 +80,10 @@ typedef struct _LogWriter
   gint partial_pos;
   LogPipe *control;
   LogWriterOptions *options;
+  LogMessage *last_msg;
+  guint32 last_msg_count;
+  guint last_msg_timerid;
+
 } LogWriter;
 
 void log_writer_set_options(LogWriter *self, LogPipe *control,
LogWriterOptions *options);
diff --git a/src/messages.c b/src/messages.c
index 6c69f48..206206a 100644
--- a/src/messages.c
+++ b/src/messages.c
@@ -56,10 +56,8 @@ msg_send_internal_message(int prio, const char *msg)
       
       if (G_LIKELY(internal_msg_queue))
         {
-          buf = g_strdup_printf("<%d> syslog-ng[%d]: %s\n", prio,
getpid(), msg);
-          m = log_msg_new(buf, strlen(buf), NULL, LP_INTERNAL |
LP_LOCAL, NULL);
+          m = log_msg_new_internal(prio, msg, LP_INTERNAL | LP_LOCAL);
           g_queue_push_tail(internal_msg_queue, m);
-          g_free(buf);
         }
     }
 }
-- 
1.5.4.5



More information about the syslog-ng mailing list