home: hub: mkinitfs

Download patch

ref: 0e2672eebb4efd725f0e906f07b3c1bae81dbde7
parent: 5f6672c1f9c71909f9da5717664b713ac53cb632
author: Timo Teräs <timo.teras@iki.fi>
date: Sat Oct 24 16:35:15 CDT 2015

nlplug-findfs: asynchronously fork child processes

instead of waiting each child to run end before continuing, fork
or queue each command allowing up to CPU count concurrent childs.
this enables full use of SMP cores, and allows loading of modules
after a blocking command is started; fixing e.g. keyboard driver
to load even if crypto disk command is waiting keyboard input.

--- a/nlplug-findfs.c
+++ b/nlplug-findfs.c
@@ -23,6 +23,7 @@
 #include <unistd.h>
 
 #include <sys/eventfd.h>
+#include <sys/signalfd.h>
 #include <sys/mount.h>
 #include <sys/socket.h>
 #include <sys/stat.h>
@@ -68,6 +69,105 @@
 
 #define envcmp(env, key) (strncmp(env, key "=", strlen(key "=")) == 0)
 
+
+static char **clone_array(char *const *const a)
+{
+	size_t i, s;
+	char **c, *p;
+
+	if (!a) return 0;
+
+	s = sizeof(char*);
+	for (i = 0; a[i]; i++)
+		s += sizeof(char*) + strlen(a[i]) + 1;
+	c = malloc(s);
+	p = (char*)(c + i + 1);
+	for (i = 0; a[i]; i++) {
+		c[i] = p;
+		p += sprintf(p, "%s", a[i]) + 1;
+	}
+	c[i] = 0;
+	return c;
+}
+
+struct spawn_task {
+	struct spawn_task *next;
+	char **argv, **envp;
+};
+struct spawn_manager {
+	int num_running;
+	int max_running;
+	struct spawn_task *first, *last;
+};
+
+static struct spawn_manager spawnmgr;
+
+static void spawn_execute(struct spawn_manager *mgr, char **argv, char **envp)
+{
+	pid_t pid;
+
+	dbg("[%d/%d] running %s", mgr->num_running+1, mgr->max_running, argv[0]);
+	if (!(pid = fork())) {
+		if (execve(argv[0], argv, envp ? envp : default_envp) < 0)
+			err(1, argv[0]);
+		exit(0);
+	}
+	if (pid < 0)
+		err(1,"fork");
+
+	mgr->num_running++;
+}
+
+static void spawn_queue(struct spawn_manager *mgr, char **argv, char **envp)
+{
+	struct spawn_task *task;
+
+	task = malloc(sizeof *task);
+	if (!task) return;
+	*task = (struct spawn_task) {
+		.next = NULL,
+		.argv = clone_array(argv),
+		.envp = clone_array(envp),
+	};
+	if (mgr->last) {
+		mgr->last->next = task;
+		mgr->last = task;
+	} else {
+		mgr->first = mgr->last = task;
+	}
+}
+
+static void spawn_command(struct spawn_manager *mgr, char **argv, char **envp)
+{
+	if (!mgr->max_running)
+		mgr->max_running = sysconf(_SC_NPROCESSORS_ONLN);
+	if (mgr->num_running < mgr->max_running)
+		spawn_execute(mgr, argv, envp);
+	else
+		spawn_queue(mgr, argv, envp);
+}
+
+static void spawn_reap(struct spawn_manager *mgr, pid_t pid)
+{
+	mgr->num_running--;
+	if (mgr->first && mgr->num_running < mgr->max_running) {
+		struct spawn_task *task = mgr->first;
+		if (task->next)
+			mgr->first = task->next;
+		else
+			mgr->first = mgr->last = NULL;
+		spawn_execute(mgr, task->argv, task->envp);
+		free(task->argv);
+		free(task->envp);
+		free(task);
+	}
+}
+
+static int spawn_active(struct spawn_manager *mgr)
+{
+	return mgr->num_running || mgr->first;
+}
+
 struct uevent {
 	char *buf;
 	size_t bufsize;
@@ -99,7 +199,7 @@
 
 static void sighandler(int sig)
 {
-	switch(sig) {
+	switch (sig) {
 	case SIGHUP:
 	case SIGINT:
 	case SIGQUIT:
@@ -106,7 +206,6 @@
 	case SIGABRT:
 	case SIGTERM:
 		exit(0);
-		break;
 	default:
 		break;
 	}
@@ -119,6 +218,7 @@
 	signal(SIGQUIT, sighandler);
 	signal(SIGABRT, sighandler);
 	signal(SIGTERM, sighandler);
+	signal(SIGCHLD, sighandler);
 	signal(SIGPIPE, SIG_IGN);
 }
 
@@ -156,23 +256,6 @@
 	return fd;
 }
 
-static void run_child(char **argv, char **envp)
-{
-	pid_t pid;
-
-	if (!(pid = fork())) {
-		dbg("running %s", argv[0]);
-		if (execve(argv[0], argv, envp) < 0)
-			err(1, argv[0]);
-		exit(0);
-	}
-	if (pid < 0)
-		err(1,"fork");
-
-	waitpid(pid, NULL, 0);
-}
-
-
 static int load_kmod(const char *modalias)
 {
 	static struct kmod_ctx *ctx = NULL;
@@ -225,7 +308,7 @@
 		devnode,
 		NULL
 	};
-	run_child(mdadm_argv, default_envp);
+	spawn_command(&spawnmgr, mdadm_argv, 0);
 }
 
 static void start_lvm2(char *devnode)
@@ -235,7 +318,7 @@
 		"--activate" , "ay", "--noudevsync", "--sysinit",
 		NULL
 	};
-	run_child(lvm2_argv, default_envp);
+	spawn_command(&spawnmgr, lvm2_argv, 0);
 }
 
 static void start_cryptsetup(char *devnode, char *cryptdm)
@@ -245,7 +328,7 @@
 		devnode, cryptdm ? cryptdm : "crypdm", NULL
 	};
 	load_kmod("dm-crypt");
-	run_child(cryptsetup_argv, default_envp);
+	spawn_command(&spawnmgr, cryptsetup_argv, 0);
 }
 
 static int is_mounted(const char *devnode) {
@@ -511,7 +594,7 @@
 
 	} else if (ev->devname != NULL) {
 		if (conf->program_argv[0] != NULL) {
-			run_child(conf->program_argv, ev->envp);
+			spawn_command(&spawnmgr, conf->program_argv, ev->envp);
 			conf->fork_count++;
 		}
 
@@ -634,15 +717,16 @@
 
 int main(int argc, char *argv[])
 {
-	struct pollfd fds[2];
-	int numfds = 2;
+	struct pollfd fds[3];
+	int numfds = 3;
 	int r;
 	struct ueventconf conf;
 	int event_count = 0;
-	size_t total_bytes;
+	size_t total_bytes = 0;
 	int found = 0, trigger_running = 0;
 	char *program_argv[2] = {0,0};
 	pthread_t tid;
+	sigset_t sigchldmask;
 
 	for (r = 0; environ[r]; r++) {
 		if (envcmp(environ[r], "PATH"))
@@ -692,87 +776,106 @@
 		conf.search_device = argv[0];
 
 	initsignals();
+	sigemptyset(&sigchldmask);
+	sigaddset(&sigchldmask, SIGCHLD);
+	sigprocmask(SIG_BLOCK, &sigchldmask, NULL);
 
 	fds[0].fd = init_netlink_socket();
 	fds[0].events = POLLIN;
 
-	fds[1].fd = eventfd(0, EFD_CLOEXEC);
+	fds[1].fd = signalfd(-1, &sigchldmask, SFD_NONBLOCK|SFD_CLOEXEC);
 	fds[1].events = POLLIN;
 
-	pthread_create(&tid, NULL, trigger_thread, &fds[1].fd);
+	fds[2].fd = eventfd(0, EFD_CLOEXEC);
+	fds[2].events = POLLIN;
+	pthread_create(&tid, NULL, trigger_thread, &fds[2].fd);
 	trigger_running = 1;
 
 	while (1) {
-		size_t len;
-		struct iovec iov;
-		char cbuf[CMSG_SPACE(sizeof(struct ucred))];
-		char buf[16384];
-		struct cmsghdr *chdr;
-		struct ucred *cred;
-		struct msghdr hdr;
-		struct sockaddr_nl cnls;
-
-		r = poll(fds, numfds, trigger_running ? -1 : conf.timeout);
-		if (r == -1)
+		r = poll(fds, numfds, (spawn_active(&spawnmgr) || trigger_running) ? -1 : conf.timeout);
+		if (r == -1) {
+			if (errno == EINTR || errno == ERESTART)
+				continue;
 			err(1, "poll");
-
+		}
 		if (r == 0) {
 			dbg("exit due to timeout");
 			break;
 		}
 
-		if (numfds > 1 && fds[1].revents & POLLIN) {
-			close(fds[1].fd);
-			fds[1].fd = -1;
-			numfds--;
-			trigger_running = 0;
-			pthread_join(tid, NULL);
-		}
+		if (fds[0].revents & POLLIN) {
+			size_t len;
+			struct iovec iov;
+			char cbuf[CMSG_SPACE(sizeof(struct ucred))];
+			char buf[16384];
+			struct cmsghdr *chdr;
+			struct ucred *cred;
+			struct msghdr hdr;
+			struct sockaddr_nl cnls;
 
-		if (!(fds[0].revents & POLLIN))
-			continue;
+			iov.iov_base = &buf;
+			iov.iov_len = sizeof(buf);
+			memset(&hdr, 0, sizeof(hdr));
+			hdr.msg_iov = &iov;
+			hdr.msg_iovlen = 1;
+			hdr.msg_control = cbuf;
+			hdr.msg_controllen = sizeof(cbuf);
+			hdr.msg_name = &cnls;
+			hdr.msg_namelen = sizeof(cnls);
 
-		iov.iov_base = &buf;
-		iov.iov_len = sizeof(buf);
-		memset(&hdr, 0, sizeof(hdr));
-		hdr.msg_iov = &iov;
-		hdr.msg_iovlen = 1;
-		hdr.msg_control = cbuf;
-		hdr.msg_controllen = sizeof(cbuf);
-		hdr.msg_name = &cnls;
-		hdr.msg_namelen = sizeof(cnls);
+			len = recvmsg(fds[0].fd, &hdr, 0);
+			if (len < 0) {
+				if (errno == EINTR)
+					continue;
+				err(1, "recvmsg");
+			}
+			if (len < 32 || len >= sizeof(buf))
+				continue;
 
-		len = recvmsg(fds[0].fd, &hdr, 0);
-		if (len < 0) {
-			if (errno == EINTR)
+			total_bytes += len;
+			chdr = CMSG_FIRSTHDR(&hdr);
+			if (chdr == NULL || chdr->cmsg_type != SCM_CREDENTIALS)
 				continue;
-			err(1, "recvmsg");
-		}
-		if (len < 32 || len >= sizeof(buf))
-			continue;
 
-		total_bytes += len;
-		chdr = CMSG_FIRSTHDR(&hdr);
-		if (chdr == NULL || chdr->cmsg_type != SCM_CREDENTIALS)
-			continue;
+			/* filter out messages that are not from root or kernel */
+			cred = (struct ucred *)CMSG_DATA(chdr);
+			if (cred->uid != 0 || cnls.nl_pid > 0)
+				continue;
 
-		/* filter out messages that are not from root or kernel */
-		cred = (struct ucred *)CMSG_DATA(chdr);
-		if (cred->uid != 0 || cnls.nl_pid > 0)
-			continue;
+			event_count++;
+			found |= process_uevent(buf, len, &conf);
 
-		event_count++;
-		found |= process_uevent(buf, len, &conf);
-
-		if ((found & FOUND_DEVICE)
-		    || ((found & FOUND_BOOTREPO) && (found & FOUND_APKOVL))) {
-			dbg("setting timeout to 0");
-			conf.timeout = 0;
+			if ((found & FOUND_DEVICE)
+			    || ((found & FOUND_BOOTREPO) &&
+				(found & FOUND_APKOVL))) {
+				dbg("setting timeout to 0");
+				conf.timeout = 0;
+			}
 		}
 
 		if (fds[0].revents & POLLHUP) {
 			dbg("parent hung up\n");
 			break;
+		}
+
+		if (fds[1].revents & POLLIN) {
+			struct signalfd_siginfo fdsi;
+			pid_t pid;
+			int status;
+
+			while (read(fds[1].fd, &fdsi, sizeof fdsi) > 0)
+				;
+			while ((pid = waitpid(-1, &status, WNOHANG)) > 0)
+				spawn_reap(&spawnmgr, pid);
+		}
+
+		if (fds[2].revents & POLLIN) {
+			close(fds[2].fd);
+			fds[2].fd = -1;
+			fds[2].revents = 0;
+			numfds--;
+			trigger_running = 0;
+			pthread_join(tid, NULL);
 		}
 	}