[28827] in Source-Commits

home help back first fref pref prev next nref lref last post

moira commit [debian]: Add incremental pipe child support. If a service is listed in the incremental_pipe table, its process will be run once with no arguments. Each incremental update will be sent to it on its stdin as a four-byte length followed by a sequence of zero-terminated arguments.

daemon@ATHENA.MIT.EDU (Anders Kaseorg)
Wed Apr 25 23:50:00 2018

Date: Wed, 25 Apr 2018 23:49:54 -0400
From: Anders Kaseorg <andersk@mit.edu>
Message-Id: <201804260349.w3Q3nsTD024814@drugstore.mit.edu>
To: source-commits@mit.edu

https://github.com/mit-athena/moira/commit/4bfb4c9867260637089021a29b5706deed22f886
commit 4bfb4c9867260637089021a29b5706deed22f886
Author: Greg Hudson <ghudson@mit.edu>
Date:   Mon Oct 24 11:42:20 2016 -0400

    Add incremental pipe child support.  If a service is listed in the
    incremental_pipe table, its process will be run once with no
    arguments.  Each incremental update will be sent to it on its stdin as
    a four-byte length followed by a sequence of zero-terminated
    arguments.
    
    Also add scripts stub.py and example.py.  The stub script can be used
    on the test server in place of a real incremental script; the example
    script can be used as a basis for a real incremental.  Both scripts
    work in either normal mode or pipe mode.

 moira/db/schema.sql                          |    4 +
 moira/db/unschema.sql                        |    1 +
 moira/incremental/incr-runner/example.py     |   68 ++++++++
 moira/incremental/incr-runner/incr-runner.pc |  219 ++++++++++++++++++++------
 moira/incremental/incr-runner/stub.py        |   20 +++
 5 files changed, 262 insertions(+), 50 deletions(-)

diff --git a/moira/db/schema.sql b/moira/db/schema.sql
index a8d61f8..352da0e 100644
--- a/moira/db/schema.sql
+++ b/moira/db/schema.sql
@@ -514,3 +514,7 @@ create table incremental_queue
 	incremental_id	INTEGER		DEFAULT 0	NOT NULL
 );
 	
+create table incremental_pipe
+(
+	service		VARCHAR(16)	DEFAULT CHR (0)	NOT NULL
+);
diff --git a/moira/db/unschema.sql b/moira/db/unschema.sql
index f17c947..9ab8561 100644
--- a/moira/db/unschema.sql
+++ b/moira/db/unschema.sql
@@ -30,3 +30,4 @@ drop table containers;
 drop table mcntmap;
 drop table accountnumbers;
 drop table incremental_queue;
+drop table incremental_pipe;
diff --git a/moira/incremental/incr-runner/example.py b/moira/incremental/incr-runner/example.py
new file mode 100644
index 0000000..d19ff88
--- /dev/null
+++ b/moira/incremental/incr-runner/example.py
@@ -0,0 +1,68 @@
+#!/moira/bin/python
+
+import os
+import struct
+import sys
+from subprocess import call
+from time import ctime, sleep
+
+# Replace this with the stop file path for this incremental.
+stop_file = '/moira/myincremental/nomyincremental'
+
+# Display an informational message to incr-runner.log.
+def log(msg):
+    print progname + ': ' + msg
+
+
+# Write a message to critical.log and send a zephyr message to -c moira.
+def critical_log(msg):
+    log('critical error: ' + msg)
+    timestr = ctime()[4:-5]
+    with open('/moira/critical.log', 'a') as f:
+        f.write('%s <%d>%s: %s\n' % (timestr, os.getpid(), progname, msg))
+    zwrite = '/usr/local/bin/zwrite'
+    if os.path.exists(zwrite):
+        call([zwrite, '-q', '-d', '-n', '-c', 'moira', '-i', 'incremental',
+              '-m', progname + ': ' + msg])
+
+
+def do_incremental(args):
+    print progname + ' ' + ' '.join(args)
+
+    # Pause while the stop file exists.  Complain every half hour if
+    # it exists for a long time.
+    stop_check_count = 0
+    while os.path.exists(stop_file):
+        sleep(60)
+        if stop_check_count % 30 == 1:
+            critical_log('%s exists (half-hour warning)' % stop_file)
+        stop_check_count += 1
+
+    table = args[0]
+    beforec = int(args[1])
+    afterc = int(args[2])
+    before = sys.argv[3:3+beforec]
+    after = sys.argv[3+beforec:3+beforec+afterc]
+    # Replace this with a call to process the incremental for real.
+    # real_incremental_function_here(table, before, after)
+
+
+progname = os.path.basename(sys.argv[0])
+if len(sys.argv) > 1:
+    # We were run as a per-incremental child process, or run by hand.
+    do_incremental(sys.argv[1:])
+else:
+    # We were run as a pipe child process.
+    print progname + ' pipe child starting'
+    sys.stdout.flush()
+    while True:
+        # Each update is a four-byte length followed by the arguments,
+        # with a null byte after each argument.
+        lenbytes = sys.stdin.read(4)
+        if len(lenbytes) == 0:
+            print progname + ' pipe child exiting'
+            break
+        l, = struct.unpack('>L', lenbytes)
+        msg = sys.stdin.read(l)
+        args = msg.split('\0')[:-1]
+        do_incremental(args)
diff --git a/moira/incremental/incr-runner/incr-runner.pc b/moira/incremental/incr-runner/incr-runner.pc
index 7f2894b..87e4a3c 100644
--- a/moira/incremental/incr-runner/incr-runner.pc
+++ b/moira/incremental/incr-runner/incr-runner.pc
@@ -37,6 +37,9 @@ void do_service(char *name);
 void free_argv(char **argv, int argc);
 void reapchild(int x);
 void nothing(int x);
+int start_pipe_child(char *prog);
+void run_incr_prog(char *prog, char **arg);
+void send_to_pipe(int fd, char **argv);
 
 RCSID("$HeadURL$ $Id$");
 
@@ -146,39 +149,50 @@ int main(int argc, char **argv)
 void do_service(char *name)
 {
   char *argv[MAXARGC * 2 + 4], prog[MAXPATHLEN], cbefore[3], cafter[3];
-  int i, length = 0;
+  int i, length = 0, use_pipe = 0, fd = -1;
   EXEC SQL BEGIN DECLARE SECTION;
-  int beforec, afterc, incremental_id, status;
+  int beforec, afterc, incremental_id;
+  char service[INCREMENTAL_PIPE_SERVICE_SIZE];
   char table[INCREMENTAL_QUEUE_TABLE_NAME_SIZE];
   char encoded_before[INCREMENTAL_QUEUE_BEFORE_SIZE];
   char encoded_after[INCREMENTAL_QUEUE_AFTER_SIZE];
   EXEC SQL END DECLARE SECTION;
   void *flattened_before = NULL, *flattened_after = NULL;
   char **before_argv = NULL, **after_argv = NULL;
-  sigset_t sigs;
   struct sigaction action;
-  pid_t inc_pid, pid;
 
   EXEC SQL CONNECT :db IDENTIFIED BY :db;
 
-  action.sa_flags = 0;
-  sigemptyset(&action.sa_mask);
+  /* See if this service uses a pipe. */
+  EXEC SQL SELECT service INTO :service FROM incremental_pipe WHERE service = :name;
+  if (sqlca.sqlerrd[2] != 0)
+    use_pipe = 1;
 
-  /* Allow SIGALRM to interrupt waitpid(), but do nothing when it fires. */
-  action.sa_handler = nothing;
-  if (sigaction(SIGALRM, &action, NULL) < 0)
-    {
-      com_err(whoami, errno, "Unable to establish signal handlers.");
-      exit(1);
-    }
+  sprintf(prog, "%s/%s.incr", BIN_DIR, name);
 
-  /* Set up a handler to reap timed-out incremental processes. */
-  action.sa_handler = reapchild;
-  sigaddset(&action.sa_mask, SIGCHLD);
-  if (sigaction(SIGCHLD, &action, NULL) < 0)
+  if (use_pipe)
+    fd = start_pipe_child(prog);
+  else
     {
-      com_err(whoami, errno, "Unable to establish signal handlers.");
-      exit(1);
+      action.sa_flags = 0;
+      sigemptyset(&action.sa_mask);
+
+      /* Allow SIGALRM to interrupt waitpid(), but do nothing when it fires. */
+      action.sa_handler = nothing;
+      if (sigaction(SIGALRM, &action, NULL) < 0)
+	{
+	  com_err(whoami, errno, "Unable to establish signal handlers.");
+	  exit(1);
+	}
+
+      /* Set up a handler to reap timed-out incremental processes. */
+      action.sa_handler = reapchild;
+      sigaddset(&action.sa_mask, SIGCHLD);
+      if (sigaction(SIGCHLD, &action, NULL) < 0)
+	{
+	  com_err(whoami, errno, "Unable to establish signal handlers.");
+	  exit(1);
+	}
     }
 
   while (1)
@@ -190,6 +204,9 @@ void do_service(char *name)
 	  exit(1);
 	}
 
+      if (use_pipe)
+	fd = monitor_pipe_child(fd, prog);
+
       EXEC SQL SELECT table_name, beforec, afterc, before, after, incremental_id INTO
 	:table, :beforec, :afterc, :encoded_before, :encoded_after, :incremental_id
 	FROM incremental_queue WHERE service = :name AND
@@ -202,7 +219,6 @@ void do_service(char *name)
 	  continue;
 	}
       
-      sprintf(prog, "%s/%s.incr", BIN_DIR, name);
       argv[0] = prog;
       argv[1] = strtrim(table);
       sprintf(cbefore, "%d", beforec);
@@ -248,37 +264,11 @@ void do_service(char *name)
 
       EXEC SQL DELETE FROM incremental_queue WHERE incremental_id = :incremental_id;
       EXEC SQL COMMIT WORK;
-      
-      sigemptyset(&sigs);
-      sigaddset(&sigs, SIGCHLD);
-      sigprocmask(SIG_BLOCK, &sigs, NULL);
-      inc_pid = vfork();
-      switch (inc_pid)
-	{
-	case 0:
-	  execv(prog, argv);
-	  _exit(1);
-	case -1:
-	  critical_alert(whoami, "incr-runner", "Failed to start incremental update %s", prog);
-	  break;
-	default:
-	  break;
-	}
-
-      alarm(INC_TIMEOUT);
-      pid = waitpid(inc_pid, &status, 0);
-      alarm(0);
-      if (pid == -1 && errno == EINTR)
-	critical_alert(whoami, "incr-runner", "incremental timeout on pid %d", (int) inc_pid);
-      else if (pid == -1)
-	critical_alert(whoami, "incr-runner", "error in waitpid");
-      else if (WTERMSIG(status) != 0 || WEXITSTATUS(status) != 0)
-	{
-          critical_alert(whoami, "incr-runner", "%d: child exits with signal %d status %d",
-                         (int) inc_pid, WTERMSIG(status), WEXITSTATUS(status));
-	}
 
-      sigprocmask(SIG_UNBLOCK, &sigs, NULL);
+      if (use_pipe)
+	send_to_pipe(fd, argv + 1);
+      else
+	run_incr_prog(prog, argv);
 
       if (flattened_before)
 	{
@@ -349,3 +339,132 @@ void reapchild(int x)
 void nothing(int x)
 {
 }
+
+/* Create a child process running prog (with no arguments) and return a file
+ * descriptor for its standard input. */
+int start_pipe_child(char *prog)
+{
+  int pipefd[2];
+  pid_t pid;
+
+  if (pipe(pipefd) == -1)
+    {
+      critical_alert(whoami, "incr-runner", "Failed to create pipe for %s: %s",
+		     prog, strerror(errno));
+      exit(1);
+    }
+  pid = vfork();
+  if (pid == -1)
+    {
+      critical_alert(whoami, "incr-runner", "Failed to start child %s", prog);
+      exit(1);
+    }
+  else if (pid == 0)
+    {
+      close(pipefd[1]);
+      close(STDIN_FILENO);
+      dup2(pipefd[0], STDIN_FILENO);
+      close(pipefd[0]);
+      execl(prog, prog, (char *)NULL);
+      _exit(1);
+    }
+  close(pipefd[0]);
+  return pipefd[1];
+}
+
+/* If the pipe child process has exited, restart it and return a new
+   pipe.  Otherwise return fd. */
+int monitor_pipe_child(int fd, char *prog)
+{
+  pid_t pid;
+  int status;
+
+  pid = waitpid(-1, &status, WNOHANG);
+  if (pid <= 0)
+    return fd;
+
+  critical_alert(whoami, "incr-runner", "%d: pipe child exits with signal %d status %d",
+		 (int) pid, WTERMSIG(status), WEXITSTATUS(status));
+  close(fd);
+  return start_pipe_child(prog);
+}
+
+/* Flatten argv into a message and send it to fd. */
+void send_to_pipe(int fd, char **argv)
+{
+  size_t len, pos;
+  ssize_t r;
+  int i;
+  char *buf;
+
+  /* Flatten the argv array into a buffer with four-byte big-endian length. */
+  len = 0;
+  for (i = 0; argv[i] != NULL; i++)
+    len += strlen(argv[i]) + 1;
+  buf = xmalloc(4 + len);
+  buf[0] = (len >> 24) & 0xFF;
+  buf[1] = (len >> 16) & 0xFF;
+  buf[2] = (len >> 8) & 0xFF;
+  buf[3] = len & 0xFF;
+  pos = 4;
+  for (i = 0; argv[i]; i++)
+    {
+      memcpy(buf + pos, argv[i], strlen(argv[i]) + 1);
+      pos += strlen(argv[i]) + 1;
+    }
+  len += 4;
+
+  /* Send the message to fd. */
+  pos = 0;
+  while (pos < len)
+    {
+      r = write(fd, buf + pos, len - pos);
+      if (r < 0)
+	{
+	  critical_alert(whoami, "incr-runner", "error writing to pipe: %s",
+			 strerror(errno));
+	  exit(1);
+	}
+      pos += r;
+    }
+
+  free(buf);
+}
+
+void run_incr_prog(char *prog, char **argv)
+{
+  sigset_t sigs;
+  pid_t inc_pid, pid;
+  int status;
+
+  sigemptyset(&sigs);
+  sigaddset(&sigs, SIGCHLD);
+  sigprocmask(SIG_BLOCK, &sigs, NULL);
+  inc_pid = vfork();
+  switch (inc_pid)
+    {
+    case 0:
+      execv(prog, argv);
+      _exit(1);
+    case -1:
+      critical_alert(whoami, "incr-runner", "Failed to start incremental update %s", prog);
+      break;
+    default:
+      break;
+    }
+
+  alarm(INC_TIMEOUT);
+  pid = waitpid(inc_pid, &status, 0);
+  alarm(0);
+  if (pid == -1 && errno == EINTR)
+    critical_alert(whoami, "incr-runner", "incremental timeout on pid %d", (int) inc_pid);
+  else if (pid == -1)
+    critical_alert(whoami, "incr-runner", "error in waitpid");
+  else if (WTERMSIG(status) != 0 || WEXITSTATUS(status) != 0)
+    {
+      critical_alert(whoami, "incr-runner", "%d: child exits with signal %d status %d",
+		     (int) inc_pid, WTERMSIG(status), WEXITSTATUS(status));
+    }
+
+  sigprocmask(SIG_UNBLOCK, &sigs, NULL);
+}
diff --git a/moira/incremental/incr-runner/stub.py b/moira/incremental/incr-runner/stub.py
new file mode 100644
index 0000000..049b35f
--- /dev/null
+++ b/moira/incremental/incr-runner/stub.py
@@ -0,0 +1,20 @@
+#!/moira/bin/python
+
+import os
+import struct
+import sys
+
+progname = os.path.basename(sys.argv[0])
+if len(sys.argv) > 1:
+    print progname + ' ' + ' '.join(sys.argv[1:])
+else:
+    print progname + ' pipe child starting'
+    sys.stdout.flush()
+    while True:
+        lenbytes = sys.stdin.read(4)
+        if len(lenbytes) == 0:
+            print progname + ' pipe child exiting'
+            break
+        l, = struct.unpack('>L', lenbytes)
+        print progname + ' ' + ' '.join(sys.stdin.read(l).split('\0')[:-1])
+        sys.stdout.flush()

home help back first fref pref prev next nref lref last post