To make things a tiny bit easier, move the final error handling out of _insert_db() into a separate function: afsql_dd_insert_fail_handler(). Furthermore, change afsql_dd_validate_table() to allocate the GString for the table name itself, and resolve the template too, if so need be. The above two allows us to simplify afsql_dd_insert_db() a little: when validating the table, we can immediately bail out, without a goto, since we only need to free up the bare minimum. And in the end, a monstrous if (success) {} else {} branch is replaced by something a lot shorter, and clearer. All of these together make the flow of afsql_dd_insert_db() easier to understand and follow. Signed-off-by: Gergely Nagy <algernon@balabit.hu> --- modules/afsql/afsql.c | 158 ++++++++++++++++++++++++++----------------------- 1 file changed, 84 insertions(+), 74 deletions(-) diff --git a/modules/afsql/afsql.c b/modules/afsql/afsql.c index 8b5cc2d..4f5ffec 100644 --- a/modules/afsql/afsql.c +++ b/modules/afsql/afsql.c @@ -1,6 +1,6 @@ /* - * Copyright (c) 2002-2010 BalaBit IT Ltd, Budapest, Hungary - * Copyright (c) 1998-2010 Balázs Scheidler + * Copyright (c) 2002-2012 BalaBit IT Ltd, Budapest, Hungary + * Copyright (c) 1998-2012 Balázs Scheidler * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 as published @@ -458,24 +458,27 @@ afsql_dd_create_index(AFSqlDestDriver *self, gchar *table, gchar *column) * * NOTE: This function can only be called from the database thread. **/ -static gboolean -afsql_dd_validate_table(AFSqlDestDriver *self, gchar *table) +static GString * +afsql_dd_validate_table(AFSqlDestDriver *self, LogMessage *msg) { - GString *query_string; + GString *query_string, *table; dbi_result db_res; gboolean success = FALSE; gint i; + table = g_string_sized_new(32); + log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table); + if (self->flags & AFSQL_DDF_DONT_CREATE_TABLES) - return TRUE; + return table; - afsql_dd_check_sql_identifier(table, TRUE); + afsql_dd_check_sql_identifier(table->str, TRUE); - if (g_hash_table_lookup(self->validated_tables, table)) - return TRUE; + if (g_hash_table_lookup(self->validated_tables, table->str)) + return table; query_string = g_string_sized_new(32); - g_string_printf(query_string, "SELECT * FROM %s WHERE 0=1", table); + g_string_printf(query_string, "SELECT * FROM %s WHERE 0=1", table->str); if (afsql_dd_run_query(self, query_string->str, TRUE, &db_res)) { @@ -487,11 +490,11 @@ afsql_dd_validate_table(AFSqlDestDriver *self, gchar *table) { GList *l; /* field does not exist, add this column */ - g_string_printf(query_string, "ALTER TABLE %s ADD %s %s", table, self->fields[i].name, self->fields[i].type); + g_string_printf(query_string, "ALTER TABLE %s ADD %s %s", table->str, self->fields[i].name, self->fields[i].type); if (!afsql_dd_run_query(self, query_string->str, FALSE, NULL)) { msg_error("Error adding missing column, giving up", - evt_tag_str("table", table), + evt_tag_str("table", table->str), evt_tag_str("column", self->fields[i].name), NULL); success = FALSE; @@ -502,7 +505,7 @@ afsql_dd_validate_table(AFSqlDestDriver *self, gchar *table) if (strcmp((gchar *) l->data, self->fields[i].name) == 0) { /* this is an indexed column, create index */ - afsql_dd_create_index(self, table, self->fields[i].name); + afsql_dd_create_index(self, table->str, self->fields[i].name); } } } @@ -513,7 +516,7 @@ afsql_dd_validate_table(AFSqlDestDriver *self, gchar *table) { /* table does not exist, create it */ - g_string_printf(query_string, "CREATE TABLE %s (", table); + g_string_printf(query_string, "CREATE TABLE %s (", table->str); for (i = 0; i < self->fields_len; i++) { g_string_append_printf(query_string, "%s %s", self->fields[i].name, self->fields[i].type); @@ -528,23 +531,29 @@ afsql_dd_validate_table(AFSqlDestDriver *self, gchar *table) success = TRUE; for (l = self->indexes; l; l = l->next) { - afsql_dd_create_index(self, table, (gchar *) l->data); + afsql_dd_create_index(self, table->str, (gchar *) l->data); } } else { msg_error("Error creating table, giving up", - evt_tag_str("table", table), + evt_tag_str("table", table->str), NULL); } } if (success) { /* we have successfully created/altered the destination table, record this information */ - g_hash_table_insert(self->validated_tables, g_strdup(table), GUINT_TO_POINTER(TRUE)); + g_hash_table_insert(self->validated_tables, g_strdup(table->str), GUINT_TO_POINTER(TRUE)); + } + else + { + g_string_free(table, TRUE); + table = NULL; } g_string_free(query_string, TRUE); - return success; + + return table; } /** @@ -721,6 +730,45 @@ afsql_dd_connect(AFSqlDestDriver *self) return TRUE; } +static gboolean +afsql_dd_insert_fail_handler(AFSqlDestDriver *self, LogMessage *msg, + LogPathOptions *path_options) +{ + if (self->failed_message_counter < self->num_retries - 1) + { + self->pending_msg = msg; + self->pending_msg_ack_needed = path_options->ack_needed; + + /* database connection status sanity check after failed query */ + if (dbi_conn_ping(self->dbi_ctx) != 1) + { + const gchar *dbi_error; + + dbi_conn_error(self->dbi_ctx, &dbi_error); + msg_error("Error, no SQL connection after failed query attempt", + evt_tag_str("type", self->type), + evt_tag_str("host", self->host), + evt_tag_str("port", self->port), + evt_tag_str("username", self->user), + evt_tag_str("database", self->database), + evt_tag_str("error", dbi_error), + NULL); + return FALSE; + } + + self->failed_message_counter++; + return FALSE; + } + + msg_error("Multiple failures while inserting this record into the database, message dropped", + evt_tag_int("attempts", self->num_retries), + NULL); + stats_counter_inc(self->dropped_messages); + log_msg_drop(msg, path_options); + self->failed_message_counter = 0; + return TRUE; +} + /** * afsql_dd_insert_db: * @@ -767,22 +815,21 @@ afsql_dd_insert_db(AFSqlDestDriver *self) msg_set_context(msg); - table = g_string_sized_new(32); - value = g_string_sized_new(256); - query_string = g_string_sized_new(512); - - log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table); - - if (!afsql_dd_validate_table(self, table->str)) + table = afsql_dd_validate_table(self, msg); + if (!table) { /* If validate table is FALSE then close the connection and wait time_reopen time (next call) */ msg_error("Error checking table, disconnecting from database, trying again shortly", evt_tag_int("time_reopen", self->time_reopen), NULL); - success = FALSE; - goto error; + msg_set_context(NULL); + g_string_free(table, TRUE); + return afsql_dd_insert_fail_handler(self, msg, &path_options); } + value = g_string_sized_new(256); + query_string = g_string_sized_new(512); + g_string_printf(query_string, "INSERT INTO %s (", table->str); for (i = 0; i < self->fields_len; i++) { @@ -848,63 +895,26 @@ afsql_dd_insert_db(AFSqlDestDriver *self) if (self->flush_lines && self->flush_lines_queued == self->flush_lines && !afsql_dd_commit_txn(self, TRUE)) return FALSE; } - error: + g_string_free(table, TRUE); g_string_free(value, TRUE); g_string_free(query_string, TRUE); msg_set_context(NULL); - if (success) - { - /* we only ACK if each INSERT is a separate transaction */ - if ((self->flags & AFSQL_DDF_EXPLICIT_COMMITS) == 0) - log_msg_ack(msg, &path_options); - log_msg_unref(msg); - step_sequence_number(&self->seq_num); - self->failed_message_counter = 0; - } - else - { - if (self->failed_message_counter < self->num_retries - 1) - { - self->pending_msg = msg; - self->pending_msg_ack_needed = path_options.ack_needed; + if (!success) + return afsql_dd_insert_fail_handler(self, msg, &path_options); - /* database connection status sanity check after failed query */ - if (dbi_conn_ping(self->dbi_ctx) != 1) - { - const gchar *dbi_error; - - dbi_conn_error(self->dbi_ctx, &dbi_error); - msg_error("Error, no SQL connection after failed query attempt", - evt_tag_str("type", self->type), - evt_tag_str("host", self->host), - evt_tag_str("port", self->port), - evt_tag_str("username", self->user), - evt_tag_str("database", self->database), - evt_tag_str("error", dbi_error), - NULL); - return FALSE; - } + /* we only ACK if each INSERT is a separate transaction */ + if ((self->flags & AFSQL_DDF_EXPLICIT_COMMITS) == 0) + log_msg_ack(msg, &path_options); + log_msg_unref(msg); + step_sequence_number(&self->seq_num); + self->failed_message_counter = 0; - self->failed_message_counter++; - } - else - { - msg_error("Multiple failures while inserting this record into the database, message dropped", - evt_tag_int("attempts", self->num_retries), - NULL); - stats_counter_inc(self->dropped_messages); - log_msg_drop(msg, &path_options); - self->failed_message_counter = 0; - success = TRUE; - } - } - return success; + return TRUE; } - /** * afsql_dd_database_thread: * -- 1.7.10