[syslog-ng] [PATCH 5/7] afmongodb: Use insert instead of upserts

Gergely Nagy algernon at balabit.hu
Fri Sep 14 11:51:59 CEST 2012


With the recent changes to value pairs, we can now turn our internal
dotted notation into structures, so do that, and use inserts instead
of upserts!

This - at the moment - comes with a noticable performance cost, as
we'll do extra work on the syslog-ng side (which was previously done
on the server side). However, this paves the way for bulk inserts,
which will improve performance a lot. (And making the value pairs
walker more performant will also help, but that's independent of
mongodb.)

Signed-off-by: Gergely Nagy <algernon at balabit.hu>
---
 modules/afmongodb/afmongodb.c |   97 +++++++++++++++++++++++++++++------------
 1 file changed, 68 insertions(+), 29 deletions(-)

diff --git a/modules/afmongodb/afmongodb.c b/modules/afmongodb/afmongodb.c
index ad06dd4..205ab98 100644
--- a/modules/afmongodb/afmongodb.c
+++ b/modules/afmongodb/afmongodb.c
@@ -31,6 +31,7 @@
 #include "stats.h"
 #include "nvtable.h"
 #include "logqueue.h"
+#include "value-pairs.h"
 
 #include "mongo.h"
 
@@ -86,7 +87,7 @@ typedef struct
   gchar *ns;
 
   GString *current_value;
-  bson *bson_sel, *bson_upd, *bson_set;
+  bson *bson;
 } MongoDBDestDriver;
 
 /*
@@ -316,10 +317,56 @@ afmongodb_dd_connect(MongoDBDestDriver *self, gboolean reconnect)
  * Worker thread
  */
 static gboolean
-afmongodb_vp_foreach (const gchar *name, const gchar *value,
-		      gpointer user_data)
+afmongodb_vp_obj_start(const gchar *name,
+                       const gchar *prefix, gpointer *prefix_data,
+                       const gchar *prev, gpointer *prev_data,
+                       gpointer user_data)
 {
-  bson *bson_set = (bson *)user_data;
+  bson *o;
+
+  if (prefix_data)
+    {
+      o = bson_new();
+      *prefix_data = o;
+    }
+  return FALSE;
+}
+
+static gboolean
+afmongodb_vp_obj_end(const gchar *name,
+                     const gchar *prefix, gpointer *prefix_data,
+                     const gchar *prev, gpointer *prev_data,
+                     gpointer user_data)
+{
+  bson *root;
+
+  if (prev_data)
+    root = (bson *)*prev_data;
+  else
+    root = (bson *)user_data;
+
+  if (prefix_data)
+    {
+      bson *d = (bson *)*prefix_data;
+
+      bson_finish (d);
+      bson_append_document (root, name, d);
+      bson_free (d);
+    }
+  return FALSE;
+}
+
+static gboolean
+afmongodb_vp_process_value(const gchar *name, const gchar *prefix,
+                           const gchar *value,
+                           gpointer *prefix_data, gpointer user_data)
+{
+  bson *o;
+
+  if (prefix_data)
+    o = (bson *)*prefix_data;
+  else
+    o = (bson *)user_data;
 
   if (name[0] == '.')
     {
@@ -328,10 +375,10 @@ afmongodb_vp_foreach (const gchar *name, const gchar *value,
       tx_name[0] = '_';
       strncpy(&tx_name[1], name + 1, sizeof(tx_name) - 1);
       tx_name[sizeof(tx_name) - 1] = 0;
-      bson_append_string (bson_set, tx_name, value, -1);
+      bson_append_string (o, tx_name, value, -1);
     }
   else
-    bson_append_string (bson_set, name, value, -1);
+    bson_append_string (o, name, value, -1);
 
   return FALSE;
 }
@@ -355,29 +402,25 @@ afmongodb_worker_insert (MongoDBDestDriver *self)
 
   msg_set_context(msg);
 
-  bson_reset (self->bson_sel);
-  bson_reset (self->bson_upd);
-  bson_reset (self->bson_set);
+  bson_reset (self->bson);
 
   oid = mongo_util_oid_new_with_time (self->last_msg_stamp, self->seq_num);
-  bson_append_oid (self->bson_sel, "_id", oid);
+  bson_append_oid (self->bson, "_id", oid);
   g_free (oid);
-  bson_finish (self->bson_sel);
 
-  value_pairs_foreach (self->vp, afmongodb_vp_foreach,
-		       msg, self->seq_num, self->bson_set);
+  value_pairs_walk(self->vp,
+                   afmongodb_vp_obj_start,
+                   afmongodb_vp_process_value,
+                   afmongodb_vp_obj_end,
+                   msg, self->seq_num, self->bson);
+  bson_finish (self->bson);
 
-  bson_finish (self->bson_set);
-
-  bson_append_document (self->bson_upd, "$set", self->bson_set);
-  bson_finish (self->bson_upd);
-
-  if (!mongo_sync_cmd_update (self->conn, self->ns, MONGO_WIRE_FLAG_UPDATE_UPSERT,
-			      self->bson_sel, self->bson_upd))
+  if (!mongo_sync_cmd_insert_n(self->conn, self->ns, 1,
+                               (const bson **)&self->bson))
     {
-      msg_error ("Network error while inserting into MongoDB",
-		 evt_tag_int("time_reopen", self->time_reopen),
-		 NULL);
+      msg_error("Network error while inserting into MongoDB",
+                evt_tag_int("time_reopen", self->time_reopen),
+                NULL);
       success = FALSE;
     }
 
@@ -416,9 +459,7 @@ afmongodb_worker_thread (gpointer arg)
 
   self->current_value = g_string_sized_new(256);
 
-  self->bson_sel = bson_new_sized(64);
-  self->bson_upd = bson_new_sized(512);
-  self->bson_set = bson_new_sized(512);
+  self->bson = bson_new_sized(4096);
 
   while (!self->writer_thread_terminate)
     {
@@ -462,9 +503,7 @@ afmongodb_worker_thread (gpointer arg)
   g_free (self->ns);
   g_string_free (self->current_value, TRUE);
 
-  bson_free (self->bson_sel);
-  bson_free (self->bson_upd);
-  bson_free (self->bson_set);
+  bson_free (self->bson);
 
   msg_debug ("Worker thread finished",
 	     evt_tag_str("driver", self->super.super.id),
-- 
1.7.10.4




More information about the syslog-ng mailing list