[syslog-ng] [PATCH (3.4) 2/3] afsql: Simplify insert_db()'s table validation

Gergely Nagy algernon at balabit.hu
Fri Apr 27 13:53:15 CEST 2012


To make things a tiny bit easier, move the final error handling out of
_insert_db() into a separate function: afsql_dd_insert_fail_handler().

Furthermore, change afsql_dd_validate_table() to allocate the GString
for the table name itself, and resolve the template too, if so need
be.

The above two allows us to simplify afsql_dd_insert_db() a little:
when validating the table, we can immediately bail out, without a
goto, since we only need to free up the bare minimum.

And in the end, a monstrous if (success) {} else {} branch is replaced
by something a lot shorter, and clearer.

All of these together make the flow of afsql_dd_insert_db() easier to
understand and follow.

Signed-off-by: Gergely Nagy <algernon at balabit.hu>
---
 modules/afsql/afsql.c |  158 ++++++++++++++++++++++++++-----------------------
 1 file changed, 84 insertions(+), 74 deletions(-)

diff --git a/modules/afsql/afsql.c b/modules/afsql/afsql.c
index 8b5cc2d..4f5ffec 100644
--- a/modules/afsql/afsql.c
+++ b/modules/afsql/afsql.c
@@ -1,6 +1,6 @@
 /*
- * Copyright (c) 2002-2010 BalaBit IT Ltd, Budapest, Hungary
- * Copyright (c) 1998-2010 Balázs Scheidler
+ * Copyright (c) 2002-2012 BalaBit IT Ltd, Budapest, Hungary
+ * Copyright (c) 1998-2012 Balázs Scheidler
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License version 2 as published
@@ -458,24 +458,27 @@ afsql_dd_create_index(AFSqlDestDriver *self, gchar *table, gchar *column)
  *
  * NOTE: This function can only be called from the database thread.
  **/
-static gboolean
-afsql_dd_validate_table(AFSqlDestDriver *self, gchar *table)
+static GString *
+afsql_dd_validate_table(AFSqlDestDriver *self, LogMessage *msg)
 {
-  GString *query_string;
+  GString *query_string, *table;
   dbi_result db_res;
   gboolean success = FALSE;
   gint i;
 
+  table = g_string_sized_new(32);
+  log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table);
+
   if (self->flags & AFSQL_DDF_DONT_CREATE_TABLES)
-    return TRUE;
+    return table;
 
-  afsql_dd_check_sql_identifier(table, TRUE);
+  afsql_dd_check_sql_identifier(table->str, TRUE);
 
-  if (g_hash_table_lookup(self->validated_tables, table))
-    return TRUE;
+  if (g_hash_table_lookup(self->validated_tables, table->str))
+    return table;
 
   query_string = g_string_sized_new(32);
-  g_string_printf(query_string, "SELECT * FROM %s WHERE 0=1", table);
+  g_string_printf(query_string, "SELECT * FROM %s WHERE 0=1", table->str);
   if (afsql_dd_run_query(self, query_string->str, TRUE, &db_res))
     {
 
@@ -487,11 +490,11 @@ afsql_dd_validate_table(AFSqlDestDriver *self, gchar *table)
             {
               GList *l;
               /* field does not exist, add this column */
-              g_string_printf(query_string, "ALTER TABLE %s ADD %s %s", table, self->fields[i].name, self->fields[i].type);
+              g_string_printf(query_string, "ALTER TABLE %s ADD %s %s", table->str, self->fields[i].name, self->fields[i].type);
               if (!afsql_dd_run_query(self, query_string->str, FALSE, NULL))
                 {
                   msg_error("Error adding missing column, giving up",
-                            evt_tag_str("table", table),
+                            evt_tag_str("table", table->str),
                             evt_tag_str("column", self->fields[i].name),
                             NULL);
                   success = FALSE;
@@ -502,7 +505,7 @@ afsql_dd_validate_table(AFSqlDestDriver *self, gchar *table)
                   if (strcmp((gchar *) l->data, self->fields[i].name) == 0)
                     {
                       /* this is an indexed column, create index */
-                      afsql_dd_create_index(self, table, self->fields[i].name);
+                      afsql_dd_create_index(self, table->str, self->fields[i].name);
                     }
                 }
             }
@@ -513,7 +516,7 @@ afsql_dd_validate_table(AFSqlDestDriver *self, gchar *table)
     {
       /* table does not exist, create it */
 
-      g_string_printf(query_string, "CREATE TABLE %s (", table);
+      g_string_printf(query_string, "CREATE TABLE %s (", table->str);
       for (i = 0; i < self->fields_len; i++)
         {
           g_string_append_printf(query_string, "%s %s", self->fields[i].name, self->fields[i].type);
@@ -528,23 +531,29 @@ afsql_dd_validate_table(AFSqlDestDriver *self, gchar *table)
           success = TRUE;
           for (l = self->indexes; l; l = l->next)
             {
-              afsql_dd_create_index(self, table, (gchar *) l->data);
+              afsql_dd_create_index(self, table->str, (gchar *) l->data);
             }
         }
       else
         {
           msg_error("Error creating table, giving up",
-                    evt_tag_str("table", table),
+                    evt_tag_str("table", table->str),
                     NULL);
         }
     }
   if (success)
     {
       /* we have successfully created/altered the destination table, record this information */
-      g_hash_table_insert(self->validated_tables, g_strdup(table), GUINT_TO_POINTER(TRUE));
+      g_hash_table_insert(self->validated_tables, g_strdup(table->str), GUINT_TO_POINTER(TRUE));
+    }
+  else
+    {
+      g_string_free(table, TRUE);
+      table = NULL;
     }
   g_string_free(query_string, TRUE);
-  return success;
+
+  return table;
 }
 
 /**
@@ -721,6 +730,45 @@ afsql_dd_connect(AFSqlDestDriver *self)
   return TRUE;
 }
 
+static gboolean
+afsql_dd_insert_fail_handler(AFSqlDestDriver *self, LogMessage *msg,
+                             LogPathOptions *path_options)
+{
+  if (self->failed_message_counter < self->num_retries - 1)
+    {
+      self->pending_msg = msg;
+      self->pending_msg_ack_needed = path_options->ack_needed;
+
+      /* database connection status sanity check after failed query */
+      if (dbi_conn_ping(self->dbi_ctx) != 1)
+        {
+          const gchar *dbi_error;
+
+          dbi_conn_error(self->dbi_ctx, &dbi_error);
+          msg_error("Error, no SQL connection after failed query attempt",
+                    evt_tag_str("type", self->type),
+                    evt_tag_str("host", self->host),
+                    evt_tag_str("port", self->port),
+                    evt_tag_str("username", self->user),
+                    evt_tag_str("database", self->database),
+                    evt_tag_str("error", dbi_error),
+                    NULL);
+          return FALSE;
+        }
+
+      self->failed_message_counter++;
+      return FALSE;
+    }
+
+  msg_error("Multiple failures while inserting this record into the database, message dropped",
+            evt_tag_int("attempts", self->num_retries),
+            NULL);
+  stats_counter_inc(self->dropped_messages);
+  log_msg_drop(msg, path_options);
+  self->failed_message_counter = 0;
+  return TRUE;
+}
+
 /**
  * afsql_dd_insert_db:
  *
@@ -767,22 +815,21 @@ afsql_dd_insert_db(AFSqlDestDriver *self)
 
   msg_set_context(msg);
 
-  table = g_string_sized_new(32);
-  value = g_string_sized_new(256);
-  query_string = g_string_sized_new(512);
-
-  log_template_format(self->table, msg, &self->template_options, LTZ_LOCAL, 0, NULL, table);
-
-  if (!afsql_dd_validate_table(self, table->str))
+  table = afsql_dd_validate_table(self, msg);
+  if (!table)
     {
       /* If validate table is FALSE then close the connection and wait time_reopen time (next call) */
       msg_error("Error checking table, disconnecting from database, trying again shortly",
                 evt_tag_int("time_reopen", self->time_reopen),
                 NULL);
-      success = FALSE;
-      goto error;
+      msg_set_context(NULL);
+      g_string_free(table, TRUE);
+      return afsql_dd_insert_fail_handler(self, msg, &path_options);
     }
 
+  value = g_string_sized_new(256);
+  query_string = g_string_sized_new(512);
+
   g_string_printf(query_string, "INSERT INTO %s (", table->str);
   for (i = 0; i < self->fields_len; i++)
     {
@@ -848,63 +895,26 @@ afsql_dd_insert_db(AFSqlDestDriver *self)
       if (self->flush_lines && self->flush_lines_queued == self->flush_lines && !afsql_dd_commit_txn(self, TRUE))
         return FALSE;
     }
- error:
+
   g_string_free(table, TRUE);
   g_string_free(value, TRUE);
   g_string_free(query_string, TRUE);
 
   msg_set_context(NULL);
 
-  if (success)
-    {
-      /* we only ACK if each INSERT is a separate transaction */
-      if ((self->flags & AFSQL_DDF_EXPLICIT_COMMITS) == 0)
-        log_msg_ack(msg, &path_options);
-      log_msg_unref(msg);
-      step_sequence_number(&self->seq_num);
-      self->failed_message_counter = 0;
-    }
-  else
-    {
-      if (self->failed_message_counter < self->num_retries - 1)
-        {
-          self->pending_msg = msg;
-          self->pending_msg_ack_needed = path_options.ack_needed;
+  if (!success)
+    return afsql_dd_insert_fail_handler(self, msg, &path_options);
 
-          /* database connection status sanity check after failed query */
-          if (dbi_conn_ping(self->dbi_ctx) != 1)
-            {
-              const gchar *dbi_error;
-
-              dbi_conn_error(self->dbi_ctx, &dbi_error);
-              msg_error("Error, no SQL connection after failed query attempt",
-                        evt_tag_str("type", self->type),
-                        evt_tag_str("host", self->host),
-                        evt_tag_str("port", self->port),
-                        evt_tag_str("username", self->user),
-                        evt_tag_str("database", self->database),
-                        evt_tag_str("error", dbi_error),
-                        NULL);
-              return FALSE;
-            }
+  /* we only ACK if each INSERT is a separate transaction */
+  if ((self->flags & AFSQL_DDF_EXPLICIT_COMMITS) == 0)
+    log_msg_ack(msg, &path_options);
+  log_msg_unref(msg);
+  step_sequence_number(&self->seq_num);
+  self->failed_message_counter = 0;
 
-          self->failed_message_counter++;
-        }
-      else
-        {
-          msg_error("Multiple failures while inserting this record into the database, message dropped",
-                    evt_tag_int("attempts", self->num_retries),
-                    NULL);
-          stats_counter_inc(self->dropped_messages);
-          log_msg_drop(msg, &path_options);
-          self->failed_message_counter = 0;
-          success = TRUE;
-        }
-    }
-  return success;
+  return TRUE;
 }
 
-
 /**
  * afsql_dd_database_thread:
  *
-- 
1.7.10




More information about the syslog-ng mailing list