[syslog-ng] [PATCH 1/2] afamqp: New AMQP destination driver

Gergely Nagy algernon at balabit.hu
Tue Oct 9 18:13:05 CEST 2012


This driver implements an AMQP destination (based on the rabbitmq-c
library), supporting persistence, all the exchange types, and uses a
creative way to get messages accross: all the name-value pairs
selected with the value-pairs() syntax will be sent as headers, while
the message payload can be set with the body() option (empty by
default).

Most settings have sensible defaults, except for a few, noted below:

@module afamqp
destination d_amqp {
    amqp(
        vhost("/")
        host("127.0.0.1")
        port(5672)
        username("guest") # mandatory, no default
        password("guest") # mandatory, no default
        exchange("syslog")
        exchange_type("fanout")
        #routing_key("")
        #body("")
        persistent(yes)
        value-pairs(
            scope("selected-macros" "nv-pairs" "sdata")
        )
    );
};

Publishing the name-value pairs as headers makes it possible to use a
headers exchange type and subscribe only to interesting log streams,
in a much more flexible way than using the routing_key() option.

The routing_key() and body() options can contain any template, they
will be expanded before publication.

Signed-off-by: Attila Nagy <bra at fsn.hu>
Signed-off-by: Gergely Nagy <algernon at balabit.hu>
---
 .gitmodules                      |    3 +
 autogen.sh                       |    2 +-
 configure.in                     |   52 +++
 lib/stats.c                      |    2 +
 lib/stats.h                      |    1 +
 modules/Makefile.am              |    2 +-
 modules/afamqp/Makefile.am       |   35 ++
 modules/afamqp/afamqp-grammar.ym |   92 +++++
 modules/afamqp/afamqp-parser.c   |   59 +++
 modules/afamqp/afamqp-parser.h   |   36 ++
 modules/afamqp/afamqp.c          |  738 ++++++++++++++++++++++++++++++++++++++
 modules/afamqp/afamqp.h          |   45 +++
 modules/afamqp/rabbitmq-c        |    1 +
 13 files changed, 1066 insertions(+), 2 deletions(-)
 create mode 100644 modules/afamqp/Makefile.am
 create mode 100644 modules/afamqp/afamqp-grammar.ym
 create mode 100644 modules/afamqp/afamqp-parser.c
 create mode 100644 modules/afamqp/afamqp-parser.h
 create mode 100644 modules/afamqp/afamqp.c
 create mode 100644 modules/afamqp/afamqp.h
 create mode 160000 modules/afamqp/rabbitmq-c

diff --git a/.gitmodules b/.gitmodules
index d000782..be86ee5 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -4,3 +4,6 @@
 [submodule "lib/ivykis"]
 	path = lib/ivykis
         url = https://github.com/buytenh/ivykis.git
+[submodule "modules/afamqp/rabbitmq-c"]
+	path = modules/afamqp/rabbitmq-c
+	url = https://github.com/alanxz/rabbitmq-c.git
diff --git a/autogen.sh b/autogen.sh
index fb4151e..5027602 100755
--- a/autogen.sh
+++ b/autogen.sh
@@ -3,7 +3,7 @@
 # This script is needed to setup build environment from checked out
 # source tree. 
 #
-SUBMODULES="lib/ivykis modules/afmongodb/libmongo-client"
+SUBMODULES="lib/ivykis modules/afmongodb/libmongo-client modules/afamqp/rabbitmq-c"
 GIT=`which git`
 
 autogen_submodules()
diff --git a/configure.in b/configure.in
index a23c6e8..2edec69 100644
--- a/configure.in
+++ b/configure.in
@@ -28,6 +28,7 @@ IVYKIS_MIN_VERSION="0.30.1"
 JSON_C_MIN_VERSION="0.9"
 PCRE_MIN_VERSION="6.1"
 LMC_MIN_VERSION="0.1.6"
+LRMQ_MIN_VERSION="0.0.1"
 
 dnl ***************************************************************************
 dnl Initial setup
@@ -163,6 +164,15 @@ AC_ARG_WITH(libmongo-client,
                                          Link against the system supplied or the built-in libmongo-client library.]
               ,,with_libmongo_client="internal")
 
+AC_ARG_ENABLE(amqp,
+          [  --enable-amqp        Enable amqp destination (default: auto)]
+              ,,enable_amqp="auto")
+
+AC_ARG_WITH(librabbitmq,
+              [  --with-librabbitmq-client=[system/internal]
+                                         Link against the system supplied or the built-in librabbitmq library.]
+              ,,with_librabbitmq_client="internal")
+
 AC_ARG_WITH(ivykis,
               [  --with-ivykis=[system/internal]
                                          Link against the system supplied or the built-in ivykis library.]
@@ -827,6 +837,31 @@ if test "x$enable_smtp" != "xno" && test "x$with_libesmtp" != "no"; then
 fi
 
 dnl ***************************************************************************
+dnl rabbitmq-c headers/libraries
+dnl ***************************************************************************
+
+if test "x$with_librabbitmq_client" = "xinternal"; then
+    if test -f "$srcdir/modules/afamqp/rabbitmq-c/librabbitmq/amqp.h"; then
+        AC_CONFIG_SUBDIRS([modules/afamqp/rabbitmq-c])
+        # these can only be used in modules/amqp as it assumes
+        # the current directory just one below rabbitmq-c
+
+        LIBRABBITMQ_LIBS="-L\$(builddir)/rabbitmq-c/librabbitmq -lrabbitmq"
+        LIBRABBITMQ_CFLAGS="-I\$(srcdir)/rabbitmq-c/librabbitmq -I\$(builddir)/rabbitmq-c/librabbitmq"
+        LIBRABBITMQ_SUBDIRS="rabbitmq-c"
+    else
+        AC_MSG_WARN([Internal librabbitmq-client sources not found in modules/afamqp/rabbitmq-c])
+        with_librabbitmq_client="no"
+    fi
+elif test "x$with_librabbitmq_client" = "xsystem"; then
+    PKG_CHECK_MODULES(LIBRABBITMQ, librabbitmq >= $LRMQ_MIN_VERSION,with_librabbitmq_client="yes",with_librabbitmq_client="no")
+fi
+
+if test "x$with_librabbitmq_client" = "xno"; then
+    enable_amqp="no"
+fi
+
+dnl ***************************************************************************
 dnl misc features to be enabled
 dnl ***************************************************************************
 
@@ -889,6 +924,16 @@ if test "x$enable_mongodb" = "xauto"; then
 	AC_MSG_RESULT([$enable_mongodb])
 fi
 
+if test "x$enable_amqp" = "xauto"; then
+    AC_MSG_CHECKING(whether to enable amqp destination support)
+    if test "x$with_librabbitmq_client" != "no"; then
+        enable_amqp="yes"
+    else
+        enable_amqp="no"
+    fi
+    AC_MSG_RESULT([$enable_amqp])
+fi
+
 if test "x$enable_json" != "xno"; then
         JSON_LIBS=$JSON_C_LIBS
         JSON_CFLAGS=$JSON_C_CFLAGS
@@ -1068,6 +1113,7 @@ AM_CONDITIONAL(ENABLE_SUN_STREAMS, [test "$enable_sun_streams" = "yes"])
 AM_CONDITIONAL(ENABLE_PACCT, [test "$enable_pacct" = "yes"])
 AM_CONDITIONAL(ENABLE_MONGODB, [test "$enable_mongodb" = "yes"])
 AM_CONDITIONAL(ENABLE_SMTP, [test "$enable_smtp" = "yes"])
+AM_CONDITIONAL(ENABLE_AMQP, [test "$enable_amqp" = "yes"])
 AM_CONDITIONAL(ENABLE_JSON, [test "$enable_json" = "yes"])
 AM_CONDITIONAL(WITH_LIBSYSTEMD, [test "$with_libsystemd" = "yes"])
 
@@ -1102,6 +1148,9 @@ AC_SUBST(LIBMONGO_CFLAGS)
 AC_SUBST(LIBMONGO_SUBDIRS)
 AC_SUBST(LIBESMTP_CFLAGS)
 AC_SUBST(LIBESMTP_LIBS)
+AC_SUBST(LIBRABBITMQ_LIBS)
+AC_SUBST(LIBRABBITMQ_CFLAGS)
+AC_SUBST(LIBRABBITMQ_SUBDIRS)
 AC_SUBST(JSON_LIBS)
 AC_SUBST(JSON_CFLAGS)
 AC_SUBST(IVYKIS_SUBDIRS)
@@ -1132,6 +1181,7 @@ AC_OUTPUT(dist.conf
           modules/afuser/Makefile
 	  modules/afmongodb/Makefile
           modules/afsmtp/Makefile
+	  modules/afamqp/Makefile
           modules/dbparser/Makefile
           modules/dbparser/tests/Makefile
           modules/csvparser/Makefile
@@ -1171,6 +1221,7 @@ echo "  __thread keyword            : ${ac_cv_have_tls:=no}"
 echo " Submodules:"
 echo "  ivykis                      : $with_ivykis"
 echo "  libmongo-client             : $with_libmongo_client"
+echo "  librabbitmq                 : $with_librabbitmq_client"
 echo " Features:"
 echo "  Debug symbols               : ${enable_debug:=no}"
 echo "  GCC profiling               : ${enable_gprof:=no}"
@@ -1191,4 +1242,5 @@ echo "  PACCT module (EXPERIMENTAL) : ${enable_pacct:=no}"
 echo "  MongoDB destination (module): ${enable_mongodb:=no}"
 echo "  JSON support (module)       : ${enable_json:=no}"
 echo "  SMTP support (module)       : ${enable_smtp:=no}"
+echo "  AMQP destination (module)   : ${enable_amqp:=no}"
 
diff --git a/lib/stats.c b/lib/stats.c
index 0ccc283..f757597 100644
--- a/lib/stats.c
+++ b/lib/stats.c
@@ -383,6 +383,8 @@ const gchar *source_names[SCS_MAX] =
   "severity",
   "facility",
   "sender",
+  "smtp",
+  "amqp",
 };
 
 
diff --git a/lib/stats.h b/lib/stats.h
index 9174408..2550724 100644
--- a/lib/stats.h
+++ b/lib/stats.h
@@ -71,6 +71,7 @@ enum
   SCS_FACILITY       = 25,
   SCS_SENDER         = 26,
   SCS_SMTP           = 27,
+  SCS_AMQP           = 28,
   SCS_MAX,
   SCS_SOURCE_MASK    = 0xff
 };
diff --git a/modules/Makefile.am b/modules/Makefile.am
index 8f9d63b..1fce2d2 100644
--- a/modules/Makefile.am
+++ b/modules/Makefile.am
@@ -1 +1 @@
-SUBDIRS = afsocket afsql afstreams affile afprog afuser afmongodb afsmtp csvparser confgen system-source syslogformat pacctformat basicfuncs cryptofuncs dbparser json
+SUBDIRS = afsocket afsql afstreams affile afprog afuser afamqp afmongodb afsmtp csvparser confgen system-source syslogformat pacctformat basicfuncs cryptofuncs dbparser json
diff --git a/modules/afamqp/Makefile.am b/modules/afamqp/Makefile.am
new file mode 100644
index 0000000..ccf20df
--- /dev/null
+++ b/modules/afamqp/Makefile.am
@@ -0,0 +1,35 @@
+
+SUBDIRS = @LIBRABBITMQ_SUBDIRS@
+DIST_SUBDIRS = rabbitmq-c
+
+moduledir = @moduledir@
+AM_CPPFLAGS = -I$(top_srcdir)/lib -I../../lib
+module_LTLIBRARIES = libafamqp.la
+
+export top_srcdir
+
+if ENABLE_AMQP
+
+libafamqp_la_CFLAGS = $(LIBRABBITMQ_CFLAGS)
+libafamqp_la_SOURCES = afamqp-grammar.y afamqp.c afamqp.h afamqp-parser.c afamqp-parser.h
+libafamqp_la_LIBADD = $(MODULE_DEPS_LIBS) $(LIBRABBITMQ_LIBS)
+libafamqp_la_LDFLAGS = $(MODULE_LDFLAGS)
+
+endif
+
+BUILT_SOURCES = afamqp-grammar.y afamqp-grammar.c afamqp-grammar.h
+EXTRA_DIST = $(BUILT_SOURCES) afamqp-grammar.ym
+
+include $(top_srcdir)/build/lex-rules.am
+
+# divert install/uninstall targets to avoid recursing into $(SUBDIRS)
+
+install:
+	$(MAKE) $(AM_MAKEFLAGS) all
+	$(MAKE) $(AM_MAKEFLAGS) install-am
+
+uninstall:
+	$(MAKE) $(AM_MAKEFLAGS) uninstall-am
+
+check:
+	echo "Make check disabled, since it requires a newer glib"
diff --git a/modules/afamqp/afamqp-grammar.ym b/modules/afamqp/afamqp-grammar.ym
new file mode 100644
index 0000000..b325b3d
--- /dev/null
+++ b/modules/afamqp/afamqp-grammar.ym
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2012 Nagy, Attila <bra at fsn.hu>
+ * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary
+ * Copyright (c) 2012 Gergely Nagy <algernon at balabit.hu>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+ *
+ * As an additional exemption you are allowed to compile & link against the
+ * OpenSSL libraries as published by the OpenSSL project. See the file
+ * COPYING for details.
+ *
+ */
+
+%code requires {
+
+#include "afamqp-parser.h"
+
+}
+
+%code {
+
+#include "cfg-parser.h"
+#include "afamqp-grammar.h"
+#include "plugin.h"
+#include "vptransform.h"
+
+extern LogDriver *last_driver;
+extern ValuePairs *last_value_pairs;
+extern ValuePairsTransformSet *last_vp_transset;
+}
+
+%name-prefix "afamqp_"
+%lex-param {CfgLexer *lexer}
+%parse-param {CfgLexer *lexer}
+%parse-param {LogDriver **instance}
+%parse-param {gpointer arg}
+
+
+/* INCLUDE_DECLS */
+
+%token KW_AMQP
+%token KW_EXCHANGE
+%token KW_EXCHANGE_TYPE
+%token KW_PERSISTENT
+%token KW_VHOST
+%token KW_ROUTING_KEY
+%token KW_BODY
+
+%%
+
+start
+        : LL_CONTEXT_DESTINATION KW_AMQP
+          {
+            last_driver = *instance = afamqp_dd_new();
+          }
+          '(' afamqp_options ')'		{ YYACCEPT; }
+	;
+
+afamqp_options
+        : afamqp_option afamqp_options
+	|
+	;
+
+afamqp_option
+        : KW_HOST '(' string ')'		{ afamqp_dd_set_host(last_driver, $3); free($3); }
+        | KW_PORT '(' LL_NUMBER ')'		{ afamqp_dd_set_port(last_driver, $3); }
+        | KW_VHOST '(' string ')'		{ afamqp_dd_set_vhost(last_driver, $3); free($3); }
+	| KW_EXCHANGE '(' string ')'		{ afamqp_dd_set_exchange(last_driver, $3); free($3); }
+	| KW_EXCHANGE_TYPE '(' string ')'	{ afamqp_dd_set_exchange_type(last_driver, $3); free($3); }
+	| KW_ROUTING_KEY '(' string ')'		{ afamqp_dd_set_routing_key(last_driver, $3); free($3); }
+        | KW_BODY '(' string ')'		{ afamqp_dd_set_body(last_driver, $3); free($3); }
+	| KW_PERSISTENT '(' yesno ')'		{ afamqp_dd_set_persistent(last_driver, $3); }
+	| KW_USERNAME '(' string ')'		{ afamqp_dd_set_user(last_driver, $3); free($3); }
+	| KW_PASSWORD '(' string ')'		{ afamqp_dd_set_password(last_driver, $3); free($3); }
+	| value_pair_option			{ afamqp_dd_set_value_pairs(last_driver, $1); }
+	| dest_driver_option
+        ;
+
+/* INCLUDE_RULES */
+
+%%
diff --git a/modules/afamqp/afamqp-parser.c b/modules/afamqp/afamqp-parser.c
new file mode 100644
index 0000000..f082f32
--- /dev/null
+++ b/modules/afamqp/afamqp-parser.c
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2012 Nagy, Attila <bra at fsn.hu>
+ * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary
+ * Copyright (c) 2012 Gergely Nagy <algernon at balabit.hu>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+ *
+ * As an additional exemption you are allowed to compile & link against the
+ * OpenSSL libraries as published by the OpenSSL project. See the file
+ * COPYING for details.
+ *
+ */
+
+#include "afamqp.h"
+#include "cfg-parser.h"
+#include "afamqp-grammar.h"
+
+extern int afamqp_debug;
+int afamqp_parse(CfgLexer *lexer, LogDriver **instance, gpointer arg);
+
+static CfgLexerKeyword afamqp_keywords[] = {
+  { "amqp",			KW_AMQP },
+  { "vhost",			KW_VHOST },
+  { "host",			KW_HOST },
+  { "port",			KW_PORT },
+  { "exchange",			KW_EXCHANGE },
+  { "exchange_type",		KW_EXCHANGE_TYPE },
+  { "routing_key",		KW_ROUTING_KEY },
+  { "persistent",		KW_PERSISTENT },
+  { "username",			KW_USERNAME },
+  { "password",			KW_PASSWORD },
+  { "log_fifo_size",		KW_LOG_FIFO_SIZE  },
+  { "body",			KW_BODY },
+  { NULL }
+};
+
+CfgParser afamqp_parser =
+{
+#if ENABLE_DEBUG
+  .debug_flag = &afamqp_debug,
+#endif
+  .name = "afamqp",
+  .keywords = afamqp_keywords,
+  .parse = (int (*)(CfgLexer *lexer, gpointer *instance, gpointer)) afamqp_parse,
+  .cleanup = (void (*)(gpointer)) log_pipe_unref,
+};
+
+CFG_PARSER_IMPLEMENT_LEXER_BINDING(afamqp_, LogDriver **)
diff --git a/modules/afamqp/afamqp-parser.h b/modules/afamqp/afamqp-parser.h
new file mode 100644
index 0000000..d75de63
--- /dev/null
+++ b/modules/afamqp/afamqp-parser.h
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2012 Nagy, Attila <bra at fsn.hu>
+ * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary
+ * Copyright (c) 2012 Gergely Nagy <algernon at balabit.hu>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+ *
+ * As an additional exemption you are allowed to compile & link against the
+ * OpenSSL libraries as published by the OpenSSL project. See the file
+ * COPYING for details.
+ *
+ */
+
+#ifndef AFAMQP_PARSER_H_INCLUDED
+#define AFAMQP_PARSER_H_INCLUDED
+
+#include "cfg-parser.h"
+#include "cfg-lexer.h"
+#include "afamqp.h"
+
+extern CfgParser afamqp_parser;
+
+CFG_PARSER_DECLARE_LEXER_BINDING(afamqp_, LogDriver **)
+
+#endif
diff --git a/modules/afamqp/afamqp.c b/modules/afamqp/afamqp.c
new file mode 100644
index 0000000..a9d65d4
--- /dev/null
+++ b/modules/afamqp/afamqp.c
@@ -0,0 +1,738 @@
+/*
+ * Copyright (c) 2012 Nagy, Attila <bra at fsn.hu>
+ * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary
+ * Copyright (c) 2012 Gergely Nagy <algernon at balabit.hu>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+ *
+ * As an additional exemption you are allowed to compile & link against the
+ * OpenSSL libraries as published by the OpenSSL project. See the file
+ * COPYING for details.
+ *
+ */
+
+#include "afamqp.h"
+#include "afamqp-parser.h"
+#include "plugin.h"
+#include "messages.h"
+#include "misc.h"
+#include "stats.h"
+#include "nvtable.h"
+#include "logqueue.h"
+#include "scratch-buffers.h"
+
+#include <amqp.h>
+#include <amqp_framing.h>
+
+typedef struct
+{
+  LogDestDriver super;
+
+  /* Shared between main/writer; only read by the writer, never written */
+  gchar *exchange;
+  gchar *exchange_type;
+  LogTemplate *routing_key_template;
+  LogTemplate *body_template;
+
+  gint persistent;
+
+  gchar *vhost;
+  gchar *host;
+  gint port;
+
+  gchar *user;
+  gchar *password;
+
+  time_t time_reopen;
+
+  StatsCounterItem *dropped_messages;
+  StatsCounterItem *stored_messages;
+
+  ValuePairs *vp;
+
+  /* Thread related stuff; shared */
+  GThread *writer_thread;
+  GMutex *queue_mutex;
+  GMutex *suspend_mutex;
+  GCond *writer_thread_wakeup_cond;
+
+  gboolean writer_thread_terminate;
+  gboolean writer_thread_suspended;
+  GTimeVal writer_thread_suspend_target;
+
+  LogQueue *queue;
+
+  /* Writer-only stuff */
+  amqp_connection_state_t conn;
+  amqp_table_entry_t *entries;
+  gint32 max_entries;
+  gint32 seq_num;
+} AMQPDestDriver;
+
+/*
+ * Configuration
+ */
+
+void
+afamqp_dd_set_user(LogDriver *d, const gchar *user)
+{
+  AMQPDestDriver *self = (AMQPDestDriver *) d;
+
+  g_free(self->user);
+  self->user = g_strdup(user);
+}
+
+void
+afamqp_dd_set_password(LogDriver *d, const gchar *password)
+{
+  AMQPDestDriver *self = (AMQPDestDriver *) d;
+
+  g_free(self->password);
+  self->password = g_strdup(password);
+}
+
+void
+afamqp_dd_set_vhost(LogDriver *d, const gchar *vhost)
+{
+  AMQPDestDriver *self = (AMQPDestDriver *) d;
+
+  g_free(self->vhost);
+  self->vhost = g_strdup(vhost);
+}
+
+void
+afamqp_dd_set_host(LogDriver *d, const gchar *host)
+{
+  AMQPDestDriver *self = (AMQPDestDriver *) d;
+
+  g_free(self->host);
+  self->host = g_strdup(host);
+}
+
+void
+afamqp_dd_set_port(LogDriver *d, gint port)
+{
+  AMQPDestDriver *self = (AMQPDestDriver *) d;
+
+  self->port = (int) port;
+}
+
+void
+afamqp_dd_set_exchange(LogDriver *d, const gchar *exchange)
+{
+  AMQPDestDriver *self = (AMQPDestDriver *) d;
+
+  g_free(self->exchange);
+  self->exchange = g_strdup(exchange);
+}
+
+void
+afamqp_dd_set_exchange_type(LogDriver *d, const gchar *exchange_type)
+{
+  AMQPDestDriver *self = (AMQPDestDriver *) d;
+
+  g_free(self->exchange_type);
+  self->exchange_type = g_strdup(exchange_type);
+}
+
+void
+afamqp_dd_set_routing_key(LogDriver *d, const gchar *routing_key)
+{
+  AMQPDestDriver *self = (AMQPDestDriver *) d;
+
+  log_template_compile(self->routing_key_template, routing_key, NULL);
+}
+
+void
+afamqp_dd_set_body(LogDriver *d, const gchar *body)
+{
+  AMQPDestDriver *self = (AMQPDestDriver *) d;
+
+  if (!self->body_template)
+    self->body_template = log_template_new(configuration, NULL);
+  log_template_compile(self->body_template, body, NULL);
+}
+
+void
+afamqp_dd_set_persistent(LogDriver *s, gboolean persistent)
+{
+  AMQPDestDriver *self = (AMQPDestDriver *) s;
+
+  if (persistent)
+    self->persistent = 2;
+  else
+    self->persistent = 1;
+}
+
+void
+afamqp_dd_set_value_pairs(LogDriver *d, ValuePairs *vp)
+{
+  AMQPDestDriver *self = (AMQPDestDriver *) d;
+
+  if (self->vp)
+    value_pairs_free(self->vp);
+  self->vp = vp;
+}
+
+/*
+ * Utilities
+ */
+
+static gchar *
+afamqp_dd_format_stats_instance(AMQPDestDriver *self)
+{
+  static gchar persist_name[1024];
+
+  g_snprintf(persist_name, sizeof(persist_name), "amqp,%s,%s,%u,%s,%s",
+             self->vhost, self->host, self->port, self->exchange,
+             self->exchange_type);
+  return persist_name;
+}
+
+static gchar *
+afamqp_dd_format_persist_name(AMQPDestDriver *self)
+{
+  static gchar persist_name[1024];
+
+  g_snprintf(persist_name, sizeof(persist_name), "afamqp(%s,%s,%u,%s,%s)",
+             self->vhost, self->host, self->port, self->exchange,
+             self->exchange_type);
+  return persist_name;
+}
+
+static void
+afamqp_dd_suspend(AMQPDestDriver *self)
+{
+  self->writer_thread_suspended = TRUE;
+  g_get_current_time(&self->writer_thread_suspend_target);
+  g_time_val_add(&self->writer_thread_suspend_target,
+                 self->time_reopen * 1000000);
+}
+
+static void
+afamqp_dd_disconnect(AMQPDestDriver *self)
+{
+  amqp_channel_close(self->conn, 1, AMQP_REPLY_SUCCESS);
+  amqp_connection_close(self->conn, AMQP_REPLY_SUCCESS);
+  amqp_destroy_connection(self->conn);
+  self->conn = NULL;
+}
+
+static gboolean
+afamqp_is_ok(AMQPDestDriver *self, gchar *context, amqp_rpc_reply_t ret)
+{
+  switch (ret.reply_type)
+    {
+    case AMQP_RESPONSE_NORMAL:
+      break;
+
+    case AMQP_RESPONSE_NONE:
+      msg_error(context,
+                evt_tag_str("driver", self->super.super.id),
+                evt_tag_str("error", "missing RPC reply type"),
+                evt_tag_int("time_reopen", self->time_reopen),
+                NULL);
+      afamqp_dd_suspend(self);
+      return FALSE;
+
+    case AMQP_RESPONSE_LIBRARY_EXCEPTION:
+      {
+        gchar *errstr = amqp_error_string(ret.library_error);
+        msg_error(context,
+                  evt_tag_str("driver", self->super.super.id),
+                  evt_tag_str("error", errstr),
+                  evt_tag_int("time_reopen", self->time_reopen),
+                  NULL);
+        g_free (errstr);
+        afamqp_dd_suspend(self);
+        return FALSE;
+      }
+
+    case AMQP_RESPONSE_SERVER_EXCEPTION:
+      switch (ret.reply.id)
+        {
+        case AMQP_CONNECTION_CLOSE_METHOD:
+          {
+            amqp_connection_close_t *m =
+              (amqp_connection_close_t *) ret.reply.decoded;
+            msg_error(context,
+                      evt_tag_str("driver", self->super.super.id),
+                      evt_tag_str("error", "server connection error"),
+                      evt_tag_int("code", m->reply_code),
+                      evt_tag_str("text", m->reply_text.bytes),
+                      evt_tag_int("time_reopen", self->time_reopen),
+                      NULL);
+            afamqp_dd_suspend(self);
+            return FALSE;
+          }
+        case AMQP_CHANNEL_CLOSE_METHOD:
+          {
+            amqp_channel_close_t *m =
+              (amqp_channel_close_t *) ret.reply.decoded;
+            msg_error(context,
+                      evt_tag_str("driver", self->super.super.id),
+                      evt_tag_str("error", "server channel error"),
+                      evt_tag_int("code", m->reply_code),
+                      evt_tag_str("text", m->reply_text.bytes),
+                      evt_tag_int("time_reopen", self->time_reopen),
+                      NULL);
+            afamqp_dd_suspend(self);
+            return FALSE;
+          }
+        default:
+          msg_error(context,
+                    evt_tag_str("driver", self->super.super.id),
+                    evt_tag_str("error", "unknown server error"),
+                    evt_tag_printf("method id", "0x%08X", ret.reply.id),
+                    evt_tag_int("time_reopen", self->time_reopen),
+                    NULL);
+          afamqp_dd_suspend(self);
+          return FALSE;
+        }
+      return FALSE;
+    }
+  return TRUE;
+}
+
+static gboolean
+afamqp_dd_connect(AMQPDestDriver *self, gboolean reconnect)
+{
+  int sockfd;
+  amqp_rpc_reply_t ret;
+
+  if (reconnect && self->conn)
+    {
+      ret = amqp_get_rpc_reply(self->conn);
+      if (ret.reply_type == AMQP_RESPONSE_NORMAL)
+        return TRUE;
+    }
+
+  self->conn = amqp_new_connection();
+  sockfd = amqp_open_socket(self->host, self->port);
+  if (sockfd < 0)
+    {
+      gchar *errstr = amqp_error_string(-sockfd);
+      msg_error("Error connecting to AMQP server",
+                evt_tag_str("driver", self->super.super.id),
+                evt_tag_str("error", errstr),
+                evt_tag_int("time_reopen", self->time_reopen),
+                NULL);
+      g_free(errstr);
+      return FALSE;
+    }
+  amqp_set_sockfd(self->conn, sockfd);
+
+  ret = amqp_login(self->conn, self->vhost, 0, 131072, 0,
+                   AMQP_SASL_METHOD_PLAIN, self->user, self->password);
+  if (!afamqp_is_ok(self, "Error during AMQP login", ret))
+    return FALSE;
+
+  amqp_channel_open(self->conn, 1);
+  ret = amqp_get_rpc_reply(self->conn);
+  if (!afamqp_is_ok(self, "Error during AMQP channel open", ret))
+    return FALSE;
+
+  amqp_exchange_declare(self->conn, 1, amqp_cstring_bytes(self->exchange),
+                        amqp_cstring_bytes(self->exchange_type), 0, 0, amqp_empty_table);
+  ret = amqp_get_rpc_reply(self->conn);
+  if (!afamqp_is_ok(self, "Error during AMQP exchange declaration", ret))
+    return FALSE;
+
+  msg_debug ("Connecting to AMQP succeeded",
+             evt_tag_str("driver", self->super.super.id),
+             NULL);
+
+  return TRUE;
+}
+
+/*
+ * Worker thread
+ */
+
+static gboolean
+afamqp_vp_foreach(const gchar *name, const gchar *value,
+                  gpointer user_data)
+{
+  amqp_table_entry_t **entries = (amqp_table_entry_t **) ((gpointer *)user_data)[0];
+  gint *pos = (gint *) ((gpointer *)user_data)[1];
+  gint32 *max_size = (gint32 *) ((gpointer *)user_data)[2];
+
+  if (*pos == *max_size)
+    {
+      *max_size *= 2;
+      *entries = g_renew(amqp_table_entry_t, *entries, *max_size);
+    }
+
+  (*entries)[*pos].key = amqp_cstring_bytes(strdup(name));
+  (*entries)[*pos].value.kind = AMQP_FIELD_KIND_UTF8;
+  (*entries)[*pos].value.value.bytes = amqp_cstring_bytes(strdup(value));
+
+  (*pos)++;
+
+  return FALSE;
+}
+
+static gboolean
+afamqp_worker_publish(AMQPDestDriver *self, LogMessage *msg)
+{
+  gint pos = 0, ret;
+  amqp_table_t table;
+  amqp_basic_properties_t props;
+  gboolean success = TRUE;
+  ScratchBuffer *routing_key = scratch_buffer_acquire();
+  ScratchBuffer *body = scratch_buffer_acquire();
+  amqp_bytes_t body_bytes = amqp_cstring_bytes("");
+
+  gpointer user_data[] = { &self->entries, &pos, &self->max_entries };
+
+  value_pairs_foreach(self->vp, afamqp_vp_foreach, msg, self->seq_num,
+                      user_data);
+
+  table.num_entries = pos;
+  table.entries = self->entries;
+
+  props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
+    | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_HEADERS_FLAG;
+  props.content_type = amqp_cstring_bytes("text/plain");
+  props.delivery_mode = self->persistent;
+  props.headers = table;
+
+  log_template_format(self->routing_key_template, msg, NULL, LTZ_LOCAL,
+                      self->seq_num, NULL, sb_string(routing_key));
+
+  if (self->body_template)
+    {
+      log_template_format(self->body_template, msg, NULL, LTZ_LOCAL,
+                          self->seq_num, NULL, sb_string(body));
+      body_bytes = amqp_cstring_bytes(sb_string(body)->str);
+    }
+
+  ret = amqp_basic_publish(self->conn, 1, amqp_cstring_bytes(self->exchange),
+                           amqp_cstring_bytes(sb_string(routing_key)->str),
+                           0, 0, &props, body_bytes);
+
+  scratch_buffer_release(routing_key);
+  scratch_buffer_release(body);
+
+  if (ret < 0)
+    {
+      msg_error("Network error while inserting into AMQP server",
+                evt_tag_int("time_reopen", self->time_reopen), NULL);
+      success = FALSE;
+    }
+
+  while (--pos >= 0)
+    {
+      amqp_bytes_free(self->entries[pos].key);
+      amqp_bytes_free(self->entries[pos].value.value.bytes);
+    }
+
+  return success;
+}
+
+static gboolean
+afamqp_worker_insert(AMQPDestDriver *self)
+{
+  gboolean success;
+  LogMessage *msg;
+  LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
+
+  afamqp_dd_connect(self, TRUE);
+
+  g_mutex_lock(self->queue_mutex);
+  log_queue_reset_parallel_push(self->queue);
+  success = log_queue_pop_head(self->queue, &msg, &path_options, FALSE, FALSE);
+  g_mutex_unlock(self->queue_mutex);
+  if (!success)
+    return TRUE;
+
+  msg_set_context(msg);
+  success = afamqp_worker_publish (self, msg);
+  msg_set_context(NULL);
+
+  if (success)
+    {
+      stats_counter_inc(self->stored_messages);
+      step_sequence_number(&self->seq_num);
+      log_msg_ack(msg, &path_options);
+      log_msg_unref(msg);
+    }
+  else
+    {
+      g_mutex_lock(self->queue_mutex);
+      log_queue_push_head(self->queue, msg, &path_options);
+      g_mutex_unlock(self->queue_mutex);
+    }
+
+  return success;
+}
+
+static gpointer
+afamqp_worker_thread(gpointer arg)
+{
+  AMQPDestDriver *self = (AMQPDestDriver *) arg;
+  gboolean success;
+
+  msg_debug("Worker thread started",
+            evt_tag_str("driver", self->super.super.id), NULL);
+
+  success = afamqp_dd_connect(self, FALSE);
+
+  while (!self->writer_thread_terminate)
+    {
+      g_mutex_lock(self->suspend_mutex);
+      if (self->writer_thread_suspended)
+        {
+          g_cond_timed_wait(self->writer_thread_wakeup_cond,
+                            self->suspend_mutex, &self->writer_thread_suspend_target);
+          self->writer_thread_suspended = FALSE;
+          g_mutex_unlock(self->suspend_mutex);
+        }
+      else
+        {
+          g_mutex_unlock(self->suspend_mutex);
+
+          g_mutex_lock(self->queue_mutex);
+          if (log_queue_get_length(self->queue) == 0)
+            {
+              g_cond_wait(self->writer_thread_wakeup_cond, self->queue_mutex);
+            }
+          g_mutex_unlock(self->queue_mutex);
+        }
+
+      if (self->writer_thread_terminate)
+        break;
+
+      if (!afamqp_worker_insert(self))
+        {
+          afamqp_dd_disconnect(self);
+          afamqp_dd_suspend(self);
+        }
+    }
+
+  afamqp_dd_disconnect(self);
+
+  msg_debug("Worker thread finished",
+            evt_tag_str("driver", self->super.super.id), NULL);
+
+  return NULL;
+}
+
+/*
+ * Main thread
+ */
+
+static void
+afamqp_dd_start_thread(AMQPDestDriver *self)
+{
+  self->writer_thread = create_worker_thread(afamqp_worker_thread, self,
+                                             TRUE, NULL);
+}
+
+static void
+afamqp_dd_stop_thread(AMQPDestDriver *self)
+{
+  self->writer_thread_terminate = TRUE;
+  g_mutex_lock(self->queue_mutex);
+  g_cond_signal(self->writer_thread_wakeup_cond);
+  g_mutex_unlock(self->queue_mutex);
+  g_thread_join(self->writer_thread);
+}
+
+static gboolean
+afamqp_dd_init(LogPipe *s)
+{
+  AMQPDestDriver *self = (AMQPDestDriver *) s;
+  GlobalConfig *cfg = log_pipe_get_config(s);
+
+  if (!log_dest_driver_init_method(s))
+    return FALSE;
+
+  if (cfg)
+    self->time_reopen = cfg->time_reopen;
+
+  if (!self->vp)
+    {
+      self->vp = value_pairs_new();
+      value_pairs_add_scope(self->vp, "selected-macros");
+      value_pairs_add_scope(self->vp, "nv-pairs");
+      value_pairs_add_scope(self->vp, "sdata");
+    }
+
+  msg_verbose("Initializing AMQP destination",
+              evt_tag_str("vhost", self->vhost),
+              evt_tag_str("host", self->host),
+              evt_tag_int("port", self->port),
+              evt_tag_str("exchange", self->exchange),
+              evt_tag_str("exchange_type", self->exchange_type),
+              NULL);
+
+  self->queue = log_dest_driver_acquire_queue(&self->super, afamqp_dd_format_persist_name(self));
+
+  stats_lock();
+  stats_register_counter(0, SCS_AMQP | SCS_DESTINATION,
+                         self->super.super.id, afamqp_dd_format_stats_instance(self),
+                         SC_TYPE_STORED, &self->stored_messages);
+  stats_register_counter(0, SCS_AMQP | SCS_DESTINATION,
+                         self->super.super.id, afamqp_dd_format_stats_instance(self),
+                         SC_TYPE_DROPPED, &self->dropped_messages);
+  stats_unlock();
+
+  log_queue_set_counters(self->queue, self->stored_messages,
+                         self->dropped_messages);
+  afamqp_dd_start_thread(self);
+
+  return TRUE;
+}
+
+static gboolean
+afamqp_dd_deinit(LogPipe *s)
+{
+  AMQPDestDriver *self = (AMQPDestDriver *) s;
+
+  afamqp_dd_stop_thread(self);
+
+  log_queue_set_counters(self->queue, NULL, NULL);
+  stats_lock();
+  stats_unregister_counter(SCS_AMQP | SCS_DESTINATION,
+                           self->super.super.id, afamqp_dd_format_stats_instance(self),
+                           SC_TYPE_STORED, &self->stored_messages);
+  stats_unregister_counter(SCS_AMQP | SCS_DESTINATION,
+                           self->super.super.id, afamqp_dd_format_stats_instance(self),
+                           SC_TYPE_DROPPED, &self->dropped_messages);
+  stats_unlock();
+  if (!log_dest_driver_deinit_method(s))
+    return FALSE;
+
+  return TRUE;
+}
+
+static void
+afamqp_dd_free(LogPipe *d)
+{
+  AMQPDestDriver *self = (AMQPDestDriver *) d;
+
+  g_mutex_free(self->suspend_mutex);
+  g_mutex_free(self->queue_mutex);
+  g_cond_free(self->writer_thread_wakeup_cond);
+
+  if (self->queue)
+    log_queue_unref(self->queue);
+
+  g_free(self->exchange);
+  g_free(self->exchange_type);
+  log_template_unref(self->routing_key_template);
+  log_template_unref(self->body_template);
+  g_free(self->user);
+  g_free(self->password);
+  g_free(self->host);
+  g_free(self->vhost);
+  g_free(self->entries);
+  if (self->vp)
+    value_pairs_free(self->vp);
+  log_dest_driver_free(d);
+}
+
+static void
+afamqp_dd_queue_notify(gpointer user_data)
+{
+  AMQPDestDriver *self = (AMQPDestDriver *) user_data;
+
+  g_mutex_lock(self->queue_mutex);
+  g_cond_signal(self->writer_thread_wakeup_cond);
+  log_queue_reset_parallel_push(self->queue);
+  g_mutex_unlock(self->queue_mutex);
+}
+
+static void
+afamqp_dd_queue(LogPipe *s, LogMessage *msg,
+                const LogPathOptions *path_options, gpointer user_data)
+{
+  AMQPDestDriver *self = (AMQPDestDriver *) s;
+  LogPathOptions local_options;
+
+  if (!path_options->flow_control_requested)
+    path_options = log_msg_break_ack(msg, path_options, &local_options);
+
+  g_mutex_lock(self->suspend_mutex);
+  g_mutex_lock(self->queue_mutex);
+  if (!self->writer_thread_suspended)
+    log_queue_set_parallel_push(self->queue, 1, afamqp_dd_queue_notify,
+                                self, NULL);
+  g_mutex_unlock(self->queue_mutex);
+  g_mutex_unlock(self->suspend_mutex);
+  log_queue_push_tail(self->queue, msg, path_options);
+}
+
+/*
+ * Plugin glue.
+ */
+
+LogDriver *
+afamqp_dd_new(void)
+{
+  AMQPDestDriver *self = g_new0(AMQPDestDriver, 1);
+
+  log_dest_driver_init_instance(&self->super);
+  self->super.super.super.init = afamqp_dd_init;
+  self->super.super.super.deinit = afamqp_dd_deinit;
+  self->super.super.super.queue = afamqp_dd_queue;
+  self->super.super.super.free_fn = afamqp_dd_free;
+
+  self->routing_key_template = log_template_new(configuration, NULL);
+
+  afamqp_dd_set_vhost((LogDriver *) self, "/");
+  afamqp_dd_set_host((LogDriver *) self, "127.0.0.1");
+  afamqp_dd_set_port((LogDriver *) self, 5672);
+  afamqp_dd_set_exchange((LogDriver *) self, "syslog");
+  afamqp_dd_set_exchange_type((LogDriver *) self, "fanout");
+  afamqp_dd_set_routing_key((LogDriver *) self, "");
+  afamqp_dd_set_persistent((LogDriver *) self, TRUE);
+
+  init_sequence_number(&self->seq_num);
+
+  self->writer_thread_wakeup_cond = g_cond_new();
+  self->suspend_mutex = g_mutex_new();
+  self->queue_mutex = g_mutex_new();
+
+  self->max_entries = 256;
+  self->entries = g_new(amqp_table_entry_t, self->max_entries);
+
+  return (LogDriver *) self;
+}
+
+extern CfgParser afamqp_dd_parser;
+
+static Plugin afamqp_plugin =
+{
+  .type = LL_CONTEXT_DESTINATION,
+  .name = "amqp",
+  .parser = &afamqp_parser
+};
+
+gboolean
+afamqp_module_init(GlobalConfig *cfg, CfgArgs *args)
+{
+  plugin_register(cfg, &afamqp_plugin, 1);
+  return TRUE;
+}
+
+const ModuleInfo module_info =
+{
+  .canonical_name = "afamqp",
+  .version = VERSION,
+  .description = "The afamqp module provides AMQP destination support for syslog-ng.",
+  .core_revision = SOURCE_REVISION, .plugins = &afamqp_plugin,
+  .plugins_len = 1,
+};
diff --git a/modules/afamqp/afamqp.h b/modules/afamqp/afamqp.h
new file mode 100644
index 0000000..8e7d7e9
--- /dev/null
+++ b/modules/afamqp/afamqp.h
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2012 Nagy, Attila <bra at fsn.hu>
+ * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary
+ * Copyright (c) 2012 Gergely Nagy <algernon at balabit.hu>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+ *
+ * As an additional exemption you are allowed to compile & link against the
+ * OpenSSL libraries as published by the OpenSSL project. See the file
+ * COPYING for details.
+ *
+ */
+
+#ifndef AFAMQP_H_INCLUDED
+#define AFAMQP_H_INCLUDED
+
+#include "driver.h"
+#include "value-pairs.h"
+
+LogDriver *afamqp_dd_new(void);
+
+void afamqp_dd_set_host(LogDriver *d, const gchar *host);
+void afamqp_dd_set_port(LogDriver *d, gint port);
+void afamqp_dd_set_exchange(LogDriver *d, const gchar *database);
+void afamqp_dd_set_exchange_type(LogDriver *d, const gchar *exchange_type);
+void afamqp_dd_set_vhost(LogDriver *d, const gchar *vhost);
+void afamqp_dd_set_routing_key(LogDriver *d, const gchar *routing_key);
+void afamqp_dd_set_body(LogDriver *d, const gchar *body);
+void afamqp_dd_set_persistent(LogDriver *d, gboolean persistent);
+void afamqp_dd_set_user(LogDriver *d, const gchar *user);
+void afamqp_dd_set_password(LogDriver *d, const gchar *password);
+void afamqp_dd_set_value_pairs(LogDriver *d, ValuePairs *vp);
+
+#endif
diff --git a/modules/afamqp/rabbitmq-c b/modules/afamqp/rabbitmq-c
new file mode 160000
index 0000000..afdfc05
--- /dev/null
+++ b/modules/afamqp/rabbitmq-c
@@ -0,0 +1 @@
+Subproject commit afdfc05b0e974b470130e20b4be6c5728618afd4
-- 
1.7.10.4




More information about the syslog-ng mailing list