[syslog-ng] [PATCH 5/7] afmongodb: Use insert instead of upserts
Gergely Nagy
algernon at balabit.hu
Fri Sep 14 11:51:59 CEST 2012
With the recent changes to value pairs, we can now turn our internal
dotted notation into structures, so do that, and use inserts instead
of upserts!
This - at the moment - comes with a noticable performance cost, as
we'll do extra work on the syslog-ng side (which was previously done
on the server side). However, this paves the way for bulk inserts,
which will improve performance a lot. (And making the value pairs
walker more performant will also help, but that's independent of
mongodb.)
Signed-off-by: Gergely Nagy <algernon at balabit.hu>
---
modules/afmongodb/afmongodb.c | 97 +++++++++++++++++++++++++++++------------
1 file changed, 68 insertions(+), 29 deletions(-)
diff --git a/modules/afmongodb/afmongodb.c b/modules/afmongodb/afmongodb.c
index ad06dd4..205ab98 100644
--- a/modules/afmongodb/afmongodb.c
+++ b/modules/afmongodb/afmongodb.c
@@ -31,6 +31,7 @@
#include "stats.h"
#include "nvtable.h"
#include "logqueue.h"
+#include "value-pairs.h"
#include "mongo.h"
@@ -86,7 +87,7 @@ typedef struct
gchar *ns;
GString *current_value;
- bson *bson_sel, *bson_upd, *bson_set;
+ bson *bson;
} MongoDBDestDriver;
/*
@@ -316,10 +317,56 @@ afmongodb_dd_connect(MongoDBDestDriver *self, gboolean reconnect)
* Worker thread
*/
static gboolean
-afmongodb_vp_foreach (const gchar *name, const gchar *value,
- gpointer user_data)
+afmongodb_vp_obj_start(const gchar *name,
+ const gchar *prefix, gpointer *prefix_data,
+ const gchar *prev, gpointer *prev_data,
+ gpointer user_data)
{
- bson *bson_set = (bson *)user_data;
+ bson *o;
+
+ if (prefix_data)
+ {
+ o = bson_new();
+ *prefix_data = o;
+ }
+ return FALSE;
+}
+
+static gboolean
+afmongodb_vp_obj_end(const gchar *name,
+ const gchar *prefix, gpointer *prefix_data,
+ const gchar *prev, gpointer *prev_data,
+ gpointer user_data)
+{
+ bson *root;
+
+ if (prev_data)
+ root = (bson *)*prev_data;
+ else
+ root = (bson *)user_data;
+
+ if (prefix_data)
+ {
+ bson *d = (bson *)*prefix_data;
+
+ bson_finish (d);
+ bson_append_document (root, name, d);
+ bson_free (d);
+ }
+ return FALSE;
+}
+
+static gboolean
+afmongodb_vp_process_value(const gchar *name, const gchar *prefix,
+ const gchar *value,
+ gpointer *prefix_data, gpointer user_data)
+{
+ bson *o;
+
+ if (prefix_data)
+ o = (bson *)*prefix_data;
+ else
+ o = (bson *)user_data;
if (name[0] == '.')
{
@@ -328,10 +375,10 @@ afmongodb_vp_foreach (const gchar *name, const gchar *value,
tx_name[0] = '_';
strncpy(&tx_name[1], name + 1, sizeof(tx_name) - 1);
tx_name[sizeof(tx_name) - 1] = 0;
- bson_append_string (bson_set, tx_name, value, -1);
+ bson_append_string (o, tx_name, value, -1);
}
else
- bson_append_string (bson_set, name, value, -1);
+ bson_append_string (o, name, value, -1);
return FALSE;
}
@@ -355,29 +402,25 @@ afmongodb_worker_insert (MongoDBDestDriver *self)
msg_set_context(msg);
- bson_reset (self->bson_sel);
- bson_reset (self->bson_upd);
- bson_reset (self->bson_set);
+ bson_reset (self->bson);
oid = mongo_util_oid_new_with_time (self->last_msg_stamp, self->seq_num);
- bson_append_oid (self->bson_sel, "_id", oid);
+ bson_append_oid (self->bson, "_id", oid);
g_free (oid);
- bson_finish (self->bson_sel);
- value_pairs_foreach (self->vp, afmongodb_vp_foreach,
- msg, self->seq_num, self->bson_set);
+ value_pairs_walk(self->vp,
+ afmongodb_vp_obj_start,
+ afmongodb_vp_process_value,
+ afmongodb_vp_obj_end,
+ msg, self->seq_num, self->bson);
+ bson_finish (self->bson);
- bson_finish (self->bson_set);
-
- bson_append_document (self->bson_upd, "$set", self->bson_set);
- bson_finish (self->bson_upd);
-
- if (!mongo_sync_cmd_update (self->conn, self->ns, MONGO_WIRE_FLAG_UPDATE_UPSERT,
- self->bson_sel, self->bson_upd))
+ if (!mongo_sync_cmd_insert_n(self->conn, self->ns, 1,
+ (const bson **)&self->bson))
{
- msg_error ("Network error while inserting into MongoDB",
- evt_tag_int("time_reopen", self->time_reopen),
- NULL);
+ msg_error("Network error while inserting into MongoDB",
+ evt_tag_int("time_reopen", self->time_reopen),
+ NULL);
success = FALSE;
}
@@ -416,9 +459,7 @@ afmongodb_worker_thread (gpointer arg)
self->current_value = g_string_sized_new(256);
- self->bson_sel = bson_new_sized(64);
- self->bson_upd = bson_new_sized(512);
- self->bson_set = bson_new_sized(512);
+ self->bson = bson_new_sized(4096);
while (!self->writer_thread_terminate)
{
@@ -462,9 +503,7 @@ afmongodb_worker_thread (gpointer arg)
g_free (self->ns);
g_string_free (self->current_value, TRUE);
- bson_free (self->bson_sel);
- bson_free (self->bson_upd);
- bson_free (self->bson_set);
+ bson_free (self->bson);
msg_debug ("Worker thread finished",
evt_tag_str("driver", self->super.super.id),
--
1.7.10.4
More information about the syslog-ng
mailing list