[syslog-ng] Unix-stream destination as listen socket

Peter Nahas pnahas at mrv.com
Wed Oct 19 20:33:11 CEST 2005


Per Balazs' suggestion, I have implemented an AFSocketCallHomeDestDriver 
"call home" destination driver and a unix_call_home_stream destination 
to implement the feature and have attached a patch against 
syslog-ng-1.9.6 along with this email.  My implementation currently only 
supports one incoming connection and currently only adds a unix stream 
version, however it would be easy to enhance the implementation to 
support TCP and/or support more connections.  If the patch is not 
accepted into the repository, I would like to know what requirements 
must be met to do so. 

Thanks,

Peter Nahas
Software Engineer
MRV Communications - InReach Division

Balazs Scheidler wrote:

>On Tue, 2005-10-18 at 10:46 -0400, Peter Nahas wrote:
>  
>
>>Hello everyone,
>>
>>I am currently working on a project using syslog-ng-1.9.5 (soon to 
>>upgrade to 1.9.6) which requires a syslog-ng destination to be a Unix 
>>socket.  The problem is that the syslog-ng daemon will always be started 
>>before the destination application.  So to prevent the loss of messages 
>>due to the reconnect timer, the syslog-ng destination Unix socket  will 
>>have to  listen()/accept() for a connection rather than connect() to an 
>>external socket.  If this enhancement would be used by others, I would 
>>like to write this properly.  Therefore I have the following questions:
>>    
>>
>
>I'd say there different ways to implement the same:
>* persistent buffering: in addition to using a RAM based buffer, use a
>circular buffer stored on disk; this way your application can remain a
>UNIX domain server, and syslog-ng would retry connections until it comes
>up
>* use program() destination, which sends the messages to you via your
>standard input
>* use some kind of external relaying application, that integrates
>syslog-ng via either program/unix-stream and would relay messages to
>your application once it started up
>
>  
>
>>A) If this enhancement were written, would it be accepted into the 
>>repository?  If not, I will simply hack up the current 
>>AFUnixDestinationDriver to do what I want. 
>>    
>>
>
>Without seeing the actual implementation it's hard to say, however it
>would be nice to keep the syslog-ng model
>
>  
>
>>B) Is there any specification document which would assist me in making 
>>this enhancement? 
>>    
>>
>
>Apart from the source? Well, no there's none. The actual design of
>syslog-ng is quite simple however: everything is a LogPipe, and LogPipes
>attached together perform the task at hand. 
>
>source driver 
>  -> source group 
>    -> log center 
>      -> destination group 
>        -> dest driver
>
>Some drivers use some additional LogPipes for things like TCP connections.
>
>A LogPipe is a half-duplex pipe processing log messages, new messages are 
>injected to the next pipe element using log_pipe_queue(). The basic interface
>implemented by all elements is here:
>
>typedef struct _LogPipe
>{
>  guint ref_cnt;
>  struct _LogPipe *pipe_next;
>  gboolean (*init)(struct _LogPipe *self, GlobalConfig *cfg, PersistentConfig *persist);
>  gboolean (*deinit)(struct _LogPipe *self, GlobalConfig *cfg, PersistentConfig *persist);
>  void (*queue)(struct _LogPipe *self, LogMessage *msg, gint path_flags);
>  void (*free_fn)(struct _LogPipe *self);
>  void (*notify)(struct _LogPipe *self, struct _LogPipe *sender, gint notify_code, gpointer user_data);
>} LogPipe;
>
>
>
>  
>
>>C) Would this best be done by adding an additional option to unix-stream 
>>for destinations or by creating a separate destination driver? 
>>D) It appears as if the AFSocketDestinationDriver is not designed to 
>>perform listen()/accept(), but the AFSocketSourceDriver is.  Would it be 
>>possible to set up the AFUnixDestinationDriver to use the 
>>AFSocketSourceDriver as a parent, but attach it to a log writer?
>>    
>>
>
>I'd recommend rethinking the basic idea again. There should be a lot of
>different possibilities to integrate your application that are already
>implemented.
>
>That said, the functionality you want to implement is really orthogonal
>to both unix-stream sources and destinations. This is somewhat similar
>to the 'call-home' way of retrieving log information in which case the
>receiver connects 'out', then receives messages.
>
>In the implementation it would be something like AFSocketCHSourceDriver
>and AFSocketCHDestinationDriver (CH is for call-home), which'd probably
>belong to afsocket.c. Then both the INET and the UNIX socket based
>source/destination drivers would benefit.
>
>  
>

-------------- next part --------------
diff -urP 1.9.6/afsocket.c mine/afsocket.c
--- 1.9.6/afsocket.c	2005-10-19 14:10:46.000000000 -0400
+++ mine/afsocket.c	2005-10-19 14:17:19.000000000 -0400
@@ -163,7 +163,7 @@
     sock = socket(bind_addr->sa.sa_family, SOCK_STREAM, 0);
   else
     sock = socket(bind_addr->sa.sa_family, SOCK_DGRAM, 0);
-    
+
   g_fd_set_nonblock(sock, TRUE);
   if (sock != -1)
     {
@@ -645,6 +645,7 @@
       return FALSE;
     }
   
+
   rc = g_connect(sock, self->dest_addr);
   if (rc == G_IO_STATUS_NORMAL)
     {
@@ -655,7 +656,6 @@
       GSource *source;
 
       /* we must wait until connect succeeds */
-
       self->fd = sock;
       source = g_connect_source_new(sock);
       
@@ -764,3 +764,150 @@
   self->super.super.notify = afsocket_dd_notify;
   self->flags = flags;
 }
+
+/*******************************************************************************
+ * 
+ * PCN - added "call home driver" 
+ * The purpose of the CallHomeDestinationDriver is to provide a system by which
+ * a destination may connect to syslog-ng to received data rather than the
+ * other way around.
+ *
+ ******************************************************************************/
+gboolean 
+afsocket_chdd_process_connection(AFSocketCallHomeDestDriver *self, GSockAddr *peer_addr, gint fd)
+{
+  if(self->connectedFd > 0) 
+    {
+      return FALSE;
+    }
+  
+  self->connectedFd = fd;
+  log_writer_reopen(self->writer, fd_write_new(self->connectedFd));
+  if (!log_pipe_init(self->writer, NULL, NULL))
+    return FALSE;
+  return TRUE;
+}
+
+static gboolean
+afsocket_chdd_accept(gpointer s)
+{
+  AFSocketCallHomeDestDriver *self = (AFSocketCallHomeDestDriver *) s;
+  GSockAddr *peer_addr;
+  gchar buf1[256], buf2[256];
+  gint new_fd;
+  gboolean res;
+  
+  if (g_accept(self->listenFd, &new_fd, &peer_addr) != G_IO_STATUS_NORMAL)
+    {
+      msg_error("Error call home accepting new connection",
+                evt_tag_errno(EVT_TAG_OSERROR, errno),
+                NULL);
+      return TRUE;
+    }
+  g_fd_set_nonblock(new_fd, TRUE);
+    
+  msg_verbose("Call home connection accepted",
+              evt_tag_str("from", g_sockaddr_format(peer_addr, buf1, sizeof(buf1))),
+              evt_tag_str("to", g_sockaddr_format(self->bind_addr, buf2, sizeof(buf2))),
+              NULL);
+
+  res = afsocket_chdd_process_connection(self, peer_addr, new_fd);
+  g_sockaddr_unref(peer_addr);
+  return res;
+
+}
+gboolean
+afsocket_chdd_init(LogPipe *s, GlobalConfig *cfg, PersistentConfig *persist)
+{
+  AFSocketCallHomeDestDriver *self = (AFSocketCallHomeDestDriver *) s;
+  int sock;
+  GSource* source;
+  
+  log_writer_options_init(&self->writer_options, cfg, !!(self->flags & AFSOCKET_PROTO_RFC3164));
+  /* NOTE: we open our writer with no fd, so we can send messages down there
+   * even while the connection is not established */
+         
+  self->writer = log_writer_new(LW_FORMAT_PROTO | LW_DETECT_EOF, &self->super.super, &self->writer_options);
+  log_pipe_append(&self->super.super, self->writer);
+
+  /* Create and listen on the socket */
+  if (!afsocket_open_socket(self->bind_addr, !!(self->flags & AFSOCKET_STREAM), &sock))
+    {
+      return FALSE;
+    }
+  if(listen(sock, self->listen_backlog) < 0)
+    {
+      msg_error("Error during call home listen()",
+                evt_tag_errno(EVT_TAG_OSERROR, errno),
+                NULL);
+      close(sock);
+      return FALSE;
+    }
+  self->listenFd = sock;
+  source = g_listen_source_new(self->listenFd);
+  /* the listen_source references us, which is freed when the source is deleted */
+  log_pipe_ref(s); 
+  g_source_set_callback(source, afsocket_chdd_accept, self, (GDestroyNotify) log_pipe_unref);
+  self->source_id = g_source_attach(source, NULL);
+  g_source_unref(source);
+  
+  return TRUE;
+}
+
+gboolean
+afsocket_chdd_deinit(LogPipe *s, GlobalConfig *cfg, PersistentConfig *persist)
+{
+  AFSocketCallHomeDestDriver *self = (AFSocketCallHomeDestDriver*) s;
+
+  if (self->source_id && g_source_remove(self->source_id))
+    {
+      msg_verbose("Closing listen fd",
+                  evt_tag_int("fd", self->listenFd),
+                  NULL);
+      close(self->listenFd);
+    }
+  if (self->connectedFd >= 0) 
+    {
+      close(self->connectedFd);
+    }
+  return TRUE;
+}
+
+void
+afsocket_chdd_free(LogPipe *s)
+{
+  AFSocketCallHomeDestDriver *self = (AFSocketCallHomeDestDriver *) s;
+
+  g_sockaddr_unref(self->bind_addr);
+  log_drv_free_instance(&self->super);
+  g_free(s);
+}
+
+static void
+afsocket_chdd_notify(LogPipe *s, LogPipe *sender, gint notify_code, gpointer user_data)
+{
+  AFSocketCallHomeDestDriver *self = (AFSocketCallHomeDestDriver *) s;
+  switch (notify_code)
+    {
+    case NC_CLOSE:
+    case NC_WRITE_ERROR:
+      self->connectedFd = -1;
+      break;
+    }
+}
+
+void
+afsocket_chdd_init_instance(AFSocketCallHomeDestDriver *self, guint32 flags)
+{
+  log_drv_init_instance(&self->super);
+
+  log_writer_options_defaults(&self->writer_options);
+  self->super.super.init = afsocket_chdd_init;
+  self->super.super.deinit = afsocket_chdd_deinit;
+  self->super.super.queue = log_pipe_forward_msg;
+  self->super.super.free_fn = afsocket_chdd_free;
+  self->super.super.notify = afsocket_chdd_notify;
+  self->listen_backlog = 255;
+  self->flags = flags;
+}
+
diff -urP 1.9.6/afsocket.h mine/afsocket.h
--- 1.9.6/afsocket.h	2005-10-19 14:10:46.000000000 -0400
+++ mine/afsocket.h	2005-10-19 14:08:19.000000000 -0400
@@ -82,4 +82,20 @@
 void afsocket_dd_init_instance(AFSocketDestDriver *self, guint32 flags);
 
 
+typedef struct _AFSocketCallHomeDestDriver
+{
+  LogDriver super;
+  guint32 flags;
+  gint listenFd;
+  gint connectedFd;
+  guint source_id;
+  LogPipe *writer;
+  LogWriterOptions writer_options;
+
+  GSockAddr *bind_addr;
+  gint listen_backlog;
+} AFSocketCallHomeDestDriver;
+
+void afsocket_chdd_init_instance(AFSocketCallHomeDestDriver *self, guint32 flags);
+
 #endif
diff -urP 1.9.6/afunix.c mine/afunix.c
--- 1.9.6/afunix.c	2005-10-19 14:10:46.000000000 -0400
+++ mine/afunix.c	2005-10-19 14:08:19.000000000 -0400
@@ -113,3 +113,14 @@
   self->super.dest_addr = g_sockaddr_unix_new(filename);
   return &self->super.super;
 }
+
+LogDriver *
+afunix_chdd_new(gchar *filename, guint flags)
+{
+  AFUnixCallHomeDestDriver *self = g_new0(AFUnixCallHomeDestDriver, 1);
+  
+  afsocket_chdd_init_instance(&self->super, flags);
+  self->super.bind_addr = g_sockaddr_unix_new(filename);
+  return &self->super.super;
+}
+
diff -urP 1.9.6/afunix.h mine/afunix.h
--- 1.9.6/afunix.h	2005-10-19 14:10:46.000000000 -0400
+++ mine/afunix.h	2005-10-19 14:08:19.000000000 -0400
@@ -49,5 +49,12 @@
 
 LogDriver *afunix_dd_new(gchar *filename, guint flags);
 
+typedef struct _AFUnixCallHomeDestDriver
+{
+  AFSocketCallHomeDestDriver super;
+} AFUnixCallHomeDestDriver;
+
+LogDriver *afunix_chdd_new(gchar *filename, guint flags);
+
 #endif
 
diff -urP 1.9.6/cfg-grammar.y mine/cfg-grammar.y
--- 1.9.6/cfg-grammar.y	2005-10-19 14:10:46.000000000 -0400
+++ mine/cfg-grammar.y	2005-10-19 14:13:33.000000000 -0400
@@ -48,7 +48,7 @@
 %token	KW_SOURCE KW_DESTINATION KW_LOG KW_OPTIONS KW_FILTER
 
 /* source & destination items */
-%token	KW_INTERNAL KW_FILE KW_PIPE KW_UNIX_STREAM KW_UNIX_DGRAM KW_UDP 
+%token	KW_INTERNAL KW_FILE KW_PIPE KW_UNIX_STREAM KW_UNIX_CHSTREAM KW_UNIX_DGRAM KW_UDP 
 %token  KW_TCP KW_USER
 %token  KW_DOOR KW_SUN_STREAMS KW_PROGRAM
 
@@ -142,6 +142,7 @@
 %type	<ptr> dest_afsocket
 %type	<ptr> dest_afunix_dgram_params
 %type	<ptr> dest_afunix_stream_params
+%type	<ptr> dest_afunix_ch_stream_params /* PCN added for call home driver */
 %type	<ptr> dest_afinet_udp_params
 %type	<ptr> dest_afinet_tcp_params
 %type   <ptr> dest_afuser
@@ -501,6 +502,7 @@
 dest_afsocket
 	: KW_UNIX_DGRAM '(' dest_afunix_dgram_params ')'	{ $$ = $3; }
 	| KW_UNIX_STREAM '(' dest_afunix_stream_params ')'	{ $$ = $3; }
+	| KW_UNIX_CHSTREAM '(' dest_afunix_ch_stream_params ')'	{ $$ = $3; }
 	| KW_UDP '(' dest_afinet_udp_params ')'			{ $$ = $3; }
 	| KW_TCP '(' dest_afinet_tcp_params ')'			{ $$ = $3; } 
 	;
@@ -525,6 +527,16 @@
 	  dest_writer_options			{ $$ = last_driver; }
 	;
 
+// PCN added for call home driver
+dest_afunix_ch_stream_params
+	: string
+	  {
+	    last_driver = afunix_chdd_new($1, AFSOCKET_STREAM);
+	    free($1);
+	    last_writer_options = &((AFSocketCallHomeDestDriver *) last_driver)->writer_options;
+	  }
+	  dest_writer_options { $$ = last_driver; }
+	;
 
 dest_afinet_udp_params
 	: string 	
diff -urP 1.9.6/cfg-lex.l mine/cfg-lex.l
--- 1.9.6/cfg-lex.l	2005-10-19 14:10:46.000000000 -0400
+++ mine/cfg-lex.l	2005-10-19 14:15:27.000000000 -0400
@@ -19,7 +19,7 @@
  *
  * Inspired by nsyslog, originally written by Darren Reed.
  *
- * $Id: cfg-lex.l,v 1.8 2003/01/22 11:11:18 bazsi Exp $
+ * $Id$
  *
  ***************************************************************************/
 %{
@@ -52,6 +52,7 @@
         { "internal",           KW_INTERNAL },
 	{ "unix_dgram",		KW_UNIX_DGRAM },
 	{ "unix_stream",	KW_UNIX_STREAM },
+	{ "unix_call_home_stream",	KW_UNIX_CHSTREAM },
         { "udp",                KW_UDP },
         { "tcp",                KW_TCP },
         { "usertty", 		KW_USER },


More information about the syslog-ng mailing list