[syslog-ng] Unix-stream destination as listen socket
Peter Nahas
pnahas at mrv.com
Wed Oct 19 20:33:11 CEST 2005
Per Balazs' suggestion, I have implemented an AFSocketCallHomeDestDriver
"call home" destination driver and a unix_call_home_stream destination
to implement the feature and have attached a patch against
syslog-ng-1.9.6 along with this email. My implementation currently only
supports one incoming connection and currently only adds a unix stream
version, however it would be easy to enhance the implementation to
support TCP and/or support more connections. If the patch is not
accepted into the repository, I would like to know what requirements
must be met to do so.
Thanks,
Peter Nahas
Software Engineer
MRV Communications - InReach Division
Balazs Scheidler wrote:
>On Tue, 2005-10-18 at 10:46 -0400, Peter Nahas wrote:
>
>
>>Hello everyone,
>>
>>I am currently working on a project using syslog-ng-1.9.5 (soon to
>>upgrade to 1.9.6) which requires a syslog-ng destination to be a Unix
>>socket. The problem is that the syslog-ng daemon will always be started
>>before the destination application. So to prevent the loss of messages
>>due to the reconnect timer, the syslog-ng destination Unix socket will
>>have to listen()/accept() for a connection rather than connect() to an
>>external socket. If this enhancement would be used by others, I would
>>like to write this properly. Therefore I have the following questions:
>>
>>
>
>I'd say there different ways to implement the same:
>* persistent buffering: in addition to using a RAM based buffer, use a
>circular buffer stored on disk; this way your application can remain a
>UNIX domain server, and syslog-ng would retry connections until it comes
>up
>* use program() destination, which sends the messages to you via your
>standard input
>* use some kind of external relaying application, that integrates
>syslog-ng via either program/unix-stream and would relay messages to
>your application once it started up
>
>
>
>>A) If this enhancement were written, would it be accepted into the
>>repository? If not, I will simply hack up the current
>>AFUnixDestinationDriver to do what I want.
>>
>>
>
>Without seeing the actual implementation it's hard to say, however it
>would be nice to keep the syslog-ng model
>
>
>
>>B) Is there any specification document which would assist me in making
>>this enhancement?
>>
>>
>
>Apart from the source? Well, no there's none. The actual design of
>syslog-ng is quite simple however: everything is a LogPipe, and LogPipes
>attached together perform the task at hand.
>
>source driver
> -> source group
> -> log center
> -> destination group
> -> dest driver
>
>Some drivers use some additional LogPipes for things like TCP connections.
>
>A LogPipe is a half-duplex pipe processing log messages, new messages are
>injected to the next pipe element using log_pipe_queue(). The basic interface
>implemented by all elements is here:
>
>typedef struct _LogPipe
>{
> guint ref_cnt;
> struct _LogPipe *pipe_next;
> gboolean (*init)(struct _LogPipe *self, GlobalConfig *cfg, PersistentConfig *persist);
> gboolean (*deinit)(struct _LogPipe *self, GlobalConfig *cfg, PersistentConfig *persist);
> void (*queue)(struct _LogPipe *self, LogMessage *msg, gint path_flags);
> void (*free_fn)(struct _LogPipe *self);
> void (*notify)(struct _LogPipe *self, struct _LogPipe *sender, gint notify_code, gpointer user_data);
>} LogPipe;
>
>
>
>
>
>>C) Would this best be done by adding an additional option to unix-stream
>>for destinations or by creating a separate destination driver?
>>D) It appears as if the AFSocketDestinationDriver is not designed to
>>perform listen()/accept(), but the AFSocketSourceDriver is. Would it be
>>possible to set up the AFUnixDestinationDriver to use the
>>AFSocketSourceDriver as a parent, but attach it to a log writer?
>>
>>
>
>I'd recommend rethinking the basic idea again. There should be a lot of
>different possibilities to integrate your application that are already
>implemented.
>
>That said, the functionality you want to implement is really orthogonal
>to both unix-stream sources and destinations. This is somewhat similar
>to the 'call-home' way of retrieving log information in which case the
>receiver connects 'out', then receives messages.
>
>In the implementation it would be something like AFSocketCHSourceDriver
>and AFSocketCHDestinationDriver (CH is for call-home), which'd probably
>belong to afsocket.c. Then both the INET and the UNIX socket based
>source/destination drivers would benefit.
>
>
>
-------------- next part --------------
diff -urP 1.9.6/afsocket.c mine/afsocket.c
--- 1.9.6/afsocket.c 2005-10-19 14:10:46.000000000 -0400
+++ mine/afsocket.c 2005-10-19 14:17:19.000000000 -0400
@@ -163,7 +163,7 @@
sock = socket(bind_addr->sa.sa_family, SOCK_STREAM, 0);
else
sock = socket(bind_addr->sa.sa_family, SOCK_DGRAM, 0);
-
+
g_fd_set_nonblock(sock, TRUE);
if (sock != -1)
{
@@ -645,6 +645,7 @@
return FALSE;
}
+
rc = g_connect(sock, self->dest_addr);
if (rc == G_IO_STATUS_NORMAL)
{
@@ -655,7 +656,6 @@
GSource *source;
/* we must wait until connect succeeds */
-
self->fd = sock;
source = g_connect_source_new(sock);
@@ -764,3 +764,150 @@
self->super.super.notify = afsocket_dd_notify;
self->flags = flags;
}
+
+/*******************************************************************************
+ *
+ * PCN - added "call home driver"
+ * The purpose of the CallHomeDestinationDriver is to provide a system by which
+ * a destination may connect to syslog-ng to received data rather than the
+ * other way around.
+ *
+ ******************************************************************************/
+gboolean
+afsocket_chdd_process_connection(AFSocketCallHomeDestDriver *self, GSockAddr *peer_addr, gint fd)
+{
+ if(self->connectedFd > 0)
+ {
+ return FALSE;
+ }
+
+ self->connectedFd = fd;
+ log_writer_reopen(self->writer, fd_write_new(self->connectedFd));
+ if (!log_pipe_init(self->writer, NULL, NULL))
+ return FALSE;
+ return TRUE;
+}
+
+static gboolean
+afsocket_chdd_accept(gpointer s)
+{
+ AFSocketCallHomeDestDriver *self = (AFSocketCallHomeDestDriver *) s;
+ GSockAddr *peer_addr;
+ gchar buf1[256], buf2[256];
+ gint new_fd;
+ gboolean res;
+
+ if (g_accept(self->listenFd, &new_fd, &peer_addr) != G_IO_STATUS_NORMAL)
+ {
+ msg_error("Error call home accepting new connection",
+ evt_tag_errno(EVT_TAG_OSERROR, errno),
+ NULL);
+ return TRUE;
+ }
+ g_fd_set_nonblock(new_fd, TRUE);
+
+ msg_verbose("Call home connection accepted",
+ evt_tag_str("from", g_sockaddr_format(peer_addr, buf1, sizeof(buf1))),
+ evt_tag_str("to", g_sockaddr_format(self->bind_addr, buf2, sizeof(buf2))),
+ NULL);
+
+ res = afsocket_chdd_process_connection(self, peer_addr, new_fd);
+ g_sockaddr_unref(peer_addr);
+ return res;
+
+}
+gboolean
+afsocket_chdd_init(LogPipe *s, GlobalConfig *cfg, PersistentConfig *persist)
+{
+ AFSocketCallHomeDestDriver *self = (AFSocketCallHomeDestDriver *) s;
+ int sock;
+ GSource* source;
+
+ log_writer_options_init(&self->writer_options, cfg, !!(self->flags & AFSOCKET_PROTO_RFC3164));
+ /* NOTE: we open our writer with no fd, so we can send messages down there
+ * even while the connection is not established */
+
+ self->writer = log_writer_new(LW_FORMAT_PROTO | LW_DETECT_EOF, &self->super.super, &self->writer_options);
+ log_pipe_append(&self->super.super, self->writer);
+
+ /* Create and listen on the socket */
+ if (!afsocket_open_socket(self->bind_addr, !!(self->flags & AFSOCKET_STREAM), &sock))
+ {
+ return FALSE;
+ }
+ if(listen(sock, self->listen_backlog) < 0)
+ {
+ msg_error("Error during call home listen()",
+ evt_tag_errno(EVT_TAG_OSERROR, errno),
+ NULL);
+ close(sock);
+ return FALSE;
+ }
+ self->listenFd = sock;
+ source = g_listen_source_new(self->listenFd);
+ /* the listen_source references us, which is freed when the source is deleted */
+ log_pipe_ref(s);
+ g_source_set_callback(source, afsocket_chdd_accept, self, (GDestroyNotify) log_pipe_unref);
+ self->source_id = g_source_attach(source, NULL);
+ g_source_unref(source);
+
+ return TRUE;
+}
+
+gboolean
+afsocket_chdd_deinit(LogPipe *s, GlobalConfig *cfg, PersistentConfig *persist)
+{
+ AFSocketCallHomeDestDriver *self = (AFSocketCallHomeDestDriver*) s;
+
+ if (self->source_id && g_source_remove(self->source_id))
+ {
+ msg_verbose("Closing listen fd",
+ evt_tag_int("fd", self->listenFd),
+ NULL);
+ close(self->listenFd);
+ }
+ if (self->connectedFd >= 0)
+ {
+ close(self->connectedFd);
+ }
+ return TRUE;
+}
+
+void
+afsocket_chdd_free(LogPipe *s)
+{
+ AFSocketCallHomeDestDriver *self = (AFSocketCallHomeDestDriver *) s;
+
+ g_sockaddr_unref(self->bind_addr);
+ log_drv_free_instance(&self->super);
+ g_free(s);
+}
+
+static void
+afsocket_chdd_notify(LogPipe *s, LogPipe *sender, gint notify_code, gpointer user_data)
+{
+ AFSocketCallHomeDestDriver *self = (AFSocketCallHomeDestDriver *) s;
+ switch (notify_code)
+ {
+ case NC_CLOSE:
+ case NC_WRITE_ERROR:
+ self->connectedFd = -1;
+ break;
+ }
+}
+
+void
+afsocket_chdd_init_instance(AFSocketCallHomeDestDriver *self, guint32 flags)
+{
+ log_drv_init_instance(&self->super);
+
+ log_writer_options_defaults(&self->writer_options);
+ self->super.super.init = afsocket_chdd_init;
+ self->super.super.deinit = afsocket_chdd_deinit;
+ self->super.super.queue = log_pipe_forward_msg;
+ self->super.super.free_fn = afsocket_chdd_free;
+ self->super.super.notify = afsocket_chdd_notify;
+ self->listen_backlog = 255;
+ self->flags = flags;
+}
+
diff -urP 1.9.6/afsocket.h mine/afsocket.h
--- 1.9.6/afsocket.h 2005-10-19 14:10:46.000000000 -0400
+++ mine/afsocket.h 2005-10-19 14:08:19.000000000 -0400
@@ -82,4 +82,20 @@
void afsocket_dd_init_instance(AFSocketDestDriver *self, guint32 flags);
+typedef struct _AFSocketCallHomeDestDriver
+{
+ LogDriver super;
+ guint32 flags;
+ gint listenFd;
+ gint connectedFd;
+ guint source_id;
+ LogPipe *writer;
+ LogWriterOptions writer_options;
+
+ GSockAddr *bind_addr;
+ gint listen_backlog;
+} AFSocketCallHomeDestDriver;
+
+void afsocket_chdd_init_instance(AFSocketCallHomeDestDriver *self, guint32 flags);
+
#endif
diff -urP 1.9.6/afunix.c mine/afunix.c
--- 1.9.6/afunix.c 2005-10-19 14:10:46.000000000 -0400
+++ mine/afunix.c 2005-10-19 14:08:19.000000000 -0400
@@ -113,3 +113,14 @@
self->super.dest_addr = g_sockaddr_unix_new(filename);
return &self->super.super;
}
+
+LogDriver *
+afunix_chdd_new(gchar *filename, guint flags)
+{
+ AFUnixCallHomeDestDriver *self = g_new0(AFUnixCallHomeDestDriver, 1);
+
+ afsocket_chdd_init_instance(&self->super, flags);
+ self->super.bind_addr = g_sockaddr_unix_new(filename);
+ return &self->super.super;
+}
+
diff -urP 1.9.6/afunix.h mine/afunix.h
--- 1.9.6/afunix.h 2005-10-19 14:10:46.000000000 -0400
+++ mine/afunix.h 2005-10-19 14:08:19.000000000 -0400
@@ -49,5 +49,12 @@
LogDriver *afunix_dd_new(gchar *filename, guint flags);
+typedef struct _AFUnixCallHomeDestDriver
+{
+ AFSocketCallHomeDestDriver super;
+} AFUnixCallHomeDestDriver;
+
+LogDriver *afunix_chdd_new(gchar *filename, guint flags);
+
#endif
diff -urP 1.9.6/cfg-grammar.y mine/cfg-grammar.y
--- 1.9.6/cfg-grammar.y 2005-10-19 14:10:46.000000000 -0400
+++ mine/cfg-grammar.y 2005-10-19 14:13:33.000000000 -0400
@@ -48,7 +48,7 @@
%token KW_SOURCE KW_DESTINATION KW_LOG KW_OPTIONS KW_FILTER
/* source & destination items */
-%token KW_INTERNAL KW_FILE KW_PIPE KW_UNIX_STREAM KW_UNIX_DGRAM KW_UDP
+%token KW_INTERNAL KW_FILE KW_PIPE KW_UNIX_STREAM KW_UNIX_CHSTREAM KW_UNIX_DGRAM KW_UDP
%token KW_TCP KW_USER
%token KW_DOOR KW_SUN_STREAMS KW_PROGRAM
@@ -142,6 +142,7 @@
%type <ptr> dest_afsocket
%type <ptr> dest_afunix_dgram_params
%type <ptr> dest_afunix_stream_params
+%type <ptr> dest_afunix_ch_stream_params /* PCN added for call home driver */
%type <ptr> dest_afinet_udp_params
%type <ptr> dest_afinet_tcp_params
%type <ptr> dest_afuser
@@ -501,6 +502,7 @@
dest_afsocket
: KW_UNIX_DGRAM '(' dest_afunix_dgram_params ')' { $$ = $3; }
| KW_UNIX_STREAM '(' dest_afunix_stream_params ')' { $$ = $3; }
+ | KW_UNIX_CHSTREAM '(' dest_afunix_ch_stream_params ')' { $$ = $3; }
| KW_UDP '(' dest_afinet_udp_params ')' { $$ = $3; }
| KW_TCP '(' dest_afinet_tcp_params ')' { $$ = $3; }
;
@@ -525,6 +527,16 @@
dest_writer_options { $$ = last_driver; }
;
+// PCN added for call home driver
+dest_afunix_ch_stream_params
+ : string
+ {
+ last_driver = afunix_chdd_new($1, AFSOCKET_STREAM);
+ free($1);
+ last_writer_options = &((AFSocketCallHomeDestDriver *) last_driver)->writer_options;
+ }
+ dest_writer_options { $$ = last_driver; }
+ ;
dest_afinet_udp_params
: string
diff -urP 1.9.6/cfg-lex.l mine/cfg-lex.l
--- 1.9.6/cfg-lex.l 2005-10-19 14:10:46.000000000 -0400
+++ mine/cfg-lex.l 2005-10-19 14:15:27.000000000 -0400
@@ -19,7 +19,7 @@
*
* Inspired by nsyslog, originally written by Darren Reed.
*
- * $Id: cfg-lex.l,v 1.8 2003/01/22 11:11:18 bazsi Exp $
+ * $Id$
*
***************************************************************************/
%{
@@ -52,6 +52,7 @@
{ "internal", KW_INTERNAL },
{ "unix_dgram", KW_UNIX_DGRAM },
{ "unix_stream", KW_UNIX_STREAM },
+ { "unix_call_home_stream", KW_UNIX_CHSTREAM },
{ "udp", KW_UDP },
{ "tcp", KW_TCP },
{ "usertty", KW_USER },
More information about the syslog-ng
mailing list