[PATCH (3.4) 0/3] afsql code cleanups
The patches following this mail are my attempts at cleaning up the afsql driver here and there, mostly afsql_dd_insert_db(), in preparation for moving towards LogThreadedDestDriver (which will be posted separately a bit later). The migration is possible without these patches, but these are worth applying even without the LogThreadedDestDriver patches, as these present no real architectural change, just a little refactoring. I think the code could be cleaned up further, but for the time being, I only did what I thought necessary to prepare for migration to LogThreadedDestDriver.
To make the code clearer, move the code to connect to a database out of afsql_dd_insert_db() into afsql_dd_connect(). Signed-off-by: Gergely Nagy <algernon@balabit.hu> --- modules/afsql/afsql.c | 138 +++++++++++++++++++++++++------------------------ 1 file changed, 71 insertions(+), 67 deletions(-) diff --git a/modules/afsql/afsql.c b/modules/afsql/afsql.c index 16cf253..8b5cc2d 100644 --- a/modules/afsql/afsql.c +++ b/modules/afsql/afsql.c @@ -651,6 +651,76 @@ afsql_dd_set_dbd_opt_numeric(gpointer key, gpointer value, gpointer user_data) GPOINTER_TO_INT(value)); } +static gboolean +afsql_dd_connect(AFSqlDestDriver *self) +{ + if (self->dbi_ctx) + return TRUE; + + self->dbi_ctx = dbi_conn_new(self->type); + if (!self->dbi_ctx) + { + msg_error("No such DBI driver", + evt_tag_str("type", self->type), + NULL); + return FALSE; + } + + dbi_conn_set_option(self->dbi_ctx, "host", self->host); + if (strcmp(self->type, "mysql")) + dbi_conn_set_option(self->dbi_ctx, "port", self->port); + else + dbi_conn_set_option_numeric(self->dbi_ctx, "port", atoi(self->port)); + dbi_conn_set_option(self->dbi_ctx, "username", self->user); + dbi_conn_set_option(self->dbi_ctx, "password", self->password); + dbi_conn_set_option(self->dbi_ctx, "dbname", self->database); + dbi_conn_set_option(self->dbi_ctx, "encoding", self->encoding); + dbi_conn_set_option(self->dbi_ctx, "auto-commit", self->flags & AFSQL_DDF_EXPLICIT_COMMITS ? "false" : "true"); + + /* database specific hacks */ + dbi_conn_set_option(self->dbi_ctx, "sqlite_dbdir", ""); + dbi_conn_set_option(self->dbi_ctx, "sqlite3_dbdir", ""); + + /* Set user-specified options */ + g_hash_table_foreach(self->dbd_options, afsql_dd_set_dbd_opt, self->dbi_ctx); + g_hash_table_foreach(self->dbd_options_numeric, afsql_dd_set_dbd_opt_numeric, self->dbi_ctx); + + if (dbi_conn_connect(self->dbi_ctx) < 0) + { + const gchar *dbi_error; + + dbi_conn_error(self->dbi_ctx, &dbi_error); + + msg_error("Error establishing SQL connection", + evt_tag_str("type", self->type), + evt_tag_str("host", self->host), + evt_tag_str("port", self->port), + evt_tag_str("username", self->user), + evt_tag_str("database", self->database), + evt_tag_str("error", dbi_error), + NULL); + return FALSE; + } + + if (self->session_statements != NULL) + { + GList *l; + + for (l = self->session_statements; l; l = l->next) + { + if (!afsql_dd_run_query(self, (gchar *) l->data, FALSE, NULL)) + { + msg_error("Error executing SQL connection statement", + evt_tag_str("statement", (gchar *) l->data), + NULL); + return FALSE; + } + } + } + + return TRUE; +} + /** * afsql_dd_insert_db: * @@ -668,73 +738,7 @@ afsql_dd_insert_db(AFSqlDestDriver *self) gint i; LogPathOptions path_options = LOG_PATH_OPTIONS_INIT; - if (!self->dbi_ctx) - { - self->dbi_ctx = dbi_conn_new(self->type); - if (self->dbi_ctx) - { - dbi_conn_set_option(self->dbi_ctx, "host", self->host); - if (strcmp(self->type, "mysql")) - dbi_conn_set_option(self->dbi_ctx, "port", self->port); - else - dbi_conn_set_option_numeric(self->dbi_ctx, "port", atoi(self->port)); - dbi_conn_set_option(self->dbi_ctx, "username", self->user); - dbi_conn_set_option(self->dbi_ctx, "password", self->password); - dbi_conn_set_option(self->dbi_ctx, "dbname", self->database); - dbi_conn_set_option(self->dbi_ctx, "encoding", self->encoding); - dbi_conn_set_option(self->dbi_ctx, "auto-commit", self->flags & AFSQL_DDF_EXPLICIT_COMMITS ? "false" : "true"); - - /* database specific hacks */ - dbi_conn_set_option(self->dbi_ctx, "sqlite_dbdir", ""); - dbi_conn_set_option(self->dbi_ctx, "sqlite3_dbdir", ""); - - /* Set user-specified options */ - g_hash_table_foreach(self->dbd_options, afsql_dd_set_dbd_opt, self->dbi_ctx); - g_hash_table_foreach(self->dbd_options_numeric, afsql_dd_set_dbd_opt_numeric, self->dbi_ctx); - - if (dbi_conn_connect(self->dbi_ctx) < 0) - { - const gchar *dbi_error; - - dbi_conn_error(self->dbi_ctx, &dbi_error); - - msg_error("Error establishing SQL connection", - evt_tag_str("type", self->type), - evt_tag_str("host", self->host), - evt_tag_str("port", self->port), - evt_tag_str("username", self->user), - evt_tag_str("database", self->database), - evt_tag_str("error", dbi_error), - NULL); - return FALSE; - } - } - else - { - msg_error("No such DBI driver", - evt_tag_str("type", self->type), - NULL); - return FALSE; - } - - if (self->session_statements != NULL) - { - GList *l; - - for (l = self->session_statements; l; l = l->next) - { - if (!afsql_dd_run_query(self, (gchar *) l->data, FALSE, NULL)) - { - msg_error("Error executing SQL connection statement", - evt_tag_str("statement", (gchar *) l->data), - NULL); - return FALSE; - } - } - } - } - - /* connection established, try to insert a message */ + afsql_dd_connect(self); if (self->pending_msg) { -- 1.7.10
To make things a tiny bit easier, move the final error handling out of _insert_db() into a separate function: afsql_dd_insert_fail_handler(). Furthermore, change afsql_dd_validate_table() to allocate the GString for the table name itself, and resolve the template too, if so need be. The above two allows us to simplify afsql_dd_insert_db() a little: when validating the table, we can immediately bail out, without a goto, since we only need to free up the bare minimum. And in the end, a monstrous if (success) {} else {} branch is replaced by something a lot shorter, and clearer. All of these together make the flow of afsql_dd_insert_db() easier to understand and follow. Signed-off-by: Gergely Nagy <algernon@balabit.hu> --- modules/afsql/afsql.c | 158 ++++++++++++++++++++++++++----------------------- 1 file changed, 84 insertions(+), 74 deletions(-) diff --git a/modules/afsql/afsql.c b/modules/afsql/afsql.c index 8b5cc2d..4f5ffec 100644 --- a/modules/afsql/afsql.c +++ b/modules/afsql/afsql.c @@ -1,6 +1,6 @@ /* - * Copyright (c) 2002-2010 BalaBit IT Ltd, Budapest, Hungary - * Copyright (c) 1998-2010 Balázs Scheidler + * Copyright (c) 2002-2012 BalaBit IT Ltd, Budapest, Hungary + * Copyright (c) 1998-2012 Balázs Scheidler * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 as published @@ -458,24 +458,27 @@ afsql_dd_create_index(AFSqlDestDriver *self, gchar *table, gchar *column) * * NOTE: This function can only be called from the database thread. **/ -static gboolean -afsql_dd_validate_table(AFSqlDestDriver *self, gchar *table) +static GString * +afsql_dd_validate_table(AFSqlDestDriver *self, LogMessage *msg) { - GString *query_string; + GString *query_string, *table; dbi_result db_res; gboolean success = FALSE; gint i; + table = g_string_sized_new(32); + log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table); + if (self->flags & AFSQL_DDF_DONT_CREATE_TABLES) - return TRUE; + return table; - afsql_dd_check_sql_identifier(table, TRUE); + afsql_dd_check_sql_identifier(table->str, TRUE); - if (g_hash_table_lookup(self->validated_tables, table)) - return TRUE; + if (g_hash_table_lookup(self->validated_tables, table->str)) + return table; query_string = g_string_sized_new(32); - g_string_printf(query_string, "SELECT * FROM %s WHERE 0=1", table); + g_string_printf(query_string, "SELECT * FROM %s WHERE 0=1", table->str); if (afsql_dd_run_query(self, query_string->str, TRUE, &db_res)) { @@ -487,11 +490,11 @@ afsql_dd_validate_table(AFSqlDestDriver *self, gchar *table) { GList *l; /* field does not exist, add this column */ - g_string_printf(query_string, "ALTER TABLE %s ADD %s %s", table, self->fields[i].name, self->fields[i].type); + g_string_printf(query_string, "ALTER TABLE %s ADD %s %s", table->str, self->fields[i].name, self->fields[i].type); if (!afsql_dd_run_query(self, query_string->str, FALSE, NULL)) { msg_error("Error adding missing column, giving up", - evt_tag_str("table", table), + evt_tag_str("table", table->str), evt_tag_str("column", self->fields[i].name), NULL); success = FALSE; @@ -502,7 +505,7 @@ afsql_dd_validate_table(AFSqlDestDriver *self, gchar *table) if (strcmp((gchar *) l->data, self->fields[i].name) == 0) { /* this is an indexed column, create index */ - afsql_dd_create_index(self, table, self->fields[i].name); + afsql_dd_create_index(self, table->str, self->fields[i].name); } } } @@ -513,7 +516,7 @@ afsql_dd_validate_table(AFSqlDestDriver *self, gchar *table) { /* table does not exist, create it */ - g_string_printf(query_string, "CREATE TABLE %s (", table); + g_string_printf(query_string, "CREATE TABLE %s (", table->str); for (i = 0; i < self->fields_len; i++) { g_string_append_printf(query_string, "%s %s", self->fields[i].name, self->fields[i].type); @@ -528,23 +531,29 @@ afsql_dd_validate_table(AFSqlDestDriver *self, gchar *table) success = TRUE; for (l = self->indexes; l; l = l->next) { - afsql_dd_create_index(self, table, (gchar *) l->data); + afsql_dd_create_index(self, table->str, (gchar *) l->data); } } else { msg_error("Error creating table, giving up", - evt_tag_str("table", table), + evt_tag_str("table", table->str), NULL); } } if (success) { /* we have successfully created/altered the destination table, record this information */ - g_hash_table_insert(self->validated_tables, g_strdup(table), GUINT_TO_POINTER(TRUE)); + g_hash_table_insert(self->validated_tables, g_strdup(table->str), GUINT_TO_POINTER(TRUE)); + } + else + { + g_string_free(table, TRUE); + table = NULL; } g_string_free(query_string, TRUE); - return success; + + return table; } /** @@ -721,6 +730,45 @@ afsql_dd_connect(AFSqlDestDriver *self) return TRUE; } +static gboolean +afsql_dd_insert_fail_handler(AFSqlDestDriver *self, LogMessage *msg, + LogPathOptions *path_options) +{ + if (self->failed_message_counter < self->num_retries - 1) + { + self->pending_msg = msg; + self->pending_msg_ack_needed = path_options->ack_needed; + + /* database connection status sanity check after failed query */ + if (dbi_conn_ping(self->dbi_ctx) != 1) + { + const gchar *dbi_error; + + dbi_conn_error(self->dbi_ctx, &dbi_error); + msg_error("Error, no SQL connection after failed query attempt", + evt_tag_str("type", self->type), + evt_tag_str("host", self->host), + evt_tag_str("port", self->port), + evt_tag_str("username", self->user), + evt_tag_str("database", self->database), + evt_tag_str("error", dbi_error), + NULL); + return FALSE; + } + + self->failed_message_counter++; + return FALSE; + } + + msg_error("Multiple failures while inserting this record into the database, message dropped", + evt_tag_int("attempts", self->num_retries), + NULL); + stats_counter_inc(self->dropped_messages); + log_msg_drop(msg, path_options); + self->failed_message_counter = 0; + return TRUE; +} + /** * afsql_dd_insert_db: * @@ -767,22 +815,21 @@ afsql_dd_insert_db(AFSqlDestDriver *self) msg_set_context(msg); - table = g_string_sized_new(32); - value = g_string_sized_new(256); - query_string = g_string_sized_new(512); - - log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table); - - if (!afsql_dd_validate_table(self, table->str)) + table = afsql_dd_validate_table(self, msg); + if (!table) { /* If validate table is FALSE then close the connection and wait time_reopen time (next call) */ msg_error("Error checking table, disconnecting from database, trying again shortly", evt_tag_int("time_reopen", self->time_reopen), NULL); - success = FALSE; - goto error; + msg_set_context(NULL); + g_string_free(table, TRUE); + return afsql_dd_insert_fail_handler(self, msg, &path_options); } + value = g_string_sized_new(256); + query_string = g_string_sized_new(512); + g_string_printf(query_string, "INSERT INTO %s (", table->str); for (i = 0; i < self->fields_len; i++) { @@ -848,63 +895,26 @@ afsql_dd_insert_db(AFSqlDestDriver *self) if (self->flush_lines && self->flush_lines_queued == self->flush_lines && !afsql_dd_commit_txn(self, TRUE)) return FALSE; } - error: + g_string_free(table, TRUE); g_string_free(value, TRUE); g_string_free(query_string, TRUE); msg_set_context(NULL); - if (success) - { - /* we only ACK if each INSERT is a separate transaction */ - if ((self->flags & AFSQL_DDF_EXPLICIT_COMMITS) == 0) - log_msg_ack(msg, &path_options); - log_msg_unref(msg); - step_sequence_number(&self->seq_num); - self->failed_message_counter = 0; - } - else - { - if (self->failed_message_counter < self->num_retries - 1) - { - self->pending_msg = msg; - self->pending_msg_ack_needed = path_options.ack_needed; + if (!success) + return afsql_dd_insert_fail_handler(self, msg, &path_options); - /* database connection status sanity check after failed query */ - if (dbi_conn_ping(self->dbi_ctx) != 1) - { - const gchar *dbi_error; - - dbi_conn_error(self->dbi_ctx, &dbi_error); - msg_error("Error, no SQL connection after failed query attempt", - evt_tag_str("type", self->type), - evt_tag_str("host", self->host), - evt_tag_str("port", self->port), - evt_tag_str("username", self->user), - evt_tag_str("database", self->database), - evt_tag_str("error", dbi_error), - NULL); - return FALSE; - } + /* we only ACK if each INSERT is a separate transaction */ + if ((self->flags & AFSQL_DDF_EXPLICIT_COMMITS) == 0) + log_msg_ack(msg, &path_options); + log_msg_unref(msg); + step_sequence_number(&self->seq_num); + self->failed_message_counter = 0; - self->failed_message_counter++; - } - else - { - msg_error("Multiple failures while inserting this record into the database, message dropped", - evt_tag_int("attempts", self->num_retries), - NULL); - stats_counter_inc(self->dropped_messages); - log_msg_drop(msg, &path_options); - self->failed_message_counter = 0; - success = TRUE; - } - } - return success; + return TRUE; } - /** * afsql_dd_database_thread: * -- 1.7.10
To improve the flow of the afsql_dd_insert_db() function, moved the code to construct the insert statement out into a new function: afsql_dd_construct_query(). Signed-off-by: Gergely Nagy <algernon@balabit.hu> --- modules/afsql/afsql.c | 125 +++++++++++++++++++++++++------------------------ 1 file changed, 65 insertions(+), 60 deletions(-) diff --git a/modules/afsql/afsql.c b/modules/afsql/afsql.c index 4f5ffec..bed2ca9 100644 --- a/modules/afsql/afsql.c +++ b/modules/afsql/afsql.c @@ -769,6 +769,68 @@ afsql_dd_insert_fail_handler(AFSqlDestDriver *self, LogMessage *msg, return TRUE; } +static GString * +afsql_dd_construct_query(AFSqlDestDriver *self, GString *table, + LogMessage *msg) +{ + GString *value; + GString *query_string; + gint i; + + value = g_string_sized_new(256); + query_string = g_string_sized_new(512); + + g_string_printf(query_string, "INSERT INTO %s (", table->str); + for (i = 0; i < self->fields_len; i++) + { + g_string_append(query_string, self->fields[i].name); + if (i != self->fields_len - 1) + g_string_append(query_string, ", "); + } + g_string_append(query_string, ") VALUES ("); + + for (i = 0; i < self->fields_len; i++) + { + gchar *quoted; + + if (self->fields[i].value == NULL) + { + /* the config used the 'default' value for this column -> the fields[i].value is NULL, use SQL default */ + g_string_append(query_string, "DEFAULT"); + } + else + { + log_template_format(self->fields[i].value, msg, &self->template_options, LTZ_SEND, self->seq_num, NULL, value); + + if (self->null_value && strcmp(self->null_value, value->str) == 0) + { + g_string_append(query_string, "NULL"); + } + else + { + dbi_conn_quote_string_copy(self->dbi_ctx, value->str, "ed); + if (quoted) + { + g_string_append(query_string, quoted); + free(quoted); + } + else + { + g_string_append(query_string, "''"); + } + } + } + + if (i != self->fields_len - 1) + g_string_append(query_string, ", "); + } + g_string_append(query_string, ")"); + + g_string_free(value, TRUE); + + return query_string; +} + /** * afsql_dd_insert_db: * @@ -780,10 +842,9 @@ afsql_dd_insert_fail_handler(AFSqlDestDriver *self, LogMessage *msg, static gboolean afsql_dd_insert_db(AFSqlDestDriver *self) { - GString *table, *query_string, *value; + GString *table, *query_string; LogMessage *msg; gboolean success; - gint i; LogPathOptions path_options = LOG_PATH_OPTIONS_INIT; afsql_dd_connect(self); @@ -827,67 +888,12 @@ afsql_dd_insert_db(AFSqlDestDriver *self) return afsql_dd_insert_fail_handler(self, msg, &path_options); } - value = g_string_sized_new(256); - query_string = g_string_sized_new(512); - - g_string_printf(query_string, "INSERT INTO %s (", table->str); - for (i = 0; i < self->fields_len; i++) - { - g_string_append(query_string, self->fields[i].name); - if (i != self->fields_len - 1) - g_string_append(query_string, ", "); - } - g_string_append(query_string, ") VALUES ("); - - for (i = 0; i < self->fields_len; i++) - { - gchar *quoted; - - if (self->fields[i].value == NULL) - { - /* the config used the 'default' value for this column -> the fields[i].value is NULL, use SQL default */ - g_string_append(query_string, "DEFAULT"); - } - else - { - log_template_format(self->fields[i].value, msg, &self->template_options, LTZ_SEND, self->seq_num, NULL, value); - - if (self->null_value && strcmp(self->null_value, value->str) == 0) - { - g_string_append(query_string, "NULL"); - } - else - { - dbi_conn_quote_string_copy(self->dbi_ctx, value->str, "ed); - if (quoted) - { - g_string_append(query_string, quoted); - free(quoted); - } - else - { - g_string_append(query_string, "''"); - } - } - } - - if (i != self->fields_len - 1) - g_string_append(query_string, ", "); - } - g_string_append(query_string, ")"); - - /* we have the INSERT statement ready in query_string */ + query_string = afsql_dd_construct_query(self, table, msg); if (self->flush_lines_queued == 0 && !afsql_dd_begin_txn(self)) return FALSE; - success = TRUE; - if (!afsql_dd_run_query(self, query_string->str, FALSE, NULL)) - { - /* error running INSERT on an already validated table, too bad. Try to reconnect. Maybe that helps. */ - success = FALSE; - } - + success = afsql_dd_run_query(self, query_string->str, FALSE, NULL); if (success && self->flush_lines_queued != -1) { self->flush_lines_queued++; @@ -897,7 +903,6 @@ afsql_dd_insert_db(AFSqlDestDriver *self) } g_string_free(table, TRUE); - g_string_free(value, TRUE); g_string_free(query_string, TRUE); msg_set_context(NULL); -- 1.7.10
Hi, I generally like these changes and would apply them too. However I know about some related work by the BalaBit syslog-ng team, I'm afraid we would have an enourmous conflict when trying to pull their stuff into our tree. Since this is an opportunity to larger changes I'd love to integrate their stuff too. Can you give it a peek? PS: the thing I'd love to see is to use the LogQueue backlog instead of pending_msg in @self. Thanks. On Fri, 2012-04-27 at 13:53 +0200, Gergely Nagy wrote:
The patches following this mail are my attempts at cleaning up the afsql driver here and there, mostly afsql_dd_insert_db(), in preparation for moving towards LogThreadedDestDriver (which will be posted separately a bit later).
The migration is possible without these patches, but these are worth applying even without the LogThreadedDestDriver patches, as these present no real architectural change, just a little refactoring.
I think the code could be cleaned up further, but for the time being, I only did what I thought necessary to prepare for migration to LogThreadedDestDriver.
______________________________________________________________________________ Member info: https://lists.balabit.hu/mailman/listinfo/syslog-ng Documentation: http://www.balabit.com/support/documentation/?product=syslog-ng FAQ: http://www.balabit.com/wiki/syslog-ng-faq
-- Bazsi
Balazs Scheidler <bazsi@balabit.hu> writes:
Hi,
I generally like these changes and would apply them too. However I know about some related work by the BalaBit syslog-ng team, I'm afraid we would have an enourmous conflict when trying to pull their stuff into our tree. Since this is an opportunity to larger changes I'd love to integrate their stuff too.
Can you give it a peek?
PS: the thing I'd love to see is to use the LogQueue backlog instead of pending_msg in @self.
Good idea, thanks! I'll do that, and report back with patches. -- |8]
participants (2)
-
Balazs Scheidler
-
Gergely Nagy