With the recent changes to value pairs, we can now turn our internal dotted notation into structures, so do that, and use inserts instead of upserts! This - at the moment - comes with a noticable performance cost, as we'll do extra work on the syslog-ng side (which was previously done on the server side). However, this paves the way for bulk inserts, which will improve performance a lot. (And making the value pairs walker more performant will also help, but that's independent of mongodb.) Signed-off-by: Gergely Nagy <algernon@balabit.hu> --- modules/afmongodb/afmongodb.c | 97 +++++++++++++++++++++++++++++------------ 1 file changed, 68 insertions(+), 29 deletions(-) diff --git a/modules/afmongodb/afmongodb.c b/modules/afmongodb/afmongodb.c index ad06dd4..205ab98 100644 --- a/modules/afmongodb/afmongodb.c +++ b/modules/afmongodb/afmongodb.c @@ -31,6 +31,7 @@ #include "stats.h" #include "nvtable.h" #include "logqueue.h" +#include "value-pairs.h" #include "mongo.h" @@ -86,7 +87,7 @@ typedef struct gchar *ns; GString *current_value; - bson *bson_sel, *bson_upd, *bson_set; + bson *bson; } MongoDBDestDriver; /* @@ -316,10 +317,56 @@ afmongodb_dd_connect(MongoDBDestDriver *self, gboolean reconnect) * Worker thread */ static gboolean -afmongodb_vp_foreach (const gchar *name, const gchar *value, - gpointer user_data) +afmongodb_vp_obj_start(const gchar *name, + const gchar *prefix, gpointer *prefix_data, + const gchar *prev, gpointer *prev_data, + gpointer user_data) { - bson *bson_set = (bson *)user_data; + bson *o; + + if (prefix_data) + { + o = bson_new(); + *prefix_data = o; + } + return FALSE; +} + +static gboolean +afmongodb_vp_obj_end(const gchar *name, + const gchar *prefix, gpointer *prefix_data, + const gchar *prev, gpointer *prev_data, + gpointer user_data) +{ + bson *root; + + if (prev_data) + root = (bson *)*prev_data; + else + root = (bson *)user_data; + + if (prefix_data) + { + bson *d = (bson *)*prefix_data; + + bson_finish (d); + bson_append_document (root, name, d); + bson_free (d); + } + return FALSE; +} + +static gboolean +afmongodb_vp_process_value(const gchar *name, const gchar *prefix, + const gchar *value, + gpointer *prefix_data, gpointer user_data) +{ + bson *o; + + if (prefix_data) + o = (bson *)*prefix_data; + else + o = (bson *)user_data; if (name[0] == '.') { @@ -328,10 +375,10 @@ afmongodb_vp_foreach (const gchar *name, const gchar *value, tx_name[0] = '_'; strncpy(&tx_name[1], name + 1, sizeof(tx_name) - 1); tx_name[sizeof(tx_name) - 1] = 0; - bson_append_string (bson_set, tx_name, value, -1); + bson_append_string (o, tx_name, value, -1); } else - bson_append_string (bson_set, name, value, -1); + bson_append_string (o, name, value, -1); return FALSE; } @@ -355,29 +402,25 @@ afmongodb_worker_insert (MongoDBDestDriver *self) msg_set_context(msg); - bson_reset (self->bson_sel); - bson_reset (self->bson_upd); - bson_reset (self->bson_set); + bson_reset (self->bson); oid = mongo_util_oid_new_with_time (self->last_msg_stamp, self->seq_num); - bson_append_oid (self->bson_sel, "_id", oid); + bson_append_oid (self->bson, "_id", oid); g_free (oid); - bson_finish (self->bson_sel); - value_pairs_foreach (self->vp, afmongodb_vp_foreach, - msg, self->seq_num, self->bson_set); + value_pairs_walk(self->vp, + afmongodb_vp_obj_start, + afmongodb_vp_process_value, + afmongodb_vp_obj_end, + msg, self->seq_num, self->bson); + bson_finish (self->bson); - bson_finish (self->bson_set); - - bson_append_document (self->bson_upd, "$set", self->bson_set); - bson_finish (self->bson_upd); - - if (!mongo_sync_cmd_update (self->conn, self->ns, MONGO_WIRE_FLAG_UPDATE_UPSERT, - self->bson_sel, self->bson_upd)) + if (!mongo_sync_cmd_insert_n(self->conn, self->ns, 1, + (const bson **)&self->bson)) { - msg_error ("Network error while inserting into MongoDB", - evt_tag_int("time_reopen", self->time_reopen), - NULL); + msg_error("Network error while inserting into MongoDB", + evt_tag_int("time_reopen", self->time_reopen), + NULL); success = FALSE; } @@ -416,9 +459,7 @@ afmongodb_worker_thread (gpointer arg) self->current_value = g_string_sized_new(256); - self->bson_sel = bson_new_sized(64); - self->bson_upd = bson_new_sized(512); - self->bson_set = bson_new_sized(512); + self->bson = bson_new_sized(4096); while (!self->writer_thread_terminate) { @@ -462,9 +503,7 @@ afmongodb_worker_thread (gpointer arg) g_free (self->ns); g_string_free (self->current_value, TRUE); - bson_free (self->bson_sel); - bson_free (self->bson_upd); - bson_free (self->bson_set); + bson_free (self->bson); msg_debug ("Worker thread finished", evt_tag_str("driver", self->super.super.id), -- 1.7.10.4