[PATCH (3.4) 0/5] afmongodb cleanups & replica set support
Following this email, a few patches will come, various minor touch-ups (such as the cleanup of the default value-pairs config, and the removal of the TODO file) and replica-set support for afmongodb. The replica-set support is an incompatible change compared to 3.3, as it removes the host() and port() options from the driver, replacing both with a servers() option. After the patches, the driver will support replica sets, which means that when connecting to a replica set, and the primary goes down, the driver will reconnect to the new master automatically. Most of this is handled transparently by the underlying libmongo-client library, so the syslog-ng side doesn't actually get much more complicated. These patches have been sitting in my syslog-ng module collection repo since about july, and I kinda forgot about them. It's about time I submit them. (And consequently, the module-collection repo will be abandoned, as it pretty much has been for the past few months anyway. Turns out it's easier for me to maintain module updates within my syslog-ng repo, especially now that I have 3.3 and 3.4 within the same git repo.) All of these are available from the feature/3.4/afmongodb/lmc-sync branch of my git repository.
Cleaned up the default value-pairs() set, so no needless excludes are present by default. Signed-off-by: Gergely Nagy <algernon@balabit.hu> --- modules/afmongodb/afmongodb.c | 6 ------ 1 files changed, 0 insertions(+), 6 deletions(-) diff --git a/modules/afmongodb/afmongodb.c b/modules/afmongodb/afmongodb.c index 0314481..f526bc1 100644 --- a/modules/afmongodb/afmongodb.c +++ b/modules/afmongodb/afmongodb.c @@ -424,12 +424,6 @@ afmongodb_dd_init(LogPipe *s) self->vp = value_pairs_new(); value_pairs_add_scope(self->vp, "selected-macros"); value_pairs_add_scope(self->vp, "nv-pairs"); - value_pairs_add_glob_pattern(self->vp, "R_*", FALSE); - value_pairs_add_glob_pattern(self->vp, "S_*", FALSE); - value_pairs_add_glob_pattern(self->vp, "HOST_FROM", FALSE); - value_pairs_add_glob_pattern(self->vp, "LEGACY_MSGHDR", FALSE); - value_pairs_add_glob_pattern(self->vp, "MSG", FALSE); - value_pairs_add_glob_pattern(self->vp, "SDATA", FALSE); } msg_verbose("Initializing MongoDB destination", -- 1.7.7.3
In preparation to support other things, and to slightly reduce code complexity, convert the MongoDB destination driver to use the mongo_sync family of functions. Signed-Off-By: Gergely Nagy <algernon@balabit.hu> --- modules/afmongodb/afmongodb.c | 15 +++++---------- 1 files changed, 5 insertions(+), 10 deletions(-) diff --git a/modules/afmongodb/afmongodb.c b/modules/afmongodb/afmongodb.c index f526bc1..48df68f 100644 --- a/modules/afmongodb/afmongodb.c +++ b/modules/afmongodb/afmongodb.c @@ -77,7 +77,7 @@ typedef struct LogQueue *queue; /* Writer-only stuff */ - mongo_connection *conn; + mongo_sync_connection *conn; gint32 seq_num; gchar *ns; @@ -189,7 +189,7 @@ afmongodb_dd_suspend(MongoDBDestDriver *self) static void afmongodb_dd_disconnect(MongoDBDestDriver *self) { - mongo_disconnect(self->conn); + mongo_sync_disconnect(self->conn); self->conn = NULL; } @@ -199,7 +199,7 @@ afmongodb_dd_connect(MongoDBDestDriver *self, gboolean reconnect) if (reconnect && self->conn) return TRUE; - self->conn = mongo_connect(self->host, self->port); + self->conn = mongo_sync_connect(self->host, self->port, FALSE); if (!self->conn) { @@ -255,7 +255,6 @@ static gboolean afmongodb_worker_insert (MongoDBDestDriver *self) { gboolean success; - mongo_packet *p; guint8 *oid; LogMessage *msg; LogPathOptions path_options = LOG_PATH_OPTIONS_INIT; @@ -288,10 +287,8 @@ afmongodb_worker_insert (MongoDBDestDriver *self) bson_append_document (self->bson_upd, "$set", self->bson_set); bson_finish (self->bson_upd); - p = mongo_wire_cmd_update (1, self->ns, 1, - self->bson_sel, self->bson_upd); - - if (!mongo_packet_send (self->conn, p)) + if (!mongo_sync_cmd_update (self->conn, self->ns, MONGO_WIRE_FLAG_UPDATE_UPSERT, + self->bson_sel, self->bson_upd)) { msg_error ("Network error while inserting into MongoDB", evt_tag_int("time_reopen", self->time_reopen), @@ -299,8 +296,6 @@ afmongodb_worker_insert (MongoDBDestDriver *self) success = FALSE; } - mongo_wire_packet_free (p); - msg_set_context(NULL); if (success) -- 1.7.7.3
The servers can now be specified with a server() parameter, where the first will be the primary (used in the stats for example), the rest will be the seeds. Signed-off-by: Gergely Nagy <algernon@balabit.hu> --- modules/afmongodb/afmongodb-grammar.ym | 4 +- modules/afmongodb/afmongodb-parser.c | 4 +-- modules/afmongodb/afmongodb.c | 53 ++++++++++++++++++++++++-------- modules/afmongodb/afmongodb.h | 3 +- 4 files changed, 44 insertions(+), 20 deletions(-) diff --git a/modules/afmongodb/afmongodb-grammar.ym b/modules/afmongodb/afmongodb-grammar.ym index 36d69c4..1a929ac 100644 --- a/modules/afmongodb/afmongodb-grammar.ym +++ b/modules/afmongodb/afmongodb-grammar.ym @@ -50,6 +50,7 @@ extern ValuePairsTransformSet *last_vp_transset; %token KW_MONGODB %token KW_COLLECTION +%token KW_SERVERS %% @@ -67,8 +68,7 @@ afmongodb_options ; afmongodb_option - : KW_HOST '(' string ')' { afmongodb_dd_set_host(last_driver, $3); free($3); } - | KW_PORT '(' LL_NUMBER ')' { afmongodb_dd_set_port(last_driver, $3); } + : KW_SERVERS '(' string_list ')' { afmongodb_dd_set_servers(last_driver, $3); } | KW_DATABASE '(' string ')' { afmongodb_dd_set_database(last_driver, $3); free($3); } | KW_COLLECTION '(' string ')' { afmongodb_dd_set_collection(last_driver, $3); free($3); } | KW_USERNAME '(' string ')' { afmongodb_dd_set_user(last_driver, $3); free($3); } diff --git a/modules/afmongodb/afmongodb-parser.c b/modules/afmongodb/afmongodb-parser.c index 67a9b0b..b589fe8 100644 --- a/modules/afmongodb/afmongodb-parser.c +++ b/modules/afmongodb/afmongodb-parser.c @@ -30,13 +30,11 @@ int afmongodb_parse(CfgLexer *lexer, LogDriver **instance, gpointer arg); static CfgLexerKeyword afmongodb_keywords[] = { { "mongodb", KW_MONGODB }, - { "host", KW_HOST }, - { "port", KW_PORT }, + { "servers", KW_SERVERS }, { "database", KW_DATABASE }, { "collection", KW_COLLECTION }, { "username", KW_USERNAME }, { "password", KW_PASSWORD }, - { "log_fifo_size", KW_LOG_FIFO_SIZE }, { NULL } }; diff --git a/modules/afmongodb/afmongodb.c b/modules/afmongodb/afmongodb.c index 48df68f..01d5dc8 100644 --- a/modules/afmongodb/afmongodb.c +++ b/modules/afmongodb/afmongodb.c @@ -49,6 +49,7 @@ typedef struct gchar *db; gchar *coll; + GList *servers; gchar *host; gint port; @@ -109,20 +110,12 @@ afmongodb_dd_set_password(LogDriver *d, const gchar *password) } void -afmongodb_dd_set_host(LogDriver *d, const gchar *host) +afmongodb_dd_set_servers(LogDriver *d, GList *servers) { MongoDBDestDriver *self = (MongoDBDestDriver *)d; - g_free(self->host); - self->host = g_strdup (host); -} - -void -afmongodb_dd_set_port(LogDriver *d, gint port) -{ - MongoDBDestDriver *self = (MongoDBDestDriver *)d; - - self->port = (int)port; + string_list_free(self->servers); + self->servers = servers; } void @@ -196,6 +189,8 @@ afmongodb_dd_disconnect(MongoDBDestDriver *self) static gboolean afmongodb_dd_connect(MongoDBDestDriver *self, gboolean reconnect) { + GList *l; + if (reconnect && self->conn) return TRUE; @@ -207,6 +202,27 @@ afmongodb_dd_connect(MongoDBDestDriver *self, gboolean reconnect) return FALSE; } + l = self->servers; + while ((l = g_list_next(l)) != NULL) + { + gchar *host = NULL; + gint port = 27017; + + if (!mongo_util_parse_addr(l->data, &host, &port)) + { + msg_warning("Cannot parse MongoDB server address, ignoring", + evt_tag_str("address", l->data), + NULL); + continue; + } + mongo_sync_conn_seed_add (self->conn, host, port); + msg_verbose("Added MongoDB server seed", + evt_tag_str("host", host), + evt_tag_int("port", port), + NULL); + g_free(host); + } + /* if (self->user || self->password) { @@ -421,6 +437,17 @@ afmongodb_dd_init(LogPipe *s) value_pairs_add_scope(self->vp, "nv-pairs"); } + self->host = NULL; + self->port = 27017; + if (!mongo_util_parse_addr(g_list_nth_data(self->servers, 0), &self->host, + &self->port)) + { + msg_error("Cannot parse the primary host", + evt_tag_str("primary", g_list_nth_data(self->servers, 0)), + NULL); + return FALSE; + } + msg_verbose("Initializing MongoDB destination", evt_tag_str("host", self->host), evt_tag_int("port", self->port), @@ -484,6 +511,7 @@ afmongodb_dd_free(LogPipe *d) g_free(self->user); g_free(self->password); g_free(self->host); + string_list_free(self->servers); value_pairs_free(self->vp); log_dest_driver_free(d); } @@ -541,8 +569,7 @@ afmongodb_dd_new(void) self->super.super.super.queue = afmongodb_dd_queue; self->super.super.super.free_fn = afmongodb_dd_free; - afmongodb_dd_set_host((LogDriver *)self, "127.0.0.1"); - afmongodb_dd_set_port((LogDriver *)self, 27017); + afmongodb_dd_set_servers((LogDriver *)self, g_list_append (NULL, g_strdup ("127.0.0.1:27017"))); afmongodb_dd_set_database((LogDriver *)self, "syslog"); afmongodb_dd_set_collection((LogDriver *)self, "messages"); diff --git a/modules/afmongodb/afmongodb.h b/modules/afmongodb/afmongodb.h index bd7b258..4aa47ef 100644 --- a/modules/afmongodb/afmongodb.h +++ b/modules/afmongodb/afmongodb.h @@ -29,8 +29,7 @@ LogDriver *afmongodb_dd_new(void); -void afmongodb_dd_set_host(LogDriver *d, const gchar *host); -void afmongodb_dd_set_port(LogDriver *d, gint port); +void afmongodb_dd_set_servers(LogDriver *d, GList *servers); void afmongodb_dd_set_database(LogDriver *d, const gchar *database); void afmongodb_dd_set_collection(LogDriver *d, const gchar *collection); void afmongodb_dd_set_user(LogDriver *d, const gchar *user); -- 1.7.7.3
On Tue, 2012-01-03 at 01:55 +0100, Gergely Nagy wrote:
The servers can now be specified with a server() parameter, where the first will be the primary (used in the stats for example), the rest will be the seeds.
Signed-off-by: Gergely Nagy <algernon@balabit.hu> --- modules/afmongodb/afmongodb-grammar.ym | 4 +- modules/afmongodb/afmongodb-parser.c | 4 +-- modules/afmongodb/afmongodb.c | 53 ++++++++++++++++++++++++-------- modules/afmongodb/afmongodb.h | 3 +- 4 files changed, 44 insertions(+), 20 deletions(-)
I'm applying this, but can you please make this backward compatible? You can even make the old options deprecated and the new ones appear only in "@version: 3.4" mode. People will use the mongodb driver in 3.3 too and I wouldn't like to break their environment. Thanks. -- Bazsi
Balazs Scheidler <bazsi@balabit.hu> writes:
On Tue, 2012-01-03 at 01:55 +0100, Gergely Nagy wrote:
The servers can now be specified with a server() parameter, where the first will be the primary (used in the stats for example), the rest will be the seeds.
Signed-off-by: Gergely Nagy <algernon@balabit.hu> --- modules/afmongodb/afmongodb-grammar.ym | 4 +- modules/afmongodb/afmongodb-parser.c | 4 +-- modules/afmongodb/afmongodb.c | 53 ++++++++++++++++++++++++-------- modules/afmongodb/afmongodb.h | 3 +- 4 files changed, 44 insertions(+), 20 deletions(-)
I'm applying this, but can you please make this backward compatible? You can even make the old options deprecated and the new ones appear only in "@version: 3.4" mode.
People will use the mongodb driver in 3.3 too and I wouldn't like to break their environment.
Yes, that's on my TODO list, but didn't get around to code it yet. I'll send a patch once it's ready. -- |8]
This adds a new option to the mongodb destination driver: safe-mode(yes|no), which, as the name implies, can be used to turn safe-mode on and off. For performance reasons, it is off by default. Safe mode basically involves an extra check after each insert, that checks whether the last insert succeeded, and only treating the insert a complete success when the double check succeeds too. Signed-off-by: Gergely Nagy <algernon@balabit.hu> --- modules/afmongodb/afmongodb-grammar.ym | 2 ++ modules/afmongodb/afmongodb-parser.c | 1 + modules/afmongodb/afmongodb.c | 14 +++++++++++++- modules/afmongodb/afmongodb.h | 1 + 4 files changed, 17 insertions(+), 1 deletions(-) diff --git a/modules/afmongodb/afmongodb-grammar.ym b/modules/afmongodb/afmongodb-grammar.ym index 1a929ac..a21e23c 100644 --- a/modules/afmongodb/afmongodb-grammar.ym +++ b/modules/afmongodb/afmongodb-grammar.ym @@ -51,6 +51,7 @@ extern ValuePairsTransformSet *last_vp_transset; %token KW_MONGODB %token KW_COLLECTION %token KW_SERVERS +%token KW_SAFE_MODE %% @@ -73,6 +74,7 @@ afmongodb_option | KW_COLLECTION '(' string ')' { afmongodb_dd_set_collection(last_driver, $3); free($3); } | KW_USERNAME '(' string ')' { afmongodb_dd_set_user(last_driver, $3); free($3); } | KW_PASSWORD '(' string ')' { afmongodb_dd_set_password(last_driver, $3); free($3); } + | KW_SAFE_MODE '(' yesno ')' { afmongodb_dd_set_safe_mode(last_driver, $3); } | value_pair_option { afmongodb_dd_set_value_pairs(last_driver, $1); } | dest_driver_option ; diff --git a/modules/afmongodb/afmongodb-parser.c b/modules/afmongodb/afmongodb-parser.c index b589fe8..b8c9ead 100644 --- a/modules/afmongodb/afmongodb-parser.c +++ b/modules/afmongodb/afmongodb-parser.c @@ -35,6 +35,7 @@ static CfgLexerKeyword afmongodb_keywords[] = { { "collection", KW_COLLECTION }, { "username", KW_USERNAME }, { "password", KW_PASSWORD }, + { "safe_mode", KW_SAFE_MODE }, { NULL } }; diff --git a/modules/afmongodb/afmongodb.c b/modules/afmongodb/afmongodb.c index 01d5dc8..1a7dd0e 100644 --- a/modules/afmongodb/afmongodb.c +++ b/modules/afmongodb/afmongodb.c @@ -53,6 +53,8 @@ typedef struct gchar *host; gint port; + gboolean safe_mode; + gchar *user; gchar *password; @@ -146,6 +148,14 @@ afmongodb_dd_set_value_pairs(LogDriver *d, ValuePairs *vp) self->vp = vp; } +void +afmongodb_dd_set_safe_mode(LogDriver *d, gboolean state) +{ + MongoDBDestDriver *self = (MongoDBDestDriver *)d; + + self->safe_mode = state; +} + /* * Utilities */ @@ -195,13 +205,14 @@ afmongodb_dd_connect(MongoDBDestDriver *self, gboolean reconnect) return TRUE; self->conn = mongo_sync_connect(self->host, self->port, FALSE); - if (!self->conn) { msg_error ("Error connecting to MongoDB", NULL); return FALSE; } + mongo_sync_conn_set_safe_mode(self->conn, self->safe_mode); + l = self->servers; while ((l = g_list_next(l)) != NULL) { @@ -572,6 +583,7 @@ afmongodb_dd_new(void) afmongodb_dd_set_servers((LogDriver *)self, g_list_append (NULL, g_strdup ("127.0.0.1:27017"))); afmongodb_dd_set_database((LogDriver *)self, "syslog"); afmongodb_dd_set_collection((LogDriver *)self, "messages"); + afmongodb_dd_set_safe_mode((LogDriver *)self, FALSE); init_sequence_number(&self->seq_num); diff --git a/modules/afmongodb/afmongodb.h b/modules/afmongodb/afmongodb.h index 4aa47ef..9950d26 100644 --- a/modules/afmongodb/afmongodb.h +++ b/modules/afmongodb/afmongodb.h @@ -35,5 +35,6 @@ void afmongodb_dd_set_collection(LogDriver *d, const gchar *collection); void afmongodb_dd_set_user(LogDriver *d, const gchar *user); void afmongodb_dd_set_password(LogDriver *d, const gchar *password); void afmongodb_dd_set_value_pairs(LogDriver *d, ValuePairs *vp); +void afmongodb_dd_set_safe_mode(LogDriver *d, gboolean state); #endif -- 1.7.7.3
The TODO file was old and stale, remove it. Signed-off-by: Gergely Nagy <algernon@balabit.hu> --- modules/afmongodb/TODO | 20 -------------------- 1 files changed, 0 insertions(+), 20 deletions(-) delete mode 100644 modules/afmongodb/TODO diff --git a/modules/afmongodb/TODO b/modules/afmongodb/TODO deleted file mode 100644 index b5b14fc..0000000 --- a/modules/afmongodb/TODO +++ /dev/null @@ -1,20 +0,0 @@ -# -*- org -*- - -* Finish migrating to libmongo-client -While the bulk of the code has been migrated, authentication does not -work with libmongo-client yet. - -* Date type handling -It would be very useful if we could either recognise the various date -macros and store them as appropriate date-typed bson objects, or, if -that would be too intrusive, offer an option to put the timestamp -into a specified key. - -* Documentation -There is some documentation in the syslog-ng@ mailing list archives -and on the project page at -http://asylum.madhouse-project.org/projects/syslog-ng/mongodb/, but -something closer to the driver would be preferable. - -However, if/when the driver gets merged, this will stop being my -problem! -- 1.7.7.3
participants (2)
-
Balazs Scheidler
-
Gergely Nagy