From 4d6bf767a273f985f24e86adaf04680e6ce2cdf2 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Luk=C3=A1=C5=A1=20Je=C5=BEek?= <lukas.jezek@nic.cz>
Date: Mon, 27 Apr 2020 14:00:17 +0200
Subject: [PATCH] map: use control sockets instead of pipe from parent process

This change allows map() to work with systemd integration.

As a bonus the new client implementation is based on Lua cqueues
allows caller to wrap map() in worker.corroutine() and get
asynchronous execution/avoid blocking main loop.

Currently socket communication does not employ timeouts so a hang
instance will lead to hang map() call. This does not affect query
processing _if_ map() is being run in worker.corroutine.

Fixes: #554
Fixes: #620
---
 .luacheckrc                        |   1 +
 NEWS                               |   1 +
 daemon/engine.c                    |  79 +-----------------
 daemon/engine.h                    |   7 --
 daemon/lua/distro-preconfig.lua.in |   3 +-
 daemon/lua/kluautil.lua            |  40 ++++++++-
 daemon/lua/postconfig.lua          |   5 +-
 daemon/lua/sandbox.lua.in          |  75 +++++++++++++++++
 daemon/main.c                      | 126 +----------------------------
 9 files changed, 127 insertions(+), 210 deletions(-)

diff --git a/.luacheckrc b/.luacheckrc
index 7ff622489..0cf0b884a 100644
--- a/.luacheckrc
+++ b/.luacheckrc
@@ -2,6 +2,7 @@
 std = 'luajit'
 new_read_globals = {
 	'cache',
+	'eval_cmd',
 	'event',
 	'help',
 	'_hint_root_file',
diff --git a/NEWS b/NEWS
index b5dc884e2..871a4b40b 100644
--- a/NEWS
+++ b/NEWS
@@ -19,6 +19,7 @@ Bugfixes
 --------
 - avoid an assert() error in stash_rrset() (!1072)
 - fix emergency cache locking bug introduced in 5.1.3 (!1078)
+- migrate map() command to control sockets; fix systemd integration (!1000)
 
 Incompatible changes
 --------------------
diff --git a/daemon/engine.c b/daemon/engine.c
index 0023696d5..59a6cf189 100644
--- a/daemon/engine.c
+++ b/daemon/engine.c
@@ -12,6 +12,7 @@
 #include <pwd.h>
 #include <sys/param.h>
 #include <libzscanner/scanner.h>
+#include <sys/un.h>
 
 #include <lua.h>
 #include <lualib.h>
@@ -371,62 +372,6 @@ static int l_fromjson(lua_State *L)
 	return 1;
 }
 
-/** @internal Throw Lua error if expr is false */
-#define expr_checked(expr) \
-	if (!(expr)) { lua_pushboolean(L, false); lua_rawseti(L, -2, lua_objlen(L, -2) + 1); continue; }
-
-static int l_map(lua_State *L)
-{
-	/* We don't kr_log_deprecate() here for now.  Plan: after --forks gets *removed*,
-	 * kill internal uses of map() (e.g. from daf module) and add deprecation here.
-	 * Alternatively we might (attempt to) implement map() in another way. */
-	if (lua_gettop(L) != 1 || !lua_isstring(L, 1))
-		lua_error_p(L, "map('string with a lua expression')");
-
-	const char *cmd = lua_tostring(L, 1);
-	uint32_t len = strlen(cmd);
-	lua_newtable(L);
-
-	/* Execute on leader instance */
-	int ntop = lua_gettop(L);
-	engine_cmd(L, cmd, true);
-	lua_settop(L, ntop + 1); /* Push only one return value to table */
-	lua_rawseti(L, -2, 1);
-
-	for (size_t i = 0; i < the_worker->engine->ipc_set.len; ++i) {
-		int fd = the_worker->engine->ipc_set.at[i];
-		/* Send command */
-		expr_checked(write(fd, &len, sizeof(len)) == sizeof(len));
-		expr_checked(write(fd, cmd, len) == len);
-		/* Read response */
-		uint32_t rlen = 0;
-		if (read(fd, &rlen, sizeof(rlen)) == sizeof(rlen)) {
-			expr_checked(rlen < UINT32_MAX);
-			auto_free char *rbuf = malloc(rlen + 1);
-			expr_checked(rbuf != NULL);
-			expr_checked(read(fd, rbuf, rlen) == rlen);
-			rbuf[rlen] = '\0';
-			/* Unpack from JSON */
-			JsonNode *root_node = json_decode(rbuf);
-			if (root_node) {
-				l_unpack_json(L, root_node);
-			} else {
-				lua_pushlstring(L, rbuf, rlen);
-			}
-			json_delete(root_node);
-			lua_rawseti(L, -2, lua_objlen(L, -2) + 1);
-			continue;
-		}
-		/* Didn't respond */
-		lua_pushboolean(L, false);
-		lua_rawseti(L, -2, lua_objlen(L, -2) + 1);
-	}
-	return 1;
-}
-
-#undef expr_checked
-
-
 /*
  * Engine API.
  */
@@ -497,8 +442,6 @@ static int init_state(struct engine *engine)
 	lua_setglobal(engine->L, "tojson");
 	lua_pushcfunction(engine->L, l_fromjson);
 	lua_setglobal(engine->L, "fromjson");
-	lua_pushcfunction(engine->L, l_map);
-	lua_setglobal(engine->L, "map");
 	/* Random number generator */
 	lua_getfield(engine->L, LUA_GLOBALSINDEX, "math");
 	lua_getfield(engine->L, -1, "randomseed");
@@ -628,9 +571,6 @@ void engine_deinit(struct engine *engine)
 	 * e.g. the endpoint kind registry to work (inside ->net),
 	 * and this registry deinitization uses the lua state. */
 	network_close_force(&engine->net);
-	for (size_t i = 0; i < engine->ipc_set.len; ++i) {
-		close(engine->ipc_set.at[i]);
-	}
 	for (size_t i = 0; i < engine->modules.len; ++i) {
 		engine_unload(engine, engine->modules.at[i]);
 	}
@@ -649,7 +589,6 @@ void engine_deinit(struct engine *engine)
 	/* Free data structures */
 	array_clear(engine->modules);
 	array_clear(engine->backends);
-	array_clear(engine->ipc_set);
 	kr_ta_clear(&engine->resolver.trust_anchors);
 	kr_ta_clear(&engine->resolver.negative_anchors);
 	free(engine->hostname);
@@ -675,22 +614,6 @@ int engine_cmd(lua_State *L, const char *str, bool raw)
 	return engine_pcall(L, 2);
 }
 
-int engine_ipc(struct engine *engine, const char *expr)
-{
-	if (engine == NULL || engine->L == NULL) {
-		return kr_error(ENOEXEC);
-	}
-
-	/* Run expression and serialize response. */
-	engine_cmd(engine->L, expr, true);
-	if (lua_gettop(engine->L) > 0) {
-		l_tojson(engine->L);
-		return 1;
-	} else {
-		return 0;
-	}
-}
-
 int engine_load_sandbox(struct engine *engine)
 {
 	/* Init environment */
diff --git a/daemon/engine.h b/daemon/engine.h
index 3de30320b..d724f9255 100644
--- a/daemon/engine.h
+++ b/daemon/engine.h
@@ -13,15 +13,11 @@ struct lua_State;
 #include "lib/resolve.h"
 #include "daemon/network.h"
 
-/* @internal Array of file descriptors shorthand. */
-typedef array_t(int) fd_array_t;
-
 struct engine {
     struct kr_context resolver;
     struct network net;
     module_array_t modules;
     array_t(const struct kr_cdb_api *) backends;
-    fd_array_t ipc_set;
     knot_mm_t *pool;
     char *hostname;
     struct lua_State *L;
@@ -40,9 +36,6 @@ int engine_cmd(struct lua_State *L, const char *str, bool raw);
 /** Execute current chunk in the sandbox */
 int engine_pcall(struct lua_State *L, int argc);
 
-int engine_ipc(struct engine *engine, const char *expr);
-
-
 int engine_load_sandbox(struct engine *engine);
 int engine_loadconf(struct engine *engine, const char *config_path);
 
diff --git a/daemon/lua/distro-preconfig.lua.in b/daemon/lua/distro-preconfig.lua.in
index e017b3556..a27974a2b 100644
--- a/daemon/lua/distro-preconfig.lua.in
+++ b/daemon/lua/distro-preconfig.lua.in
@@ -4,7 +4,8 @@ if not id then
 	warn('environment variable $SYSTEMD_INSTANCE not set')
 else
 	-- Bind to control socket in run_dir
-	local path = '@run_dir@/control/'..id
+	worker.control_path = '@run_dir@/control/'
+	local path = worker.control_path..id
 	local ok, err = pcall(net.listen, path, nil, { kind = 'control' })
 	if not ok then
 		warn('bind to '..path..' failed '..err)
diff --git a/daemon/lua/kluautil.lua b/daemon/lua/kluautil.lua
index 0d63c07fc..0b6a114fb 100644
--- a/daemon/lua/kluautil.lua
+++ b/daemon/lua/kluautil.lua
@@ -1,6 +1,7 @@
 -- SPDX-License-Identifier: GPL-3.0-or-later
 
 local cqerrno = require('cqueues.errno')
+local ffi = require('ffi')
 local kluautil = {}
 
 -- Get length of table
@@ -17,6 +18,21 @@ function kluautil.kr_table_len(t)
 end
 
 -- Fetch over HTTPS
+ffi.cdef([[
+	typedef struct __dirstream DIR;
+	struct dirent {
+		unsigned long int	d_ino;
+		long int		d_off;
+		unsigned short		d_reclen;
+		unsigned char		d_type;
+		char			d_name[256];
+	};
+	DIR *opendir(const char *name);
+	struct dirent *readdir(DIR *dirp);
+	int closedir(DIR *dirp);
+	char *strerror(int errnum);
+]])
+
 function kluautil.kr_https_fetch(url, out_file, ca_file)
 	local http_ok, http_request = pcall(require, 'http.request')
 	local httptls_ok, http_tls = pcall(require, 'http.tls')
@@ -62,9 +78,31 @@ function kluautil.kr_https_fetch(url, out_file, ca_file)
 		return nil, errmsg
 	end
 
-	out_file:seek("set", 0)
+	out_file:seek('set', 0)
 
 	return true
 end
 
+-- List directory
+function kluautil.list_dir (path)
+	local results = {}
+	local dir = ffi.C.opendir(path)
+	if dir == nil then
+		return results
+	end
+
+	local entry = ffi.C.readdir(dir)
+	while entry ~= nil do
+		local entry_name = ffi.string(entry.d_name)
+		if entry_name ~= '.' and entry_name ~= '..' then
+			table.insert(results, entry_name)
+		end
+		entry = ffi.C.readdir(dir)
+	end
+
+	ffi.C.closedir(dir)
+
+	return results
+end
+
 return kluautil
diff --git a/daemon/lua/postconfig.lua b/daemon/lua/postconfig.lua
index 7fff3d1e5..48ac65aa4 100644
--- a/daemon/lua/postconfig.lua
+++ b/daemon/lua/postconfig.lua
@@ -21,9 +21,12 @@ end
 
 local n_dns_socks, n_control_socks = count_sockets()
 
+-- Check and set control sockets path
+worker.control_path = worker.control_path or (worker.cwd .. '/control/')
+
 -- Bind to control socket by default
 if not C.the_args.interactive and n_control_socks == 0 and not env.KRESD_NO_LISTEN then
-	local path = worker.cwd..'/control/'..worker.pid
+	local path = worker.control_path..worker.pid
 	local ok, err = pcall(net.listen, path, nil, { kind = 'control' })
 	if not ok then
 		warn('bind to '..path..' failed '..err)
diff --git a/daemon/lua/sandbox.lua.in b/daemon/lua/sandbox.lua.in
index b20068194..a02584d22 100644
--- a/daemon/lua/sandbox.lua.in
+++ b/daemon/lua/sandbox.lua.in
@@ -642,3 +642,78 @@ else
 	worker.coroutine = disabled
 	worker.bg_worker = setmetatable({}, { __index = disabled })
 end
+
+-- Global commands for map()
+
+function map(cmd, format)
+	local socket = require('cqueues.socket')
+	local kluautil = require('kluautil')
+	local bit = require("bit")
+	local local_sockets = {}
+	local results = {}
+
+	format = format or 'luaobj'
+	if format ~= 'luaobj' and format ~= 'strings' then
+		warn('warning: Unknown map format. Used "luaobj".')
+		format = 'luaobj'
+	end
+
+	for _,v in pairs(net.list()) do
+		if (v['kind'] == 'control') and (v['transport']['family'] == 'unix') then
+			table.insert(local_sockets, string.match(v['transport']['path'], '^.*/([^/]+)$'))
+		end
+	end
+
+	local filetab = kluautil.list_dir(worker.control_path)
+	if next(filetab) == nil then
+		local ret = eval_cmd(cmd, true)
+		if ret == nil then
+			results = {}
+		else
+			table.insert(results, ret)
+		end
+		return results
+	end
+
+	for _,file in ipairs(filetab) do
+		local local_exec = false
+		for _,lsoc in ipairs(local_sockets) do
+			if file == lsoc then
+				local_exec = true
+			end
+		end
+
+		if local_exec then
+			table.insert(results, eval_cmd(cmd, true))
+		else
+			local s = socket.connect({ path = worker.control_path..file })
+			s:setmode('bn', 'bn')
+			local status, err = pcall(s.connect, s)
+			if not status then
+				print(err)
+			else
+				s:write('__binary\n')
+				recv = s:read(2)
+				if format == 'luaobj' then
+					cmd = 'tojson('..cmd..')'
+				end
+				s:write(cmd..'\n')
+				local recv = s:read(4)
+				local len = tonumber(recv:byte(1))
+				for i=2,4 do
+					len = bit.bor(bit.lshift(len, 8), tonumber(recv:byte(i)))
+				end
+				recv = s:read(len)
+				if format == 'strings' then
+					table.insert(results, recv)
+				else
+					table.insert(results, fromjson(recv))
+				end
+
+				s:close()
+			end
+		end
+	end
+
+	return results
+end
diff --git a/daemon/main.c b/daemon/main.c
index f464effbd..cd83a976d 100644
--- a/daemon/main.c
+++ b/daemon/main.c
@@ -42,97 +42,6 @@
 
 struct args the_args_value;  /** Static allocation for the_args singleton. */
 
-
-/* @internal AF_LOCAL reads may still be interrupted, loop it. */
-static bool ipc_readall(int fd, char *dst, size_t len)
-{
-	while (len > 0) {
-		int rb = read(fd, dst, len);
-		if (rb > 0) {
-			dst += rb;
-			len -= rb;
-		} else if (errno != EAGAIN && errno != EINTR) {
-			return false;
-		}
-	}
-	return true;
-}
-
-static void ipc_activity(uv_poll_t *handle, int status, int events)
-{
-	struct engine *engine = handle->data;
-	if (status != 0) {
-		kr_log_error("[system] ipc: %s\n", uv_strerror(status));
-		return;
-	}
-	/* Get file descriptor from handle */
-	uv_os_fd_t fd = 0;
-	(void) uv_fileno((uv_handle_t *)(handle), &fd);
-	/* Read expression from IPC pipe */
-	uint32_t len = 0;
-	auto_free char *rbuf = NULL;
-	if (!ipc_readall(fd, (char *)&len, sizeof(len))) {
-		goto failure;
-	}
-	if (len < UINT32_MAX) {
-		rbuf = malloc(len + 1);
-	} else {
-		errno = EINVAL;
-	}
-	if (!rbuf) {
-		goto failure;
-	}
-	if (!ipc_readall(fd, rbuf, len)) {
-		goto failure;
-	}
-	rbuf[len] = '\0';
-	/* Run expression */
-	const char *message = "";
-	int ret = engine_ipc(engine, rbuf);
-	if (ret > 0) {
-		message = lua_tostring(engine->L, -1);
-	}
-	/* Clear the Lua stack */
-	lua_settop(engine->L, 0);
-	/* Send response back */
-	len = strlen(message);
-	if (write(fd, &len, sizeof(len)) != sizeof(len) ||
-		write(fd, message, len) != len) {
-		goto failure;
-	}
-	return; /* success! */
-failure:
-	/* Note that if the piped command got read or written partially,
-	 * we would get out of sync and only receive rubbish now.
-	 * Therefore we prefer to stop IPC, but we try to continue with all else.
-	 */
-	kr_log_error("[system] stopping ipc because of: %s\n", strerror(errno));
-	uv_poll_stop(handle);
-	uv_close((uv_handle_t *)handle, (uv_close_cb)free);
-}
-
-static bool ipc_watch(uv_loop_t *loop, struct engine *engine, int fd)
-{
-	uv_poll_t *poller = malloc(sizeof(*poller));
-	if (!poller) {
-		return false;
-	}
-	int ret = uv_poll_init(loop, poller, fd);
-	if (ret != 0) {
-		free(poller);
-		return false;
-	}
-	poller->data = engine;
-	ret = uv_poll_start(poller, UV_READABLE, ipc_activity);
-	if (ret != 0) {
-		free(poller);
-		return false;
-	}
-	/* libuv sets O_NONBLOCK whether we want it or not */
-	(void) fcntl(fd, F_SETFD, fcntl(fd, F_GETFL) & ~O_NONBLOCK);
-	return true;
-}
-
 static void signal_handler(uv_signal_t *handle, int signum)
 {
 	uv_stop(uv_default_loop());
@@ -180,15 +89,10 @@ static void sigbus_handler(int sig, siginfo_t *siginfo, void *ptr)
  * Server operation.
  */
 
-static int fork_workers(fd_array_t *ipc_set, int forks)
+static int fork_workers(int forks)
 {
 	/* Fork subprocesses if requested */
 	while (--forks > 0) {
-		int sv[2] = {-1, -1};
-		if (socketpair(AF_LOCAL, SOCK_STREAM, 0, sv) < 0) {
-			perror("[system] socketpair");
-			return kr_error(errno);
-		}
 		int pid = fork();
 		if (pid < 0) {
 			perror("[system] fork");
@@ -197,16 +101,7 @@ static int fork_workers(fd_array_t *ipc_set, int forks)
 
 		/* Forked process */
 		if (pid == 0) {
-			array_clear(*ipc_set);
-			array_push(*ipc_set, sv[0]);
-			close(sv[1]);
 			return forks;
-		/* Parent process */
-		} else {
-			array_push(*ipc_set, sv[1]);
-			/* Do not share parent-end with other forks. */
-			(void) fcntl(sv[1], F_SETFD, FD_CLOEXEC);
-			close(sv[0]);
 		}
 	}
 	return 0;
@@ -234,7 +129,7 @@ static void help(int argc, char *argv[])
 }
 
 /** \return exit code for main()  */
-static int run_worker(uv_loop_t *loop, struct engine *engine, fd_array_t *ipc_set, bool leader, struct args *args)
+static int run_worker(uv_loop_t *loop, struct engine *engine, bool leader, struct args *args)
 {
 	/* Only some kinds of stdin work with uv_pipe_t.
 	 * Otherwise we would abort() from libuv e.g. with </dev/null */
@@ -265,16 +160,6 @@ static int run_worker(uv_loop_t *loop, struct engine *engine, fd_array_t *ipc_se
 	} else if (args->control_fd != -1 && uv_pipe_open(pipe, args->control_fd) == 0) {
 		uv_listen((uv_stream_t *)pipe, 16, io_tty_accept);
 	}
-	/* Watch IPC pipes (or just assign them if leading the pgroup). */
-	if (!leader) {
-		for (size_t i = 0; i < ipc_set->len; ++i) {
-			if (!ipc_watch(loop, engine, ipc_set->at[i])) {
-				kr_log_error("[system] failed to create poller: %s\n", strerror(errno));
-				close(ipc_set->at[i]);
-			}
-		}
-	}
-	memcpy(&engine->ipc_set, ipc_set, sizeof(*ipc_set));
 
 	/* Notify supervisor. */
 #if ENABLE_LIBSYSTEMD
@@ -591,11 +476,8 @@ int main(int argc, char **argv)
 				(long)rlim.rlim_cur);
 	}
 
-	/* Connect forks with local socket */
-	fd_array_t ipc_set;
-	array_init(ipc_set);
 	/* Fork subprocesses if requested */
-	int fork_id = fork_workers(&ipc_set, the_args->forks);
+	int fork_id = fork_workers(the_args->forks);
 	if (fork_id < 0) {
 		return EXIT_FAILURE;
 	}
@@ -696,7 +578,7 @@ int main(int argc, char **argv)
 	}
 
 	/* Run the event loop */
-	ret = run_worker(loop, &engine, &ipc_set, fork_id == 0, the_args);
+	ret = run_worker(loop, &engine, fork_id == 0, the_args);
 
 cleanup:/* Cleanup. */
 	engine_deinit(&engine);
-- 
GitLab