Commit de7b3862 authored by Marek Vavrusa's avatar Marek Vavrusa
Browse files

daemon: workers are interconnected with IPC pipes

forks are connected with IPC pipes to process
group leader and can execute expressions on its
behalf. so running commands over all workers
is easy now:

> hostname() -- single
localhost

> map 'hostname()' -- all
localhost
localhost
localhost
parent dedf8c57
...@@ -69,6 +69,7 @@ static int l_help(lua_State *L) ...@@ -69,6 +69,7 @@ static int l_help(lua_State *L)
"resolve(name, type[, class, flags, callback])\n resolve query, callback when it's finished\n" "resolve(name, type[, class, flags, callback])\n resolve query, callback when it's finished\n"
"todname(name)\n convert name to wire format\n" "todname(name)\n convert name to wire format\n"
"tojson(val)\n convert value to JSON\n" "tojson(val)\n convert value to JSON\n"
"map(expr)\n run expression on all workers\n"
"net\n network configuration\n" "net\n network configuration\n"
"cache\n network configuration\n" "cache\n network configuration\n"
"modules\n modules configuration\n" "modules\n modules configuration\n"
...@@ -343,6 +344,56 @@ static int l_tojson(lua_State *L) ...@@ -343,6 +344,56 @@ static int l_tojson(lua_State *L)
return 1; return 1;
} }
/** @internal Throw Lua error if expr is false */
#define expr_checked(expr) \
if (!(expr)) { lua_pushboolean(L, false); lua_rawseti(L, -2, lua_rawlen(L, -2) + 1); continue; }
static int l_map(lua_State *L)
{
struct engine *engine = engine_luaget(L);
const char *cmd = lua_tostring(L, 1);
uint32_t len = strlen(cmd);
lua_newtable(L);
/* Execute on leader instance */
int ntop = lua_gettop(L);
engine_cmd(L, cmd, true);
lua_settop(L, ntop + 1); /* Push only one return value to table */
lua_rawseti(L, -2, 1);
for (size_t i = 0; i < engine->ipc_set.len; ++i) {
int fd = engine->ipc_set.at[i];
/* Send command */
expr_checked(write(fd, &len, sizeof(len)) == sizeof(len));
expr_checked(write(fd, cmd, len) == len);
/* Read response */
uint32_t rlen = 0;
if (read(fd, &rlen, sizeof(rlen)) == sizeof(rlen)) {
auto_free char *rbuf = malloc(rlen + 1);
expr_checked(rbuf != NULL);
expr_checked(read(fd, rbuf, rlen) == rlen);
rbuf[rlen] = '\0';
/* Unpack from JSON */
JsonNode *root_node = json_decode(rbuf);
if (root_node) {
l_unpack_json(L, root_node);
} else {
lua_pushlstring(L, rbuf, rlen);
}
json_delete(root_node);
lua_rawseti(L, -2, lua_rawlen(L, -2) + 1);
continue;
}
/* Didn't respond */
lua_pushboolean(L, false);
lua_rawseti(L, -2, lua_rawlen(L, -2) + 1);
}
return 1;
}
#undef expr_checked
/** Trampoline function for module properties. */ /** Trampoline function for module properties. */
static int l_trampoline(lua_State *L) static int l_trampoline(lua_State *L)
{ {
...@@ -457,6 +508,8 @@ static int init_state(struct engine *engine) ...@@ -457,6 +508,8 @@ static int init_state(struct engine *engine)
lua_setglobal(engine->L, "libpath"); lua_setglobal(engine->L, "libpath");
lua_pushcfunction(engine->L, l_tojson); lua_pushcfunction(engine->L, l_tojson);
lua_setglobal(engine->L, "tojson"); lua_setglobal(engine->L, "tojson");
lua_pushcfunction(engine->L, l_map);
lua_setglobal(engine->L, "map");
lua_pushliteral(engine->L, MODULEDIR); lua_pushliteral(engine->L, MODULEDIR);
lua_setglobal(engine->L, "moduledir"); lua_setglobal(engine->L, "moduledir");
lua_pushliteral(engine->L, ETCDIR); lua_pushliteral(engine->L, ETCDIR);
...@@ -535,6 +588,11 @@ void engine_deinit(struct engine *engine) ...@@ -535,6 +588,11 @@ void engine_deinit(struct engine *engine)
lru_deinit(engine->resolver.cache_rtt); lru_deinit(engine->resolver.cache_rtt);
lru_deinit(engine->resolver.cache_rep); lru_deinit(engine->resolver.cache_rep);
/* Clear IPC pipes */
for (size_t i = 0; i < engine->ipc_set.len; ++i) {
close(engine->ipc_set.at[i]);
}
/* Unload modules and engine. */ /* Unload modules and engine. */
for (size_t i = 0; i < engine->modules.len; ++i) { for (size_t i = 0; i < engine->modules.len; ++i) {
engine_unload(engine, engine->modules.at[i]); engine_unload(engine, engine->modules.at[i]);
...@@ -546,6 +604,7 @@ void engine_deinit(struct engine *engine) ...@@ -546,6 +604,7 @@ void engine_deinit(struct engine *engine)
/* Free data structures */ /* Free data structures */
array_clear(engine->modules); array_clear(engine->modules);
array_clear(engine->backends); array_clear(engine->backends);
array_clear(engine->ipc_set);
kr_ta_clear(&engine->resolver.trust_anchors); kr_ta_clear(&engine->resolver.trust_anchors);
kr_ta_clear(&engine->resolver.negative_anchors); kr_ta_clear(&engine->resolver.negative_anchors);
} }
...@@ -559,18 +618,35 @@ int engine_pcall(lua_State *L, int argc) ...@@ -559,18 +618,35 @@ int engine_pcall(lua_State *L, int argc)
return lua_pcall(L, argc, LUA_MULTRET, 0); return lua_pcall(L, argc, LUA_MULTRET, 0);
} }
int engine_cmd(struct engine *engine, const char *str) int engine_cmd(lua_State *L, const char *str, bool raw)
{ {
if (engine == NULL || engine->L == NULL) { if (L == NULL) {
return kr_error(ENOEXEC); return kr_error(ENOEXEC);
} }
/* Evaluate results */ /* Evaluate results */
lua_getglobal(engine->L, "eval_cmd"); lua_getglobal(L, "eval_cmd");
lua_pushstring(engine->L, str); lua_pushstring(L, str);
lua_pushboolean(L, raw);
/* Check result. */ /* Check result. */
return engine_pcall(engine->L, 1); return engine_pcall(L, 2);
}
int engine_ipc(struct engine *engine, const char *expr)
{
if (engine == NULL || engine->L == NULL) {
return kr_error(ENOEXEC);
}
/* Run expression and serialize response. */
engine_cmd(engine->L, expr, true);
if (lua_gettop(engine->L) > 0) {
l_tojson(engine->L);
return 1;
} else {
return 0;
}
} }
/* Execute byte code */ /* Execute byte code */
......
...@@ -45,11 +45,15 @@ struct lua_State; ...@@ -45,11 +45,15 @@ struct lua_State;
#include "lib/resolve.h" #include "lib/resolve.h"
#include "daemon/network.h" #include "daemon/network.h"
/* @internal Array of file descriptors shorthand. */
typedef array_t(int) fd_array_t;
struct engine { struct engine {
struct kr_context resolver; struct kr_context resolver;
struct network net; struct network net;
module_array_t modules; module_array_t modules;
array_t(const struct kr_cdb_api *) backends; array_t(const struct kr_cdb_api *) backends;
fd_array_t ipc_set;
knot_mm_t *pool; knot_mm_t *pool;
uv_timer_t *updater; uv_timer_t *updater;
struct lua_State *L; struct lua_State *L;
...@@ -58,7 +62,8 @@ struct engine { ...@@ -58,7 +62,8 @@ struct engine {
int engine_init(struct engine *engine, knot_mm_t *pool); int engine_init(struct engine *engine, knot_mm_t *pool);
void engine_deinit(struct engine *engine); void engine_deinit(struct engine *engine);
/** @warning This function leaves 1 string result on stack. */ /** @warning This function leaves 1 string result on stack. */
int engine_cmd(struct engine *engine, const char *str); int engine_cmd(struct lua_State *L, const char *str, bool raw);
int engine_ipc(struct engine *engine, const char *expr);
int engine_start(struct engine *engine, const char *config_path); int engine_start(struct engine *engine, const char *config_path);
void engine_stop(struct engine *engine); void engine_stop(struct engine *engine);
int engine_register(struct engine *engine, const char *module, const char *precedence, const char* ref); int engine_register(struct engine *engine, const char *module, const char *precedence, const char* ref);
......
...@@ -142,6 +142,15 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, ...@@ -142,6 +142,15 @@ void udp_recv(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf,
mp_flush(worker->pkt_pool.ctx); mp_flush(worker->pkt_pool.ctx);
} }
static int udp_bind_finalize(uv_handle_t *handle)
{
check_bufsize((uv_handle_t *)handle);
/* Handle is already created, just create context. */
handle->data = session_new();
assert(handle->data);
return io_start_read((uv_handle_t *)handle);
}
int udp_bind(uv_udp_t *handle, struct sockaddr *addr) int udp_bind(uv_udp_t *handle, struct sockaddr *addr)
{ {
unsigned flags = UV_UDP_REUSEADDR; unsigned flags = UV_UDP_REUSEADDR;
...@@ -152,11 +161,20 @@ int udp_bind(uv_udp_t *handle, struct sockaddr *addr) ...@@ -152,11 +161,20 @@ int udp_bind(uv_udp_t *handle, struct sockaddr *addr)
if (ret != 0) { if (ret != 0) {
return ret; return ret;
} }
check_bufsize((uv_handle_t *)handle); return udp_bind_finalize((uv_handle_t *)handle);
/* Handle is already created, just create context. */ }
handle->data = session_new();
assert(handle->data); int udp_bindfd(uv_udp_t *handle, int fd)
return io_start_read((uv_handle_t *)handle); {
if (!handle) {
return kr_error(EINVAL);
}
int ret = uv_udp_open(handle, (uv_os_sock_t) fd);
if (ret != 0) {
return ret;
}
return udp_bind_finalize((uv_handle_t *)handle);
} }
static void tcp_timeout(uv_handle_t *timer) static void tcp_timeout(uv_handle_t *timer)
...@@ -230,15 +248,30 @@ static void tcp_accept(uv_stream_t *master, int status) ...@@ -230,15 +248,30 @@ static void tcp_accept(uv_stream_t *master, int status)
io_start_read((uv_handle_t *)client); io_start_read((uv_handle_t *)client);
} }
static int set_tcp_option(uv_tcp_t *handle, int option, int val) static int set_tcp_option(uv_handle_t *handle, int option, int val)
{ {
uv_os_fd_t fd = 0; uv_os_fd_t fd = 0;
if (uv_fileno((uv_handle_t *)handle, &fd) == 0) { if (uv_fileno(handle, &fd) == 0) {
return setsockopt(fd, IPPROTO_TCP, option, &val, sizeof(val)); return setsockopt(fd, IPPROTO_TCP, option, &val, sizeof(val));
} }
return 0; /* N/A */ return 0; /* N/A */
} }
static int tcp_bind_finalize(uv_handle_t *handle)
{
/* TCP_FASTOPEN enables 1 RTT connection resumptions. */
#ifdef TCP_FASTOPEN
# ifdef __linux__
(void) set_tcp_option(handle, TCP_FASTOPEN, 16); /* Accepts queue length hint */
# else
(void) set_tcp_option(handle, TCP_FASTOPEN, 1); /* Accepts on/off */
# endif
#endif
handle->data = NULL;
return 0;
}
static int _tcp_bind(uv_tcp_t *handle, struct sockaddr *addr, uv_connection_cb connection) static int _tcp_bind(uv_tcp_t *handle, struct sockaddr *addr, uv_connection_cb connection)
{ {
unsigned flags = 0; unsigned flags = 0;
...@@ -253,7 +286,7 @@ static int _tcp_bind(uv_tcp_t *handle, struct sockaddr *addr, uv_connection_cb c ...@@ -253,7 +286,7 @@ static int _tcp_bind(uv_tcp_t *handle, struct sockaddr *addr, uv_connection_cb c
/* TCP_DEFER_ACCEPT delays accepting connections until there is readable data. */ /* TCP_DEFER_ACCEPT delays accepting connections until there is readable data. */
#ifdef TCP_DEFER_ACCEPT #ifdef TCP_DEFER_ACCEPT
if (set_tcp_option(handle, TCP_DEFER_ACCEPT, KR_CONN_RTT_MAX/1000) != 0) { if (set_tcp_option((uv_handle_t *)handle, TCP_DEFER_ACCEPT, KR_CONN_RTT_MAX/1000) != 0) {
kr_log_info("[ io ] tcp_bind (defer_accept): %s\n", strerror(errno)); kr_log_info("[ io ] tcp_bind (defer_accept): %s\n", strerror(errno));
} }
#endif #endif
...@@ -263,17 +296,7 @@ static int _tcp_bind(uv_tcp_t *handle, struct sockaddr *addr, uv_connection_cb c ...@@ -263,17 +296,7 @@ static int _tcp_bind(uv_tcp_t *handle, struct sockaddr *addr, uv_connection_cb c
return ret; return ret;
} }
/* TCP_FASTOPEN enables 1 RTT connection resumptions. */ return tcp_bind_finalize((uv_handle_t *)handle);
#ifdef TCP_FASTOPEN
# ifdef __linux__
(void) set_tcp_option(handle, TCP_FASTOPEN, 16); /* Accepts queue length hint */
# else
(void) set_tcp_option(handle, TCP_FASTOPEN, 1); /* Accepts on/off */
# endif
#endif
handle->data = NULL;
return 0;
} }
int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr) int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr)
...@@ -281,6 +304,24 @@ int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr) ...@@ -281,6 +304,24 @@ int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr)
return _tcp_bind(handle, addr, tcp_accept); return _tcp_bind(handle, addr, tcp_accept);
} }
int tcp_bindfd(uv_tcp_t *handle, int fd)
{
if (!handle) {
return kr_error(EINVAL);
}
int ret = uv_tcp_open(handle, (uv_os_sock_t) fd);
if (ret != 0) {
return ret;
}
ret = uv_listen((uv_stream_t *)handle, 16, tcp_accept);
if (ret != 0) {
return ret;
}
return tcp_bind_finalize((uv_handle_t *)handle);
}
void io_create(uv_loop_t *loop, uv_handle_t *handle, int type) void io_create(uv_loop_t *loop, uv_handle_t *handle, int type)
{ {
if (type == SOCK_DGRAM) { if (type == SOCK_DGRAM) {
......
...@@ -37,7 +37,9 @@ void session_free(struct session *s); ...@@ -37,7 +37,9 @@ void session_free(struct session *s);
struct session *session_new(void); struct session *session_new(void);
int udp_bind(uv_udp_t *handle, struct sockaddr *addr); int udp_bind(uv_udp_t *handle, struct sockaddr *addr);
int udp_bindfd(uv_udp_t *handle, int fd);
int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr); int tcp_bind(uv_tcp_t *handle, struct sockaddr *addr);
int tcp_bindfd(uv_tcp_t *handle, int fd);
void io_create(uv_loop_t *loop, uv_handle_t *handle, int type); void io_create(uv_loop_t *loop, uv_handle_t *handle, int type);
void io_deinit(uv_handle_t *handle); void io_deinit(uv_handle_t *handle);
......
...@@ -171,7 +171,7 @@ else -- Lua 5.2+ ...@@ -171,7 +171,7 @@ else -- Lua 5.2+
end end
-- Interactive command evaluation -- Interactive command evaluation
function eval_cmd(line) function eval_cmd(line, raw)
-- Compatibility sandbox code loading -- Compatibility sandbox code loading
local function load_code(code) local function load_code(code)
if getfenv then -- Lua 5.1 if getfenv then -- Lua 5.1
...@@ -181,7 +181,7 @@ function eval_cmd(line) ...@@ -181,7 +181,7 @@ function eval_cmd(line)
end end
end end
local status, err, chunk local status, err, chunk
chunk, err = load_code('return table_print('..line..')') chunk, err = load_code(raw and 'return '..line or 'return table_print('..line..')')
if err then if err then
chunk, err = load_code(line) chunk, err = load_code(line)
end end
......
...@@ -35,6 +35,11 @@ ...@@ -35,6 +35,11 @@
#include "daemon/engine.h" #include "daemon/engine.h"
#include "daemon/bindings.h" #include "daemon/bindings.h"
/* We can fork early on Linux 3.9+ and do SO_REUSEPORT for better performance. */
#if defined(UV_VERSION_HEX) && defined(SO_REUSEPORT) && defined(__linux__)
#define CAN_FORK_EARLY 1
#endif
/* /*
* Globals * Globals
*/ */
...@@ -69,7 +74,7 @@ static void tty_read(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf) ...@@ -69,7 +74,7 @@ static void tty_read(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
} }
struct engine *engine = stream->data; struct engine *engine = stream->data;
lua_State *L = engine->L; lua_State *L = engine->L;
int ret = engine_cmd(engine, cmd); int ret = engine_cmd(L, cmd, false);
const char *message = ""; const char *message = "";
if (lua_gettop(L) > 0) { if (lua_gettop(L) > 0) {
message = lua_tostring(L, -1); message = lua_tostring(L, -1);
...@@ -126,6 +131,92 @@ static void tty_accept(uv_stream_t *master, int status) ...@@ -126,6 +131,92 @@ static void tty_accept(uv_stream_t *master, int status)
} }
} }
static void ipc_close(uv_handle_t *handle)
{
free(handle);
}
/* @internal AF_LOCAL reads may still be interrupted, loop it. */
static bool ipc_readall(int fd, char *dst, size_t len)
{
while (len > 0) {
int rb = read(fd, dst, len);
if (rb > 0) {
dst += rb;
len -= rb;
} else if (errno != EAGAIN && errno != EINTR) {
return false;
}
}
return true;
}
static void ipc_activity(uv_poll_t* handle, int status, int events)
{
struct engine *engine = handle->data;
if (status != 0) {
kr_log_error("[system] ipc: %s\n", uv_strerror(status));
ipc_close((uv_handle_t *)handle);
return;
}
/* Get file descriptor from handle */
uv_os_fd_t fd = 0;
(void) uv_fileno((uv_handle_t *)(handle), &fd);
/* Read expression from IPC pipe */
uint32_t len = 0;
if (ipc_readall(fd, (char *)&len, sizeof(len))) {
auto_free char *rbuf = malloc(len + 1);
if (!rbuf) {
kr_log_error("[system] ipc: %s\n", strerror(errno));
engine_stop(engine); /* Panic and stop this fork. */
return;
}
if (ipc_readall(fd, rbuf, len)) {
rbuf[len] = '\0';
/* Run expression */
const char *message = "";
int ret = engine_ipc(engine, rbuf);
if (ret > 0) {
message = lua_tostring(engine->L, -1);
}
/* Send response back */
len = strlen(message);
if (write(fd, &len, sizeof(len)) != sizeof(len) ||
write(fd, message, len) != len) {
kr_log_error("[system] ipc: %s\n", strerror(errno));
}
/* Clear the Lua stack */
lua_settop(engine->L, 0);
} else {
kr_log_error("[system] ipc: %s\n", strerror(errno));
}
} else {
kr_log_error("[system] ipc: %s\n", strerror(errno));
}
}
static bool ipc_watch(uv_loop_t *loop, struct engine *engine, int fd)
{
uv_poll_t *poller = malloc(sizeof(*poller));
if (!poller) {
return false;
}
int ret = uv_poll_init(loop, poller, fd);
if (ret != 0) {
free(poller);
return false;
}
poller->data = engine;
ret = uv_poll_start(poller, UV_READABLE, ipc_activity);
if (ret != 0) {
free(poller);
return false;
}
/* libuv sets O_NONBLOCK whether we want it or not */
(void) fcntl(fd, F_SETFD, fcntl(fd, F_GETFL) & ~O_NONBLOCK);
return true;
}
static void signal_handler(uv_signal_t *handle, int signum) static void signal_handler(uv_signal_t *handle, int signum)
{ {
uv_stop(uv_default_loop()); uv_stop(uv_default_loop());
...@@ -147,6 +238,39 @@ static const char *set_addr(char *addr, int *port) ...@@ -147,6 +238,39 @@ static const char *set_addr(char *addr, int *port)
* Server operation. * Server operation.
*/ */
static int fork_workers(fd_array_t *ipc_set, int forks)
{
/* Fork subprocesses if requested */
while (--forks > 0) {
int sv[2] = {-1, -1};
if (socketpair(AF_LOCAL, SOCK_STREAM, 0, sv) < 0) {
perror("[system] socketpair");
return kr_error(errno);
}
int pid = fork();
if (pid < 0) {
perror("[system] fork");
return kr_error(errno);
}
/* Forked process */
if (pid == 0) {
array_clear(*ipc_set);
array_push(*ipc_set, sv[0]);
close(sv[1]);
kr_crypto_reinit();
return forks;
/* Parent process */
} else {
array_push(*ipc_set, sv[1]);
/* Do not share parent-end with other forks. */
(void) fcntl(sv[1], F_SETFD, FD_CLOEXEC);
close(sv[0]);
}
}
return 0;
}
static void help(int argc, char *argv[]) static void help(int argc, char *argv[])
{ {
printf("Usage: %s [parameters] [rundir]\n", argv[0]); printf("Usage: %s [parameters] [rundir]\n", argv[0]);
...@@ -164,7 +288,7 @@ static void help(int argc, char *argv[]) ...@@ -164,7 +288,7 @@ static void help(int argc, char *argv[])
" [rundir] Path to the working directory (default: .)\n"); " [rundir] Path to the working directory (default: .)\n");
} }
static struct worker_ctx *init_worker(uv_loop_t *loop, struct engine *engine, knot_mm_t *pool, int worker_id, int worker_count) static struct worker_ctx *init_worker(struct engine *engine, knot_mm_t *pool, int worker_id, int worker_count)
{ {
/* Load bindings */ /* Load bindings */
engine_lualib(engine, "modules", lib_modules); engine_lualib(engine, "modules", lib_modules);
...@@ -181,9 +305,7 @@ static struct worker_ctx *init_worker(uv_loop_t *loop, struct engine *engine, kn ...@@ -181,9 +305,7 @@ static struct worker_ctx *init_worker(uv_loop_t *loop, struct engine *engine, kn
memset(worker, 0, sizeof(*worker)); memset(worker, 0, sizeof(*worker));
worker->id = worker_id; worker->id = worker_id;
worker->count = worker_count; worker->count = worker_count;
worker->engine = engine, worker->engine = engine;
worker->loop = loop;
loop->data = worker;
worker_reserve(worker, MP_FREELIST_SIZE); worker_reserve(worker, MP_FREELIST_SIZE);
/* Register worker in Lua thread */ /* Register worker in Lua thread */
lua_pushlightuserdata(engine->L, worker); lua_pushlightuserdata(engine->L, worker);
...@@ -191,13 +313,15 @@ static struct worker_ctx *init_worker(uv_loop_t *loop, struct engine *engine, kn ...@@ -191,13 +313,15 @@ static struct worker_ctx *init_worker(uv_loop_t *loop, struct engine *engine, kn
lua_getglobal(engine->L, "worker"); lua_getglobal(engine->L, "worker");
lua_pushnumber(engine->L, worker_id); lua_pushnumber(engine->L, worker_id);
lua_setfield(engine->L, -2, "id"); lua_setfield(engine->L, -2, "id");
lua_pushnumber(engine->L, getpid());
lua_setfield(engine->L, -2, "pid");
lua_pushnumber(engine->L, worker_count); lua_pushnumber(engine->L, worker_count);
lua_setfield(engine->L, -2, "count"); lua_setfield(engine->L, -2, "count");
lua_pop(engine->L, 1); lua_pop(engine->L, 1);
return worker;