[syslog-ng] [PATCH 2/3] afmongodb: Port to value-pairs().
Gergely Nagy
algernon at balabit.hu
Sun Apr 24 11:29:52 CEST 2011
Ported from custom, explicit key/value pairs to the far more flexible
value-pairs() solution. By default, the driver uses a custom scope:
selected_macros and nv_pairs, with a few patterns excluded.
The patch also turns the collection() parameter into a plain string
(from a templatable string), to considerably reduce the work needed to
insert messages into the database.
Signed-off-by: Gergely Nagy <algernon at balabit.hu>
---
modules/afmongodb/afmongodb-grammar.ym | 4 +-
modules/afmongodb/afmongodb-parser.c | 2 -
modules/afmongodb/afmongodb.c | 154 +++++++++----------------------
modules/afmongodb/afmongodb.h | 4 +-
4 files changed, 48 insertions(+), 116 deletions(-)
diff --git a/modules/afmongodb/afmongodb-grammar.ym b/modules/afmongodb/afmongodb-grammar.ym
index a19e2b4..7b5e680 100644
--- a/modules/afmongodb/afmongodb-grammar.ym
+++ b/modules/afmongodb/afmongodb-grammar.ym
@@ -48,7 +48,6 @@ extern LogDriver *last_driver;
%token KW_MONGODB
%token KW_COLLECTION
-%token KW_KEYS
%%
@@ -70,10 +69,9 @@ afmongodb_option
| KW_PORT '(' LL_NUMBER ')' { afmongodb_dd_set_port(last_driver, $3); }
| KW_DATABASE '(' string ')' { afmongodb_dd_set_database(last_driver, $3); free($3); }
| KW_COLLECTION '(' string ')' { afmongodb_dd_set_collection(last_driver, $3); free($3); }
- | KW_VALUES '(' string_list ')' { afmongodb_dd_set_values(last_driver, $3); }
- | KW_KEYS '(' string_list ')' { afmongodb_dd_set_keys(last_driver, $3); }
| KW_USERNAME '(' string ')' { afmongodb_dd_set_user(last_driver, $3); free($3); }
| KW_PASSWORD '(' string ')' { afmongodb_dd_set_password(last_driver, $3); free($3); }
+ | value_pair_stmt { afmongodb_dd_set_value_pairs(last_driver, last_value_pairs); }
| dest_driver_option
;
diff --git a/modules/afmongodb/afmongodb-parser.c b/modules/afmongodb/afmongodb-parser.c
index 5f9161e..67a9b0b 100644
--- a/modules/afmongodb/afmongodb-parser.c
+++ b/modules/afmongodb/afmongodb-parser.c
@@ -34,8 +34,6 @@ static CfgLexerKeyword afmongodb_keywords[] = {
{ "port", KW_PORT },
{ "database", KW_DATABASE },
{ "collection", KW_COLLECTION },
- { "keys", KW_KEYS },
- { "values", KW_VALUES },
{ "username", KW_USERNAME },
{ "password", KW_PASSWORD },
{ "log_fifo_size", KW_LOG_FIFO_SIZE },
diff --git a/modules/afmongodb/afmongodb.c b/modules/afmongodb/afmongodb.c
index 284066a..94f0ce1 100644
--- a/modules/afmongodb/afmongodb.c
+++ b/modules/afmongodb/afmongodb.c
@@ -47,7 +47,7 @@ typedef struct
/* Shared between main/writer; only read by the writer, never
written */
gchar *db;
- LogTemplate *coll;
+ gchar *coll;
gchar *host;
gint port;
@@ -55,12 +55,6 @@ typedef struct
gchar *user;
gchar *password;
- GList *keys;
- GList *values;
-
- gint num_fields;
- MongoDBField *fields;
-
time_t time_reopen;
guint32 *dropped_messages;
@@ -68,6 +62,8 @@ typedef struct
time_t last_msg_stamp;
+ ValuePairs *vp;
+
/* Thread related stuff; shared */
GThread *writer_thread;
GMutex *queue_mutex;
@@ -84,11 +80,9 @@ typedef struct
mongo_connection *conn;
gint32 seq_num;
- GString *current_namespace;
- gint ns_prefix_len;
+ gchar *ns;
GString *current_value;
-
bson *bson_sel, *bson_upd, *bson_set;
} MongoDBDestDriver;
@@ -145,26 +139,17 @@ afmongodb_dd_set_collection(LogDriver *d, const gchar *collection)
{
MongoDBDestDriver *self = (MongoDBDestDriver *)d;
- log_template_unref(self->coll);
- self->coll = log_template_new(log_pipe_get_config(&d->super), NULL, collection);
+ g_free(self->coll);
+ self->coll = g_strdup(collection);
}
void
-afmongodb_dd_set_keys(LogDriver *d, GList *keys)
+afmongodb_dd_set_value_pairs(LogDriver *d, ValuePairs *vp)
{
MongoDBDestDriver *self = (MongoDBDestDriver *)d;
- string_list_free(self->keys);
- self->keys = keys;
-}
-
-void
-afmongodb_dd_set_values(LogDriver *d, GList *values)
-{
- MongoDBDestDriver *self = (MongoDBDestDriver *)d;
-
- string_list_free(self->values);
- self->values = values;
+ value_pairs_free (self->vp);
+ self->vp = vp;
}
/*
@@ -177,7 +162,7 @@ afmongodb_dd_format_stats_instance(MongoDBDestDriver *self)
static gchar persist_name[1024];
g_snprintf(persist_name, sizeof(persist_name),
- "mongodb,%s,%u,%s,%s", self->host, self->port, self->db, self->coll->template);
+ "mongodb,%s,%u,%s,%s", self->host, self->port, self->db, self->coll);
return persist_name;
}
@@ -187,7 +172,7 @@ afmongodb_dd_format_persist_name(MongoDBDestDriver *self)
static gchar persist_name[1024];
g_snprintf(persist_name, sizeof(persist_name),
- "afmongodb(%s,%u,%s,%s)", self->host, self->port, self->db, self->coll->template);
+ "afmongodb(%s,%u,%s,%s)", self->host, self->port, self->db, self->coll);
return persist_name;
}
@@ -244,11 +229,23 @@ afmongodb_dd_connect(MongoDBDestDriver *self, gboolean reconnect)
/*
* Worker thread
*/
+static gboolean
+afmongodb_vp_foreach (const gchar *name, const gchar *value,
+ gpointer user_data)
+{
+ bson *bson_set = (bson *)user_data;
+
+ if (name[0] == '.')
+ bson_append_string (bson_set, name + 1, value, -1);
+ else
+ bson_append_string (bson_set, name, value, -1);
+
+ return FALSE;
+}
static gboolean
afmongodb_worker_insert (MongoDBDestDriver *self)
{
- gint i;
gboolean success;
mongo_packet *p;
guint8 *oid;
@@ -274,25 +271,15 @@ afmongodb_worker_insert (MongoDBDestDriver *self)
g_free (oid);
bson_finish (self->bson_sel);
- g_string_truncate(self->current_namespace, self->ns_prefix_len);
- log_template_append_format(self->coll, msg, NULL, LTZ_LOCAL,
- self->seq_num, NULL, self->current_namespace);
-
- for (i = 0; i < self->num_fields; i++)
- {
- log_template_format(self->fields[i].value, msg, NULL, LTZ_SEND,
- self->seq_num, NULL, self->current_value);
- if (self->current_value->len)
- bson_append_string(self->bson_set, self->fields[i].name,
- self->current_value->str, -1);
- }
+ value_pairs_foreach (self->vp, afmongodb_vp_foreach,
+ msg, self->seq_num, self->bson_set);
bson_finish (self->bson_set);
bson_append_document (self->bson_upd, "$set", self->bson_set);
bson_finish (self->bson_upd);
- p = mongo_wire_cmd_update (1, self->current_namespace->str, 1,
+ p = mongo_wire_cmd_update (1, self->ns, 1,
self->bson_sel, self->bson_upd);
if (!mongo_packet_send (self->conn, p))
@@ -335,12 +322,8 @@ afmongodb_worker_thread (gpointer arg)
NULL);
success = afmongodb_dd_connect(self, FALSE);
- self->current_namespace = g_string_sized_new(64);
- self->ns_prefix_len = strlen (self->db) + 1;
- self->current_namespace =
- g_string_append_c (g_string_assign (self->current_namespace,
- self->db), '.');
+ self->ns = g_strconcat (self->db, ".", self->coll, NULL);
self->current_value = g_string_sized_new(256);
@@ -383,7 +366,7 @@ afmongodb_worker_thread (gpointer arg)
afmongodb_dd_disconnect(self);
- g_string_free (self->current_namespace, TRUE);
+ g_free (self->ns);
g_string_free (self->current_value, TRUE);
bson_free (self->bson_sel);
@@ -421,15 +404,25 @@ afmongodb_dd_init(LogPipe *s)
MongoDBDestDriver *self = (MongoDBDestDriver *)s;
GlobalConfig *cfg = log_pipe_get_config(s);
- gint num_keys, num_values, i;
- GList *key, *value;
-
if (!log_dest_driver_init_method(s))
return FALSE;
if (cfg)
self->time_reopen = cfg->time_reopen;
+ if (!self->vp)
+ {
+ self->vp = value_pairs_new(cfg);
+ value_pairs_add_scope(self->vp, VALUE_PAIR_SCOPE_SELECTED_MACROS);
+ value_pairs_add_scope(self->vp, VALUE_PAIR_SCOPE_NV_PAIRS);
+ value_pairs_exclude_glob_pattern(self->vp, "R_*");
+ value_pairs_exclude_glob_pattern(self->vp, "S_*");
+ value_pairs_exclude_glob_pattern(self->vp, "HOST_FROM");
+ value_pairs_exclude_glob_pattern(self->vp, "LEGACY_MSGHDR");
+ value_pairs_exclude_glob_pattern(self->vp, "MSG");
+ value_pairs_exclude_glob_pattern(self->vp, "SDATA");
+ }
+
msg_verbose("Initializing MongoDB destination",
evt_tag_str("host", self->host),
evt_tag_int("port", self->port),
@@ -438,29 +431,6 @@ afmongodb_dd_init(LogPipe *s)
self->queue = log_dest_driver_acquire_queue(&self->super, afmongodb_dd_format_persist_name(self));
- if (!self->fields)
- {
- num_keys = g_list_length(self->keys);
- num_values = g_list_length(self->values);
-
- if (num_keys != num_values)
- {
- msg_error("The number of keys and values do not match",
- evt_tag_int("num_keys", num_keys),
- evt_tag_int("num_values", num_values),
- NULL);
- return FALSE;
- }
- self->num_fields = num_keys;
- self->fields = g_new0(MongoDBField, num_keys);
-
- for (i = 0, key = self->keys, value = self->values; key && value; i++, key = key->next, value = value->next)
- {
- self->fields[i].name = g_strdup(key->data);
- self->fields[i].value = log_template_new(cfg, NULL, (gchar *)value->data);
- }
- }
-
stats_register_counter(0, SCS_MONGODB | SCS_DESTINATION, self->super.super.id,
afmongodb_dd_format_stats_instance(self),
SC_TYPE_STORED, &self->stored_messages);
@@ -499,7 +469,6 @@ static void
afmongodb_dd_free(LogPipe *d)
{
MongoDBDestDriver *self = (MongoDBDestDriver *)d;
- gint i;
g_mutex_free(self->suspend_mutex);
g_mutex_free(self->queue_mutex);
@@ -508,20 +477,13 @@ afmongodb_dd_free(LogPipe *d)
if (self->queue)
log_queue_unref(self->queue);
- for (i = 0; i < self->num_fields; i++)
- {
- g_free(self->fields[i].name);
- log_template_unref(self->fields[i].value);
- }
-
- g_free(self->fields);
g_free(self->db);
- log_template_unref(self->coll);
+ g_free(self->coll);
g_free(self->user);
g_free(self->password);
g_free(self->host);
- string_list_free(self->keys);
- string_list_free(self->values);
+
+ value_pairs_free(self->vp);
log_dest_driver_free(d);
}
@@ -548,30 +510,6 @@ afmongodb_dd_queue(LogPipe *s, LogMessage *msg, const LogPathOptions *path_optio
* Plugin glue.
*/
-const gchar *default_keys[] =
-{
- "date",
- "facility",
- "level",
- "host",
- "program",
- "pid",
- "message",
- NULL
-};
-
-const gchar *default_values[] =
-{
- "${R_YEAR}-${R_MONTH}-${R_DAY} ${R_HOUR}:${R_MIN}:${R_SEC}",
- "$FACILITY",
- "$LEVEL",
- "$HOST",
- "$PROGRAM",
- "$PID",
- "$MSGONLY",
- NULL
-};
-
LogDriver *
afmongodb_dd_new(void)
{
@@ -589,8 +527,6 @@ afmongodb_dd_new(void)
afmongodb_dd_set_port((LogDriver *)self, 27017);
afmongodb_dd_set_database((LogDriver *)self, "syslog");
afmongodb_dd_set_collection((LogDriver *)self, "messages");
- afmongodb_dd_set_keys((LogDriver *)self, string_array_to_list(default_keys));
- afmongodb_dd_set_values((LogDriver *)self, string_array_to_list(default_values));
init_sequence_number(&self->seq_num);
diff --git a/modules/afmongodb/afmongodb.h b/modules/afmongodb/afmongodb.h
index e54faf2..bd7b258 100644
--- a/modules/afmongodb/afmongodb.h
+++ b/modules/afmongodb/afmongodb.h
@@ -25,6 +25,7 @@
#define AFMONGODB_H_INCLUDED
#include "driver.h"
+#include "value-pairs.h"
LogDriver *afmongodb_dd_new(void);
@@ -32,9 +33,8 @@ void afmongodb_dd_set_host(LogDriver *d, const gchar *host);
void afmongodb_dd_set_port(LogDriver *d, gint port);
void afmongodb_dd_set_database(LogDriver *d, const gchar *database);
void afmongodb_dd_set_collection(LogDriver *d, const gchar *collection);
-void afmongodb_dd_set_values(LogDriver *d, GList *values);
-void afmongodb_dd_set_keys(LogDriver *d, GList *keys);
void afmongodb_dd_set_user(LogDriver *d, const gchar *user);
void afmongodb_dd_set_password(LogDriver *d, const gchar *password);
+void afmongodb_dd_set_value_pairs(LogDriver *d, ValuePairs *vp);
#endif
--
1.7.2.5
More information about the syslog-ng
mailing list