For TTL collections primarly (but useful for other scenarios aswell), introduce a new option: datetime(FIELD_NAME). This allows one to mark a single field anywhere in the structure as a datetime field, and the driver will attempt to convert it to a proper datetime, by assuming it is a unix timestamp. If the conversion fails, or the field is not found, it will not be inserted. Usage: mongodb(datetime("some.date") value-pairs( key("MESSAGE") pair("some.date", "$UNIXTIME"))) This will result in something like: { "_id" : ObjectId("509b8ffc1a1006213a000001"), "some" : { "date" : ISODate("2012-11-08T10:56:59Z") }, "MESSAGE" : "foobar" } Signed-off-by: Gergely Nagy <algernon@balabit.hu> --- modules/afmongodb/afmongodb-grammar.ym | 2 ++ modules/afmongodb/afmongodb-parser.c | 1 + modules/afmongodb/afmongodb.c | 61 +++++++++++++++++++++++++++++--- modules/afmongodb/afmongodb.h | 1 + 4 files changed, 61 insertions(+), 4 deletions(-) diff --git a/modules/afmongodb/afmongodb-grammar.ym b/modules/afmongodb/afmongodb-grammar.ym index f108f26..40eccc0 100644 --- a/modules/afmongodb/afmongodb-grammar.ym +++ b/modules/afmongodb/afmongodb-grammar.ym @@ -53,6 +53,7 @@ extern ValuePairsTransformSet *last_vp_transset; %token KW_SERVERS %token KW_SAFE_MODE %token KW_PATH +%token KW_DATETIME %% @@ -97,6 +98,7 @@ afmongodb_option | KW_USERNAME '(' string ')' { afmongodb_dd_set_user(last_driver, $3); free($3); } | KW_PASSWORD '(' string ')' { afmongodb_dd_set_password(last_driver, $3); free($3); } | KW_SAFE_MODE '(' yesno ')' { afmongodb_dd_set_safe_mode(last_driver, $3); } + | KW_DATETIME '(' string ')' { afmongodb_dd_set_datetime(last_driver, $3); free($3); } | value_pair_option { afmongodb_dd_set_value_pairs(last_driver, $1); } | dest_driver_option ; diff --git a/modules/afmongodb/afmongodb-parser.c b/modules/afmongodb/afmongodb-parser.c index 8aa55c1..1c51154 100644 --- a/modules/afmongodb/afmongodb-parser.c +++ b/modules/afmongodb/afmongodb-parser.c @@ -39,6 +39,7 @@ static CfgLexerKeyword afmongodb_keywords[] = { { "host", KW_HOST }, { "port", KW_PORT }, { "path", KW_PATH }, + { "datetime", KW_DATETIME }, { NULL } }; diff --git a/modules/afmongodb/afmongodb.c b/modules/afmongodb/afmongodb.c index 205ab98..bb267a5 100644 --- a/modules/afmongodb/afmongodb.c +++ b/modules/afmongodb/afmongodb.c @@ -22,6 +22,7 @@ */ #include <time.h> +#include <stdlib.h> #include "afmongodb.h" #include "afmongodb-parser.h" @@ -32,6 +33,7 @@ #include "nvtable.h" #include "logqueue.h" #include "value-pairs.h" +#include "scratch-buffers.h" #include "mongo.h" @@ -55,6 +57,7 @@ typedef struct gint port; gboolean safe_mode; + gchar *datetime_field; gchar *user; gchar *password; @@ -207,6 +210,15 @@ afmongodb_dd_set_safe_mode(LogDriver *d, gboolean state) self->safe_mode = state; } +void +afmongodb_dd_set_datetime(LogDriver *d, const gchar *field) +{ + MongoDBDestDriver *self = (MongoDBDestDriver *)d; + + g_free(self->datetime_field); + self->datetime_field = g_strdup(field); +} + /* * Utilities */ @@ -343,7 +355,7 @@ afmongodb_vp_obj_end(const gchar *name, if (prev_data) root = (bson *)*prev_data; else - root = (bson *)user_data; + root = (bson *)(((gpointer *)user_data)[0]); if (prefix_data) { @@ -357,6 +369,30 @@ afmongodb_vp_obj_end(const gchar *name, } static gboolean +afmongodb_field_is_datetime(const gchar *prefix, const gchar *name, + const gchar *datetime_field) +{ + size_t plen = 0; + + if (!datetime_field) + return FALSE; + + if (prefix) + plen = strlen(prefix); + + if (strncmp(datetime_field, prefix, plen) != 0) + return FALSE; + + if (datetime_field[plen] != '.') + return FALSE; + + if (strcmp(&datetime_field[plen + 1], name) != 0) + return FALSE; + + return TRUE; +} + +static gboolean afmongodb_vp_process_value(const gchar *name, const gchar *prefix, const gchar *value, gpointer *prefix_data, gpointer user_data) @@ -366,7 +402,7 @@ afmongodb_vp_process_value(const gchar *name, const gchar *prefix, if (prefix_data) o = (bson *)*prefix_data; else - o = (bson *)user_data; + o = (bson *)(((gpointer *)user_data)[0]); if (name[0] == '.') { @@ -378,7 +414,20 @@ afmongodb_vp_process_value(const gchar *name, const gchar *prefix, bson_append_string (o, tx_name, value, -1); } else - bson_append_string (o, name, value, -1); + { + MongoDBDestDriver *self = (MongoDBDestDriver *)(((gpointer *)user_data)[1]); + + if (afmongodb_field_is_datetime(prefix, name, self->datetime_field)) + { + gchar *endptr; + gint64 datetime = (gint64)strtoul(value, &endptr, 10) * 1000; + + if (endptr[0] == '\0') + bson_append_utc_datetime(o, name, datetime); + } + else + bson_append_string (o, name, value, -1); + } return FALSE; } @@ -390,6 +439,7 @@ afmongodb_worker_insert (MongoDBDestDriver *self) guint8 *oid; LogMessage *msg; LogPathOptions path_options = LOG_PATH_OPTIONS_INIT; + gpointer user_data[] = { NULL, self }; afmongodb_dd_connect(self, TRUE); @@ -408,11 +458,13 @@ afmongodb_worker_insert (MongoDBDestDriver *self) bson_append_oid (self->bson, "_id", oid); g_free (oid); + user_data[0] = self->bson; + value_pairs_walk(self->vp, afmongodb_vp_obj_start, afmongodb_vp_process_value, afmongodb_vp_obj_end, - msg, self->seq_num, self->bson); + msg, self->seq_num, user_data); bson_finish (self->bson); if (!mongo_sync_cmd_insert_n(self->conn, self->ns, 1, @@ -650,6 +702,7 @@ afmongodb_dd_free(LogPipe *d) string_list_free(self->servers); if (self->vp) value_pairs_free(self->vp); + g_free(self->datetime_field); log_dest_driver_free(d); } diff --git a/modules/afmongodb/afmongodb.h b/modules/afmongodb/afmongodb.h index 6a61616..76f8b83 100644 --- a/modules/afmongodb/afmongodb.h +++ b/modules/afmongodb/afmongodb.h @@ -39,6 +39,7 @@ void afmongodb_dd_set_password(LogDriver *d, const gchar *password); void afmongodb_dd_set_value_pairs(LogDriver *d, ValuePairs *vp); void afmongodb_dd_set_safe_mode(LogDriver *d, gboolean state); void afmongodb_dd_set_path(LogDriver *d, const gchar *path); +void afmongodb_dd_set_datetime(LogDriver *d, const gchar *field); gboolean afmongodb_dd_check_address(LogDriver *d, gboolean local); -- 1.7.10.4