[syslog-ng] [PATCH] afsmtp: New destination driver, to send mails via SMTP.

Gergely Nagy algernon at balabit.hu
Fri Feb 3 15:03:05 CET 2012


This introduces the afsmtp destination driver, updated to the latest
3.4 conventions, and with all the threading bugs that were also
present in afmongodb too, fixed.

The driver itself uses libESMTP to send mail, and has the following
config syntax:

destination d_smtp {
 smtp(
  host("localhost") port(25)
  subject("ALERT! IMPORTANT EVENT FROM ${HOST}")
  header("X-Severity", "${SEVERITY}")
  from("no-reply@${HOST}.dummy")
  to("syslog-alerts at my.example.domain")
  bcc("big-brother at my.example.domain")
  cc("it at my.example.domain")
  reply-to("admin at my.example.domain")
 );
};

Signed-off-by: Gergely Nagy <algernon at balabit.hu>
---
 configure.in                     |   41 ++-
 modules/Makefile.am              |    2 +-
 modules/afsmtp/Makefile.am       |   19 +
 modules/afsmtp/afsmtp-grammar.ym |  101 ++++++
 modules/afsmtp/afsmtp-parser.c   |   58 +++
 modules/afsmtp/afsmtp-parser.h   |   35 ++
 modules/afsmtp/afsmtp.c          |  728 ++++++++++++++++++++++++++++++++++++++
 modules/afsmtp/afsmtp.h          |   52 +++
 8 files changed, 1034 insertions(+), 2 deletions(-)
 create mode 100644 modules/afsmtp/Makefile.am
 create mode 100644 modules/afsmtp/afsmtp-grammar.ym
 create mode 100644 modules/afsmtp/afsmtp-parser.c
 create mode 100644 modules/afsmtp/afsmtp-parser.h
 create mode 100644 modules/afsmtp/afsmtp.c
 create mode 100644 modules/afsmtp/afsmtp.h

diff --git a/configure.in b/configure.in
index 0a3d730..2fab7ac 100644
--- a/configure.in
+++ b/configure.in
@@ -174,6 +174,10 @@ AC_ARG_WITH(json,
                                          Use the JSON implementation specified]
               ,,with_json="auto")
 
+AC_ARG_WITH(libesmtp,
+             AC_HELP_STRING([--with-libesmtp=DIR],
+                            [use libesmtp library from (prefix) directory DIR]),,)
+
 AC_ARG_ENABLE(systemd,
               [  --enable-systemd        Enable systemd support (default: auto)]
               ,,enable_systemd="auto")
@@ -793,6 +797,37 @@ if test "x$with_libmongo_client" = "xno"; then
 fi
 
 dnl ***************************************************************************
+dnl libesmtp headers/libraries
+dnl ***************************************************************************
+if test "x$enable_smtp" != "xno" && test "x$with_libesmtp" != "no"; then
+	libesmtp="yes"
+	if test "x$with_libesmtp" != "yes" && test "x$with_libesmtp" != "x"; then
+		CPPFLAGS_SAVE="$CPPFLAGS"
+		LDFLAGS_SAVE="$LDFLAGS"
+		CPPFLAGS="$CPPFLAGS -I$with_libesmtp/include"
+		LDFLAGS="$LDFLAGS -L$with_libesmtp/lib"
+		AC_CHECK_HEADER(libesmtp.h, [LIBESMTP_CFLAGS="-I$with_libesmtp/include"
+					     LIBESMTP_LIBS="-L$with_libesmtp/lib"], [libesmtp=no])
+		CPPFLAGS="$CPPFLAGS_SAVE"
+		LDFLAGS="$LDFLAGS_SAVE"
+	else
+		AC_MSG_CHECKING(for libESMTP)
+		if libesmtp-config --version >/dev/null 2>&1; then
+			AC_MSG_RESULT(yes)
+			LIBESMTP_CFLAGS="`libesmtp-config --cflags`"
+			LIBESMTP_LIBS="`libesmtp-config --libs`"
+		else
+			AC_MSG_RESULT(no)
+			libesmtp=no
+		fi
+	fi
+	if test "x$enable_smtp" = "xyes" && test "x$libesmtp" = "xno"; then
+		AC_MSG_ERROR(libESMTP not found)
+	fi
+	enable_smtp=$libesmtp
+fi
+
+dnl ***************************************************************************
 dnl misc features to be enabled
 dnl ***************************************************************************
 
@@ -1065,6 +1100,7 @@ AM_CONDITIONAL(ENABLE_SQL, [test "$enable_sql" = "yes"])
 AM_CONDITIONAL(ENABLE_SUN_STREAMS, [test "$enable_sun_streams" = "yes"])
 AM_CONDITIONAL(ENABLE_PACCT, [test "$enable_pacct" = "yes"])
 AM_CONDITIONAL(ENABLE_MONGODB, [test "$enable_mongodb" = "yes"])
+AM_CONDITIONAL(ENABLE_SMTP, [test "$enable_smtp" = "yes"])
 AM_CONDITIONAL(ENABLE_JSON_FORMAT, [test "$enable_json_format" = "yes"])
 AM_CONDITIONAL(ENABLE_JSON_PARSE, [test "$enable_json_parse" = "yes"])
 AM_CONDITIONAL(WITH_LIBSYSTEMD, [test "$with_libsystemd" = "yes"])
@@ -1098,6 +1134,8 @@ AC_SUBST(LIBDBI_CFLAGS)
 AC_SUBST(LIBMONGO_LIBS)
 AC_SUBST(LIBMONGO_CFLAGS)
 AC_SUBST(LIBMONGO_SUBDIRS)
+AC_SUBST(LIBESMTP_CFLAGS)
+AC_SUBST(LIBESMTP_LIBS)
 AC_SUBST(JSON_LIBS)
 AC_SUBST(JSON_CFLAGS)
 AC_SUBST(IVYKIS_SUBDIRS)
@@ -1125,6 +1163,7 @@ AC_OUTPUT(dist.conf
           modules/afprog/Makefile
           modules/afuser/Makefile
 	  modules/afmongodb/Makefile
+          modules/afsmtp/Makefile
           modules/dbparser/Makefile
           modules/dbparser/tests/Makefile
           modules/csvparser/Makefile
@@ -1183,5 +1222,5 @@ echo "  SQL support (module)        : ${enable_sql:=no}"
 echo "  PACCT module (EXPERIMENTAL) : ${enable_pacct:=no}"
 echo "  MongoDB destination (module): ${enable_mongodb:=no}"
 echo "  JSON support (module)       : parser=${enable_json_parse:=no}, formatter=${enable_json_format:=no} (using ${with_json})"
-
+echo "  SMTP support (module)       : ${enable_smtp:=no}"
 
diff --git a/modules/Makefile.am b/modules/Makefile.am
index 0d0594b..340df40 100644
--- a/modules/Makefile.am
+++ b/modules/Makefile.am
@@ -1 +1 @@
-SUBDIRS = afsocket afsql afstreams affile afprog afuser afmongodb csvparser confgen syslogformat pacctformat basicfuncs dbparser tfjson jsonparser dummy
+SUBDIRS = afsocket afsql afstreams affile afprog afuser afmongodb afsmtp csvparser confgen syslogformat pacctformat basicfuncs dbparser tfjson jsonparser dummy
diff --git a/modules/afsmtp/Makefile.am b/modules/afsmtp/Makefile.am
new file mode 100644
index 0000000..790aad7
--- /dev/null
+++ b/modules/afsmtp/Makefile.am
@@ -0,0 +1,19 @@
+moduledir = @moduledir@
+AM_CPPFLAGS = -I$(top_srcdir)/lib -I../../lib
+module_LTLIBRARIES = libafsmtp.la
+
+export top_srcdir
+
+if ENABLE_SMTP
+
+libafsmtp_la_CFLAGS = $(LIBESMTP_CFLAGS) $(IVYKIS_CFLAGS) $(EVENTLOG_CFLAGS) $(SYSLOG_NG_CFLAGS)
+libafsmtp_la_SOURCES = afsmtp-grammar.y afsmtp.c afsmtp.h afsmtp-parser.c afsmtp-parser.h
+libafsmtp_la_LIBADD = $(LIBESMTP_LIBS) $(IVYKIS_LIBS) $(EVENTLOG_LIBS) $(SYSLOG_NG_LIBS)
+libafsmtp_la_LDFLAGS = -avoid-version
+
+endif
+
+BUILT_SOURCES = afsmtp-grammar.y afsmtp-grammar.c afsmtp-grammar.h
+EXTRA_DIST = $(BUILT_SOURCES) afsmtp-grammar.ym
+
+include $(top_srcdir)/build/lex-rules.am
diff --git a/modules/afsmtp/afsmtp-grammar.ym b/modules/afsmtp/afsmtp-grammar.ym
new file mode 100644
index 0000000..eb6eecf
--- /dev/null
+++ b/modules/afsmtp/afsmtp-grammar.ym
@@ -0,0 +1,101 @@
+/*
+ * Copyright (c) 2011-2012 BalaBit IT Ltd, Budapest, Hungary
+ * Copyright (c) 2011-2012 Gergely Nagy <algernon at balabit.hu>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+ *
+ * As an additional exemption you are allowed to compile & link against the
+ * OpenSSL libraries as published by the OpenSSL project. See the file
+ * COPYING for details.
+ *
+ */
+
+%code requires {
+
+#include "afsmtp-parser.h"
+
+}
+
+%code {
+
+#include "cfg-parser.h"
+#include "afsmtp-grammar.h"
+#include "plugin.h"
+
+extern LogDriver *last_driver;
+
+}
+
+%name-prefix "afsmtp_"
+%lex-param {CfgLexer *lexer}
+%parse-param {CfgLexer *lexer}
+%parse-param {LogDriver **instance}
+%parse-param {gpointer arg}
+
+/* INCLUDE_DECLS */
+
+%token KW_SMTP
+%token KW_SUBJECT
+%token KW_FROM
+%token KW_TO
+%token KW_BODY
+%token KW_HEADER
+%token KW_CC
+%token KW_BCC
+%token KW_SENDER
+%token KW_REPLY_TO
+
+%%
+
+start
+        : LL_CONTEXT_DESTINATION KW_SMTP
+          {
+            last_driver = *instance = afsmtp_dd_new();
+          }
+          '(' afsmtp_options ')'         { YYACCEPT; }
+	;
+
+afsmtp_options
+        : afsmtp_option afsmtp_options
+	|
+	;
+
+afsmtp_option
+        : KW_HOST '(' string ')'		{ afsmtp_dd_set_host(last_driver, $3); free($3); }
+        | KW_PORT '(' LL_NUMBER ')'		{ afsmtp_dd_set_port(last_driver, $3); }
+	| KW_SUBJECT '(' string ')'	 	{ afsmtp_dd_set_subject(last_driver, $3); free($3); }
+	| KW_BODY '(' string ')'		{ afsmtp_dd_set_body(last_driver, $3); free($3); }
+	| KW_HEADER '(' string string ')'	{
+		afsmtp_dd_add_header(last_driver, $3, $4);
+		free($3); free($4);
+	}
+
+	| KW_FROM '(' string ')'		{ afsmtp_dd_set_from(last_driver, $3, $3); free($3); }
+	| KW_FROM '(' string string ')'		{ afsmtp_dd_set_from(last_driver, $3, $4); free($3); free($4); }
+	| KW_SENDER '(' string ')'		{ afsmtp_dd_set_from(last_driver, $3, $3); free($3); }
+	| KW_SENDER '(' string string ')'	{ afsmtp_dd_set_from(last_driver, $3, $4); free($3); free($4); }
+	| KW_TO '(' string ')'			{ afsmtp_dd_add_rcpt(last_driver, AFSMTP_RCPT_TYPE_TO, $3, $3); free($3); }
+	| KW_TO '(' string string ')'		{ afsmtp_dd_add_rcpt(last_driver, AFSMTP_RCPT_TYPE_TO, $3, $4); free($3); free($4); }
+	| KW_CC '(' string ')'			{ afsmtp_dd_add_rcpt(last_driver, AFSMTP_RCPT_TYPE_CC, $3, $3); free($3); }
+	| KW_CC '(' string string ')'		{ afsmtp_dd_add_rcpt(last_driver, AFSMTP_RCPT_TYPE_CC, $3, $4); free($3); free($4); }
+	| KW_BCC '(' string ')'			{ afsmtp_dd_add_rcpt(last_driver, AFSMTP_RCPT_TYPE_BCC, $3, $3); free($3); }
+	| KW_BCC '(' string string ')'		{ afsmtp_dd_add_rcpt(last_driver, AFSMTP_RCPT_TYPE_BCC, $3, $4); free($3); free($4); }
+	| KW_REPLY_TO '(' string ')'		{ afsmtp_dd_add_rcpt(last_driver, AFSMTP_RCPT_TYPE_REPLY_TO, $3, $3); free($3); }
+	| KW_REPLY_TO '(' string string ')'	{ afsmtp_dd_add_rcpt(last_driver, AFSMTP_RCPT_TYPE_REPLY_TO, $3, $4); free($3); free($4); }
+        | dest_driver_option
+        ;
+
+/* INCLUDE_RULES */
+
+%%
diff --git a/modules/afsmtp/afsmtp-parser.c b/modules/afsmtp/afsmtp-parser.c
new file mode 100644
index 0000000..f190e2f
--- /dev/null
+++ b/modules/afsmtp/afsmtp-parser.c
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2011-2012 BalaBit IT Ltd, Budapest, Hungary
+ * Copyright (c) 2011-2012 Gergely Nagy <algernon at balabit.hu>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+ *
+ * As an additional exemption you are allowed to compile & link against the
+ * OpenSSL libraries as published by the OpenSSL project. See the file
+ * COPYING for details.
+ *
+ */
+
+#include "afsmtp.h"
+#include "cfg-parser.h"
+#include "afsmtp-grammar.h"
+
+extern int afsmtp_debug;
+int afsmtp_parse(CfgLexer *lexer, LogDriver **instance);
+
+static CfgLexerKeyword afsmtp_keywords[] = {
+  { "smtp",			KW_SMTP },
+  { "host",			KW_HOST },
+  { "port",			KW_PORT },
+  { "subject",			KW_SUBJECT },
+  { "from",			KW_FROM },
+  { "to",			KW_TO },
+  { "cc",			KW_CC },
+  { "bcc",			KW_BCC },
+  { "reply_to",			KW_REPLY_TO },
+  { "sender",			KW_SENDER },
+  { "body",			KW_BODY },
+  { "header",			KW_HEADER },
+  { NULL }
+};
+
+CfgParser afsmtp_parser =
+{
+#if ENABLE_DEBUG
+  .debug_flag = &afsmtp_debug,
+#endif
+  .name = "afsmtp",
+  .keywords = afsmtp_keywords,
+  .parse = (int (*)(CfgLexer *lexer, gpointer *instance, gpointer)) afsmtp_parse,
+  .cleanup = (void (*)(gpointer)) log_pipe_unref,
+};
+
+CFG_PARSER_IMPLEMENT_LEXER_BINDING(afsmtp_, LogDriver **)
diff --git a/modules/afsmtp/afsmtp-parser.h b/modules/afsmtp/afsmtp-parser.h
new file mode 100644
index 0000000..c178db0
--- /dev/null
+++ b/modules/afsmtp/afsmtp-parser.h
@@ -0,0 +1,35 @@
+/*
+ * Copyright (c) 2011-2012 BalaBit IT Ltd, Budapest, Hungary
+ * Copyright (c) 2011-2012 Gergely Nagy <algernon at balabit.hu>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+ *
+ * As an additional exemption you are allowed to compile & link against the
+ * OpenSSL libraries as published by the OpenSSL project. See the file
+ * COPYING for details.
+ *
+ */
+
+#ifndef AFSMTP_PARSER_H_INCLUDED
+#define AFSMTP_PARSER_H_INCLUDED
+
+#include "cfg-parser.h"
+#include "cfg-lexer.h"
+#include "afsmtp.h"
+
+extern CfgParser afsmtp_parser;
+
+CFG_PARSER_DECLARE_LEXER_BINDING(afsmtp_, LogDriver **)
+
+#endif
diff --git a/modules/afsmtp/afsmtp.c b/modules/afsmtp/afsmtp.c
new file mode 100644
index 0000000..bd8adfe
--- /dev/null
+++ b/modules/afsmtp/afsmtp.c
@@ -0,0 +1,728 @@
+/*
+ * Copyright (c) 2011-2012 BalaBit IT Ltd, Budapest, Hungary
+ * Copyright (c) 2011-2012 Gergely Nagy <algernon at balabit.hu>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+ *
+ * As an additional exemption you are allowed to compile & link against the
+ * OpenSSL libraries as published by the OpenSSL project. See the file
+ * COPYING for details.
+ *
+ */
+
+#include <signal.h>
+
+#include "afsmtp.h"
+#include "afsmtp-parser.h"
+#include "plugin.h"
+#include "messages.h"
+#include "misc.h"
+#include "stats.h"
+#include "logqueue.h"
+
+#include <libesmtp.h>
+
+#ifndef SCS_SMTP
+#define SCS_SMTP 0
+#endif
+
+typedef struct
+{
+  gchar *name;
+  gchar *template;
+  LogTemplate *value;
+} AFSMTPHeader;
+
+typedef struct
+{
+  gchar *phrase;
+  gchar *address;
+  afsmtp_rcpt_type_t type;
+} AFSMTPRecipient;
+
+typedef struct
+{
+  LogDestDriver super;
+
+  /* Shared between main/writer; only read by the writer, never
+     written */
+  gchar *host;
+  gint port;
+
+  gchar *subject;
+  AFSMTPRecipient *mail_from;
+  GList *rcpt_tos;
+  GList *headers;
+  gchar *body;
+
+  time_t time_reopen;
+
+  StatsCounterItem *dropped_messages;
+  StatsCounterItem *stored_messages;
+
+  LogTemplate *subject_tmpl;
+  LogTemplate *body_tmpl;
+
+  /* Thread related stuff; shared */
+  GThread *writer_thread;
+  GMutex *queue_mutex;
+  GMutex *suspend_mutex;
+  GCond *writer_thread_wakeup_cond;
+
+  gboolean writer_thread_terminate;
+  gboolean writer_thread_suspended;
+  GTimeVal writer_thread_suspend_target;
+
+  LogQueue *queue;
+
+  /* Writer-only stuff */
+  gint32 seq_num;
+  GString *str;
+} AFSMTPDriver;
+
+static gchar *
+afsmtp_wash_string (gchar *str)
+{
+  gint i;
+
+  for (i = 0; i < strlen (str); i++)
+    if (str[i] == '\n' ||
+        str[i] == '\r')
+      str[i] = ' ';
+
+  return str;
+}
+
+/*
+ * Configuration
+ */
+
+void
+afsmtp_dd_set_host(LogDriver *d, const gchar *host)
+{
+  AFSMTPDriver *self = (AFSMTPDriver *)d;
+
+  g_free(self->host);
+  self->host = g_strdup (host);
+}
+
+void
+afsmtp_dd_set_port(LogDriver *d, gint port)
+{
+  AFSMTPDriver *self = (AFSMTPDriver *)d;
+
+  self->port = (int)port;
+}
+
+void
+afsmtp_dd_set_subject(LogDriver *d, const gchar *subject)
+{
+  AFSMTPDriver *self = (AFSMTPDriver *)d;
+
+  g_free(self->subject);
+  self->subject = g_strdup(subject);
+}
+
+void
+afsmtp_dd_set_from(LogDriver *d, const gchar *phrase, const gchar *mbox)
+{
+  AFSMTPDriver *self = (AFSMTPDriver *)d;
+
+  g_free(self->mail_from->phrase);
+  g_free(self->mail_from->address);
+  self->mail_from->phrase = afsmtp_wash_string(g_strdup(phrase));
+  self->mail_from->address = afsmtp_wash_string(g_strdup(mbox));
+}
+
+void
+afsmtp_dd_add_rcpt(LogDriver *d, afsmtp_rcpt_type_t type, const gchar *phrase,
+                   const gchar *mbox)
+{
+  AFSMTPDriver *self = (AFSMTPDriver *)d;
+  AFSMTPRecipient *rcpt;
+
+  rcpt = g_new0(AFSMTPRecipient, 1);
+  rcpt->phrase = afsmtp_wash_string(g_strdup(phrase));
+  rcpt->address = afsmtp_wash_string(g_strdup(mbox));
+  rcpt->type = type;
+
+  self->rcpt_tos = g_list_append(self->rcpt_tos, rcpt);
+}
+
+void
+afsmtp_dd_set_body(LogDriver *d, const gchar *body)
+{
+  AFSMTPDriver *self = (AFSMTPDriver *)d;
+
+  g_free(self->body);
+  self->body = g_strdup(body);
+}
+
+gboolean
+afsmtp_dd_add_header(LogDriver *d, const gchar *header, const gchar *value)
+{
+  AFSMTPDriver *self = (AFSMTPDriver *)d;
+  AFSMTPHeader *h;
+
+  if (!g_ascii_strcasecmp(header, "to") ||
+      !g_ascii_strcasecmp(header, "cc") ||
+      !g_ascii_strcasecmp(header, "bcc") ||
+      !g_ascii_strcasecmp(header, "from") ||
+      !g_ascii_strcasecmp(header, "sender") ||
+      !g_ascii_strcasecmp(header, "reply-to") ||
+      !g_ascii_strcasecmp(header, "date"))
+    return FALSE;
+
+  h = g_new0(AFSMTPHeader, 1);
+  h->name = g_strdup(header);
+  h->template = g_strdup(value);
+
+  self->headers = g_list_append(self->headers, h);
+
+  return TRUE;
+}
+
+/*
+ * Utilities
+ */
+void
+ignore_sigpipe (void)
+{
+  struct sigaction sa;
+
+  sa.sa_handler = SIG_IGN;
+  sigemptyset(&sa.sa_mask);
+  sa.sa_flags = 0;
+  sigaction(SIGPIPE, &sa, NULL);
+}
+
+static gchar *
+afsmtp_dd_format_stats_instance(AFSMTPDriver *self)
+{
+  static gchar persist_name[1024];
+
+  g_snprintf(persist_name, sizeof(persist_name),
+             "smtp,%s,%u", self->host, self->port);
+  return persist_name;
+}
+
+static void
+afsmtp_dd_suspend(AFSMTPDriver *self)
+{
+  self->writer_thread_suspended = TRUE;
+  g_get_current_time(&self->writer_thread_suspend_target);
+  g_time_val_add(&self->writer_thread_suspend_target,
+                 self->time_reopen * 1000000);
+}
+
+/*
+ * Worker thread
+ */
+static void
+afsmtp_dd_msg_add_recipient(AFSMTPRecipient *rcpt, smtp_message_t message)
+{
+  gchar *hdr;
+
+  smtp_add_recipient(message, rcpt->address);
+
+  switch (rcpt->type)
+    {
+    case AFSMTP_RCPT_TYPE_TO:
+      hdr = "To";
+      break;
+    case AFSMTP_RCPT_TYPE_CC:
+      hdr = "Cc";
+      break;
+    case AFSMTP_RCPT_TYPE_REPLY_TO:
+      hdr = "Reply-To";
+      break;
+    default:
+      return;
+    }
+  smtp_set_header(message, hdr, rcpt->phrase, rcpt->address);
+  smtp_set_header_option(message, hdr, Hdr_OVERRIDE, 1);
+}
+
+static void
+afsmtp_dd_msg_add_header(AFSMTPHeader *hdr, gpointer user_data)
+{
+  AFSMTPDriver *self = ((gpointer *)user_data)[0];
+  LogMessage *msg = ((gpointer *)user_data)[1];
+  smtp_message_t message = ((gpointer *)user_data)[2];
+
+  log_template_format(hdr->value, msg, NULL, LTZ_SEND, self->seq_num, NULL, self->str);
+
+  smtp_set_header(message, hdr->name, afsmtp_wash_string (self->str->str), NULL);
+  smtp_set_header_option(message, hdr->name, Hdr_OVERRIDE, 1);
+}
+
+static void
+afsmtp_dd_log_rcpt_status(smtp_recipient_t rcpt, const char *mailbox,
+                          gpointer user_data)
+{
+  const smtp_status_t *status;
+
+  status = smtp_recipient_status(rcpt);
+  msg_debug("SMTP recipient result",
+            evt_tag_str("recipient", mailbox),
+            evt_tag_int("code", status->code),
+            evt_tag_str("text", status->text),
+            NULL);
+}
+
+static void
+afsmtp_dd_cb_event(smtp_session_t session, int event, AFSMTPDriver *self)
+{
+  switch (event)
+    {
+    case SMTP_EV_CONNECT:
+      msg_verbose("Connected to SMTP server",
+                  evt_tag_str("host", self->host),
+                  evt_tag_int("port", self->port),
+                  NULL);
+      break;
+    case SMTP_EV_MAILSTATUS:
+    case SMTP_EV_RCPTSTATUS:
+    case SMTP_EV_MESSAGEDATA:
+    case SMTP_EV_MESSAGESENT:
+      /* Ignore */
+      break;
+    case SMTP_EV_DISCONNECT:
+      msg_verbose("Disconnected from SMTP server",
+                  evt_tag_str("host", self->host),
+                  evt_tag_int("port", self->port),
+                  NULL);
+      break;
+    default:
+      msg_verbose("Unknown SMTP event",
+                  evt_tag_int("event_id", event),
+                  NULL);
+      break;
+    }
+}
+
+static void
+afsmtp_dd_cb_monitor(const gchar *buf, gint buflen, gint writing,
+                     AFSMTPDriver *self)
+{
+  gchar fmt[32];
+
+  g_snprintf(fmt, sizeof(fmt), "%%.%us", buflen);
+
+  switch (writing)
+    {
+    case SMTP_CB_READING:
+      msg_debug ("SMTP Session: SERVER",
+                 evt_tag_printf("message", fmt, buf),
+                 NULL);
+      break;
+    case SMTP_CB_WRITING:
+      msg_debug("SMTP Session: CLIENT",
+                evt_tag_printf("message", fmt, buf),
+                NULL);
+      break;
+    case SMTP_CB_HEADERS:
+      msg_debug("SMTP Session: HEADERS",
+                evt_tag_printf("data", fmt, buf),
+                NULL);
+      break;
+    }
+}
+
+static gboolean
+afsmtp_worker_insert(AFSMTPDriver *self)
+{
+  gboolean success;
+  LogMessage *msg;
+  LogPathOptions path_options = LOG_PATH_OPTIONS_INIT;
+  smtp_session_t session;
+  smtp_message_t message;
+  gpointer args[] = { self, NULL, NULL };
+
+  g_mutex_lock(self->queue_mutex);
+  log_queue_reset_parallel_push(self->queue);
+  success = log_queue_pop_head(self->queue, &msg, &path_options, FALSE, FALSE);
+  g_mutex_unlock(self->queue_mutex);
+  if (!success)
+    return TRUE;
+
+  msg_set_context(msg);
+
+  session = smtp_create_session();
+  message = smtp_add_message(session);
+
+  g_string_printf(self->str, "%s:%d", self->host, self->port);
+  smtp_set_server(session, self->str->str);
+
+  smtp_set_eventcb(session, (smtp_eventcb_t)afsmtp_dd_cb_event, (void *)self);
+  smtp_set_monitorcb(session, (smtp_monitorcb_t)afsmtp_dd_cb_monitor,
+                     (void *)self, 1);
+
+  smtp_set_reverse_path(message, self->mail_from->address);
+
+  /* Defaults */
+  smtp_set_header(message, "To", NULL, NULL);
+  smtp_set_header(message, "From", NULL, NULL);
+
+  log_template_format(self->subject_tmpl, msg, NULL, LTZ_SEND,
+                      self->seq_num, NULL, self->str);
+  smtp_set_header(message, "Subject", afsmtp_wash_string(self->str->str));
+  smtp_set_header_option(message, "Subject", Hdr_OVERRIDE, 1);
+
+  /* Add recipients */
+  g_list_foreach(self->rcpt_tos, (GFunc)afsmtp_dd_msg_add_recipient, message);
+
+  /* Add custom header (overrides anything set before, or in the
+     body). */
+  args[1] = msg;
+  args[2] = message;
+  g_list_foreach(self->headers, (GFunc)afsmtp_dd_msg_add_header, args);
+
+  /* Set the body.
+   *
+   * We add a header to the body, otherwise libesmtp will not
+   * recognise headers, and will append them to the end of the body.
+   */
+  g_string_assign(self->str, "X-Mailer: syslog-ng " VERSION "\r\n\r\n");
+  log_template_append_format(self->body_tmpl, msg, NULL, LTZ_SEND,
+                             self->seq_num, NULL, self->str);
+  smtp_set_message_str(message, self->str->str);
+
+  if (!smtp_start_session(session))
+    {
+      gchar error[1024];
+      smtp_strerror(smtp_errno(), error, sizeof (error) - 1);
+
+      msg_error("SMTP server error, suspending",
+                evt_tag_str("error", error),
+                evt_tag_int("time_reopen", self->time_reopen),
+                NULL);
+      success = FALSE;
+    }
+  else
+    {
+      const smtp_status_t *status = smtp_message_transfer_status(message);
+      msg_debug("SMTP result",
+                evt_tag_int("code", status->code),
+                evt_tag_str("text", status->text),
+                NULL);
+      smtp_enumerate_recipients(message, afsmtp_dd_log_rcpt_status, NULL);
+    }
+  smtp_destroy_session(session);
+
+  msg_set_context(NULL);
+
+  if (success)
+    {
+      stats_counter_inc(self->stored_messages);
+      step_sequence_number(&self->seq_num);
+      log_msg_ack(msg, &path_options);
+      log_msg_unref(msg);
+    }
+  else
+    {
+      g_mutex_lock(self->queue_mutex);
+      log_queue_push_head(self->queue, msg, &path_options);
+      g_mutex_unlock(self->queue_mutex);
+    }
+
+  return success;
+}
+
+static gpointer
+afsmtp_worker_thread(gpointer arg)
+{
+  AFSMTPDriver *self = (AFSMTPDriver *)arg;
+
+  msg_debug("Worker thread started",
+            evt_tag_str("driver", self->super.super.id),
+            NULL);
+
+  self->str = g_string_sized_new(1024);
+
+  ignore_sigpipe();
+
+  while (!self->writer_thread_terminate)
+    {
+      g_mutex_lock(self->suspend_mutex);
+      if (self->writer_thread_suspended)
+        {
+          g_cond_timed_wait(self->writer_thread_wakeup_cond,
+                            self->suspend_mutex,
+                            &self->writer_thread_suspend_target);
+          self->writer_thread_suspended = FALSE;
+          g_mutex_unlock(self->suspend_mutex);
+        }
+      else
+        {
+          g_mutex_unlock(self->suspend_mutex);
+
+          g_mutex_lock(self->queue_mutex);
+          if (log_queue_get_length(self->queue) == 0)
+            {
+              g_cond_wait(self->writer_thread_wakeup_cond, self->queue_mutex);
+            }
+          g_mutex_unlock(self->queue_mutex);
+        }
+
+      if (self->writer_thread_terminate)
+        break;
+
+      if (!afsmtp_worker_insert (self))
+        {
+          afsmtp_dd_suspend(self);
+        }
+    }
+
+  g_string_free(self->str, TRUE);
+
+  msg_debug("Worker thread finished",
+            evt_tag_str("driver", self->super.super.id),
+            NULL);
+
+  return NULL;
+}
+
+/*
+ * Main thread
+ */
+
+static void
+afsmtp_dd_start_thread(AFSMTPDriver *self)
+{
+  self->writer_thread = create_worker_thread(afsmtp_worker_thread, self, TRUE, NULL);
+}
+
+static void
+afsmtp_dd_stop_thread(AFSMTPDriver *self)
+{
+  self->writer_thread_terminate = TRUE;
+  g_mutex_lock(self->queue_mutex);
+  g_cond_signal(self->writer_thread_wakeup_cond);
+  g_mutex_unlock(self->queue_mutex);
+  g_thread_join(self->writer_thread);
+}
+
+static void
+afsmtp_dd_init_header(AFSMTPHeader *hdr, GlobalConfig *cfg)
+{
+  if (!hdr->value)
+    {
+      hdr->value = log_template_new(cfg, hdr->name);
+      log_template_compile(hdr->value, hdr->template, NULL);
+    }
+}
+
+static gboolean
+afsmtp_dd_init(LogPipe *s)
+{
+  AFSMTPDriver *self = (AFSMTPDriver *)s;
+  GlobalConfig *cfg = log_pipe_get_config(s);
+
+  if (cfg)
+    self->time_reopen = cfg->time_reopen;
+
+  msg_verbose("Initializing SMTP destination",
+              evt_tag_str("host", self->host),
+              evt_tag_int("port", self->port),
+              NULL);
+
+  self->queue = log_dest_driver_acquire_queue(&self->super, afsmtp_dd_format_stats_instance(self));
+
+  g_list_foreach(self->headers, (GFunc)afsmtp_dd_init_header, cfg);
+  if (!self->subject_tmpl)
+    {
+      self->subject_tmpl = log_template_new(cfg, "subject");
+      log_template_compile(self->subject_tmpl, self->subject, NULL);
+    }
+  if (!self->body_tmpl)
+    {
+      self->body_tmpl = log_template_new(cfg, "body");
+      log_template_compile(self->body_tmpl, self->body, NULL);
+    }
+
+  stats_lock();
+  stats_register_counter(0, SCS_SMTP | SCS_DESTINATION, self->super.super.id,
+                         afsmtp_dd_format_stats_instance(self),
+                         SC_TYPE_STORED, &self->stored_messages);
+  stats_register_counter(0, SCS_SMTP | SCS_DESTINATION, self->super.super.id,
+                         afsmtp_dd_format_stats_instance(self),
+                         SC_TYPE_DROPPED, &self->dropped_messages);
+  stats_unlock();
+
+  afsmtp_dd_start_thread(self);
+
+  return TRUE;
+}
+
+static gboolean
+afsmtp_dd_deinit(LogPipe *s)
+{
+  AFSMTPDriver *self = (AFSMTPDriver *)s;
+
+  afsmtp_dd_stop_thread(self);
+
+  stats_lock();
+  stats_unregister_counter(SCS_SMTP | SCS_DESTINATION, self->super.super.id,
+                           afsmtp_dd_format_stats_instance(self),
+                           SC_TYPE_STORED, &self->stored_messages);
+  stats_unregister_counter(SCS_SMTP | SCS_DESTINATION, self->super.super.id,
+                           afsmtp_dd_format_stats_instance(self),
+                           SC_TYPE_DROPPED, &self->dropped_messages);
+  stats_unlock();
+
+  return TRUE;
+}
+
+static void
+afsmtp_dd_free(LogPipe *d)
+{
+  AFSMTPDriver *self = (AFSMTPDriver *)d;
+  GList *l;
+
+  g_mutex_free(self->suspend_mutex);
+  g_mutex_free(self->queue_mutex);
+  g_cond_free(self->writer_thread_wakeup_cond);
+
+  if (self->queue)
+    log_queue_unref(self->queue);
+
+  g_free(self->host);
+  g_free(self->mail_from->phrase);
+  g_free(self->mail_from->address);
+  g_free(self->mail_from);
+  log_template_unref(self->subject_tmpl);
+  log_template_unref(self->body_tmpl);
+  g_free(self->body);
+  g_free(self->subject);
+  g_string_free(self->str, TRUE);
+
+  l = self->rcpt_tos;
+  while (l)
+    {
+      AFSMTPRecipient *rcpt = (AFSMTPRecipient *)l->data;
+      g_free(rcpt->address);
+      g_free(rcpt->phrase);
+      g_free(rcpt);
+      l = g_list_delete_link(l, l);
+    }
+
+  l = self->headers;
+  while (l)
+    {
+      AFSMTPHeader *hdr = (AFSMTPHeader *)l->data;
+      g_free(hdr->name);
+      g_free(hdr->template);
+      log_template_unref(hdr->value);
+      g_free(hdr);
+      l = g_list_delete_link(l, l);
+    }
+
+  log_dest_driver_free(d);
+}
+
+static void
+afsmtp_dd_queue_notify(gpointer s)
+{
+  AFSMTPDriver *self = (AFSMTPDriver *)s;
+
+  g_mutex_lock(self->queue_mutex);
+  g_cond_signal(self->writer_thread_wakeup_cond);
+  log_queue_reset_parallel_push(self->queue);
+  g_mutex_unlock(self->queue_mutex);
+}
+
+static void
+afsmtp_dd_queue(LogPipe *s, LogMessage *msg,
+                const LogPathOptions *path_options, gpointer user_data)
+{
+  AFSMTPDriver *self = (AFSMTPDriver *)s;
+  gboolean queue_was_empty;
+  LogPathOptions local_options;
+
+  if (!path_options->flow_control_requested)
+    path_options = log_msg_break_ack(msg, path_options, &local_options);
+
+  g_mutex_lock(self->queue_mutex);
+  queue_was_empty = log_queue_get_length(self->queue) == 0;
+  g_mutex_unlock(self->queue_mutex);
+
+  log_msg_add_ack(msg, path_options);
+  log_queue_push_tail(self->queue, log_msg_ref(msg), path_options);
+
+  g_mutex_lock(self->suspend_mutex);
+  if (queue_was_empty && !self->writer_thread_suspended)
+    {
+      g_mutex_lock(self->queue_mutex);
+      log_queue_set_parallel_push(self->queue, 1, afsmtp_dd_queue_notify, self, NULL);
+      g_mutex_unlock(self->queue_mutex);
+    }
+  g_mutex_unlock(self->suspend_mutex);
+  log_dest_driver_queue_method(s, msg, path_options, user_data);
+}
+
+/*
+ * Plugin glue.
+ */
+
+LogDriver *
+afsmtp_dd_new(void)
+{
+  AFSMTPDriver *self = g_new0(AFSMTPDriver, 1);
+
+  log_dest_driver_init_instance(&self->super);
+  self->super.super.super.init = afsmtp_dd_init;
+  self->super.super.super.deinit = afsmtp_dd_deinit;
+  self->super.super.super.queue = afsmtp_dd_queue;
+  self->super.super.super.free_fn = afsmtp_dd_free;
+
+  afsmtp_dd_set_host((LogDriver *)self, "127.0.0.1");
+  afsmtp_dd_set_port((LogDriver *)self, 25);
+
+  self->mail_from = g_new0(AFSMTPRecipient, 1);
+
+  init_sequence_number(&self->seq_num);
+
+  self->writer_thread_wakeup_cond = g_cond_new();
+  self->suspend_mutex = g_mutex_new();
+  self->queue_mutex = g_mutex_new();
+
+  return (LogDriver *)self;
+}
+
+extern CfgParser afsmtp_dd_parser;
+
+static Plugin afsmtp_plugin =
+{
+  .type = LL_CONTEXT_DESTINATION,
+  .name = "smtp",
+  .parser = &afsmtp_parser,
+};
+
+gboolean
+afsmtp_module_init(GlobalConfig *cfg, CfgArgs *args)
+{
+  plugin_register(cfg, &afsmtp_plugin, 1);
+  return TRUE;
+}
+
+const ModuleInfo module_info =
+{
+  .canonical_name = "afsmtp",
+  .version = VERSION,
+  .description = "The afsmtp module provides SMTP destination support for syslog-ng.",
+  .core_revision = SOURCE_REVISION,
+  .plugins = &afsmtp_plugin,
+  .plugins_len = 1,
+};
diff --git a/modules/afsmtp/afsmtp.h b/modules/afsmtp/afsmtp.h
new file mode 100644
index 0000000..75c70c5
--- /dev/null
+++ b/modules/afsmtp/afsmtp.h
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2011-2012 BalaBit IT Ltd, Budapest, Hungary
+ * Copyright (c) 2011-2012 Gergely Nagy <algernon at balabit.hu>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+ *
+ * As an additional exemption you are allowed to compile & link against the
+ * OpenSSL libraries as published by the OpenSSL project. See the file
+ * COPYING for details.
+ *
+ */
+
+#ifndef AFSMTP_H_INCLUDED
+#define AFSMTP_H_INCLUDED
+
+#include "driver.h"
+
+typedef enum
+  {
+    AFSMTP_RCPT_TYPE_NONE,
+    AFSMTP_RCPT_TYPE_TO,
+    AFSMTP_RCPT_TYPE_CC,
+    AFSMTP_RCPT_TYPE_BCC,
+    AFSMTP_RCPT_TYPE_REPLY_TO,
+    AFSMTP_RCPT_TYPE_SENDER,
+  } afsmtp_rcpt_type_t;
+
+LogDriver *afsmtp_dd_new(void);
+
+void afsmtp_dd_set_host(LogDriver *d, const gchar *host);
+void afsmtp_dd_set_port(LogDriver *d, gint port);
+
+void afsmtp_dd_set_subject(LogDriver *d, const gchar *subject);
+void afsmtp_dd_set_from(LogDriver *d, const gchar *phrase, const gchar *mbox);
+void afsmtp_dd_add_rcpt(LogDriver *d, afsmtp_rcpt_type_t type,
+                        const gchar *phrase, const gchar *mbox);
+void afsmtp_dd_set_body(LogDriver *d, const gchar *body);
+gboolean afsmtp_dd_add_header(LogDriver *d, const gchar *header,
+                              const gchar *value);
+
+#endif
-- 
1.7.8.3




More information about the syslog-ng mailing list