[28827] in Source-Commits
moira commit [debian]: Add incremental pipe child support. If a service is listed in the incremental_pipe table, its process will be run once with no arguments. Each incremental update will be sent to it on its stdin as a four-byte length followed by a sequence of zero-terminated arguments.
daemon@ATHENA.MIT.EDU (Anders Kaseorg)
Wed Apr 25 23:50:00 2018
Date: Wed, 25 Apr 2018 23:49:54 -0400
From: Anders Kaseorg <andersk@mit.edu>
Message-Id: <201804260349.w3Q3nsTD024814@drugstore.mit.edu>
To: source-commits@mit.edu
https://github.com/mit-athena/moira/commit/4bfb4c9867260637089021a29b5706deed22f886
commit 4bfb4c9867260637089021a29b5706deed22f886
Author: Greg Hudson <ghudson@mit.edu>
Date: Mon Oct 24 11:42:20 2016 -0400
Add incremental pipe child support. If a service is listed in the
incremental_pipe table, its process will be run once with no
arguments. Each incremental update will be sent to it on its stdin as
a four-byte length followed by a sequence of zero-terminated
arguments.
Also add scripts stub.py and example.py. The stub script can be used
on the test server in place of a real incremental script; the example
script can be used as a basis for a real incremental. Both scripts
work in either normal mode or pipe mode.
moira/db/schema.sql | 4 +
moira/db/unschema.sql | 1 +
moira/incremental/incr-runner/example.py | 68 ++++++++
moira/incremental/incr-runner/incr-runner.pc | 219 ++++++++++++++++++++------
moira/incremental/incr-runner/stub.py | 20 +++
5 files changed, 262 insertions(+), 50 deletions(-)
diff --git a/moira/db/schema.sql b/moira/db/schema.sql
index a8d61f8..352da0e 100644
--- a/moira/db/schema.sql
+++ b/moira/db/schema.sql
@@ -514,3 +514,7 @@ create table incremental_queue
incremental_id INTEGER DEFAULT 0 NOT NULL
);
+create table incremental_pipe
+(
+ service VARCHAR(16) DEFAULT CHR (0) NOT NULL
+);
diff --git a/moira/db/unschema.sql b/moira/db/unschema.sql
index f17c947..9ab8561 100644
--- a/moira/db/unschema.sql
+++ b/moira/db/unschema.sql
@@ -30,3 +30,4 @@ drop table containers;
drop table mcntmap;
drop table accountnumbers;
drop table incremental_queue;
+drop table incremental_pipe;
diff --git a/moira/incremental/incr-runner/example.py b/moira/incremental/incr-runner/example.py
new file mode 100644
index 0000000..d19ff88
--- /dev/null
+++ b/moira/incremental/incr-runner/example.py
@@ -0,0 +1,68 @@
+#!/moira/bin/python
+
+import os
+import struct
+import sys
+from subprocess import call
+from time import ctime, sleep
+
+# Replace this with the stop file path for this incremental.
+stop_file = '/moira/myincremental/nomyincremental'
+
+# Display an informational message to incr-runner.log.
+def log(msg):
+ print progname + ': ' + msg
+
+
+# Write a message to critical.log and send a zephyr message to -c moira.
+def critical_log(msg):
+ log('critical error: ' + msg)
+ timestr = ctime()[4:-5]
+ with open('/moira/critical.log', 'a') as f:
+ f.write('%s <%d>%s: %s\n' % (timestr, os.getpid(), progname, msg))
+ zwrite = '/usr/local/bin/zwrite'
+ if os.path.exists(zwrite):
+ call([zwrite, '-q', '-d', '-n', '-c', 'moira', '-i', 'incremental',
+ '-m', progname + ': ' + msg])
+
+
+def do_incremental(args):
+ print progname + ' ' + ' '.join(args)
+
+ # Pause while the stop file exists. Complain every half hour if
+ # it exists for a long time.
+ stop_check_count = 0
+ while os.path.exists(stop_file):
+ sleep(60)
+ if stop_check_count % 30 == 1:
+ critical_log('%s exists (half-hour warning)' % stop_file)
+ stop_check_count += 1
+
+ table = args[0]
+ beforec = int(args[1])
+ afterc = int(args[2])
+ before = sys.argv[3:3+beforec]
+ after = sys.argv[3+beforec:3+beforec+afterc]
+ # Replace this with a call to process the incremental for real.
+ # real_incremental_function_here(table, before, after)
+
+
+progname = os.path.basename(sys.argv[0])
+if len(sys.argv) > 1:
+ # We were run as a per-incremental child process, or run by hand.
+ do_incremental(sys.argv[1:])
+else:
+ # We were run as a pipe child process.
+ print progname + ' pipe child starting'
+ sys.stdout.flush()
+ while True:
+ # Each update is a four-byte length followed by the arguments,
+ # with a null byte after each argument.
+ lenbytes = sys.stdin.read(4)
+ if len(lenbytes) == 0:
+ print progname + ' pipe child exiting'
+ break
+ l, = struct.unpack('>L', lenbytes)
+ msg = sys.stdin.read(l)
+ args = msg.split('\0')[:-1]
+ do_incremental(args)
diff --git a/moira/incremental/incr-runner/incr-runner.pc b/moira/incremental/incr-runner/incr-runner.pc
index 7f2894b..87e4a3c 100644
--- a/moira/incremental/incr-runner/incr-runner.pc
+++ b/moira/incremental/incr-runner/incr-runner.pc
@@ -37,6 +37,9 @@ void do_service(char *name);
void free_argv(char **argv, int argc);
void reapchild(int x);
void nothing(int x);
+int start_pipe_child(char *prog);
+void run_incr_prog(char *prog, char **arg);
+void send_to_pipe(int fd, char **argv);
RCSID("$HeadURL$ $Id$");
@@ -146,39 +149,50 @@ int main(int argc, char **argv)
void do_service(char *name)
{
char *argv[MAXARGC * 2 + 4], prog[MAXPATHLEN], cbefore[3], cafter[3];
- int i, length = 0;
+ int i, length = 0, use_pipe = 0, fd = -1;
EXEC SQL BEGIN DECLARE SECTION;
- int beforec, afterc, incremental_id, status;
+ int beforec, afterc, incremental_id;
+ char service[INCREMENTAL_PIPE_SERVICE_SIZE];
char table[INCREMENTAL_QUEUE_TABLE_NAME_SIZE];
char encoded_before[INCREMENTAL_QUEUE_BEFORE_SIZE];
char encoded_after[INCREMENTAL_QUEUE_AFTER_SIZE];
EXEC SQL END DECLARE SECTION;
void *flattened_before = NULL, *flattened_after = NULL;
char **before_argv = NULL, **after_argv = NULL;
- sigset_t sigs;
struct sigaction action;
- pid_t inc_pid, pid;
EXEC SQL CONNECT :db IDENTIFIED BY :db;
- action.sa_flags = 0;
- sigemptyset(&action.sa_mask);
+ /* See if this service uses a pipe. */
+ EXEC SQL SELECT service INTO :service FROM incremental_pipe WHERE service = :name;
+ if (sqlca.sqlerrd[2] != 0)
+ use_pipe = 1;
- /* Allow SIGALRM to interrupt waitpid(), but do nothing when it fires. */
- action.sa_handler = nothing;
- if (sigaction(SIGALRM, &action, NULL) < 0)
- {
- com_err(whoami, errno, "Unable to establish signal handlers.");
- exit(1);
- }
+ sprintf(prog, "%s/%s.incr", BIN_DIR, name);
- /* Set up a handler to reap timed-out incremental processes. */
- action.sa_handler = reapchild;
- sigaddset(&action.sa_mask, SIGCHLD);
- if (sigaction(SIGCHLD, &action, NULL) < 0)
+ if (use_pipe)
+ fd = start_pipe_child(prog);
+ else
{
- com_err(whoami, errno, "Unable to establish signal handlers.");
- exit(1);
+ action.sa_flags = 0;
+ sigemptyset(&action.sa_mask);
+
+ /* Allow SIGALRM to interrupt waitpid(), but do nothing when it fires. */
+ action.sa_handler = nothing;
+ if (sigaction(SIGALRM, &action, NULL) < 0)
+ {
+ com_err(whoami, errno, "Unable to establish signal handlers.");
+ exit(1);
+ }
+
+ /* Set up a handler to reap timed-out incremental processes. */
+ action.sa_handler = reapchild;
+ sigaddset(&action.sa_mask, SIGCHLD);
+ if (sigaction(SIGCHLD, &action, NULL) < 0)
+ {
+ com_err(whoami, errno, "Unable to establish signal handlers.");
+ exit(1);
+ }
}
while (1)
@@ -190,6 +204,9 @@ void do_service(char *name)
exit(1);
}
+ if (use_pipe)
+ fd = monitor_pipe_child(fd, prog);
+
EXEC SQL SELECT table_name, beforec, afterc, before, after, incremental_id INTO
:table, :beforec, :afterc, :encoded_before, :encoded_after, :incremental_id
FROM incremental_queue WHERE service = :name AND
@@ -202,7 +219,6 @@ void do_service(char *name)
continue;
}
- sprintf(prog, "%s/%s.incr", BIN_DIR, name);
argv[0] = prog;
argv[1] = strtrim(table);
sprintf(cbefore, "%d", beforec);
@@ -248,37 +264,11 @@ void do_service(char *name)
EXEC SQL DELETE FROM incremental_queue WHERE incremental_id = :incremental_id;
EXEC SQL COMMIT WORK;
-
- sigemptyset(&sigs);
- sigaddset(&sigs, SIGCHLD);
- sigprocmask(SIG_BLOCK, &sigs, NULL);
- inc_pid = vfork();
- switch (inc_pid)
- {
- case 0:
- execv(prog, argv);
- _exit(1);
- case -1:
- critical_alert(whoami, "incr-runner", "Failed to start incremental update %s", prog);
- break;
- default:
- break;
- }
-
- alarm(INC_TIMEOUT);
- pid = waitpid(inc_pid, &status, 0);
- alarm(0);
- if (pid == -1 && errno == EINTR)
- critical_alert(whoami, "incr-runner", "incremental timeout on pid %d", (int) inc_pid);
- else if (pid == -1)
- critical_alert(whoami, "incr-runner", "error in waitpid");
- else if (WTERMSIG(status) != 0 || WEXITSTATUS(status) != 0)
- {
- critical_alert(whoami, "incr-runner", "%d: child exits with signal %d status %d",
- (int) inc_pid, WTERMSIG(status), WEXITSTATUS(status));
- }
- sigprocmask(SIG_UNBLOCK, &sigs, NULL);
+ if (use_pipe)
+ send_to_pipe(fd, argv + 1);
+ else
+ run_incr_prog(prog, argv);
if (flattened_before)
{
@@ -349,3 +339,132 @@ void reapchild(int x)
void nothing(int x)
{
}
+
+/* Create a child process running prog (with no arguments) and return a file
+ * descriptor for its standard input. */
+int start_pipe_child(char *prog)
+{
+ int pipefd[2];
+ pid_t pid;
+
+ if (pipe(pipefd) == -1)
+ {
+ critical_alert(whoami, "incr-runner", "Failed to create pipe for %s: %s",
+ prog, strerror(errno));
+ exit(1);
+ }
+ pid = vfork();
+ if (pid == -1)
+ {
+ critical_alert(whoami, "incr-runner", "Failed to start child %s", prog);
+ exit(1);
+ }
+ else if (pid == 0)
+ {
+ close(pipefd[1]);
+ close(STDIN_FILENO);
+ dup2(pipefd[0], STDIN_FILENO);
+ close(pipefd[0]);
+ execl(prog, prog, (char *)NULL);
+ _exit(1);
+ }
+ close(pipefd[0]);
+ return pipefd[1];
+}
+
+/* If the pipe child process has exited, restart it and return a new
+ pipe. Otherwise return fd. */
+int monitor_pipe_child(int fd, char *prog)
+{
+ pid_t pid;
+ int status;
+
+ pid = waitpid(-1, &status, WNOHANG);
+ if (pid <= 0)
+ return fd;
+
+ critical_alert(whoami, "incr-runner", "%d: pipe child exits with signal %d status %d",
+ (int) pid, WTERMSIG(status), WEXITSTATUS(status));
+ close(fd);
+ return start_pipe_child(prog);
+}
+
+/* Flatten argv into a message and send it to fd. */
+void send_to_pipe(int fd, char **argv)
+{
+ size_t len, pos;
+ ssize_t r;
+ int i;
+ char *buf;
+
+ /* Flatten the argv array into a buffer with four-byte big-endian length. */
+ len = 0;
+ for (i = 0; argv[i] != NULL; i++)
+ len += strlen(argv[i]) + 1;
+ buf = xmalloc(4 + len);
+ buf[0] = (len >> 24) & 0xFF;
+ buf[1] = (len >> 16) & 0xFF;
+ buf[2] = (len >> 8) & 0xFF;
+ buf[3] = len & 0xFF;
+ pos = 4;
+ for (i = 0; argv[i]; i++)
+ {
+ memcpy(buf + pos, argv[i], strlen(argv[i]) + 1);
+ pos += strlen(argv[i]) + 1;
+ }
+ len += 4;
+
+ /* Send the message to fd. */
+ pos = 0;
+ while (pos < len)
+ {
+ r = write(fd, buf + pos, len - pos);
+ if (r < 0)
+ {
+ critical_alert(whoami, "incr-runner", "error writing to pipe: %s",
+ strerror(errno));
+ exit(1);
+ }
+ pos += r;
+ }
+
+ free(buf);
+}
+
+void run_incr_prog(char *prog, char **argv)
+{
+ sigset_t sigs;
+ pid_t inc_pid, pid;
+ int status;
+
+ sigemptyset(&sigs);
+ sigaddset(&sigs, SIGCHLD);
+ sigprocmask(SIG_BLOCK, &sigs, NULL);
+ inc_pid = vfork();
+ switch (inc_pid)
+ {
+ case 0:
+ execv(prog, argv);
+ _exit(1);
+ case -1:
+ critical_alert(whoami, "incr-runner", "Failed to start incremental update %s", prog);
+ break;
+ default:
+ break;
+ }
+
+ alarm(INC_TIMEOUT);
+ pid = waitpid(inc_pid, &status, 0);
+ alarm(0);
+ if (pid == -1 && errno == EINTR)
+ critical_alert(whoami, "incr-runner", "incremental timeout on pid %d", (int) inc_pid);
+ else if (pid == -1)
+ critical_alert(whoami, "incr-runner", "error in waitpid");
+ else if (WTERMSIG(status) != 0 || WEXITSTATUS(status) != 0)
+ {
+ critical_alert(whoami, "incr-runner", "%d: child exits with signal %d status %d",
+ (int) inc_pid, WTERMSIG(status), WEXITSTATUS(status));
+ }
+
+ sigprocmask(SIG_UNBLOCK, &sigs, NULL);
+}
diff --git a/moira/incremental/incr-runner/stub.py b/moira/incremental/incr-runner/stub.py
new file mode 100644
index 0000000..049b35f
--- /dev/null
+++ b/moira/incremental/incr-runner/stub.py
@@ -0,0 +1,20 @@
+#!/moira/bin/python
+
+import os
+import struct
+import sys
+
+progname = os.path.basename(sys.argv[0])
+if len(sys.argv) > 1:
+ print progname + ' ' + ' '.join(sys.argv[1:])
+else:
+ print progname + ' pipe child starting'
+ sys.stdout.flush()
+ while True:
+ lenbytes = sys.stdin.read(4)
+ if len(lenbytes) == 0:
+ print progname + ' pipe child exiting'
+ break
+ l, = struct.unpack('>L', lenbytes)
+ print progname + ' ' + ' '.join(sys.stdin.read(l).split('\0')[:-1])
+ sys.stdout.flush()