This driver implements an AMQP destination (based on the rabbitmq-c library), supporting persistence, all the exchange types, and uses a creative way to get messages accross: all the name-value pairs selected with the value-pairs() syntax will be sent as headers, while the message payload can be set with the body() option (empty by default). Most settings have sensible defaults, except for a few, noted below: @module afamqp destination d_amqp { amqp( vhost("/") host("127.0.0.1") port(5672) username("guest") # mandatory, no default password("guest") # mandatory, no default exchange("syslog") exchange_type("fanout") #routing_key("") #body("") persistent(yes) value-pairs( scope("selected-macros" "nv-pairs" "sdata") ) ); }; Publishing the name-value pairs as headers makes it possible to use a headers exchange type and subscribe only to interesting log streams, in a much more flexible way than using the routing_key() option. The routing_key() and body() options can contain any template, they will be expanded before publication. Signed-off-by: Attila Nagy <bra@fsn.hu> Signed-off-by: Gergely Nagy <algernon@balabit.hu> --- .gitmodules | 3 + autogen.sh | 2 +- configure.in | 52 +++ lib/stats.c | 2 + lib/stats.h | 1 + modules/Makefile.am | 2 +- modules/afamqp/Makefile.am | 35 ++ modules/afamqp/afamqp-grammar.ym | 92 +++++ modules/afamqp/afamqp-parser.c | 59 +++ modules/afamqp/afamqp-parser.h | 36 ++ modules/afamqp/afamqp.c | 738 ++++++++++++++++++++++++++++++++++++++ modules/afamqp/afamqp.h | 45 +++ modules/afamqp/rabbitmq-c | 1 + 13 files changed, 1066 insertions(+), 2 deletions(-) create mode 100644 modules/afamqp/Makefile.am create mode 100644 modules/afamqp/afamqp-grammar.ym create mode 100644 modules/afamqp/afamqp-parser.c create mode 100644 modules/afamqp/afamqp-parser.h create mode 100644 modules/afamqp/afamqp.c create mode 100644 modules/afamqp/afamqp.h create mode 160000 modules/afamqp/rabbitmq-c diff --git a/.gitmodules b/.gitmodules index d000782..be86ee5 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,3 +4,6 @@ [submodule "lib/ivykis"] path = lib/ivykis url = https://github.com/buytenh/ivykis.git +[submodule "modules/afamqp/rabbitmq-c"] + path = modules/afamqp/rabbitmq-c + url = https://github.com/alanxz/rabbitmq-c.git diff --git a/autogen.sh b/autogen.sh index fb4151e..5027602 100755 --- a/autogen.sh +++ b/autogen.sh @@ -3,7 +3,7 @@ # This script is needed to setup build environment from checked out # source tree. # -SUBMODULES="lib/ivykis modules/afmongodb/libmongo-client" +SUBMODULES="lib/ivykis modules/afmongodb/libmongo-client modules/afamqp/rabbitmq-c" GIT=`which git` autogen_submodules() diff --git a/configure.in b/configure.in index a23c6e8..2edec69 100644 --- a/configure.in +++ b/configure.in @@ -28,6 +28,7 @@ IVYKIS_MIN_VERSION="0.30.1" JSON_C_MIN_VERSION="0.9" PCRE_MIN_VERSION="6.1" LMC_MIN_VERSION="0.1.6" +LRMQ_MIN_VERSION="0.0.1" dnl *************************************************************************** dnl Initial setup @@ -163,6 +164,15 @@ AC_ARG_WITH(libmongo-client, Link against the system supplied or the built-in libmongo-client library.] ,,with_libmongo_client="internal") +AC_ARG_ENABLE(amqp, + [ --enable-amqp Enable amqp destination (default: auto)] + ,,enable_amqp="auto") + +AC_ARG_WITH(librabbitmq, + [ --with-librabbitmq-client=[system/internal] + Link against the system supplied or the built-in librabbitmq library.] + ,,with_librabbitmq_client="internal") + AC_ARG_WITH(ivykis, [ --with-ivykis=[system/internal] Link against the system supplied or the built-in ivykis library.] @@ -827,6 +837,31 @@ if test "x$enable_smtp" != "xno" && test "x$with_libesmtp" != "no"; then fi dnl *************************************************************************** +dnl rabbitmq-c headers/libraries +dnl *************************************************************************** + +if test "x$with_librabbitmq_client" = "xinternal"; then + if test -f "$srcdir/modules/afamqp/rabbitmq-c/librabbitmq/amqp.h"; then + AC_CONFIG_SUBDIRS([modules/afamqp/rabbitmq-c]) + # these can only be used in modules/amqp as it assumes + # the current directory just one below rabbitmq-c + + LIBRABBITMQ_LIBS="-L\$(builddir)/rabbitmq-c/librabbitmq -lrabbitmq" + LIBRABBITMQ_CFLAGS="-I\$(srcdir)/rabbitmq-c/librabbitmq -I\$(builddir)/rabbitmq-c/librabbitmq" + LIBRABBITMQ_SUBDIRS="rabbitmq-c" + else + AC_MSG_WARN([Internal librabbitmq-client sources not found in modules/afamqp/rabbitmq-c]) + with_librabbitmq_client="no" + fi +elif test "x$with_librabbitmq_client" = "xsystem"; then + PKG_CHECK_MODULES(LIBRABBITMQ, librabbitmq >= $LRMQ_MIN_VERSION,with_librabbitmq_client="yes",with_librabbitmq_client="no") +fi + +if test "x$with_librabbitmq_client" = "xno"; then + enable_amqp="no" +fi + +dnl *************************************************************************** dnl misc features to be enabled dnl *************************************************************************** @@ -889,6 +924,16 @@ if test "x$enable_mongodb" = "xauto"; then AC_MSG_RESULT([$enable_mongodb]) fi +if test "x$enable_amqp" = "xauto"; then + AC_MSG_CHECKING(whether to enable amqp destination support) + if test "x$with_librabbitmq_client" != "no"; then + enable_amqp="yes" + else + enable_amqp="no" + fi + AC_MSG_RESULT([$enable_amqp]) +fi + if test "x$enable_json" != "xno"; then JSON_LIBS=$JSON_C_LIBS JSON_CFLAGS=$JSON_C_CFLAGS @@ -1068,6 +1113,7 @@ AM_CONDITIONAL(ENABLE_SUN_STREAMS, [test "$enable_sun_streams" = "yes"]) AM_CONDITIONAL(ENABLE_PACCT, [test "$enable_pacct" = "yes"]) AM_CONDITIONAL(ENABLE_MONGODB, [test "$enable_mongodb" = "yes"]) AM_CONDITIONAL(ENABLE_SMTP, [test "$enable_smtp" = "yes"]) +AM_CONDITIONAL(ENABLE_AMQP, [test "$enable_amqp" = "yes"]) AM_CONDITIONAL(ENABLE_JSON, [test "$enable_json" = "yes"]) AM_CONDITIONAL(WITH_LIBSYSTEMD, [test "$with_libsystemd" = "yes"]) @@ -1102,6 +1148,9 @@ AC_SUBST(LIBMONGO_CFLAGS) AC_SUBST(LIBMONGO_SUBDIRS) AC_SUBST(LIBESMTP_CFLAGS) AC_SUBST(LIBESMTP_LIBS) +AC_SUBST(LIBRABBITMQ_LIBS) +AC_SUBST(LIBRABBITMQ_CFLAGS) +AC_SUBST(LIBRABBITMQ_SUBDIRS) AC_SUBST(JSON_LIBS) AC_SUBST(JSON_CFLAGS) AC_SUBST(IVYKIS_SUBDIRS) @@ -1132,6 +1181,7 @@ AC_OUTPUT(dist.conf modules/afuser/Makefile modules/afmongodb/Makefile modules/afsmtp/Makefile + modules/afamqp/Makefile modules/dbparser/Makefile modules/dbparser/tests/Makefile modules/csvparser/Makefile @@ -1171,6 +1221,7 @@ echo " __thread keyword : ${ac_cv_have_tls:=no}" echo " Submodules:" echo " ivykis : $with_ivykis" echo " libmongo-client : $with_libmongo_client" +echo " librabbitmq : $with_librabbitmq_client" echo " Features:" echo " Debug symbols : ${enable_debug:=no}" echo " GCC profiling : ${enable_gprof:=no}" @@ -1191,4 +1242,5 @@ echo " PACCT module (EXPERIMENTAL) : ${enable_pacct:=no}" echo " MongoDB destination (module): ${enable_mongodb:=no}" echo " JSON support (module) : ${enable_json:=no}" echo " SMTP support (module) : ${enable_smtp:=no}" +echo " AMQP destination (module) : ${enable_amqp:=no}" diff --git a/lib/stats.c b/lib/stats.c index 0ccc283..f757597 100644 --- a/lib/stats.c +++ b/lib/stats.c @@ -383,6 +383,8 @@ const gchar *source_names[SCS_MAX] = "severity", "facility", "sender", + "smtp", + "amqp", }; diff --git a/lib/stats.h b/lib/stats.h index 9174408..2550724 100644 --- a/lib/stats.h +++ b/lib/stats.h @@ -71,6 +71,7 @@ enum SCS_FACILITY = 25, SCS_SENDER = 26, SCS_SMTP = 27, + SCS_AMQP = 28, SCS_MAX, SCS_SOURCE_MASK = 0xff }; diff --git a/modules/Makefile.am b/modules/Makefile.am index 8f9d63b..1fce2d2 100644 --- a/modules/Makefile.am +++ b/modules/Makefile.am @@ -1 +1 @@ -SUBDIRS = afsocket afsql afstreams affile afprog afuser afmongodb afsmtp csvparser confgen system-source syslogformat pacctformat basicfuncs cryptofuncs dbparser json +SUBDIRS = afsocket afsql afstreams affile afprog afuser afamqp afmongodb afsmtp csvparser confgen system-source syslogformat pacctformat basicfuncs cryptofuncs dbparser json diff --git a/modules/afamqp/Makefile.am b/modules/afamqp/Makefile.am new file mode 100644 index 0000000..ccf20df --- /dev/null +++ b/modules/afamqp/Makefile.am @@ -0,0 +1,35 @@ + +SUBDIRS = @LIBRABBITMQ_SUBDIRS@ +DIST_SUBDIRS = rabbitmq-c + +moduledir = @moduledir@ +AM_CPPFLAGS = -I$(top_srcdir)/lib -I../../lib +module_LTLIBRARIES = libafamqp.la + +export top_srcdir + +if ENABLE_AMQP + +libafamqp_la_CFLAGS = $(LIBRABBITMQ_CFLAGS) +libafamqp_la_SOURCES = afamqp-grammar.y afamqp.c afamqp.h afamqp-parser.c afamqp-parser.h +libafamqp_la_LIBADD = $(MODULE_DEPS_LIBS) $(LIBRABBITMQ_LIBS) +libafamqp_la_LDFLAGS = $(MODULE_LDFLAGS) + +endif + +BUILT_SOURCES = afamqp-grammar.y afamqp-grammar.c afamqp-grammar.h +EXTRA_DIST = $(BUILT_SOURCES) afamqp-grammar.ym + +include $(top_srcdir)/build/lex-rules.am + +# divert install/uninstall targets to avoid recursing into $(SUBDIRS) + +install: + $(MAKE) $(AM_MAKEFLAGS) all + $(MAKE) $(AM_MAKEFLAGS) install-am + +uninstall: + $(MAKE) $(AM_MAKEFLAGS) uninstall-am + +check: + echo "Make check disabled, since it requires a newer glib" diff --git a/modules/afamqp/afamqp-grammar.ym b/modules/afamqp/afamqp-grammar.ym new file mode 100644 index 0000000..b325b3d --- /dev/null +++ b/modules/afamqp/afamqp-grammar.ym @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2012 Nagy, Attila <bra@fsn.hu> + * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary + * Copyright (c) 2012 Gergely Nagy <algernon@balabit.hu> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +%code requires { + +#include "afamqp-parser.h" + +} + +%code { + +#include "cfg-parser.h" +#include "afamqp-grammar.h" +#include "plugin.h" +#include "vptransform.h" + +extern LogDriver *last_driver; +extern ValuePairs *last_value_pairs; +extern ValuePairsTransformSet *last_vp_transset; +} + +%name-prefix "afamqp_" +%lex-param {CfgLexer *lexer} +%parse-param {CfgLexer *lexer} +%parse-param {LogDriver **instance} +%parse-param {gpointer arg} + + +/* INCLUDE_DECLS */ + +%token KW_AMQP +%token KW_EXCHANGE +%token KW_EXCHANGE_TYPE +%token KW_PERSISTENT +%token KW_VHOST +%token KW_ROUTING_KEY +%token KW_BODY + +%% + +start + : LL_CONTEXT_DESTINATION KW_AMQP + { + last_driver = *instance = afamqp_dd_new(); + } + '(' afamqp_options ')' { YYACCEPT; } + ; + +afamqp_options + : afamqp_option afamqp_options + | + ; + +afamqp_option + : KW_HOST '(' string ')' { afamqp_dd_set_host(last_driver, $3); free($3); } + | KW_PORT '(' LL_NUMBER ')' { afamqp_dd_set_port(last_driver, $3); } + | KW_VHOST '(' string ')' { afamqp_dd_set_vhost(last_driver, $3); free($3); } + | KW_EXCHANGE '(' string ')' { afamqp_dd_set_exchange(last_driver, $3); free($3); } + | KW_EXCHANGE_TYPE '(' string ')' { afamqp_dd_set_exchange_type(last_driver, $3); free($3); } + | KW_ROUTING_KEY '(' string ')' { afamqp_dd_set_routing_key(last_driver, $3); free($3); } + | KW_BODY '(' string ')' { afamqp_dd_set_body(last_driver, $3); free($3); } + | KW_PERSISTENT '(' yesno ')' { afamqp_dd_set_persistent(last_driver, $3); } + | KW_USERNAME '(' string ')' { afamqp_dd_set_user(last_driver, $3); free($3); } + | KW_PASSWORD '(' string ')' { afamqp_dd_set_password(last_driver, $3); free($3); } + | value_pair_option { afamqp_dd_set_value_pairs(last_driver, $1); } + | dest_driver_option + ; + +/* INCLUDE_RULES */ + +%% diff --git a/modules/afamqp/afamqp-parser.c b/modules/afamqp/afamqp-parser.c new file mode 100644 index 0000000..f082f32 --- /dev/null +++ b/modules/afamqp/afamqp-parser.c @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2012 Nagy, Attila <bra@fsn.hu> + * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary + * Copyright (c) 2012 Gergely Nagy <algernon@balabit.hu> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#include "afamqp.h" +#include "cfg-parser.h" +#include "afamqp-grammar.h" + +extern int afamqp_debug; +int afamqp_parse(CfgLexer *lexer, LogDriver **instance, gpointer arg); + +static CfgLexerKeyword afamqp_keywords[] = { + { "amqp", KW_AMQP }, + { "vhost", KW_VHOST }, + { "host", KW_HOST }, + { "port", KW_PORT }, + { "exchange", KW_EXCHANGE }, + { "exchange_type", KW_EXCHANGE_TYPE }, + { "routing_key", KW_ROUTING_KEY }, + { "persistent", KW_PERSISTENT }, + { "username", KW_USERNAME }, + { "password", KW_PASSWORD }, + { "log_fifo_size", KW_LOG_FIFO_SIZE }, + { "body", KW_BODY }, + { NULL } +}; + +CfgParser afamqp_parser = +{ +#if ENABLE_DEBUG + .debug_flag = &afamqp_debug, +#endif + .name = "afamqp", + .keywords = afamqp_keywords, + .parse = (int (*)(CfgLexer *lexer, gpointer *instance, gpointer)) afamqp_parse, + .cleanup = (void (*)(gpointer)) log_pipe_unref, +}; + +CFG_PARSER_IMPLEMENT_LEXER_BINDING(afamqp_, LogDriver **) diff --git a/modules/afamqp/afamqp-parser.h b/modules/afamqp/afamqp-parser.h new file mode 100644 index 0000000..d75de63 --- /dev/null +++ b/modules/afamqp/afamqp-parser.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2012 Nagy, Attila <bra@fsn.hu> + * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary + * Copyright (c) 2012 Gergely Nagy <algernon@balabit.hu> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#ifndef AFAMQP_PARSER_H_INCLUDED +#define AFAMQP_PARSER_H_INCLUDED + +#include "cfg-parser.h" +#include "cfg-lexer.h" +#include "afamqp.h" + +extern CfgParser afamqp_parser; + +CFG_PARSER_DECLARE_LEXER_BINDING(afamqp_, LogDriver **) + +#endif diff --git a/modules/afamqp/afamqp.c b/modules/afamqp/afamqp.c new file mode 100644 index 0000000..a9d65d4 --- /dev/null +++ b/modules/afamqp/afamqp.c @@ -0,0 +1,738 @@ +/* + * Copyright (c) 2012 Nagy, Attila <bra@fsn.hu> + * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary + * Copyright (c) 2012 Gergely Nagy <algernon@balabit.hu> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#include "afamqp.h" +#include "afamqp-parser.h" +#include "plugin.h" +#include "messages.h" +#include "misc.h" +#include "stats.h" +#include "nvtable.h" +#include "logqueue.h" +#include "scratch-buffers.h" + +#include <amqp.h> +#include <amqp_framing.h> + +typedef struct +{ + LogDestDriver super; + + /* Shared between main/writer; only read by the writer, never written */ + gchar *exchange; + gchar *exchange_type; + LogTemplate *routing_key_template; + LogTemplate *body_template; + + gint persistent; + + gchar *vhost; + gchar *host; + gint port; + + gchar *user; + gchar *password; + + time_t time_reopen; + + StatsCounterItem *dropped_messages; + StatsCounterItem *stored_messages; + + ValuePairs *vp; + + /* Thread related stuff; shared */ + GThread *writer_thread; + GMutex *queue_mutex; + GMutex *suspend_mutex; + GCond *writer_thread_wakeup_cond; + + gboolean writer_thread_terminate; + gboolean writer_thread_suspended; + GTimeVal writer_thread_suspend_target; + + LogQueue *queue; + + /* Writer-only stuff */ + amqp_connection_state_t conn; + amqp_table_entry_t *entries; + gint32 max_entries; + gint32 seq_num; +} AMQPDestDriver; + +/* + * Configuration + */ + +void +afamqp_dd_set_user(LogDriver *d, const gchar *user) +{ + AMQPDestDriver *self = (AMQPDestDriver *) d; + + g_free(self->user); + self->user = g_strdup(user); +} + +void +afamqp_dd_set_password(LogDriver *d, const gchar *password) +{ + AMQPDestDriver *self = (AMQPDestDriver *) d; + + g_free(self->password); + self->password = g_strdup(password); +} + +void +afamqp_dd_set_vhost(LogDriver *d, const gchar *vhost) +{ + AMQPDestDriver *self = (AMQPDestDriver *) d; + + g_free(self->vhost); + self->vhost = g_strdup(vhost); +} + +void +afamqp_dd_set_host(LogDriver *d, const gchar *host) +{ + AMQPDestDriver *self = (AMQPDestDriver *) d; + + g_free(self->host); + self->host = g_strdup(host); +} + +void +afamqp_dd_set_port(LogDriver *d, gint port) +{ + AMQPDestDriver *self = (AMQPDestDriver *) d; + + self->port = (int) port; +} + +void +afamqp_dd_set_exchange(LogDriver *d, const gchar *exchange) +{ + AMQPDestDriver *self = (AMQPDestDriver *) d; + + g_free(self->exchange); + self->exchange = g_strdup(exchange); +} + +void +afamqp_dd_set_exchange_type(LogDriver *d, const gchar *exchange_type) +{ + AMQPDestDriver *self = (AMQPDestDriver *) d; + + g_free(self->exchange_type); + self->exchange_type = g_strdup(exchange_type); +} + +void +afamqp_dd_set_routing_key(LogDriver *d, const gchar *routing_key) +{ + AMQPDestDriver *self = (AMQPDestDriver *) d; + + log_template_compile(self->routing_key_template, routing_key, NULL); +} + +void +afamqp_dd_set_body(LogDriver *d, const gchar *body) +{ + AMQPDestDriver *self = (AMQPDestDriver *) d; + + if (!self->body_template) + self->body_template = log_template_new(configuration, NULL); + log_template_compile(self->body_template, body, NULL); +} + +void +afamqp_dd_set_persistent(LogDriver *s, gboolean persistent) +{ + AMQPDestDriver *self = (AMQPDestDriver *) s; + + if (persistent) + self->persistent = 2; + else + self->persistent = 1; +} + +void +afamqp_dd_set_value_pairs(LogDriver *d, ValuePairs *vp) +{ + AMQPDestDriver *self = (AMQPDestDriver *) d; + + if (self->vp) + value_pairs_free(self->vp); + self->vp = vp; +} + +/* + * Utilities + */ + +static gchar * +afamqp_dd_format_stats_instance(AMQPDestDriver *self) +{ + static gchar persist_name[1024]; + + g_snprintf(persist_name, sizeof(persist_name), "amqp,%s,%s,%u,%s,%s", + self->vhost, self->host, self->port, self->exchange, + self->exchange_type); + return persist_name; +} + +static gchar * +afamqp_dd_format_persist_name(AMQPDestDriver *self) +{ + static gchar persist_name[1024]; + + g_snprintf(persist_name, sizeof(persist_name), "afamqp(%s,%s,%u,%s,%s)", + self->vhost, self->host, self->port, self->exchange, + self->exchange_type); + return persist_name; +} + +static void +afamqp_dd_suspend(AMQPDestDriver *self) +{ + self->writer_thread_suspended = TRUE; + g_get_current_time(&self->writer_thread_suspend_target); + g_time_val_add(&self->writer_thread_suspend_target, + self->time_reopen * 1000000); +} + +static void +afamqp_dd_disconnect(AMQPDestDriver *self) +{ + amqp_channel_close(self->conn, 1, AMQP_REPLY_SUCCESS); + amqp_connection_close(self->conn, AMQP_REPLY_SUCCESS); + amqp_destroy_connection(self->conn); + self->conn = NULL; +} + +static gboolean +afamqp_is_ok(AMQPDestDriver *self, gchar *context, amqp_rpc_reply_t ret) +{ + switch (ret.reply_type) + { + case AMQP_RESPONSE_NORMAL: + break; + + case AMQP_RESPONSE_NONE: + msg_error(context, + evt_tag_str("driver", self->super.super.id), + evt_tag_str("error", "missing RPC reply type"), + evt_tag_int("time_reopen", self->time_reopen), + NULL); + afamqp_dd_suspend(self); + return FALSE; + + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + { + gchar *errstr = amqp_error_string(ret.library_error); + msg_error(context, + evt_tag_str("driver", self->super.super.id), + evt_tag_str("error", errstr), + evt_tag_int("time_reopen", self->time_reopen), + NULL); + g_free (errstr); + afamqp_dd_suspend(self); + return FALSE; + } + + case AMQP_RESPONSE_SERVER_EXCEPTION: + switch (ret.reply.id) + { + case AMQP_CONNECTION_CLOSE_METHOD: + { + amqp_connection_close_t *m = + (amqp_connection_close_t *) ret.reply.decoded; + msg_error(context, + evt_tag_str("driver", self->super.super.id), + evt_tag_str("error", "server connection error"), + evt_tag_int("code", m->reply_code), + evt_tag_str("text", m->reply_text.bytes), + evt_tag_int("time_reopen", self->time_reopen), + NULL); + afamqp_dd_suspend(self); + return FALSE; + } + case AMQP_CHANNEL_CLOSE_METHOD: + { + amqp_channel_close_t *m = + (amqp_channel_close_t *) ret.reply.decoded; + msg_error(context, + evt_tag_str("driver", self->super.super.id), + evt_tag_str("error", "server channel error"), + evt_tag_int("code", m->reply_code), + evt_tag_str("text", m->reply_text.bytes), + evt_tag_int("time_reopen", self->time_reopen), + NULL); + afamqp_dd_suspend(self); + return FALSE; + } + default: + msg_error(context, + evt_tag_str("driver", self->super.super.id), + evt_tag_str("error", "unknown server error"), + evt_tag_printf("method id", "0x%08X", ret.reply.id), + evt_tag_int("time_reopen", self->time_reopen), + NULL); + afamqp_dd_suspend(self); + return FALSE; + } + return FALSE; + } + return TRUE; +} + +static gboolean +afamqp_dd_connect(AMQPDestDriver *self, gboolean reconnect) +{ + int sockfd; + amqp_rpc_reply_t ret; + + if (reconnect && self->conn) + { + ret = amqp_get_rpc_reply(self->conn); + if (ret.reply_type == AMQP_RESPONSE_NORMAL) + return TRUE; + } + + self->conn = amqp_new_connection(); + sockfd = amqp_open_socket(self->host, self->port); + if (sockfd < 0) + { + gchar *errstr = amqp_error_string(-sockfd); + msg_error("Error connecting to AMQP server", + evt_tag_str("driver", self->super.super.id), + evt_tag_str("error", errstr), + evt_tag_int("time_reopen", self->time_reopen), + NULL); + g_free(errstr); + return FALSE; + } + amqp_set_sockfd(self->conn, sockfd); + + ret = amqp_login(self->conn, self->vhost, 0, 131072, 0, + AMQP_SASL_METHOD_PLAIN, self->user, self->password); + if (!afamqp_is_ok(self, "Error during AMQP login", ret)) + return FALSE; + + amqp_channel_open(self->conn, 1); + ret = amqp_get_rpc_reply(self->conn); + if (!afamqp_is_ok(self, "Error during AMQP channel open", ret)) + return FALSE; + + amqp_exchange_declare(self->conn, 1, amqp_cstring_bytes(self->exchange), + amqp_cstring_bytes(self->exchange_type), 0, 0, amqp_empty_table); + ret = amqp_get_rpc_reply(self->conn); + if (!afamqp_is_ok(self, "Error during AMQP exchange declaration", ret)) + return FALSE; + + msg_debug ("Connecting to AMQP succeeded", + evt_tag_str("driver", self->super.super.id), + NULL); + + return TRUE; +} + +/* + * Worker thread + */ + +static gboolean +afamqp_vp_foreach(const gchar *name, const gchar *value, + gpointer user_data) +{ + amqp_table_entry_t **entries = (amqp_table_entry_t **) ((gpointer *)user_data)[0]; + gint *pos = (gint *) ((gpointer *)user_data)[1]; + gint32 *max_size = (gint32 *) ((gpointer *)user_data)[2]; + + if (*pos == *max_size) + { + *max_size *= 2; + *entries = g_renew(amqp_table_entry_t, *entries, *max_size); + } + + (*entries)[*pos].key = amqp_cstring_bytes(strdup(name)); + (*entries)[*pos].value.kind = AMQP_FIELD_KIND_UTF8; + (*entries)[*pos].value.value.bytes = amqp_cstring_bytes(strdup(value)); + + (*pos)++; + + return FALSE; +} + +static gboolean +afamqp_worker_publish(AMQPDestDriver *self, LogMessage *msg) +{ + gint pos = 0, ret; + amqp_table_t table; + amqp_basic_properties_t props; + gboolean success = TRUE; + ScratchBuffer *routing_key = scratch_buffer_acquire(); + ScratchBuffer *body = scratch_buffer_acquire(); + amqp_bytes_t body_bytes = amqp_cstring_bytes(""); + + gpointer user_data[] = { &self->entries, &pos, &self->max_entries }; + + value_pairs_foreach(self->vp, afamqp_vp_foreach, msg, self->seq_num, + user_data); + + table.num_entries = pos; + table.entries = self->entries; + + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG + | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_HEADERS_FLAG; + props.content_type = amqp_cstring_bytes("text/plain"); + props.delivery_mode = self->persistent; + props.headers = table; + + log_template_format(self->routing_key_template, msg, NULL, LTZ_LOCAL, + self->seq_num, NULL, sb_string(routing_key)); + + if (self->body_template) + { + log_template_format(self->body_template, msg, NULL, LTZ_LOCAL, + self->seq_num, NULL, sb_string(body)); + body_bytes = amqp_cstring_bytes(sb_string(body)->str); + } + + ret = amqp_basic_publish(self->conn, 1, amqp_cstring_bytes(self->exchange), + amqp_cstring_bytes(sb_string(routing_key)->str), + 0, 0, &props, body_bytes); + + scratch_buffer_release(routing_key); + scratch_buffer_release(body); + + if (ret < 0) + { + msg_error("Network error while inserting into AMQP server", + evt_tag_int("time_reopen", self->time_reopen), NULL); + success = FALSE; + } + + while (--pos >= 0) + { + amqp_bytes_free(self->entries[pos].key); + amqp_bytes_free(self->entries[pos].value.value.bytes); + } + + return success; +} + +static gboolean +afamqp_worker_insert(AMQPDestDriver *self) +{ + gboolean success; + LogMessage *msg; + LogPathOptions path_options = LOG_PATH_OPTIONS_INIT; + + afamqp_dd_connect(self, TRUE); + + g_mutex_lock(self->queue_mutex); + log_queue_reset_parallel_push(self->queue); + success = log_queue_pop_head(self->queue, &msg, &path_options, FALSE, FALSE); + g_mutex_unlock(self->queue_mutex); + if (!success) + return TRUE; + + msg_set_context(msg); + success = afamqp_worker_publish (self, msg); + msg_set_context(NULL); + + if (success) + { + stats_counter_inc(self->stored_messages); + step_sequence_number(&self->seq_num); + log_msg_ack(msg, &path_options); + log_msg_unref(msg); + } + else + { + g_mutex_lock(self->queue_mutex); + log_queue_push_head(self->queue, msg, &path_options); + g_mutex_unlock(self->queue_mutex); + } + + return success; +} + +static gpointer +afamqp_worker_thread(gpointer arg) +{ + AMQPDestDriver *self = (AMQPDestDriver *) arg; + gboolean success; + + msg_debug("Worker thread started", + evt_tag_str("driver", self->super.super.id), NULL); + + success = afamqp_dd_connect(self, FALSE); + + while (!self->writer_thread_terminate) + { + g_mutex_lock(self->suspend_mutex); + if (self->writer_thread_suspended) + { + g_cond_timed_wait(self->writer_thread_wakeup_cond, + self->suspend_mutex, &self->writer_thread_suspend_target); + self->writer_thread_suspended = FALSE; + g_mutex_unlock(self->suspend_mutex); + } + else + { + g_mutex_unlock(self->suspend_mutex); + + g_mutex_lock(self->queue_mutex); + if (log_queue_get_length(self->queue) == 0) + { + g_cond_wait(self->writer_thread_wakeup_cond, self->queue_mutex); + } + g_mutex_unlock(self->queue_mutex); + } + + if (self->writer_thread_terminate) + break; + + if (!afamqp_worker_insert(self)) + { + afamqp_dd_disconnect(self); + afamqp_dd_suspend(self); + } + } + + afamqp_dd_disconnect(self); + + msg_debug("Worker thread finished", + evt_tag_str("driver", self->super.super.id), NULL); + + return NULL; +} + +/* + * Main thread + */ + +static void +afamqp_dd_start_thread(AMQPDestDriver *self) +{ + self->writer_thread = create_worker_thread(afamqp_worker_thread, self, + TRUE, NULL); +} + +static void +afamqp_dd_stop_thread(AMQPDestDriver *self) +{ + self->writer_thread_terminate = TRUE; + g_mutex_lock(self->queue_mutex); + g_cond_signal(self->writer_thread_wakeup_cond); + g_mutex_unlock(self->queue_mutex); + g_thread_join(self->writer_thread); +} + +static gboolean +afamqp_dd_init(LogPipe *s) +{ + AMQPDestDriver *self = (AMQPDestDriver *) s; + GlobalConfig *cfg = log_pipe_get_config(s); + + if (!log_dest_driver_init_method(s)) + return FALSE; + + if (cfg) + self->time_reopen = cfg->time_reopen; + + if (!self->vp) + { + self->vp = value_pairs_new(); + value_pairs_add_scope(self->vp, "selected-macros"); + value_pairs_add_scope(self->vp, "nv-pairs"); + value_pairs_add_scope(self->vp, "sdata"); + } + + msg_verbose("Initializing AMQP destination", + evt_tag_str("vhost", self->vhost), + evt_tag_str("host", self->host), + evt_tag_int("port", self->port), + evt_tag_str("exchange", self->exchange), + evt_tag_str("exchange_type", self->exchange_type), + NULL); + + self->queue = log_dest_driver_acquire_queue(&self->super, afamqp_dd_format_persist_name(self)); + + stats_lock(); + stats_register_counter(0, SCS_AMQP | SCS_DESTINATION, + self->super.super.id, afamqp_dd_format_stats_instance(self), + SC_TYPE_STORED, &self->stored_messages); + stats_register_counter(0, SCS_AMQP | SCS_DESTINATION, + self->super.super.id, afamqp_dd_format_stats_instance(self), + SC_TYPE_DROPPED, &self->dropped_messages); + stats_unlock(); + + log_queue_set_counters(self->queue, self->stored_messages, + self->dropped_messages); + afamqp_dd_start_thread(self); + + return TRUE; +} + +static gboolean +afamqp_dd_deinit(LogPipe *s) +{ + AMQPDestDriver *self = (AMQPDestDriver *) s; + + afamqp_dd_stop_thread(self); + + log_queue_set_counters(self->queue, NULL, NULL); + stats_lock(); + stats_unregister_counter(SCS_AMQP | SCS_DESTINATION, + self->super.super.id, afamqp_dd_format_stats_instance(self), + SC_TYPE_STORED, &self->stored_messages); + stats_unregister_counter(SCS_AMQP | SCS_DESTINATION, + self->super.super.id, afamqp_dd_format_stats_instance(self), + SC_TYPE_DROPPED, &self->dropped_messages); + stats_unlock(); + if (!log_dest_driver_deinit_method(s)) + return FALSE; + + return TRUE; +} + +static void +afamqp_dd_free(LogPipe *d) +{ + AMQPDestDriver *self = (AMQPDestDriver *) d; + + g_mutex_free(self->suspend_mutex); + g_mutex_free(self->queue_mutex); + g_cond_free(self->writer_thread_wakeup_cond); + + if (self->queue) + log_queue_unref(self->queue); + + g_free(self->exchange); + g_free(self->exchange_type); + log_template_unref(self->routing_key_template); + log_template_unref(self->body_template); + g_free(self->user); + g_free(self->password); + g_free(self->host); + g_free(self->vhost); + g_free(self->entries); + if (self->vp) + value_pairs_free(self->vp); + log_dest_driver_free(d); +} + +static void +afamqp_dd_queue_notify(gpointer user_data) +{ + AMQPDestDriver *self = (AMQPDestDriver *) user_data; + + g_mutex_lock(self->queue_mutex); + g_cond_signal(self->writer_thread_wakeup_cond); + log_queue_reset_parallel_push(self->queue); + g_mutex_unlock(self->queue_mutex); +} + +static void +afamqp_dd_queue(LogPipe *s, LogMessage *msg, + const LogPathOptions *path_options, gpointer user_data) +{ + AMQPDestDriver *self = (AMQPDestDriver *) s; + LogPathOptions local_options; + + if (!path_options->flow_control_requested) + path_options = log_msg_break_ack(msg, path_options, &local_options); + + g_mutex_lock(self->suspend_mutex); + g_mutex_lock(self->queue_mutex); + if (!self->writer_thread_suspended) + log_queue_set_parallel_push(self->queue, 1, afamqp_dd_queue_notify, + self, NULL); + g_mutex_unlock(self->queue_mutex); + g_mutex_unlock(self->suspend_mutex); + log_queue_push_tail(self->queue, msg, path_options); +} + +/* + * Plugin glue. + */ + +LogDriver * +afamqp_dd_new(void) +{ + AMQPDestDriver *self = g_new0(AMQPDestDriver, 1); + + log_dest_driver_init_instance(&self->super); + self->super.super.super.init = afamqp_dd_init; + self->super.super.super.deinit = afamqp_dd_deinit; + self->super.super.super.queue = afamqp_dd_queue; + self->super.super.super.free_fn = afamqp_dd_free; + + self->routing_key_template = log_template_new(configuration, NULL); + + afamqp_dd_set_vhost((LogDriver *) self, "/"); + afamqp_dd_set_host((LogDriver *) self, "127.0.0.1"); + afamqp_dd_set_port((LogDriver *) self, 5672); + afamqp_dd_set_exchange((LogDriver *) self, "syslog"); + afamqp_dd_set_exchange_type((LogDriver *) self, "fanout"); + afamqp_dd_set_routing_key((LogDriver *) self, ""); + afamqp_dd_set_persistent((LogDriver *) self, TRUE); + + init_sequence_number(&self->seq_num); + + self->writer_thread_wakeup_cond = g_cond_new(); + self->suspend_mutex = g_mutex_new(); + self->queue_mutex = g_mutex_new(); + + self->max_entries = 256; + self->entries = g_new(amqp_table_entry_t, self->max_entries); + + return (LogDriver *) self; +} + +extern CfgParser afamqp_dd_parser; + +static Plugin afamqp_plugin = +{ + .type = LL_CONTEXT_DESTINATION, + .name = "amqp", + .parser = &afamqp_parser +}; + +gboolean +afamqp_module_init(GlobalConfig *cfg, CfgArgs *args) +{ + plugin_register(cfg, &afamqp_plugin, 1); + return TRUE; +} + +const ModuleInfo module_info = +{ + .canonical_name = "afamqp", + .version = VERSION, + .description = "The afamqp module provides AMQP destination support for syslog-ng.", + .core_revision = SOURCE_REVISION, .plugins = &afamqp_plugin, + .plugins_len = 1, +}; diff --git a/modules/afamqp/afamqp.h b/modules/afamqp/afamqp.h new file mode 100644 index 0000000..8e7d7e9 --- /dev/null +++ b/modules/afamqp/afamqp.h @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2012 Nagy, Attila <bra@fsn.hu> + * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary + * Copyright (c) 2012 Gergely Nagy <algernon@balabit.hu> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#ifndef AFAMQP_H_INCLUDED +#define AFAMQP_H_INCLUDED + +#include "driver.h" +#include "value-pairs.h" + +LogDriver *afamqp_dd_new(void); + +void afamqp_dd_set_host(LogDriver *d, const gchar *host); +void afamqp_dd_set_port(LogDriver *d, gint port); +void afamqp_dd_set_exchange(LogDriver *d, const gchar *database); +void afamqp_dd_set_exchange_type(LogDriver *d, const gchar *exchange_type); +void afamqp_dd_set_vhost(LogDriver *d, const gchar *vhost); +void afamqp_dd_set_routing_key(LogDriver *d, const gchar *routing_key); +void afamqp_dd_set_body(LogDriver *d, const gchar *body); +void afamqp_dd_set_persistent(LogDriver *d, gboolean persistent); +void afamqp_dd_set_user(LogDriver *d, const gchar *user); +void afamqp_dd_set_password(LogDriver *d, const gchar *password); +void afamqp_dd_set_value_pairs(LogDriver *d, ValuePairs *vp); + +#endif diff --git a/modules/afamqp/rabbitmq-c b/modules/afamqp/rabbitmq-c new file mode 160000 index 0000000..afdfc05 --- /dev/null +++ b/modules/afamqp/rabbitmq-c @@ -0,0 +1 @@ +Subproject commit afdfc05b0e974b470130e20b4be6c5728618afd4 -- 1.7.10.4