[syslog-ng] [PATCH 1/2] afamqp: New AMQP destination driver
Gergely Nagy
algernon at balabit.hu
Tue Oct 9 18:13:05 CEST 2012
This driver implements an AMQP destination (based on the rabbitmq-c
library), supporting persistence, all the exchange types, and uses a
creative way to get messages accross: all the name-value pairs
selected with the value-pairs() syntax will be sent as headers, while
the message payload can be set with the body() option (empty by
default).
Most settings have sensible defaults, except for a few, noted below:
@module afamqp
destination d_amqp {
amqp(
vhost("/")
host("127.0.0.1")
port(5672)
username("guest") # mandatory, no default
password("guest") # mandatory, no default
exchange("syslog")
exchange_type("fanout")
#routing_key("")
#body("")
persistent(yes)
value-pairs(
scope("selected-macros" "nv-pairs" "sdata")
)
);
};
Publishing the name-value pairs as headers makes it possible to use a
headers exchange type and subscribe only to interesting log streams,
in a much more flexible way than using the routing_key() option.
The routing_key() and body() options can contain any template, they
will be expanded before publication.
Signed-off-by: Attila Nagy <bra at fsn.hu>
Signed-off-by: Gergely Nagy <algernon at balabit.hu>
---
.gitmodules | 3 +
autogen.sh | 2 +-
configure.in | 52 +++
lib/stats.c | 2 +
lib/stats.h | 1 +
modules/Makefile.am | 2 +-
modules/afamqp/Makefile.am | 35 ++
modules/afamqp/afamqp-grammar.ym | 92 +++++
modules/afamqp/afamqp-parser.c | 59 +++
modules/afamqp/afamqp-parser.h | 36 ++
modules/afamqp/afamqp.c | 738 ++++++++++++++++++++++++++++++++++++++
modules/afamqp/afamqp.h | 45 +++
modules/afamqp/rabbitmq-c | 1 +
13 files changed, 1066 insertions(+), 2 deletions(-)
create mode 100644 modules/afamqp/Makefile.am
create mode 100644 modules/afamqp/afamqp-grammar.ym
create mode 100644 modules/afamqp/afamqp-parser.c
create mode 100644 modules/afamqp/afamqp-parser.h
create mode 100644 modules/afamqp/afamqp.c
create mode 100644 modules/afamqp/afamqp.h
create mode 160000 modules/afamqp/rabbitmq-c
diff --git a/.gitmodules b/.gitmodules
index d000782..be86ee5 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -4,3 +4,6 @@
[submodule "lib/ivykis"]
path = lib/ivykis
url = https://github.com/buytenh/ivykis.git
+[submodule "modules/afamqp/rabbitmq-c"]
+ path = modules/afamqp/rabbitmq-c
+ url = https://github.com/alanxz/rabbitmq-c.git
diff --git a/autogen.sh b/autogen.sh
index fb4151e..5027602 100755
--- a/autogen.sh
+++ b/autogen.sh
@@ -3,7 +3,7 @@
# This script is needed to setup build environment from checked out
# source tree.
#
-SUBMODULES="lib/ivykis modules/afmongodb/libmongo-client"
+SUBMODULES="lib/ivykis modules/afmongodb/libmongo-client modules/afamqp/rabbitmq-c"
GIT=`which git`
autogen_submodules()
diff --git a/configure.in b/configure.in
index a23c6e8..2edec69 100644
--- a/configure.in
+++ b/configure.in
@@ -28,6 +28,7 @@ IVYKIS_MIN_VERSION="0.30.1"
JSON_C_MIN_VERSION="0.9"
PCRE_MIN_VERSION="6.1"
LMC_MIN_VERSION="0.1.6"
+LRMQ_MIN_VERSION="0.0.1"
dnl ***************************************************************************
dnl Initial setup
@@ -163,6 +164,15 @@ AC_ARG_WITH(libmongo-client,
Link against the system supplied or the built-in libmongo-client library.]
,,with_libmongo_client="internal")
+AC_ARG_ENABLE(amqp,
+ [ --enable-amqp Enable amqp destination (default: auto)]
+ ,,enable_amqp="auto")
+
+AC_ARG_WITH(librabbitmq,
+ [ --with-librabbitmq-client=[system/internal]
+ Link against the system supplied or the built-in librabbitmq library.]
+ ,,with_librabbitmq_client="internal")
+
AC_ARG_WITH(ivykis,
[ --with-ivykis=[system/internal]
Link against the system supplied or the built-in ivykis library.]
@@ -827,6 +837,31 @@ if test "x$enable_smtp" != "xno" && test "x$with_libesmtp" != "no"; then
fi
dnl ***************************************************************************
+dnl rabbitmq-c headers/libraries
+dnl ***************************************************************************
+
+if test "x$with_librabbitmq_client" = "xinternal"; then
+ if test -f "$srcdir/modules/afamqp/rabbitmq-c/librabbitmq/amqp.h"; then
+ AC_CONFIG_SUBDIRS([modules/afamqp/rabbitmq-c])
+ # these can only be used in modules/amqp as it assumes
+ # the current directory just one below rabbitmq-c
+
+ LIBRABBITMQ_LIBS="-L\$(builddir)/rabbitmq-c/librabbitmq -lrabbitmq"
+ LIBRABBITMQ_CFLAGS="-I\$(srcdir)/rabbitmq-c/librabbitmq -I\$(builddir)/rabbitmq-c/librabbitmq"
+ LIBRABBITMQ_SUBDIRS="rabbitmq-c"
+ else
+ AC_MSG_WARN([Internal librabbitmq-client sources not found in modules/afamqp/rabbitmq-c])
+ with_librabbitmq_client="no"
+ fi
+elif test "x$with_librabbitmq_client" = "xsystem"; then
+ PKG_CHECK_MODULES(LIBRABBITMQ, librabbitmq >= $LRMQ_MIN_VERSION,with_librabbitmq_client="yes",with_librabbitmq_client="no")
+fi
+
+if test "x$with_librabbitmq_client" = "xno"; then
+ enable_amqp="no"
+fi
+
+dnl ***************************************************************************
dnl misc features to be enabled
dnl ***************************************************************************
@@ -889,6 +924,16 @@ if test "x$enable_mongodb" = "xauto"; then
AC_MSG_RESULT([$enable_mongodb])
fi
+if test "x$enable_amqp" = "xauto"; then
+ AC_MSG_CHECKING(whether to enable amqp destination support)
+ if test "x$with_librabbitmq_client" != "no"; then
+ enable_amqp="yes"
+ else
+ enable_amqp="no"
+ fi
+ AC_MSG_RESULT([$enable_amqp])
+fi
+
if test "x$enable_json" != "xno"; then
JSON_LIBS=$JSON_C_LIBS
JSON_CFLAGS=$JSON_C_CFLAGS
@@ -1068,6 +1113,7 @@ AM_CONDITIONAL(ENABLE_SUN_STREAMS, [test "$enable_sun_streams" = "yes"])
AM_CONDITIONAL(ENABLE_PACCT, [test "$enable_pacct" = "yes"])
AM_CONDITIONAL(ENABLE_MONGODB, [test "$enable_mongodb" = "yes"])
AM_CONDITIONAL(ENABLE_SMTP, [test "$enable_smtp" = "yes"])
+AM_CONDITIONAL(ENABLE_AMQP, [test "$enable_amqp" = "yes"])
AM_CONDITIONAL(ENABLE_JSON, [test "$enable_json" = "yes"])
AM_CONDITIONAL(WITH_LIBSYSTEMD, [test "$with_libsystemd" = "yes"])
@@ -1102,6 +1148,9 @@ AC_SUBST(LIBMONGO_CFLAGS)
AC_SUBST(LIBMONGO_SUBDIRS)
AC_SUBST(LIBESMTP_CFLAGS)
AC_SUBST(LIBESMTP_LIBS)
+AC_SUBST(LIBRABBITMQ_LIBS)
+AC_SUBST(LIBRABBITMQ_CFLAGS)
+AC_SUBST(LIBRABBITMQ_SUBDIRS)
AC_SUBST(JSON_LIBS)
AC_SUBST(JSON_CFLAGS)
AC_SUBST(IVYKIS_SUBDIRS)
@@ -1132,6 +1181,7 @@ AC_OUTPUT(dist.conf
modules/afuser/Makefile
modules/afmongodb/Makefile
modules/afsmtp/Makefile
+ modules/afamqp/Makefile
modules/dbparser/Makefile
modules/dbparser/tests/Makefile
modules/csvparser/Makefile
@@ -1171,6 +1221,7 @@ echo " __thread keyword : ${ac_cv_have_tls:=no}"
echo " Submodules:"
echo " ivykis : $with_ivykis"
echo " libmongo-client : $with_libmongo_client"
+echo " librabbitmq : $with_librabbitmq_client"
echo " Features:"
echo " Debug symbols : ${enable_debug:=no}"
echo " GCC profiling : ${enable_gprof:=no}"
@@ -1191,4 +1242,5 @@ echo " PACCT module (EXPERIMENTAL) : ${enable_pacct:=no}"
echo " MongoDB destination (module): ${enable_mongodb:=no}"
echo " JSON support (module) : ${enable_json:=no}"
echo " SMTP support (module) : ${enable_smtp:=no}"
+echo " AMQP destination (module) : ${enable_amqp:=no}"
diff --git a/lib/stats.c b/lib/stats.c
index 0ccc283..f757597 100644
--- a/lib/stats.c
+++ b/lib/stats.c
@@ -383,6 +383,8 @@ const gchar *source_names[SCS_MAX] =
"severity",
"facility",
"sender",
+ "smtp",
+ "amqp",
};
diff --git a/lib/stats.h b/lib/stats.h
index 9174408..2550724 100644
--- a/lib/stats.h
+++ b/lib/stats.h
@@ -71,6 +71,7 @@ enum
SCS_FACILITY = 25,
SCS_SENDER = 26,
SCS_SMTP = 27,
+ SCS_AMQP = 28,
SCS_MAX,
SCS_SOURCE_MASK = 0xff
};
diff --git a/modules/Makefile.am b/modules/Makefile.am
index 8f9d63b..1fce2d2 100644
--- a/modules/Makefile.am
+++ b/modules/Makefile.am
@@ -1 +1 @@
-SUBDIRS = afsocket afsql afstreams affile afprog afuser afmongodb afsmtp csvparser confgen system-source syslogformat pacctformat basicfuncs cryptofuncs dbparser json
+SUBDIRS = afsocket afsql afstreams affile afprog afuser afamqp afmongodb afsmtp csvparser confgen system-source syslogformat pacctformat basicfuncs cryptofuncs dbparser json
diff --git a/modules/afamqp/Makefile.am b/modules/afamqp/Makefile.am
new file mode 100644
index 0000000..ccf20df
--- /dev/null
+++ b/modules/afamqp/Makefile.am
@@ -0,0 +1,35 @@
+
+SUBDIRS = @LIBRABBITMQ_SUBDIRS@
+DIST_SUBDIRS = rabbitmq-c
+
+moduledir = @moduledir@
+AM_CPPFLAGS = -I$(top_srcdir)/lib -I../../lib
+module_LTLIBRARIES = libafamqp.la
+
+export top_srcdir
+
+if ENABLE_AMQP
+
+libafamqp_la_CFLAGS = $(LIBRABBITMQ_CFLAGS)
+libafamqp_la_SOURCES = afamqp-grammar.y afamqp.c afamqp.h afamqp-parser.c afamqp-parser.h
+libafamqp_la_LIBADD = $(MODULE_DEPS_LIBS) $(LIBRABBITMQ_LIBS)
+libafamqp_la_LDFLAGS = $(MODULE_LDFLAGS)
+
+endif
+
+BUILT_SOURCES = afamqp-grammar.y afamqp-grammar.c afamqp-grammar.h
+EXTRA_DIST = $(BUILT_SOURCES) afamqp-grammar.ym
+
+include $(top_srcdir)/build/lex-rules.am
+
+# divert install/uninstall targets to avoid recursing into $(SUBDIRS)
+
+install:
+ $(MAKE) $(AM_MAKEFLAGS) all
+ $(MAKE) $(AM_MAKEFLAGS) install-am
+
+uninstall:
+ $(MAKE) $(AM_MAKEFLAGS) uninstall-am
+
+check:
+ echo "Make check disabled, since it requires a newer glib"
diff --git a/modules/afamqp/afamqp-grammar.ym b/modules/afamqp/afamqp-grammar.ym
new file mode 100644
index 0000000..b325b3d
--- /dev/null
+++ b/modules/afamqp/afamqp-grammar.ym
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2012 Nagy, Attila <bra at fsn.hu>
+ * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary
+ * Copyright (c) 2012 Gergely Nagy <algernon at balabit.hu>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * As an additional exemption you are allowed to compile & link against the
+ * OpenSSL libraries as published by the OpenSSL project. See the file
+ * COPYING for details.
+ *
+ */
+
+%code requires {
+
+#include "afamqp-parser.h"
+
+}
+
+%code {
+
+#include "cfg-parser.h"
+#include "afamqp-grammar.h"
+#include "plugin.h"
+#include "vptransform.h"
+
+extern LogDriver *last_driver;
+extern ValuePairs *last_value_pairs;
+extern ValuePairsTransformSet *last_vp_transset;
+}
+
+%name-prefix "afamqp_"
+%lex-param {CfgLexer *lexer}
+%parse-param {CfgLexer *lexer}
+%parse-param {LogDriver **instance}
+%parse-param {gpointer arg}
+
+
+/* INCLUDE_DECLS */
+
+%token KW_AMQP
+%token KW_EXCHANGE
+%token KW_EXCHANGE_TYPE
+%token KW_PERSISTENT
+%token KW_VHOST
+%token KW_ROUTING_KEY
+%token KW_BODY
+
+%%
+
+start
+ : LL_CONTEXT_DESTINATION KW_AMQP
+ {
+ last_driver = *instance = afamqp_dd_new();
+ }
+ '(' afamqp_options ')' { YYACCEPT; }
+ ;
+
+afamqp_options
+ : afamqp_option afamqp_options
+ |
+ ;
+
+afamqp_option
+ : KW_HOST '(' string ')' { afamqp_dd_set_host(last_driver, $3); free($3); }
+ | KW_PORT '(' LL_NUMBER ')' { afamqp_dd_set_port(last_driver, $3); }
+ | KW_VHOST '(' string ')' { afamqp_dd_set_vhost(last_driver, $3); free($3); }
+ | KW_EXCHANGE '(' string ')' { afamqp_dd_set_exchange(last_driver, $3); free($3); }
+ | KW_EXCHANGE_TYPE '(' string ')' { afamqp_dd_set_exchange_type(last_driver, $3); free($3); }
+ | KW_ROUTING_KEY '(' string ')' { afamqp_dd_set_routing_key(last_driver, $3); free($3); }
+ | KW_BODY '(' string ')' { afamqp_dd_set_body(last_driver, $3); free($3); }
+ | KW_PERSISTENT '(' yesno ')' { afamqp_dd_set_persistent(last_driver, $3); }
+ | KW_USERNAME '(' string ')' { afamqp_dd_set_user(last_driver, $3); free($3); }
+ | KW_PASSWORD '(' string ')' { afamqp_dd_set_password(last_driver, $3); free($3); }
+ | value_pair_option { afamqp_dd_set_value_pairs(last_driver, $1); }
+ | dest_driver_option
+ ;
+
+/* INCLUDE_RULES */
+
+%%
diff --git a/modules/afamqp/afamqp-parser.c b/modules/afamqp/afamqp-parser.c
new file mode 100644
index 0000000..f082f32
--- /dev/null
+++ b/modules/afamqp/afamqp-parser.c
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2012 Nagy, Attila <bra at fsn.hu>
+ * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary
+ * Copyright (c) 2012 Gergely Nagy <algernon at balabit.hu>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * As an additional exemption you are allowed to compile & link against the
+ * OpenSSL libraries as published by the OpenSSL project. See the file
+ * COPYING for details.
+ *
+ */
+
+#include "afamqp.h"
+#include "cfg-parser.h"
+#include "afamqp-grammar.h"
+
+extern int afamqp_debug;
+int afamqp_parse(CfgLexer *lexer, LogDriver **instance, gpointer arg);
+
+static CfgLexerKeyword afamqp_keywords[] = {
+ { "amqp", KW_AMQP },
+ { "vhost", KW_VHOST },
+ { "host", KW_HOST },
+ { "port", KW_PORT },
+ { "exchange", KW_EXCHANGE },
+ { "exchange_type", KW_EXCHANGE_TYPE },
+ { "routing_key", KW_ROUTING_KEY },
+ { "persistent", KW_PERSISTENT },
+ { "username", KW_USERNAME },
+ { "password", KW_PASSWORD },
+ { "log_fifo_size", KW_LOG_FIFO_SIZE },
+ { "body", KW_BODY },
+ { NULL }
+};
+
+CfgParser afamqp_parser =
+{
+#if ENABLE_DEBUG
+ .debug_flag = &afamqp_debug,
+#endif
+ .name = "afamqp",
+ .keywords = afamqp_keywords,
+ .parse = (int (*)(CfgLexer *lexer, gpointer *instance, gpointer)) afamqp_parse,
+ .cleanup = (void (*)(gpointer)) log_pipe_unref,
+};
+
+CFG_PARSER_IMPLEMENT_LEXER_BINDING(afamqp_, LogDriver **)
diff --git a/modules/afamqp/afamqp-parser.h b/modules/afamqp/afamqp-parser.h
new file mode 100644
index 0000000..d75de63
--- /dev/null
+++ b/modules/afamqp/afamqp-parser.h
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2012 Nagy, Attila <bra at fsn.hu>
+ * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary
+ * Copyright (c) 2012 Gergely Nagy <algernon at balabit.hu>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * As an additional exemption you are allowed to compile & link against the
+ * OpenSSL libraries as published by the OpenSSL project. See the file
+ * COPYING for details.
+ *
+ */
+
+#ifndef AFAMQP_PARSER_H_INCLUDED
+#define AFAMQP_PARSER_H_INCLUDED
+
+#include "cfg-parser.h"
+#include "cfg-lexer.h"
+#include "afamqp.h"
+
+extern CfgParser afamqp_parser;
+
+CFG_PARSER_DECLARE_LEXER_BINDING(afamqp_, LogDriver **)
+
+#endif
diff --git a/modules/afamqp/afamqp.c b/modules/afamqp/afamqp.c
new file mode 100644
index 0000000..a9d65d4
--- /dev/null
+++ b/modules/afamqp/afamqp.c
@@ -0,0 +1,738 @@
+/*
+ * Copyright (c) 2012 Nagy, Attila <bra at fsn.hu>
+ * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary
+ * Copyright (c) 2012 Gergely Nagy <algernon at balabit.hu>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * As an additional exemption you are allowed to compile & link against the
+ * OpenSSL libraries as published by the OpenSSL project. See the file
+ * COPYING for details.
+ *
+ */
+
+#include "afamqp.h"
+#include "afamqp-parser.h"
+#include "plugin.h"
+#include "messages.h"
+#include "misc.h"
+#include "stats.h"
+#include "nvtable.h"
+#include "logqueue.h"
+#include "scratch-buffers.h"
+
+#include <amqp.h>
+#include <amqp_framing.h>
+
+typedef struct
+{
+ LogDestDriver super;
+
+ /* Shared between main/writer; only read by the writer, never written */
+ gchar *exchange;
+ gchar *exchange_type;
+ LogTemplate *routing_key_template;
+ LogTemplate *body_template;
+
+ gint persistent;
+
+ gchar *vhost;
+ gchar *host;
+ gint port;
+
+ gchar *user;
+ gchar *password;
+
+ time_t time_reopen;
+
+ StatsCounterItem *dropped_messages;
+ StatsCounterItem *stored_messages;
+
+ ValuePairs *vp;
+
+ /* Thread related stuff; shared */
+ GThread *writer_thread;
+ GMutex *queue_mutex;
+ GMutex *suspend_mutex;
+ GCond *writer_thread_wakeup_cond;
+
+ gboolean writer_thread_terminate;
+ gboolean writer_thread_suspended;
+ GTimeVal writer_thread_suspend_target;
+
+ LogQueue *queue;
+
+ /* Writer-only stuff */
+ amqp_connection_state_t conn;
+ amqp_table_entry_t *entries;
+ gint32 max_entries;
+ gint32 seq_num;
+} AMQPDestDriver;
+
+/*
+ * Configuration
+ */
+
+void
+afamqp_dd_set_user(LogDriver *d, const gchar *user)
+{
+ AMQPDestDriver *self = (AMQPDestDriver *) d;
+
+ g_free(self->user);
+ self->user = g_strdup(user);
+}
+
+void
+afamqp_dd_set_password(LogDriver *d, const gchar *password)
+{
+ AMQPDestDriver *self = (AMQPDestDriver *) d;
+
+ g_free(self->password);
+ self->password = g_strdup(password);
+}
+
+void
+afamqp_dd_set_vhost(LogDriver *d, const gchar *vhost)
+{
+ AMQPDestDriver *self = (AMQPDestDriver *) d;
+
+ g_free(self->vhost);
+ self->vhost = g_strdup(vhost);
+}
+
+void
+afamqp_dd_set_host(LogDriver *d, const gchar *host)
+{
+ AMQPDestDriver *self = (AMQPDestDriver *) d;
+
+ g_free(self->host);
+ self->host = g_strdup(host);
+}
+
+void
+afamqp_dd_set_port(LogDriver *d, gint port)
+{
+ AMQPDestDriver *self = (AMQPDestDriver *) d;
+
+ self->port = (int) port;
+}
+
+void
+afamqp_dd_set_exchange(LogDriver *d, const gchar *exchange)
+{
+ AMQPDestDriver *self = (AMQPDestDriver *) d;
+
+ g_free(self->exchange);
+ self->exchange = g_strdup(exchange);
+}
+
+void
+afamqp_dd_set_exchange_type(LogDriver *d, const gchar *exchange_type)
+{
+ AMQPDestDriver *self = (AMQPDestDriver *) d;
+
+ g_free(self->exchange_type);
+ self->exchange_type = g_strdup(exchange_type);
+}
+
+void
+afamqp_dd_set_routing_key(LogDriver *d, const gchar *routing_key)
+{
+ AMQPDestDriver *self = (AMQPDestDriver *) d;
+
+ log_template_compile(self->routing_key_template, routing_key, NULL);
+}
+
+void
+afamqp_dd_set_body(LogDriver *d, const gchar *body)
+{
+ AMQPDestDriver *self = (AMQPDestDriver *) d;
+
+ if (!self->body_template)
+ self->body_template = log_template_new(configuration, NULL);
+ log_template_compile(self->body_template, body, NULL);
+}
+
+void
+afamqp_dd_set_persistent(LogDriver *s, gboolean persistent)
+{
+ AMQPDestDriver *self = (AMQPDestDriver *) s;
+
+ if (persistent)
+ self->persistent = 2;
+ else
+ self->persistent = 1;
+}
+
+void
+afamqp_dd_set_value_pairs(LogDriver *d, ValuePairs *vp)
+{
+ AMQPDestDriver *self = (AMQPDestDriver *) d;
+
+ if (self->vp)
+ value_pairs_free(self->vp);
+ self->vp = vp;
+}
+
+/*
+ * Utilities
+ */
+
+static gchar *
+afamqp_dd_format_stats_instance(AMQPDestDriver *self)
+{
+ static gchar persist_name[1024];
+
+ g_snprintf(persist_name, sizeof(persist_name), "amqp,%s,%s,%u,%s,%s",
+ self->vhost, self->host, self->port, self->exchange,
+ self->exchange_type);
+ return persist_name;
+}
+
+static gchar *
+afamqp_dd_format_persist_name(AMQPDestDriver *self)
+{
+ static gchar persist_name[1024];
+
+ g_snprintf(persist_name, sizeof(persist_name), "afamqp(%s,%s,%u,%s,%s)",
+ self->vhost, self->host, self->port, self->exchange,
+ self->exchange_type);
+ return persist_name;
+}
+
+static void
+afamqp_dd_suspend(AMQPDestDriver *self)
+{
+ self->writer_thread_suspended = TRUE;
+ g_get_current_time(&self->writer_thread_suspend_target);
+ g_time_val_add(&self->writer_thread_suspend_target,
+ self->time_reopen * 1000000);
+}
+
+static void
+afamqp_dd_disconnect(AMQPDestDriver *self)
+{
+ amqp_channel_close(self->conn, 1, AMQP_REPLY_SUCCESS);
+ amqp_connection_close(self->conn, AMQP_REPLY_SUCCESS);
+ amqp_destroy_connection(self->conn);
+ self->conn = NULL;
+}
+
+static gboolean
+afamqp_is_ok(AMQPDestDriver *self, gchar *context, amqp_rpc_reply_t ret)
+{
+ switch (ret.reply_type)
+ {
+ case AMQP_RESPONSE_NORMAL:
+ break;
+
+ case AMQP_RESPONSE_NONE:
+ msg_error(context,
+ evt_tag_str("driver", self->super.super.id),
+ evt_tag_str("error", "missing RPC reply type"),
+ evt_tag_int("time_reopen", self->time_reopen),
+ NULL);
+ afamqp_dd_suspend(self);
+ return FALSE;
+
+ case AMQP_RESPONSE_LIBRARY_EXCEPTION:
+ {
+ gchar *errstr = amqp_error_string(ret.library_error);
+ msg_error(context,
+ evt_tag_str("driver", self->super.super.id),
+ evt_tag_str("error", errstr),
+ evt_tag_int("time_reopen", self->time_reopen),
+ NULL);
+ g_free (errstr);
+ afamqp_dd_suspend(self);
+ return FALSE;
+ }
+
+ case AMQP_RESPONSE_SERVER_EXCEPTION:
+ switch (ret.reply.id)
+ {
+ case AMQP_CONNECTION_CLOSE_METHOD:
+ {
+ amqp_connection_close_t *m =
+ (amqp_connection_close_t *) ret.reply.decoded;
+ msg_error(context,
+ evt_tag_str("driver", self->super.super.id),
+ evt_tag_str("error", "server connection error"),
+ evt_tag_int("code", m->reply_code),
+ evt_tag_str("text", m->reply_text.bytes),
+ evt_tag_int("time_reopen", self->time_reopen),
+ NULL);
+ afamqp_dd_suspend(self);
+ return FALSE;
+ }
+ case AMQP_CHANNEL_CLOSE_METHOD:
+ {
+ amqp_channel_close_t *m =
+ (amqp_channel_close_t *) ret.reply.decoded;
+ msg_error(context,
+ evt_tag_str("driver", self->super.super.id),
+ evt_tag_str("error", "server channel error"),
+ evt_tag_int("code", m->reply_code),
+ evt_tag_str("text", m->reply_text.bytes),
+ evt_tag_int("time_reopen", self->time_reopen),
+ NULL);
+ afamqp_dd_suspend(self);
+ return FALSE;
+ }
+ default:
+ msg_error(context,
+ evt_tag_str("driver", self->super.super.id),
+ evt_tag_str("error", "unknown server error"),
+ evt_tag_printf("method id", "0x%08X", ret.reply.id),
+ evt_tag_int("time_reopen", self->time_reopen),
+ NULL);
+ afamqp_dd_suspend(self);
+ return FALSE;
+ }
+ return FALSE;
+ }
+ return TRUE;
+}
+
+static gboolean
+afamqp_dd_connect(AMQPDestDriver *self, gboolean reconnect)
+{
+ int sockfd;
+ amqp_rpc_reply_t ret;
+
+ if (reconnect && self->conn)
+ {
+ ret = amqp_get_rpc_reply(self->conn);
+ if (ret.reply_type == AMQP_RESPONSE_NORMAL)
+ return TRUE;
+ }
+
+ self->conn = amqp_new_connection();
+ sockfd = amqp_open_socket(self->host, self->port);
+ if (sockfd < 0)
+ {
+ gchar *errstr = amqp_error_string(-sockfd);
+ msg_error("Error connecting to AMQP server",
+ evt_tag_str("driver", self->super.super.id),
+ evt_tag_str("error", errstr),
+ evt_tag_int("time_reopen", self->time_reopen),
+ NULL);
+ g_free(errstr);
+ return FALSE;
+ }
+ amqp_set_sockfd(self->conn, sockfd);
+
+ ret = amqp_login(self->conn, self->vhost, 0, 131072, 0,
+ AMQP_SASL_METHOD_PLAIN, self->user, self->password);
+ if (!afamqp_is_ok(self, "Error during AMQP login", ret))
+ return FALSE;
+
+ amqp_channel_open(self->conn, 1);
+ ret = amqp_get_rpc_reply(self->conn);
+ if (!afamqp_is_ok(self, "Error during AMQP channel open", ret))
+ return FALSE;
+
+ amqp_exchange_declare(self->conn, 1, amqp_cstring_bytes(self->exchange),
+ amqp_cstring_bytes(self->exchange_type), 0, 0, amqp_empty_table);
+ ret = amqp_get_rpc_reply(self->conn);
+ if (!afamqp_is_ok(self, "Error during AMQP exchange declaration", ret))
+ return FALSE;
+
+ msg_debug ("Connecting to AMQP succeeded",
+ evt_tag_str("driver", self->super.super.id),
+ NULL);
+
+ return TRUE;
+}
+
+/*
+ * Worker thread
+ */
+
+static gboolean
+afamqp_vp_foreach(const gchar *name, const gchar *value,
+ gpointer user_data)
+{
+ amqp_table_entry_t **entries = (amqp_table_entry_t **) ((gpointer *)user_data)[0];
+ gint *pos = (gint *) ((gpointer *)user_data)[1];
+ gint32 *max_size = (gint32 *) ((gpointer *)user_data)[2];
+
+ if (*pos == *max_size)
+ {
+ *max_size *= 2;
+ *entries = g_renew(amqp_table_entry_t, *entries, *max_size);
+ }
+
+ (*entries)[*pos].key = amqp_cstring_bytes(strdup(name));
+ (*entries)[*pos].value.kind = AMQP_FIELD_KIND_UTF8;
+ (*entries)[*pos].value.value.bytes = amqp_cstring_bytes(strdup(value));
+
+ (*pos)++;
+
+ return FALSE;
+}
+
+static gboolean
+afamqp_worker_publish(AMQPDestDriver *self, LogMessage *msg)
+{
+ gint pos = 0, ret;
+ amqp_table_t table;
+ amqp_basic_properties_t props;
+ gboolean success = TRUE;
+ ScratchBuffer *routing_key = scratch_buffer_acquire();
+ ScratchBuffer *body = scratch_buffer_acquire();
+ amqp_bytes_t body_bytes = amqp_cstring_bytes("");
+
+ gpointer user_data[] = { &self->entries, &pos, &self->max_entries };
+
+ value_pairs_foreach(self->vp, afamqp_vp_foreach, msg, self->seq_num,
+ user_data);
+
+ table.num_entries = pos;
+ table.entries = self->entries;
+
+ props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG
+ | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_HEADERS_FLAG;
+ props.content_type = amqp_cstring_bytes("text/plain");
+ props.delivery_mode = self->persistent;
+ props.headers = table;
+
+ log_template_format(self->routing_key_template, msg, NULL, LTZ_LOCAL,
+ self->seq_num, NULL, sb_string(routing_key));
+
+ if (self->body_template)
+ {
+ log_template_format(self->body_template, msg, NULL, LTZ_LOCAL,
+ self->seq_num, NULL, sb_string(body));
+ body_bytes = amqp_cstring_bytes(sb_string(body)->str);
+ }
+
+ ret = amqp_basic_publish(self->conn, 1, amqp_cstring_bytes(self->exchange),
+ amqp_cstring_bytes(sb_string(routing_key)->str),
+ 0, 0, &props, body_bytes);
+
+ scratch_buffer_release(routing_key);
+ scratch_buffer_release(body);
+
+ if (ret < 0)
+ {
+ msg_error("Network error while inserting into AMQP server",
+ evt_tag_int("time_reopen", self->time_reopen), NULL);
+ success = FALSE;
+ }
+
+ while (--pos >= 0)
+ {
+ amqp_bytes_free(self->entries[pos].key);
+ amqp_bytes_free(self->entries[pos].value.value.bytes);
+ }
+
+ return success;
+}
+
+static gboolean
+afamqp_worker_insert(AMQPDestDriver *self)
+{
+ gboolean success;
+ LogMessage *msg;
+ LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
+
+ afamqp_dd_connect(self, TRUE);
+
+ g_mutex_lock(self->queue_mutex);
+ log_queue_reset_parallel_push(self->queue);
+ success = log_queue_pop_head(self->queue, &msg, &path_options, FALSE, FALSE);
+ g_mutex_unlock(self->queue_mutex);
+ if (!success)
+ return TRUE;
+
+ msg_set_context(msg);
+ success = afamqp_worker_publish (self, msg);
+ msg_set_context(NULL);
+
+ if (success)
+ {
+ stats_counter_inc(self->stored_messages);
+ step_sequence_number(&self->seq_num);
+ log_msg_ack(msg, &path_options);
+ log_msg_unref(msg);
+ }
+ else
+ {
+ g_mutex_lock(self->queue_mutex);
+ log_queue_push_head(self->queue, msg, &path_options);
+ g_mutex_unlock(self->queue_mutex);
+ }
+
+ return success;
+}
+
+static gpointer
+afamqp_worker_thread(gpointer arg)
+{
+ AMQPDestDriver *self = (AMQPDestDriver *) arg;
+ gboolean success;
+
+ msg_debug("Worker thread started",
+ evt_tag_str("driver", self->super.super.id), NULL);
+
+ success = afamqp_dd_connect(self, FALSE);
+
+ while (!self->writer_thread_terminate)
+ {
+ g_mutex_lock(self->suspend_mutex);
+ if (self->writer_thread_suspended)
+ {
+ g_cond_timed_wait(self->writer_thread_wakeup_cond,
+ self->suspend_mutex, &self->writer_thread_suspend_target);
+ self->writer_thread_suspended = FALSE;
+ g_mutex_unlock(self->suspend_mutex);
+ }
+ else
+ {
+ g_mutex_unlock(self->suspend_mutex);
+
+ g_mutex_lock(self->queue_mutex);
+ if (log_queue_get_length(self->queue) == 0)
+ {
+ g_cond_wait(self->writer_thread_wakeup_cond, self->queue_mutex);
+ }
+ g_mutex_unlock(self->queue_mutex);
+ }
+
+ if (self->writer_thread_terminate)
+ break;
+
+ if (!afamqp_worker_insert(self))
+ {
+ afamqp_dd_disconnect(self);
+ afamqp_dd_suspend(self);
+ }
+ }
+
+ afamqp_dd_disconnect(self);
+
+ msg_debug("Worker thread finished",
+ evt_tag_str("driver", self->super.super.id), NULL);
+
+ return NULL;
+}
+
+/*
+ * Main thread
+ */
+
+static void
+afamqp_dd_start_thread(AMQPDestDriver *self)
+{
+ self->writer_thread = create_worker_thread(afamqp_worker_thread, self,
+ TRUE, NULL);
+}
+
+static void
+afamqp_dd_stop_thread(AMQPDestDriver *self)
+{
+ self->writer_thread_terminate = TRUE;
+ g_mutex_lock(self->queue_mutex);
+ g_cond_signal(self->writer_thread_wakeup_cond);
+ g_mutex_unlock(self->queue_mutex);
+ g_thread_join(self->writer_thread);
+}
+
+static gboolean
+afamqp_dd_init(LogPipe *s)
+{
+ AMQPDestDriver *self = (AMQPDestDriver *) s;
+ GlobalConfig *cfg = log_pipe_get_config(s);
+
+ if (!log_dest_driver_init_method(s))
+ return FALSE;
+
+ if (cfg)
+ self->time_reopen = cfg->time_reopen;
+
+ if (!self->vp)
+ {
+ self->vp = value_pairs_new();
+ value_pairs_add_scope(self->vp, "selected-macros");
+ value_pairs_add_scope(self->vp, "nv-pairs");
+ value_pairs_add_scope(self->vp, "sdata");
+ }
+
+ msg_verbose("Initializing AMQP destination",
+ evt_tag_str("vhost", self->vhost),
+ evt_tag_str("host", self->host),
+ evt_tag_int("port", self->port),
+ evt_tag_str("exchange", self->exchange),
+ evt_tag_str("exchange_type", self->exchange_type),
+ NULL);
+
+ self->queue = log_dest_driver_acquire_queue(&self->super, afamqp_dd_format_persist_name(self));
+
+ stats_lock();
+ stats_register_counter(0, SCS_AMQP | SCS_DESTINATION,
+ self->super.super.id, afamqp_dd_format_stats_instance(self),
+ SC_TYPE_STORED, &self->stored_messages);
+ stats_register_counter(0, SCS_AMQP | SCS_DESTINATION,
+ self->super.super.id, afamqp_dd_format_stats_instance(self),
+ SC_TYPE_DROPPED, &self->dropped_messages);
+ stats_unlock();
+
+ log_queue_set_counters(self->queue, self->stored_messages,
+ self->dropped_messages);
+ afamqp_dd_start_thread(self);
+
+ return TRUE;
+}
+
+static gboolean
+afamqp_dd_deinit(LogPipe *s)
+{
+ AMQPDestDriver *self = (AMQPDestDriver *) s;
+
+ afamqp_dd_stop_thread(self);
+
+ log_queue_set_counters(self->queue, NULL, NULL);
+ stats_lock();
+ stats_unregister_counter(SCS_AMQP | SCS_DESTINATION,
+ self->super.super.id, afamqp_dd_format_stats_instance(self),
+ SC_TYPE_STORED, &self->stored_messages);
+ stats_unregister_counter(SCS_AMQP | SCS_DESTINATION,
+ self->super.super.id, afamqp_dd_format_stats_instance(self),
+ SC_TYPE_DROPPED, &self->dropped_messages);
+ stats_unlock();
+ if (!log_dest_driver_deinit_method(s))
+ return FALSE;
+
+ return TRUE;
+}
+
+static void
+afamqp_dd_free(LogPipe *d)
+{
+ AMQPDestDriver *self = (AMQPDestDriver *) d;
+
+ g_mutex_free(self->suspend_mutex);
+ g_mutex_free(self->queue_mutex);
+ g_cond_free(self->writer_thread_wakeup_cond);
+
+ if (self->queue)
+ log_queue_unref(self->queue);
+
+ g_free(self->exchange);
+ g_free(self->exchange_type);
+ log_template_unref(self->routing_key_template);
+ log_template_unref(self->body_template);
+ g_free(self->user);
+ g_free(self->password);
+ g_free(self->host);
+ g_free(self->vhost);
+ g_free(self->entries);
+ if (self->vp)
+ value_pairs_free(self->vp);
+ log_dest_driver_free(d);
+}
+
+static void
+afamqp_dd_queue_notify(gpointer user_data)
+{
+ AMQPDestDriver *self = (AMQPDestDriver *) user_data;
+
+ g_mutex_lock(self->queue_mutex);
+ g_cond_signal(self->writer_thread_wakeup_cond);
+ log_queue_reset_parallel_push(self->queue);
+ g_mutex_unlock(self->queue_mutex);
+}
+
+static void
+afamqp_dd_queue(LogPipe *s, LogMessage *msg,
+ const LogPathOptions *path_options, gpointer user_data)
+{
+ AMQPDestDriver *self = (AMQPDestDriver *) s;
+ LogPathOptions local_options;
+
+ if (!path_options->flow_control_requested)
+ path_options = log_msg_break_ack(msg, path_options, &local_options);
+
+ g_mutex_lock(self->suspend_mutex);
+ g_mutex_lock(self->queue_mutex);
+ if (!self->writer_thread_suspended)
+ log_queue_set_parallel_push(self->queue, 1, afamqp_dd_queue_notify,
+ self, NULL);
+ g_mutex_unlock(self->queue_mutex);
+ g_mutex_unlock(self->suspend_mutex);
+ log_queue_push_tail(self->queue, msg, path_options);
+}
+
+/*
+ * Plugin glue.
+ */
+
+LogDriver *
+afamqp_dd_new(void)
+{
+ AMQPDestDriver *self = g_new0(AMQPDestDriver, 1);
+
+ log_dest_driver_init_instance(&self->super);
+ self->super.super.super.init = afamqp_dd_init;
+ self->super.super.super.deinit = afamqp_dd_deinit;
+ self->super.super.super.queue = afamqp_dd_queue;
+ self->super.super.super.free_fn = afamqp_dd_free;
+
+ self->routing_key_template = log_template_new(configuration, NULL);
+
+ afamqp_dd_set_vhost((LogDriver *) self, "/");
+ afamqp_dd_set_host((LogDriver *) self, "127.0.0.1");
+ afamqp_dd_set_port((LogDriver *) self, 5672);
+ afamqp_dd_set_exchange((LogDriver *) self, "syslog");
+ afamqp_dd_set_exchange_type((LogDriver *) self, "fanout");
+ afamqp_dd_set_routing_key((LogDriver *) self, "");
+ afamqp_dd_set_persistent((LogDriver *) self, TRUE);
+
+ init_sequence_number(&self->seq_num);
+
+ self->writer_thread_wakeup_cond = g_cond_new();
+ self->suspend_mutex = g_mutex_new();
+ self->queue_mutex = g_mutex_new();
+
+ self->max_entries = 256;
+ self->entries = g_new(amqp_table_entry_t, self->max_entries);
+
+ return (LogDriver *) self;
+}
+
+extern CfgParser afamqp_dd_parser;
+
+static Plugin afamqp_plugin =
+{
+ .type = LL_CONTEXT_DESTINATION,
+ .name = "amqp",
+ .parser = &afamqp_parser
+};
+
+gboolean
+afamqp_module_init(GlobalConfig *cfg, CfgArgs *args)
+{
+ plugin_register(cfg, &afamqp_plugin, 1);
+ return TRUE;
+}
+
+const ModuleInfo module_info =
+{
+ .canonical_name = "afamqp",
+ .version = VERSION,
+ .description = "The afamqp module provides AMQP destination support for syslog-ng.",
+ .core_revision = SOURCE_REVISION, .plugins = &afamqp_plugin,
+ .plugins_len = 1,
+};
diff --git a/modules/afamqp/afamqp.h b/modules/afamqp/afamqp.h
new file mode 100644
index 0000000..8e7d7e9
--- /dev/null
+++ b/modules/afamqp/afamqp.h
@@ -0,0 +1,45 @@
+/*
+ * Copyright (c) 2012 Nagy, Attila <bra at fsn.hu>
+ * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary
+ * Copyright (c) 2012 Gergely Nagy <algernon at balabit.hu>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * As an additional exemption you are allowed to compile & link against the
+ * OpenSSL libraries as published by the OpenSSL project. See the file
+ * COPYING for details.
+ *
+ */
+
+#ifndef AFAMQP_H_INCLUDED
+#define AFAMQP_H_INCLUDED
+
+#include "driver.h"
+#include "value-pairs.h"
+
+LogDriver *afamqp_dd_new(void);
+
+void afamqp_dd_set_host(LogDriver *d, const gchar *host);
+void afamqp_dd_set_port(LogDriver *d, gint port);
+void afamqp_dd_set_exchange(LogDriver *d, const gchar *database);
+void afamqp_dd_set_exchange_type(LogDriver *d, const gchar *exchange_type);
+void afamqp_dd_set_vhost(LogDriver *d, const gchar *vhost);
+void afamqp_dd_set_routing_key(LogDriver *d, const gchar *routing_key);
+void afamqp_dd_set_body(LogDriver *d, const gchar *body);
+void afamqp_dd_set_persistent(LogDriver *d, gboolean persistent);
+void afamqp_dd_set_user(LogDriver *d, const gchar *user);
+void afamqp_dd_set_password(LogDriver *d, const gchar *password);
+void afamqp_dd_set_value_pairs(LogDriver *d, ValuePairs *vp);
+
+#endif
diff --git a/modules/afamqp/rabbitmq-c b/modules/afamqp/rabbitmq-c
new file mode 160000
index 0000000..afdfc05
--- /dev/null
+++ b/modules/afamqp/rabbitmq-c
@@ -0,0 +1 @@
+Subproject commit afdfc05b0e974b470130e20b4be6c5728618afd4
--
1.7.10.4
More information about the syslog-ng
mailing list