[syslog-ng] [PATCH 2/3] afmongodb: Port to value-pairs().

Gergely Nagy algernon at balabit.hu
Sun Apr 24 11:29:52 CEST 2011


Ported from custom, explicit key/value pairs to the far more flexible
value-pairs() solution. By default, the driver uses a custom scope:
selected_macros and nv_pairs, with a few patterns excluded.

The patch also turns the collection() parameter into a plain string
(from a templatable string), to considerably reduce the work needed to
insert messages into the database.

Signed-off-by: Gergely Nagy <algernon at balabit.hu>
---
 modules/afmongodb/afmongodb-grammar.ym |    4 +-
 modules/afmongodb/afmongodb-parser.c   |    2 -
 modules/afmongodb/afmongodb.c          |  154 +++++++++----------------------
 modules/afmongodb/afmongodb.h          |    4 +-
 4 files changed, 48 insertions(+), 116 deletions(-)

diff --git a/modules/afmongodb/afmongodb-grammar.ym b/modules/afmongodb/afmongodb-grammar.ym
index a19e2b4..7b5e680 100644
--- a/modules/afmongodb/afmongodb-grammar.ym
+++ b/modules/afmongodb/afmongodb-grammar.ym
@@ -48,7 +48,6 @@ extern LogDriver *last_driver;
 
 %token KW_MONGODB
 %token KW_COLLECTION
-%token KW_KEYS
 
 %%
 
@@ -70,10 +69,9 @@ afmongodb_option
         | KW_PORT '(' LL_NUMBER ')'		{ afmongodb_dd_set_port(last_driver, $3); }
 	| KW_DATABASE '(' string ')'		{ afmongodb_dd_set_database(last_driver, $3); free($3); }
 	| KW_COLLECTION '(' string ')'		{ afmongodb_dd_set_collection(last_driver, $3); free($3); }
-	| KW_VALUES '(' string_list ')'		{ afmongodb_dd_set_values(last_driver, $3); }
-	| KW_KEYS '(' string_list ')'		{ afmongodb_dd_set_keys(last_driver, $3); }
 	| KW_USERNAME '(' string ')'		{ afmongodb_dd_set_user(last_driver, $3); free($3); }
 	| KW_PASSWORD '(' string ')'		{ afmongodb_dd_set_password(last_driver, $3); free($3); }
+	| value_pair_stmt			{ afmongodb_dd_set_value_pairs(last_driver, last_value_pairs); }
 	| dest_driver_option
         ;
 
diff --git a/modules/afmongodb/afmongodb-parser.c b/modules/afmongodb/afmongodb-parser.c
index 5f9161e..67a9b0b 100644
--- a/modules/afmongodb/afmongodb-parser.c
+++ b/modules/afmongodb/afmongodb-parser.c
@@ -34,8 +34,6 @@ static CfgLexerKeyword afmongodb_keywords[] = {
   { "port",			KW_PORT },
   { "database",			KW_DATABASE },
   { "collection",		KW_COLLECTION },
-  { "keys",			KW_KEYS },
-  { "values",			KW_VALUES },
   { "username",			KW_USERNAME },
   { "password",			KW_PASSWORD },
   { "log_fifo_size",		KW_LOG_FIFO_SIZE  },
diff --git a/modules/afmongodb/afmongodb.c b/modules/afmongodb/afmongodb.c
index 284066a..94f0ce1 100644
--- a/modules/afmongodb/afmongodb.c
+++ b/modules/afmongodb/afmongodb.c
@@ -47,7 +47,7 @@ typedef struct
   /* Shared between main/writer; only read by the writer, never
      written */
   gchar *db;
-  LogTemplate *coll;
+  gchar *coll;
 
   gchar *host;
   gint port;
@@ -55,12 +55,6 @@ typedef struct
   gchar *user;
   gchar *password;
 
-  GList *keys;
-  GList *values;
-
-  gint num_fields;
-  MongoDBField *fields;
-
   time_t time_reopen;
 
   guint32 *dropped_messages;
@@ -68,6 +62,8 @@ typedef struct
 
   time_t last_msg_stamp;
 
+  ValuePairs *vp;
+
   /* Thread related stuff; shared */
   GThread *writer_thread;
   GMutex *queue_mutex;
@@ -84,11 +80,9 @@ typedef struct
   mongo_connection *conn;
   gint32 seq_num;
 
-  GString *current_namespace;
-  gint ns_prefix_len;
+  gchar *ns;
 
   GString *current_value;
-
   bson *bson_sel, *bson_upd, *bson_set;
 } MongoDBDestDriver;
 
@@ -145,26 +139,17 @@ afmongodb_dd_set_collection(LogDriver *d, const gchar *collection)
 {
   MongoDBDestDriver *self = (MongoDBDestDriver *)d;
 
-  log_template_unref(self->coll);
-  self->coll = log_template_new(log_pipe_get_config(&d->super), NULL, collection);
+  g_free(self->coll);
+  self->coll = g_strdup(collection);
 }
 
 void
-afmongodb_dd_set_keys(LogDriver *d, GList *keys)
+afmongodb_dd_set_value_pairs(LogDriver *d, ValuePairs *vp)
 {
   MongoDBDestDriver *self = (MongoDBDestDriver *)d;
 
-  string_list_free(self->keys);
-  self->keys = keys;
-}
-
-void
-afmongodb_dd_set_values(LogDriver *d, GList *values)
-{
-  MongoDBDestDriver *self = (MongoDBDestDriver *)d;
-
-  string_list_free(self->values);
-  self->values = values;
+  value_pairs_free (self->vp);
+  self->vp = vp;
 }
 
 /*
@@ -177,7 +162,7 @@ afmongodb_dd_format_stats_instance(MongoDBDestDriver *self)
   static gchar persist_name[1024];
 
   g_snprintf(persist_name, sizeof(persist_name),
-	     "mongodb,%s,%u,%s,%s", self->host, self->port, self->db, self->coll->template);
+	     "mongodb,%s,%u,%s,%s", self->host, self->port, self->db, self->coll);
   return persist_name;
 }
 
@@ -187,7 +172,7 @@ afmongodb_dd_format_persist_name(MongoDBDestDriver *self)
   static gchar persist_name[1024];
 
   g_snprintf(persist_name, sizeof(persist_name),
-	     "afmongodb(%s,%u,%s,%s)", self->host, self->port, self->db, self->coll->template);
+	     "afmongodb(%s,%u,%s,%s)", self->host, self->port, self->db, self->coll);
   return persist_name;
 }
 
@@ -244,11 +229,23 @@ afmongodb_dd_connect(MongoDBDestDriver *self, gboolean reconnect)
 /*
  * Worker thread
  */
+static gboolean
+afmongodb_vp_foreach (const gchar *name, const gchar *value,
+		      gpointer user_data)
+{
+  bson *bson_set = (bson *)user_data;
+
+  if (name[0] == '.')
+    bson_append_string (bson_set, name + 1, value, -1);
+  else
+    bson_append_string (bson_set, name, value, -1);
+
+  return FALSE;
+}
 
 static gboolean
 afmongodb_worker_insert (MongoDBDestDriver *self)
 {
-  gint i;
   gboolean success;
   mongo_packet *p;
   guint8 *oid;
@@ -274,25 +271,15 @@ afmongodb_worker_insert (MongoDBDestDriver *self)
   g_free (oid);
   bson_finish (self->bson_sel);
 
-  g_string_truncate(self->current_namespace, self->ns_prefix_len);
-  log_template_append_format(self->coll, msg, NULL, LTZ_LOCAL,
-			     self->seq_num, NULL, self->current_namespace);
-
-  for (i = 0; i < self->num_fields; i++)
-    {
-      log_template_format(self->fields[i].value, msg, NULL, LTZ_SEND,
-			  self->seq_num, NULL, self->current_value);
-      if (self->current_value->len)
-	bson_append_string(self->bson_set, self->fields[i].name,
-			   self->current_value->str, -1);
-    }
+  value_pairs_foreach (self->vp, afmongodb_vp_foreach,
+		       msg, self->seq_num, self->bson_set);
 
   bson_finish (self->bson_set);
 
   bson_append_document (self->bson_upd, "$set", self->bson_set);
   bson_finish (self->bson_upd);
 
-  p = mongo_wire_cmd_update (1, self->current_namespace->str, 1,
+  p = mongo_wire_cmd_update (1, self->ns, 1,
 			     self->bson_sel, self->bson_upd);
 
   if (!mongo_packet_send (self->conn, p))
@@ -335,12 +322,8 @@ afmongodb_worker_thread (gpointer arg)
 	     NULL);
 
   success = afmongodb_dd_connect(self, FALSE);
-  self->current_namespace = g_string_sized_new(64);
-  self->ns_prefix_len = strlen (self->db) + 1;
 
-  self->current_namespace =
-    g_string_append_c (g_string_assign (self->current_namespace,
-					self->db), '.');
+  self->ns = g_strconcat (self->db, ".", self->coll, NULL);
 
   self->current_value = g_string_sized_new(256);
 
@@ -383,7 +366,7 @@ afmongodb_worker_thread (gpointer arg)
 
   afmongodb_dd_disconnect(self);
 
-  g_string_free (self->current_namespace, TRUE);
+  g_free (self->ns);
   g_string_free (self->current_value, TRUE);
 
   bson_free (self->bson_sel);
@@ -421,15 +404,25 @@ afmongodb_dd_init(LogPipe *s)
   MongoDBDestDriver *self = (MongoDBDestDriver *)s;
   GlobalConfig *cfg = log_pipe_get_config(s);
 
-  gint num_keys, num_values, i;
-  GList *key, *value;
-
   if (!log_dest_driver_init_method(s))
     return FALSE;
 
   if (cfg)
     self->time_reopen = cfg->time_reopen;
 
+  if (!self->vp)
+    {
+      self->vp = value_pairs_new(cfg);
+      value_pairs_add_scope(self->vp, VALUE_PAIR_SCOPE_SELECTED_MACROS);
+      value_pairs_add_scope(self->vp, VALUE_PAIR_SCOPE_NV_PAIRS);
+      value_pairs_exclude_glob_pattern(self->vp, "R_*");
+      value_pairs_exclude_glob_pattern(self->vp, "S_*");
+      value_pairs_exclude_glob_pattern(self->vp, "HOST_FROM");
+      value_pairs_exclude_glob_pattern(self->vp, "LEGACY_MSGHDR");
+      value_pairs_exclude_glob_pattern(self->vp, "MSG");
+      value_pairs_exclude_glob_pattern(self->vp, "SDATA");
+    }
+
   msg_verbose("Initializing MongoDB destination",
 	      evt_tag_str("host", self->host),
 	      evt_tag_int("port", self->port),
@@ -438,29 +431,6 @@ afmongodb_dd_init(LogPipe *s)
 
   self->queue = log_dest_driver_acquire_queue(&self->super, afmongodb_dd_format_persist_name(self));
 
-  if (!self->fields)
-    {
-      num_keys = g_list_length(self->keys);
-      num_values = g_list_length(self->values);
-
-      if (num_keys != num_values)
-	{
-	  msg_error("The number of keys and values do not match",
-		    evt_tag_int("num_keys", num_keys),
-		    evt_tag_int("num_values", num_values),
-		    NULL);
-	  return FALSE;
-	}
-      self->num_fields = num_keys;
-      self->fields = g_new0(MongoDBField, num_keys);
-
-      for (i = 0, key = self->keys, value = self->values; key && value; i++, key = key->next, value = value->next)
-	{
-	  self->fields[i].name = g_strdup(key->data);
-	  self->fields[i].value = log_template_new(cfg, NULL, (gchar *)value->data);
-	}
-    }
-
   stats_register_counter(0, SCS_MONGODB | SCS_DESTINATION, self->super.super.id,
 			 afmongodb_dd_format_stats_instance(self),
 			 SC_TYPE_STORED, &self->stored_messages);
@@ -499,7 +469,6 @@ static void
 afmongodb_dd_free(LogPipe *d)
 {
   MongoDBDestDriver *self = (MongoDBDestDriver *)d;
-  gint i;
 
   g_mutex_free(self->suspend_mutex);
   g_mutex_free(self->queue_mutex);
@@ -508,20 +477,13 @@ afmongodb_dd_free(LogPipe *d)
   if (self->queue)
     log_queue_unref(self->queue);
 
-  for (i = 0; i < self->num_fields; i++)
-    {
-      g_free(self->fields[i].name);
-      log_template_unref(self->fields[i].value);
-    }
-
-  g_free(self->fields);
   g_free(self->db);
-  log_template_unref(self->coll);
+  g_free(self->coll);
   g_free(self->user);
   g_free(self->password);
   g_free(self->host);
-  string_list_free(self->keys);
-  string_list_free(self->values);
+
+  value_pairs_free(self->vp);
 
   log_dest_driver_free(d);
 }
@@ -548,30 +510,6 @@ afmongodb_dd_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_optio
  * Plugin glue.
  */
 
-const gchar *default_keys[] =
-{
-  "date",
-  "facility",
-  "level",
-  "host",
-  "program",
-  "pid",
-  "message",
-  NULL
-};
-
-const gchar *default_values[] =
-{
-  "${R_YEAR}-${R_MONTH}-${R_DAY} ${R_HOUR}:${R_MIN}:${R_SEC}",
-  "$FACILITY",
-  "$LEVEL",
-  "$HOST",
-  "$PROGRAM",
-  "$PID",
-  "$MSGONLY",
-  NULL
-};
-
 LogDriver *
 afmongodb_dd_new(void)
 {
@@ -589,8 +527,6 @@ afmongodb_dd_new(void)
   afmongodb_dd_set_port((LogDriver *)self, 27017);
   afmongodb_dd_set_database((LogDriver *)self, "syslog");
   afmongodb_dd_set_collection((LogDriver *)self, "messages");
-  afmongodb_dd_set_keys((LogDriver *)self, string_array_to_list(default_keys));
-  afmongodb_dd_set_values((LogDriver *)self, string_array_to_list(default_values));
 
   init_sequence_number(&self->seq_num);
 
diff --git a/modules/afmongodb/afmongodb.h b/modules/afmongodb/afmongodb.h
index e54faf2..bd7b258 100644
--- a/modules/afmongodb/afmongodb.h
+++ b/modules/afmongodb/afmongodb.h
@@ -25,6 +25,7 @@
 #define AFMONGODB_H_INCLUDED
 
 #include "driver.h"
+#include "value-pairs.h"
 
 LogDriver *afmongodb_dd_new(void);
 
@@ -32,9 +33,8 @@ void afmongodb_dd_set_host(LogDriver *d, const gchar *host);
 void afmongodb_dd_set_port(LogDriver *d, gint port);
 void afmongodb_dd_set_database(LogDriver *d, const gchar *database);
 void afmongodb_dd_set_collection(LogDriver *d, const gchar *collection);
-void afmongodb_dd_set_values(LogDriver *d, GList *values);
-void afmongodb_dd_set_keys(LogDriver *d, GList *keys);
 void afmongodb_dd_set_user(LogDriver *d, const gchar *user);
 void afmongodb_dd_set_password(LogDriver *d, const gchar *password);
+void afmongodb_dd_set_value_pairs(LogDriver *d, ValuePairs *vp);
 
 #endif
-- 
1.7.2.5




More information about the syslog-ng mailing list