[syslog-ng] [PATCH (3.4) 2/3] afsql: Simplify insert_db()'s table validation
Gergely Nagy
algernon at balabit.hu
Fri Apr 27 13:53:15 CEST 2012
To make things a tiny bit easier, move the final error handling out of
_insert_db() into a separate function: afsql_dd_insert_fail_handler().
Furthermore, change afsql_dd_validate_table() to allocate the GString
for the table name itself, and resolve the template too, if so need
be.
The above two allows us to simplify afsql_dd_insert_db() a little:
when validating the table, we can immediately bail out, without a
goto, since we only need to free up the bare minimum.
And in the end, a monstrous if (success) {} else {} branch is replaced
by something a lot shorter, and clearer.
All of these together make the flow of afsql_dd_insert_db() easier to
understand and follow.
Signed-off-by: Gergely Nagy <algernon at balabit.hu>
---
modules/afsql/afsql.c | 158 ++++++++++++++++++++++++++-----------------------
1 file changed, 84 insertions(+), 74 deletions(-)
diff --git a/modules/afsql/afsql.c b/modules/afsql/afsql.c
index 8b5cc2d..4f5ffec 100644
--- a/modules/afsql/afsql.c
+++ b/modules/afsql/afsql.c
@@ -1,6 +1,6 @@
/*
- * Copyright (c) 2002-2010 BalaBit IT Ltd, Budapest, Hungary
- * Copyright (c) 1998-2010 Balázs Scheidler
+ * Copyright (c) 2002-2012 BalaBit IT Ltd, Budapest, Hungary
+ * Copyright (c) 1998-2012 Balázs Scheidler
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 as published
@@ -458,24 +458,27 @@ afsql_dd_create_index(AFSqlDestDriver *self, gchar *table, gchar *column)
*
* NOTE: This function can only be called from the database thread.
**/
-static gboolean
-afsql_dd_validate_table(AFSqlDestDriver *self, gchar *table)
+static GString *
+afsql_dd_validate_table(AFSqlDestDriver *self, LogMessage *msg)
{
- GString *query_string;
+ GString *query_string, *table;
dbi_result db_res;
gboolean success = FALSE;
gint i;
+ table = g_string_sized_new(32);
+ log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table);
+
if (self->flags & AFSQL_DDF_DONT_CREATE_TABLES)
- return TRUE;
+ return table;
- afsql_dd_check_sql_identifier(table, TRUE);
+ afsql_dd_check_sql_identifier(table->str, TRUE);
- if (g_hash_table_lookup(self->validated_tables, table))
- return TRUE;
+ if (g_hash_table_lookup(self->validated_tables, table->str))
+ return table;
query_string = g_string_sized_new(32);
- g_string_printf(query_string, "SELECT * FROM %s WHERE 0=1", table);
+ g_string_printf(query_string, "SELECT * FROM %s WHERE 0=1", table->str);
if (afsql_dd_run_query(self, query_string->str, TRUE, &db_res))
{
@@ -487,11 +490,11 @@ afsql_dd_validate_table(AFSqlDestDriver *self, gchar *table)
{
GList *l;
/* field does not exist, add this column */
- g_string_printf(query_string, "ALTER TABLE %s ADD %s %s", table, self->fields[i].name, self->fields[i].type);
+ g_string_printf(query_string, "ALTER TABLE %s ADD %s %s", table->str, self->fields[i].name, self->fields[i].type);
if (!afsql_dd_run_query(self, query_string->str, FALSE, NULL))
{
msg_error("Error adding missing column, giving up",
- evt_tag_str("table", table),
+ evt_tag_str("table", table->str),
evt_tag_str("column", self->fields[i].name),
NULL);
success = FALSE;
@@ -502,7 +505,7 @@ afsql_dd_validate_table(AFSqlDestDriver *self, gchar *table)
if (strcmp((gchar *) l->data, self->fields[i].name) == 0)
{
/* this is an indexed column, create index */
- afsql_dd_create_index(self, table, self->fields[i].name);
+ afsql_dd_create_index(self, table->str, self->fields[i].name);
}
}
}
@@ -513,7 +516,7 @@ afsql_dd_validate_table(AFSqlDestDriver *self, gchar *table)
{
/* table does not exist, create it */
- g_string_printf(query_string, "CREATE TABLE %s (", table);
+ g_string_printf(query_string, "CREATE TABLE %s (", table->str);
for (i = 0; i < self->fields_len; i++)
{
g_string_append_printf(query_string, "%s %s", self->fields[i].name, self->fields[i].type);
@@ -528,23 +531,29 @@ afsql_dd_validate_table(AFSqlDestDriver *self, gchar *table)
success = TRUE;
for (l = self->indexes; l; l = l->next)
{
- afsql_dd_create_index(self, table, (gchar *) l->data);
+ afsql_dd_create_index(self, table->str, (gchar *) l->data);
}
}
else
{
msg_error("Error creating table, giving up",
- evt_tag_str("table", table),
+ evt_tag_str("table", table->str),
NULL);
}
}
if (success)
{
/* we have successfully created/altered the destination table, record this information */
- g_hash_table_insert(self->validated_tables, g_strdup(table), GUINT_TO_POINTER(TRUE));
+ g_hash_table_insert(self->validated_tables, g_strdup(table->str), GUINT_TO_POINTER(TRUE));
+ }
+ else
+ {
+ g_string_free(table, TRUE);
+ table = NULL;
}
g_string_free(query_string, TRUE);
- return success;
+
+ return table;
}
/**
@@ -721,6 +730,45 @@ afsql_dd_connect(AFSqlDestDriver *self)
return TRUE;
}
+static gboolean
+afsql_dd_insert_fail_handler(AFSqlDestDriver *self, LogMessage *msg,
+ LogPathOptions *path_options)
+{
+ if (self->failed_message_counter < self->num_retries - 1)
+ {
+ self->pending_msg = msg;
+ self->pending_msg_ack_needed = path_options->ack_needed;
+
+ /* database connection status sanity check after failed query */
+ if (dbi_conn_ping(self->dbi_ctx) != 1)
+ {
+ const gchar *dbi_error;
+
+ dbi_conn_error(self->dbi_ctx, &dbi_error);
+ msg_error("Error, no SQL connection after failed query attempt",
+ evt_tag_str("type", self->type),
+ evt_tag_str("host", self->host),
+ evt_tag_str("port", self->port),
+ evt_tag_str("username", self->user),
+ evt_tag_str("database", self->database),
+ evt_tag_str("error", dbi_error),
+ NULL);
+ return FALSE;
+ }
+
+ self->failed_message_counter++;
+ return FALSE;
+ }
+
+ msg_error("Multiple failures while inserting this record into the database, message dropped",
+ evt_tag_int("attempts", self->num_retries),
+ NULL);
+ stats_counter_inc(self->dropped_messages);
+ log_msg_drop(msg, path_options);
+ self->failed_message_counter = 0;
+ return TRUE;
+}
+
/**
* afsql_dd_insert_db:
*
@@ -767,22 +815,21 @@ afsql_dd_insert_db(AFSqlDestDriver *self)
msg_set_context(msg);
- table = g_string_sized_new(32);
- value = g_string_sized_new(256);
- query_string = g_string_sized_new(512);
-
- log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table);
-
- if (!afsql_dd_validate_table(self, table->str))
+ table = afsql_dd_validate_table(self, msg);
+ if (!table)
{
/* If validate table is FALSE then close the connection and wait time_reopen time (next call) */
msg_error("Error checking table, disconnecting from database, trying again shortly",
evt_tag_int("time_reopen", self->time_reopen),
NULL);
- success = FALSE;
- goto error;
+ msg_set_context(NULL);
+ g_string_free(table, TRUE);
+ return afsql_dd_insert_fail_handler(self, msg, &path_options);
}
+ value = g_string_sized_new(256);
+ query_string = g_string_sized_new(512);
+
g_string_printf(query_string, "INSERT INTO %s (", table->str);
for (i = 0; i < self->fields_len; i++)
{
@@ -848,63 +895,26 @@ afsql_dd_insert_db(AFSqlDestDriver *self)
if (self->flush_lines && self->flush_lines_queued == self->flush_lines && !afsql_dd_commit_txn(self, TRUE))
return FALSE;
}
- error:
+
g_string_free(table, TRUE);
g_string_free(value, TRUE);
g_string_free(query_string, TRUE);
msg_set_context(NULL);
- if (success)
- {
- /* we only ACK if each INSERT is a separate transaction */
- if ((self->flags & AFSQL_DDF_EXPLICIT_COMMITS) == 0)
- log_msg_ack(msg, &path_options);
- log_msg_unref(msg);
- step_sequence_number(&self->seq_num);
- self->failed_message_counter = 0;
- }
- else
- {
- if (self->failed_message_counter < self->num_retries - 1)
- {
- self->pending_msg = msg;
- self->pending_msg_ack_needed = path_options.ack_needed;
+ if (!success)
+ return afsql_dd_insert_fail_handler(self, msg, &path_options);
- /* database connection status sanity check after failed query */
- if (dbi_conn_ping(self->dbi_ctx) != 1)
- {
- const gchar *dbi_error;
-
- dbi_conn_error(self->dbi_ctx, &dbi_error);
- msg_error("Error, no SQL connection after failed query attempt",
- evt_tag_str("type", self->type),
- evt_tag_str("host", self->host),
- evt_tag_str("port", self->port),
- evt_tag_str("username", self->user),
- evt_tag_str("database", self->database),
- evt_tag_str("error", dbi_error),
- NULL);
- return FALSE;
- }
+ /* we only ACK if each INSERT is a separate transaction */
+ if ((self->flags & AFSQL_DDF_EXPLICIT_COMMITS) == 0)
+ log_msg_ack(msg, &path_options);
+ log_msg_unref(msg);
+ step_sequence_number(&self->seq_num);
+ self->failed_message_counter = 0;
- self->failed_message_counter++;
- }
- else
- {
- msg_error("Multiple failures while inserting this record into the database, message dropped",
- evt_tag_int("attempts", self->num_retries),
- NULL);
- stats_counter_inc(self->dropped_messages);
- log_msg_drop(msg, &path_options);
- self->failed_message_counter = 0;
- success = TRUE;
- }
- }
- return success;
+ return TRUE;
}
-
/**
* afsql_dd_database_thread:
*
--
1.7.10
More information about the syslog-ng
mailing list