[PATCH 0/2]: afamqp: AMQP destination driver
The two patches that will follow implement an AMQP destination driver for syslog-ng, contributed by Attila Nagy <bra@fsn.hu>, with a few minor enhancements and porting to syslog-ng 3.4 by myself. The patch is also available on the feature/3.4/amqp[1] and feature/3.3/amqp[2] branches of my git repository, for syslog-ng 3.4 and 3.3 respectively. Functionally, they're exactly the same, but all further improvements and fixes will go to the 3.4 branch only, the 3.3 branch is there for testing and historical purposes only, and will be removed sometime after the driver gets merged to 3.4. Now, I won't explain in detail what AMQP is, let it suffice that it is a messaging protocol, of which RabbitMQ[3] is an implementation of. Interested people can read the available documentation, there's plenty! For now, lets concentrate on how to take the driver for a test drive! Once rabbitmq is installed (on debian and derivatives of recent versions it is as simple as apt-get install rabbitmq-server), all one needs to do, is configure syslog-ng to send messages, and grab a client that can look at the queue. For testing purposes, there's a simple one at https://gist.github.com/3859756 - you'll need a couple of python modules for that. What it does, is it sets up a queue named 'test', and binds it to the 'syslog' exchange (the default used by the AMQP driver), and starts consuming messages. It prints both headers and the payload (if any, by default, there is no payload). What one needs to know about the driver, is that it does not declare any queues: it's the responsibility of the admin to set queues up, and route messages as they see fit. This can be done on the RabbitMQ web admin interface (again, see the docs about that!), among other things, or programmatically, like the script above does. Hopefully, this is enough to get one started! If not, I plan to write a more detailed blog post about the topic in the not too distant future. There are plans to write an accompanying source driver too, but work has not started on that front yet. Rest assured though, it will come. I'd like to thank Attila again for contributing the driver, and answering my silly questions during review - both are much appreciated! [1]: https://github.com/algernon/syslog-ng/tree/feature/3.4/amqp [2]: https://github.com/algernon/syslog-ng/tree/feature/3.3/amqp [3]: http://www.rabbitmq.com/
This driver implements an AMQP destination (based on the rabbitmq-c library), supporting persistence, all the exchange types, and uses a creative way to get messages accross: all the name-value pairs selected with the value-pairs() syntax will be sent as headers, while the message payload can be set with the body() option (empty by default). Most settings have sensible defaults, except for a few, noted below: @module afamqp destination d_amqp { amqp( vhost("/") host("127.0.0.1") port(5672) username("guest") # mandatory, no default password("guest") # mandatory, no default exchange("syslog") exchange_type("fanout") #routing_key("") #body("") persistent(yes) value-pairs( scope("selected-macros" "nv-pairs" "sdata") ) ); }; Publishing the name-value pairs as headers makes it possible to use a headers exchange type and subscribe only to interesting log streams, in a much more flexible way than using the routing_key() option. The routing_key() and body() options can contain any template, they will be expanded before publication. Signed-off-by: Attila Nagy <bra@fsn.hu> Signed-off-by: Gergely Nagy <algernon@balabit.hu> --- .gitmodules | 3 + autogen.sh | 2 +- configure.in | 52 +++ lib/stats.c | 2 + lib/stats.h | 1 + modules/Makefile.am | 2 +- modules/afamqp/Makefile.am | 35 ++ modules/afamqp/afamqp-grammar.ym | 92 +++++ modules/afamqp/afamqp-parser.c | 59 +++ modules/afamqp/afamqp-parser.h | 36 ++ modules/afamqp/afamqp.c | 738 ++++++++++++++++++++++++++++++++++++++ modules/afamqp/afamqp.h | 45 +++ modules/afamqp/rabbitmq-c | 1 + 13 files changed, 1066 insertions(+), 2 deletions(-) create mode 100644 modules/afamqp/Makefile.am create mode 100644 modules/afamqp/afamqp-grammar.ym create mode 100644 modules/afamqp/afamqp-parser.c create mode 100644 modules/afamqp/afamqp-parser.h create mode 100644 modules/afamqp/afamqp.c create mode 100644 modules/afamqp/afamqp.h create mode 160000 modules/afamqp/rabbitmq-c diff --git a/.gitmodules b/.gitmodules index d000782..be86ee5 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,3 +4,6 @@ [submodule "lib/ivykis"] path = lib/ivykis url = https://github.com/buytenh/ivykis.git +[submodule "modules/afamqp/rabbitmq-c"] + path = modules/afamqp/rabbitmq-c + url = https://github.com/alanxz/rabbitmq-c.git diff --git a/autogen.sh b/autogen.sh index fb4151e..5027602 100755 --- a/autogen.sh +++ b/autogen.sh @@ -3,7 +3,7 @@ # This script is needed to setup build environment from checked out # source tree. # -SUBMODULES="lib/ivykis modules/afmongodb/libmongo-client" +SUBMODULES="lib/ivykis modules/afmongodb/libmongo-client modules/afamqp/rabbitmq-c" GIT=`which git` autogen_submodules() diff --git a/configure.in b/configure.in index a23c6e8..2edec69 100644 --- a/configure.in +++ b/configure.in @@ -28,6 +28,7 @@ IVYKIS_MIN_VERSION="0.30.1" JSON_C_MIN_VERSION="0.9" PCRE_MIN_VERSION="6.1" LMC_MIN_VERSION="0.1.6" +LRMQ_MIN_VERSION="0.0.1" dnl *************************************************************************** dnl Initial setup @@ -163,6 +164,15 @@ AC_ARG_WITH(libmongo-client, Link against the system supplied or the built-in libmongo-client library.] ,,with_libmongo_client="internal") +AC_ARG_ENABLE(amqp, + [ --enable-amqp Enable amqp destination (default: auto)] + ,,enable_amqp="auto") + +AC_ARG_WITH(librabbitmq, + [ --with-librabbitmq-client=[system/internal] + Link against the system supplied or the built-in librabbitmq library.] + ,,with_librabbitmq_client="internal") + AC_ARG_WITH(ivykis, [ --with-ivykis=[system/internal] Link against the system supplied or the built-in ivykis library.] @@ -827,6 +837,31 @@ if test "x$enable_smtp" != "xno" && test "x$with_libesmtp" != "no"; then fi dnl *************************************************************************** +dnl rabbitmq-c headers/libraries +dnl *************************************************************************** + +if test "x$with_librabbitmq_client" = "xinternal"; then + if test -f "$srcdir/modules/afamqp/rabbitmq-c/librabbitmq/amqp.h"; then + AC_CONFIG_SUBDIRS([modules/afamqp/rabbitmq-c]) + # these can only be used in modules/amqp as it assumes + # the current directory just one below rabbitmq-c + + LIBRABBITMQ_LIBS="-L\$(builddir)/rabbitmq-c/librabbitmq -lrabbitmq" + LIBRABBITMQ_CFLAGS="-I\$(srcdir)/rabbitmq-c/librabbitmq -I\$(builddir)/rabbitmq-c/librabbitmq" + LIBRABBITMQ_SUBDIRS="rabbitmq-c" + else + AC_MSG_WARN([Internal librabbitmq-client sources not found in modules/afamqp/rabbitmq-c]) + with_librabbitmq_client="no" + fi +elif test "x$with_librabbitmq_client" = "xsystem"; then + PKG_CHECK_MODULES(LIBRABBITMQ, librabbitmq >= $LRMQ_MIN_VERSION,with_librabbitmq_client="yes",with_librabbitmq_client="no") +fi + +if test "x$with_librabbitmq_client" = "xno"; then + enable_amqp="no" +fi + +dnl *************************************************************************** dnl misc features to be enabled dnl *************************************************************************** @@ -889,6 +924,16 @@ if test "x$enable_mongodb" = "xauto"; then AC_MSG_RESULT([$enable_mongodb]) fi +if test "x$enable_amqp" = "xauto"; then + AC_MSG_CHECKING(whether to enable amqp destination support) + if test "x$with_librabbitmq_client" != "no"; then + enable_amqp="yes" + else + enable_amqp="no" + fi + AC_MSG_RESULT([$enable_amqp]) +fi + if test "x$enable_json" != "xno"; then JSON_LIBS=$JSON_C_LIBS JSON_CFLAGS=$JSON_C_CFLAGS @@ -1068,6 +1113,7 @@ AM_CONDITIONAL(ENABLE_SUN_STREAMS, [test "$enable_sun_streams" = "yes"]) AM_CONDITIONAL(ENABLE_PACCT, [test "$enable_pacct" = "yes"]) AM_CONDITIONAL(ENABLE_MONGODB, [test "$enable_mongodb" = "yes"]) AM_CONDITIONAL(ENABLE_SMTP, [test "$enable_smtp" = "yes"]) +AM_CONDITIONAL(ENABLE_AMQP, [test "$enable_amqp" = "yes"]) AM_CONDITIONAL(ENABLE_JSON, [test "$enable_json" = "yes"]) AM_CONDITIONAL(WITH_LIBSYSTEMD, [test "$with_libsystemd" = "yes"]) @@ -1102,6 +1148,9 @@ AC_SUBST(LIBMONGO_CFLAGS) AC_SUBST(LIBMONGO_SUBDIRS) AC_SUBST(LIBESMTP_CFLAGS) AC_SUBST(LIBESMTP_LIBS) +AC_SUBST(LIBRABBITMQ_LIBS) +AC_SUBST(LIBRABBITMQ_CFLAGS) +AC_SUBST(LIBRABBITMQ_SUBDIRS) AC_SUBST(JSON_LIBS) AC_SUBST(JSON_CFLAGS) AC_SUBST(IVYKIS_SUBDIRS) @@ -1132,6 +1181,7 @@ AC_OUTPUT(dist.conf modules/afuser/Makefile modules/afmongodb/Makefile modules/afsmtp/Makefile + modules/afamqp/Makefile modules/dbparser/Makefile modules/dbparser/tests/Makefile modules/csvparser/Makefile @@ -1171,6 +1221,7 @@ echo " __thread keyword : ${ac_cv_have_tls:=no}" echo " Submodules:" echo " ivykis : $with_ivykis" echo " libmongo-client : $with_libmongo_client" +echo " librabbitmq : $with_librabbitmq_client" echo " Features:" echo " Debug symbols : ${enable_debug:=no}" echo " GCC profiling : ${enable_gprof:=no}" @@ -1191,4 +1242,5 @@ echo " PACCT module (EXPERIMENTAL) : ${enable_pacct:=no}" echo " MongoDB destination (module): ${enable_mongodb:=no}" echo " JSON support (module) : ${enable_json:=no}" echo " SMTP support (module) : ${enable_smtp:=no}" +echo " AMQP destination (module) : ${enable_amqp:=no}" diff --git a/lib/stats.c b/lib/stats.c index 0ccc283..f757597 100644 --- a/lib/stats.c +++ b/lib/stats.c @@ -383,6 +383,8 @@ const gchar *source_names[SCS_MAX] = "severity", "facility", "sender", + "smtp", + "amqp", }; diff --git a/lib/stats.h b/lib/stats.h index 9174408..2550724 100644 --- a/lib/stats.h +++ b/lib/stats.h @@ -71,6 +71,7 @@ enum SCS_FACILITY = 25, SCS_SENDER = 26, SCS_SMTP = 27, + SCS_AMQP = 28, SCS_MAX, SCS_SOURCE_MASK = 0xff }; diff --git a/modules/Makefile.am b/modules/Makefile.am index 8f9d63b..1fce2d2 100644 --- a/modules/Makefile.am +++ b/modules/Makefile.am @@ -1 +1 @@ -SUBDIRS = afsocket afsql afstreams affile afprog afuser afmongodb afsmtp csvparser confgen system-source syslogformat pacctformat basicfuncs cryptofuncs dbparser json +SUBDIRS = afsocket afsql afstreams affile afprog afuser afamqp afmongodb afsmtp csvparser confgen system-source syslogformat pacctformat basicfuncs cryptofuncs dbparser json diff --git a/modules/afamqp/Makefile.am b/modules/afamqp/Makefile.am new file mode 100644 index 0000000..ccf20df --- /dev/null +++ b/modules/afamqp/Makefile.am @@ -0,0 +1,35 @@ + +SUBDIRS = @LIBRABBITMQ_SUBDIRS@ +DIST_SUBDIRS = rabbitmq-c + +moduledir = @moduledir@ +AM_CPPFLAGS = -I$(top_srcdir)/lib -I../../lib +module_LTLIBRARIES = libafamqp.la + +export top_srcdir + +if ENABLE_AMQP + +libafamqp_la_CFLAGS = $(LIBRABBITMQ_CFLAGS) +libafamqp_la_SOURCES = afamqp-grammar.y afamqp.c afamqp.h afamqp-parser.c afamqp-parser.h +libafamqp_la_LIBADD = $(MODULE_DEPS_LIBS) $(LIBRABBITMQ_LIBS) +libafamqp_la_LDFLAGS = $(MODULE_LDFLAGS) + +endif + +BUILT_SOURCES = afamqp-grammar.y afamqp-grammar.c afamqp-grammar.h +EXTRA_DIST = $(BUILT_SOURCES) afamqp-grammar.ym + +include $(top_srcdir)/build/lex-rules.am + +# divert install/uninstall targets to avoid recursing into $(SUBDIRS) + +install: + $(MAKE) $(AM_MAKEFLAGS) all + $(MAKE) $(AM_MAKEFLAGS) install-am + +uninstall: + $(MAKE) $(AM_MAKEFLAGS) uninstall-am + +check: + echo "Make check disabled, since it requires a newer glib" diff --git a/modules/afamqp/afamqp-grammar.ym b/modules/afamqp/afamqp-grammar.ym new file mode 100644 index 0000000..b325b3d --- /dev/null +++ b/modules/afamqp/afamqp-grammar.ym @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2012 Nagy, Attila <bra@fsn.hu> + * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary + * Copyright (c) 2012 Gergely Nagy <algernon@balabit.hu> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +%code requires { + +#include "afamqp-parser.h" + +} + +%code { + +#include "cfg-parser.h" +#include "afamqp-grammar.h" +#include "plugin.h" +#include "vptransform.h" + +extern LogDriver *last_driver; +extern ValuePairs *last_value_pairs; +extern ValuePairsTransformSet *last_vp_transset; +} + +%name-prefix "afamqp_" +%lex-param {CfgLexer *lexer} +%parse-param {CfgLexer *lexer} +%parse-param {LogDriver **instance} +%parse-param {gpointer arg} + + +/* INCLUDE_DECLS */ + +%token KW_AMQP +%token KW_EXCHANGE +%token KW_EXCHANGE_TYPE +%token KW_PERSISTENT +%token KW_VHOST +%token KW_ROUTING_KEY +%token KW_BODY + +%% + +start + : LL_CONTEXT_DESTINATION KW_AMQP + { + last_driver = *instance = afamqp_dd_new(); + } + '(' afamqp_options ')' { YYACCEPT; } + ; + +afamqp_options + : afamqp_option afamqp_options + | + ; + +afamqp_option + : KW_HOST '(' string ')' { afamqp_dd_set_host(last_driver, $3); free($3); } + | KW_PORT '(' LL_NUMBER ')' { afamqp_dd_set_port(last_driver, $3); } + | KW_VHOST '(' string ')' { afamqp_dd_set_vhost(last_driver, $3); free($3); } + | KW_EXCHANGE '(' string ')' { afamqp_dd_set_exchange(last_driver, $3); free($3); } + | KW_EXCHANGE_TYPE '(' string ')' { afamqp_dd_set_exchange_type(last_driver, $3); free($3); } + | KW_ROUTING_KEY '(' string ')' { afamqp_dd_set_routing_key(last_driver, $3); free($3); } + | KW_BODY '(' string ')' { afamqp_dd_set_body(last_driver, $3); free($3); } + | KW_PERSISTENT '(' yesno ')' { afamqp_dd_set_persistent(last_driver, $3); } + | KW_USERNAME '(' string ')' { afamqp_dd_set_user(last_driver, $3); free($3); } + | KW_PASSWORD '(' string ')' { afamqp_dd_set_password(last_driver, $3); free($3); } + | value_pair_option { afamqp_dd_set_value_pairs(last_driver, $1); } + | dest_driver_option + ; + +/* INCLUDE_RULES */ + +%% diff --git a/modules/afamqp/afamqp-parser.c b/modules/afamqp/afamqp-parser.c new file mode 100644 index 0000000..f082f32 --- /dev/null +++ b/modules/afamqp/afamqp-parser.c @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2012 Nagy, Attila <bra@fsn.hu> + * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary + * Copyright (c) 2012 Gergely Nagy <algernon@balabit.hu> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#include "afamqp.h" +#include "cfg-parser.h" +#include "afamqp-grammar.h" + +extern int afamqp_debug; +int afamqp_parse(CfgLexer *lexer, LogDriver **instance, gpointer arg); + +static CfgLexerKeyword afamqp_keywords[] = { + { "amqp", KW_AMQP }, + { "vhost", KW_VHOST }, + { "host", KW_HOST }, + { "port", KW_PORT }, + { "exchange", KW_EXCHANGE }, + { "exchange_type", KW_EXCHANGE_TYPE }, + { "routing_key", KW_ROUTING_KEY }, + { "persistent", KW_PERSISTENT }, + { "username", KW_USERNAME }, + { "password", KW_PASSWORD }, + { "log_fifo_size", KW_LOG_FIFO_SIZE }, + { "body", KW_BODY }, + { NULL } +}; + +CfgParser afamqp_parser = +{ +#if ENABLE_DEBUG + .debug_flag = &afamqp_debug, +#endif + .name = "afamqp", + .keywords = afamqp_keywords, + .parse = (int (*)(CfgLexer *lexer, gpointer *instance, gpointer)) afamqp_parse, + .cleanup = (void (*)(gpointer)) log_pipe_unref, +}; + +CFG_PARSER_IMPLEMENT_LEXER_BINDING(afamqp_, LogDriver **) diff --git a/modules/afamqp/afamqp-parser.h b/modules/afamqp/afamqp-parser.h new file mode 100644 index 0000000..d75de63 --- /dev/null +++ b/modules/afamqp/afamqp-parser.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2012 Nagy, Attila <bra@fsn.hu> + * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary + * Copyright (c) 2012 Gergely Nagy <algernon@balabit.hu> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#ifndef AFAMQP_PARSER_H_INCLUDED +#define AFAMQP_PARSER_H_INCLUDED + +#include "cfg-parser.h" +#include "cfg-lexer.h" +#include "afamqp.h" + +extern CfgParser afamqp_parser; + +CFG_PARSER_DECLARE_LEXER_BINDING(afamqp_, LogDriver **) + +#endif diff --git a/modules/afamqp/afamqp.c b/modules/afamqp/afamqp.c new file mode 100644 index 0000000..a9d65d4 --- /dev/null +++ b/modules/afamqp/afamqp.c @@ -0,0 +1,738 @@ +/* + * Copyright (c) 2012 Nagy, Attila <bra@fsn.hu> + * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary + * Copyright (c) 2012 Gergely Nagy <algernon@balabit.hu> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#include "afamqp.h" +#include "afamqp-parser.h" +#include "plugin.h" +#include "messages.h" +#include "misc.h" +#include "stats.h" +#include "nvtable.h" +#include "logqueue.h" +#include "scratch-buffers.h" + +#include <amqp.h> +#include <amqp_framing.h> + +typedef struct +{ + LogDestDriver super; + + /* Shared between main/writer; only read by the writer, never written */ + gchar *exchange; + gchar *exchange_type; + LogTemplate *routing_key_template; + LogTemplate *body_template; + + gint persistent; + + gchar *vhost; + gchar *host; + gint port; + + gchar *user; + gchar *password; + + time_t time_reopen; + + StatsCounterItem *dropped_messages; + StatsCounterItem *stored_messages; + + ValuePairs *vp; + + /* Thread related stuff; shared */ + GThread *writer_thread; + GMutex *queue_mutex; + GMutex *suspend_mutex; + GCond *writer_thread_wakeup_cond; + + gboolean writer_thread_terminate; + gboolean writer_thread_suspended; + GTimeVal writer_thread_suspend_target; + + LogQueue *queue; + + /* Writer-only stuff */ + amqp_connection_state_t conn; + amqp_table_entry_t *entries; + gint32 max_entries; + gint32 seq_num; +} AMQPDestDriver; + +/* + * Configuration + */ + +void +afamqp_dd_set_user(LogDriver *d, const gchar *user) +{ + AMQPDestDriver *self = (AMQPDestDriver *) d; + + g_free(self->user); + self->user = g_strdup(user); +} + +void +afamqp_dd_set_password(LogDriver *d, const gchar *password) +{ + AMQPDestDriver *self = (AMQPDestDriver *) d; + + g_free(self->password); + self->password = g_strdup(password); +} + +void +afamqp_dd_set_vhost(LogDriver *d, const gchar *vhost) +{ + AMQPDestDriver *self = (AMQPDestDriver *) d; + + g_free(self->vhost); + self->vhost = g_strdup(vhost); +} + +void +afamqp_dd_set_host(LogDriver *d, const gchar *host) +{ + AMQPDestDriver *self = (AMQPDestDriver *) d; + + g_free(self->host); + self->host = g_strdup(host); +} + +void +afamqp_dd_set_port(LogDriver *d, gint port) +{ + AMQPDestDriver *self = (AMQPDestDriver *) d; + + self->port = (int) port; +} + +void +afamqp_dd_set_exchange(LogDriver *d, const gchar *exchange) +{ + AMQPDestDriver *self = (AMQPDestDriver *) d; + + g_free(self->exchange); + self->exchange = g_strdup(exchange); +} + +void +afamqp_dd_set_exchange_type(LogDriver *d, const gchar *exchange_type) +{ + AMQPDestDriver *self = (AMQPDestDriver *) d; + + g_free(self->exchange_type); + self->exchange_type = g_strdup(exchange_type); +} + +void +afamqp_dd_set_routing_key(LogDriver *d, const gchar *routing_key) +{ + AMQPDestDriver *self = (AMQPDestDriver *) d; + + log_template_compile(self->routing_key_template, routing_key, NULL); +} + +void +afamqp_dd_set_body(LogDriver *d, const gchar *body) +{ + AMQPDestDriver *self = (AMQPDestDriver *) d; + + if (!self->body_template) + self->body_template = log_template_new(configuration, NULL); + log_template_compile(self->body_template, body, NULL); +} + +void +afamqp_dd_set_persistent(LogDriver *s, gboolean persistent) +{ + AMQPDestDriver *self = (AMQPDestDriver *) s; + + if (persistent) + self->persistent = 2; + else + self->persistent = 1; +} + +void +afamqp_dd_set_value_pairs(LogDriver *d, ValuePairs *vp) +{ + AMQPDestDriver *self = (AMQPDestDriver *) d; + + if (self->vp) + value_pairs_free(self->vp); + self->vp = vp; +} + +/* + * Utilities + */ + +static gchar * +afamqp_dd_format_stats_instance(AMQPDestDriver *self) +{ + static gchar persist_name[1024]; + + g_snprintf(persist_name, sizeof(persist_name), "amqp,%s,%s,%u,%s,%s", + self->vhost, self->host, self->port, self->exchange, + self->exchange_type); + return persist_name; +} + +static gchar * +afamqp_dd_format_persist_name(AMQPDestDriver *self) +{ + static gchar persist_name[1024]; + + g_snprintf(persist_name, sizeof(persist_name), "afamqp(%s,%s,%u,%s,%s)", + self->vhost, self->host, self->port, self->exchange, + self->exchange_type); + return persist_name; +} + +static void +afamqp_dd_suspend(AMQPDestDriver *self) +{ + self->writer_thread_suspended = TRUE; + g_get_current_time(&self->writer_thread_suspend_target); + g_time_val_add(&self->writer_thread_suspend_target, + self->time_reopen * 1000000); +} + +static void +afamqp_dd_disconnect(AMQPDestDriver *self) +{ + amqp_channel_close(self->conn, 1, AMQP_REPLY_SUCCESS); + amqp_connection_close(self->conn, AMQP_REPLY_SUCCESS); + amqp_destroy_connection(self->conn); + self->conn = NULL; +} + +static gboolean +afamqp_is_ok(AMQPDestDriver *self, gchar *context, amqp_rpc_reply_t ret) +{ + switch (ret.reply_type) + { + case AMQP_RESPONSE_NORMAL: + break; + + case AMQP_RESPONSE_NONE: + msg_error(context, + evt_tag_str("driver", self->super.super.id), + evt_tag_str("error", "missing RPC reply type"), + evt_tag_int("time_reopen", self->time_reopen), + NULL); + afamqp_dd_suspend(self); + return FALSE; + + case AMQP_RESPONSE_LIBRARY_EXCEPTION: + { + gchar *errstr = amqp_error_string(ret.library_error); + msg_error(context, + evt_tag_str("driver", self->super.super.id), + evt_tag_str("error", errstr), + evt_tag_int("time_reopen", self->time_reopen), + NULL); + g_free (errstr); + afamqp_dd_suspend(self); + return FALSE; + } + + case AMQP_RESPONSE_SERVER_EXCEPTION: + switch (ret.reply.id) + { + case AMQP_CONNECTION_CLOSE_METHOD: + { + amqp_connection_close_t *m = + (amqp_connection_close_t *) ret.reply.decoded; + msg_error(context, + evt_tag_str("driver", self->super.super.id), + evt_tag_str("error", "server connection error"), + evt_tag_int("code", m->reply_code), + evt_tag_str("text", m->reply_text.bytes), + evt_tag_int("time_reopen", self->time_reopen), + NULL); + afamqp_dd_suspend(self); + return FALSE; + } + case AMQP_CHANNEL_CLOSE_METHOD: + { + amqp_channel_close_t *m = + (amqp_channel_close_t *) ret.reply.decoded; + msg_error(context, + evt_tag_str("driver", self->super.super.id), + evt_tag_str("error", "server channel error"), + evt_tag_int("code", m->reply_code), + evt_tag_str("text", m->reply_text.bytes), + evt_tag_int("time_reopen", self->time_reopen), + NULL); + afamqp_dd_suspend(self); + return FALSE; + } + default: + msg_error(context, + evt_tag_str("driver", self->super.super.id), + evt_tag_str("error", "unknown server error"), + evt_tag_printf("method id", "0x%08X", ret.reply.id), + evt_tag_int("time_reopen", self->time_reopen), + NULL); + afamqp_dd_suspend(self); + return FALSE; + } + return FALSE; + } + return TRUE; +} + +static gboolean +afamqp_dd_connect(AMQPDestDriver *self, gboolean reconnect) +{ + int sockfd; + amqp_rpc_reply_t ret; + + if (reconnect && self->conn) + { + ret = amqp_get_rpc_reply(self->conn); + if (ret.reply_type == AMQP_RESPONSE_NORMAL) + return TRUE; + } + + self->conn = amqp_new_connection(); + sockfd = amqp_open_socket(self->host, self->port); + if (sockfd < 0) + { + gchar *errstr = amqp_error_string(-sockfd); + msg_error("Error connecting to AMQP server", + evt_tag_str("driver", self->super.super.id), + evt_tag_str("error", errstr), + evt_tag_int("time_reopen", self->time_reopen), + NULL); + g_free(errstr); + return FALSE; + } + amqp_set_sockfd(self->conn, sockfd); + + ret = amqp_login(self->conn, self->vhost, 0, 131072, 0, + AMQP_SASL_METHOD_PLAIN, self->user, self->password); + if (!afamqp_is_ok(self, "Error during AMQP login", ret)) + return FALSE; + + amqp_channel_open(self->conn, 1); + ret = amqp_get_rpc_reply(self->conn); + if (!afamqp_is_ok(self, "Error during AMQP channel open", ret)) + return FALSE; + + amqp_exchange_declare(self->conn, 1, amqp_cstring_bytes(self->exchange), + amqp_cstring_bytes(self->exchange_type), 0, 0, amqp_empty_table); + ret = amqp_get_rpc_reply(self->conn); + if (!afamqp_is_ok(self, "Error during AMQP exchange declaration", ret)) + return FALSE; + + msg_debug ("Connecting to AMQP succeeded", + evt_tag_str("driver", self->super.super.id), + NULL); + + return TRUE; +} + +/* + * Worker thread + */ + +static gboolean +afamqp_vp_foreach(const gchar *name, const gchar *value, + gpointer user_data) +{ + amqp_table_entry_t **entries = (amqp_table_entry_t **) ((gpointer *)user_data)[0]; + gint *pos = (gint *) ((gpointer *)user_data)[1]; + gint32 *max_size = (gint32 *) ((gpointer *)user_data)[2]; + + if (*pos == *max_size) + { + *max_size *= 2; + *entries = g_renew(amqp_table_entry_t, *entries, *max_size); + } + + (*entries)[*pos].key = amqp_cstring_bytes(strdup(name)); + (*entries)[*pos].value.kind = AMQP_FIELD_KIND_UTF8; + (*entries)[*pos].value.value.bytes = amqp_cstring_bytes(strdup(value)); + + (*pos)++; + + return FALSE; +} + +static gboolean +afamqp_worker_publish(AMQPDestDriver *self, LogMessage *msg) +{ + gint pos = 0, ret; + amqp_table_t table; + amqp_basic_properties_t props; + gboolean success = TRUE; + ScratchBuffer *routing_key = scratch_buffer_acquire(); + ScratchBuffer *body = scratch_buffer_acquire(); + amqp_bytes_t body_bytes = amqp_cstring_bytes(""); + + gpointer user_data[] = { &self->entries, &pos, &self->max_entries }; + + value_pairs_foreach(self->vp, afamqp_vp_foreach, msg, self->seq_num, + user_data); + + table.num_entries = pos; + table.entries = self->entries; + + props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG + | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_HEADERS_FLAG; + props.content_type = amqp_cstring_bytes("text/plain"); + props.delivery_mode = self->persistent; + props.headers = table; + + log_template_format(self->routing_key_template, msg, NULL, LTZ_LOCAL, + self->seq_num, NULL, sb_string(routing_key)); + + if (self->body_template) + { + log_template_format(self->body_template, msg, NULL, LTZ_LOCAL, + self->seq_num, NULL, sb_string(body)); + body_bytes = amqp_cstring_bytes(sb_string(body)->str); + } + + ret = amqp_basic_publish(self->conn, 1, amqp_cstring_bytes(self->exchange), + amqp_cstring_bytes(sb_string(routing_key)->str), + 0, 0, &props, body_bytes); + + scratch_buffer_release(routing_key); + scratch_buffer_release(body); + + if (ret < 0) + { + msg_error("Network error while inserting into AMQP server", + evt_tag_int("time_reopen", self->time_reopen), NULL); + success = FALSE; + } + + while (--pos >= 0) + { + amqp_bytes_free(self->entries[pos].key); + amqp_bytes_free(self->entries[pos].value.value.bytes); + } + + return success; +} + +static gboolean +afamqp_worker_insert(AMQPDestDriver *self) +{ + gboolean success; + LogMessage *msg; + LogPathOptions path_options = LOG_PATH_OPTIONS_INIT; + + afamqp_dd_connect(self, TRUE); + + g_mutex_lock(self->queue_mutex); + log_queue_reset_parallel_push(self->queue); + success = log_queue_pop_head(self->queue, &msg, &path_options, FALSE, FALSE); + g_mutex_unlock(self->queue_mutex); + if (!success) + return TRUE; + + msg_set_context(msg); + success = afamqp_worker_publish (self, msg); + msg_set_context(NULL); + + if (success) + { + stats_counter_inc(self->stored_messages); + step_sequence_number(&self->seq_num); + log_msg_ack(msg, &path_options); + log_msg_unref(msg); + } + else + { + g_mutex_lock(self->queue_mutex); + log_queue_push_head(self->queue, msg, &path_options); + g_mutex_unlock(self->queue_mutex); + } + + return success; +} + +static gpointer +afamqp_worker_thread(gpointer arg) +{ + AMQPDestDriver *self = (AMQPDestDriver *) arg; + gboolean success; + + msg_debug("Worker thread started", + evt_tag_str("driver", self->super.super.id), NULL); + + success = afamqp_dd_connect(self, FALSE); + + while (!self->writer_thread_terminate) + { + g_mutex_lock(self->suspend_mutex); + if (self->writer_thread_suspended) + { + g_cond_timed_wait(self->writer_thread_wakeup_cond, + self->suspend_mutex, &self->writer_thread_suspend_target); + self->writer_thread_suspended = FALSE; + g_mutex_unlock(self->suspend_mutex); + } + else + { + g_mutex_unlock(self->suspend_mutex); + + g_mutex_lock(self->queue_mutex); + if (log_queue_get_length(self->queue) == 0) + { + g_cond_wait(self->writer_thread_wakeup_cond, self->queue_mutex); + } + g_mutex_unlock(self->queue_mutex); + } + + if (self->writer_thread_terminate) + break; + + if (!afamqp_worker_insert(self)) + { + afamqp_dd_disconnect(self); + afamqp_dd_suspend(self); + } + } + + afamqp_dd_disconnect(self); + + msg_debug("Worker thread finished", + evt_tag_str("driver", self->super.super.id), NULL); + + return NULL; +} + +/* + * Main thread + */ + +static void +afamqp_dd_start_thread(AMQPDestDriver *self) +{ + self->writer_thread = create_worker_thread(afamqp_worker_thread, self, + TRUE, NULL); +} + +static void +afamqp_dd_stop_thread(AMQPDestDriver *self) +{ + self->writer_thread_terminate = TRUE; + g_mutex_lock(self->queue_mutex); + g_cond_signal(self->writer_thread_wakeup_cond); + g_mutex_unlock(self->queue_mutex); + g_thread_join(self->writer_thread); +} + +static gboolean +afamqp_dd_init(LogPipe *s) +{ + AMQPDestDriver *self = (AMQPDestDriver *) s; + GlobalConfig *cfg = log_pipe_get_config(s); + + if (!log_dest_driver_init_method(s)) + return FALSE; + + if (cfg) + self->time_reopen = cfg->time_reopen; + + if (!self->vp) + { + self->vp = value_pairs_new(); + value_pairs_add_scope(self->vp, "selected-macros"); + value_pairs_add_scope(self->vp, "nv-pairs"); + value_pairs_add_scope(self->vp, "sdata"); + } + + msg_verbose("Initializing AMQP destination", + evt_tag_str("vhost", self->vhost), + evt_tag_str("host", self->host), + evt_tag_int("port", self->port), + evt_tag_str("exchange", self->exchange), + evt_tag_str("exchange_type", self->exchange_type), + NULL); + + self->queue = log_dest_driver_acquire_queue(&self->super, afamqp_dd_format_persist_name(self)); + + stats_lock(); + stats_register_counter(0, SCS_AMQP | SCS_DESTINATION, + self->super.super.id, afamqp_dd_format_stats_instance(self), + SC_TYPE_STORED, &self->stored_messages); + stats_register_counter(0, SCS_AMQP | SCS_DESTINATION, + self->super.super.id, afamqp_dd_format_stats_instance(self), + SC_TYPE_DROPPED, &self->dropped_messages); + stats_unlock(); + + log_queue_set_counters(self->queue, self->stored_messages, + self->dropped_messages); + afamqp_dd_start_thread(self); + + return TRUE; +} + +static gboolean +afamqp_dd_deinit(LogPipe *s) +{ + AMQPDestDriver *self = (AMQPDestDriver *) s; + + afamqp_dd_stop_thread(self); + + log_queue_set_counters(self->queue, NULL, NULL); + stats_lock(); + stats_unregister_counter(SCS_AMQP | SCS_DESTINATION, + self->super.super.id, afamqp_dd_format_stats_instance(self), + SC_TYPE_STORED, &self->stored_messages); + stats_unregister_counter(SCS_AMQP | SCS_DESTINATION, + self->super.super.id, afamqp_dd_format_stats_instance(self), + SC_TYPE_DROPPED, &self->dropped_messages); + stats_unlock(); + if (!log_dest_driver_deinit_method(s)) + return FALSE; + + return TRUE; +} + +static void +afamqp_dd_free(LogPipe *d) +{ + AMQPDestDriver *self = (AMQPDestDriver *) d; + + g_mutex_free(self->suspend_mutex); + g_mutex_free(self->queue_mutex); + g_cond_free(self->writer_thread_wakeup_cond); + + if (self->queue) + log_queue_unref(self->queue); + + g_free(self->exchange); + g_free(self->exchange_type); + log_template_unref(self->routing_key_template); + log_template_unref(self->body_template); + g_free(self->user); + g_free(self->password); + g_free(self->host); + g_free(self->vhost); + g_free(self->entries); + if (self->vp) + value_pairs_free(self->vp); + log_dest_driver_free(d); +} + +static void +afamqp_dd_queue_notify(gpointer user_data) +{ + AMQPDestDriver *self = (AMQPDestDriver *) user_data; + + g_mutex_lock(self->queue_mutex); + g_cond_signal(self->writer_thread_wakeup_cond); + log_queue_reset_parallel_push(self->queue); + g_mutex_unlock(self->queue_mutex); +} + +static void +afamqp_dd_queue(LogPipe *s, LogMessage *msg, + const LogPathOptions *path_options, gpointer user_data) +{ + AMQPDestDriver *self = (AMQPDestDriver *) s; + LogPathOptions local_options; + + if (!path_options->flow_control_requested) + path_options = log_msg_break_ack(msg, path_options, &local_options); + + g_mutex_lock(self->suspend_mutex); + g_mutex_lock(self->queue_mutex); + if (!self->writer_thread_suspended) + log_queue_set_parallel_push(self->queue, 1, afamqp_dd_queue_notify, + self, NULL); + g_mutex_unlock(self->queue_mutex); + g_mutex_unlock(self->suspend_mutex); + log_queue_push_tail(self->queue, msg, path_options); +} + +/* + * Plugin glue. + */ + +LogDriver * +afamqp_dd_new(void) +{ + AMQPDestDriver *self = g_new0(AMQPDestDriver, 1); + + log_dest_driver_init_instance(&self->super); + self->super.super.super.init = afamqp_dd_init; + self->super.super.super.deinit = afamqp_dd_deinit; + self->super.super.super.queue = afamqp_dd_queue; + self->super.super.super.free_fn = afamqp_dd_free; + + self->routing_key_template = log_template_new(configuration, NULL); + + afamqp_dd_set_vhost((LogDriver *) self, "/"); + afamqp_dd_set_host((LogDriver *) self, "127.0.0.1"); + afamqp_dd_set_port((LogDriver *) self, 5672); + afamqp_dd_set_exchange((LogDriver *) self, "syslog"); + afamqp_dd_set_exchange_type((LogDriver *) self, "fanout"); + afamqp_dd_set_routing_key((LogDriver *) self, ""); + afamqp_dd_set_persistent((LogDriver *) self, TRUE); + + init_sequence_number(&self->seq_num); + + self->writer_thread_wakeup_cond = g_cond_new(); + self->suspend_mutex = g_mutex_new(); + self->queue_mutex = g_mutex_new(); + + self->max_entries = 256; + self->entries = g_new(amqp_table_entry_t, self->max_entries); + + return (LogDriver *) self; +} + +extern CfgParser afamqp_dd_parser; + +static Plugin afamqp_plugin = +{ + .type = LL_CONTEXT_DESTINATION, + .name = "amqp", + .parser = &afamqp_parser +}; + +gboolean +afamqp_module_init(GlobalConfig *cfg, CfgArgs *args) +{ + plugin_register(cfg, &afamqp_plugin, 1); + return TRUE; +} + +const ModuleInfo module_info = +{ + .canonical_name = "afamqp", + .version = VERSION, + .description = "The afamqp module provides AMQP destination support for syslog-ng.", + .core_revision = SOURCE_REVISION, .plugins = &afamqp_plugin, + .plugins_len = 1, +}; diff --git a/modules/afamqp/afamqp.h b/modules/afamqp/afamqp.h new file mode 100644 index 0000000..8e7d7e9 --- /dev/null +++ b/modules/afamqp/afamqp.h @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2012 Nagy, Attila <bra@fsn.hu> + * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary + * Copyright (c) 2012 Gergely Nagy <algernon@balabit.hu> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#ifndef AFAMQP_H_INCLUDED +#define AFAMQP_H_INCLUDED + +#include "driver.h" +#include "value-pairs.h" + +LogDriver *afamqp_dd_new(void); + +void afamqp_dd_set_host(LogDriver *d, const gchar *host); +void afamqp_dd_set_port(LogDriver *d, gint port); +void afamqp_dd_set_exchange(LogDriver *d, const gchar *database); +void afamqp_dd_set_exchange_type(LogDriver *d, const gchar *exchange_type); +void afamqp_dd_set_vhost(LogDriver *d, const gchar *vhost); +void afamqp_dd_set_routing_key(LogDriver *d, const gchar *routing_key); +void afamqp_dd_set_body(LogDriver *d, const gchar *body); +void afamqp_dd_set_persistent(LogDriver *d, gboolean persistent); +void afamqp_dd_set_user(LogDriver *d, const gchar *user); +void afamqp_dd_set_password(LogDriver *d, const gchar *password); +void afamqp_dd_set_value_pairs(LogDriver *d, ValuePairs *vp); + +#endif diff --git a/modules/afamqp/rabbitmq-c b/modules/afamqp/rabbitmq-c new file mode 160000 index 0000000..afdfc05 --- /dev/null +++ b/modules/afamqp/rabbitmq-c @@ -0,0 +1 @@ +Subproject commit afdfc05b0e974b470130e20b4be6c5728618afd4 -- 1.7.10.4
There is no rabbitmq-c release yet that we can support yet, so for now, error out if the system version was requested. Signed-off-by: Gergely Nagy <algernon@balabit.hu> --- configure.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/configure.in b/configure.in index 2edec69..84004ad 100644 --- a/configure.in +++ b/configure.in @@ -854,7 +854,7 @@ if test "x$with_librabbitmq_client" = "xinternal"; then with_librabbitmq_client="no" fi elif test "x$with_librabbitmq_client" = "xsystem"; then - PKG_CHECK_MODULES(LIBRABBITMQ, librabbitmq >= $LRMQ_MIN_VERSION,with_librabbitmq_client="yes",with_librabbitmq_client="no") + AC_MSG_ERROR([Building with system librabbitmq is not supported yet.]) fi if test "x$with_librabbitmq_client" = "xno"; then -- 1.7.10.4
Hi guys, I just wanted to grab the opportunity to say a big thanks for this driver. I really appreciate the effort it has taken to both write this initially and prepare it for integration. These are what bring syslog-ng forward and a complete driver has not been submitted before. These patches are now integrated to 3.4 master. Thanks again. On Tue, 2012-10-09 at 18:13 +0200, Gergely Nagy wrote:
The two patches that will follow implement an AMQP destination driver for syslog-ng, contributed by Attila Nagy <bra@fsn.hu>, with a few minor enhancements and porting to syslog-ng 3.4 by myself.
The patch is also available on the feature/3.4/amqp[1] and feature/3.3/amqp[2] branches of my git repository, for syslog-ng 3.4 and 3.3 respectively. Functionally, they're exactly the same, but all further improvements and fixes will go to the 3.4 branch only, the 3.3 branch is there for testing and historical purposes only, and will be removed sometime after the driver gets merged to 3.4.
Now, I won't explain in detail what AMQP is, let it suffice that it is a messaging protocol, of which RabbitMQ[3] is an implementation of. Interested people can read the available documentation, there's plenty!
For now, lets concentrate on how to take the driver for a test drive!
Once rabbitmq is installed (on debian and derivatives of recent versions it is as simple as apt-get install rabbitmq-server), all one needs to do, is configure syslog-ng to send messages, and grab a client that can look at the queue. For testing purposes, there's a simple one at https://gist.github.com/3859756 - you'll need a couple of python modules for that.
What it does, is it sets up a queue named 'test', and binds it to the 'syslog' exchange (the default used by the AMQP driver), and starts consuming messages. It prints both headers and the payload (if any, by default, there is no payload).
What one needs to know about the driver, is that it does not declare any queues: it's the responsibility of the admin to set queues up, and route messages as they see fit. This can be done on the RabbitMQ web admin interface (again, see the docs about that!), among other things, or programmatically, like the script above does.
Hopefully, this is enough to get one started! If not, I plan to write a more detailed blog post about the topic in the not too distant future.
There are plans to write an accompanying source driver too, but work has not started on that front yet. Rest assured though, it will come.
I'd like to thank Attila again for contributing the driver, and answering my silly questions during review - both are much appreciated!
[1]: https://github.com/algernon/syslog-ng/tree/feature/3.4/amqp [2]: https://github.com/algernon/syslog-ng/tree/feature/3.3/amqp [3]: http://www.rabbitmq.com/
______________________________________________________________________________ Member info: https://lists.balabit.hu/mailman/listinfo/syslog-ng Documentation: http://www.balabit.com/support/documentation/?product=syslog-ng FAQ: http://www.balabit.com/wiki/syslog-ng-faq
-- Bazsi
participants (2)
-
Balazs Scheidler
-
Gergely Nagy