diff --git a/daemon/io.c b/daemon/io.c index 96e37e8a6ae4a799d626cb2fa572690efd232095..a1af8709ed0900451d2df1ed9982f093a42b9872 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -13,6 +13,7 @@ #include "daemon/worker.h" #include "daemon/tls.h" #include "daemon/session.h" +#include "lib/utils.h" #define negotiate_bufsize(func, handle, bufsize_want) do { \ int bufsize = 0; (func)((handle), &bufsize); \ @@ -448,47 +449,47 @@ void io_tty_process_input(uv_stream_t *stream, ssize_t nread, const uv_buf_t *bu /* Set output streams */ FILE *out = stdout; - uv_os_fd_t stream_fd = 0; + uv_os_fd_t stream_fd = -1; struct args *args = the_args; - if (uv_fileno((uv_handle_t *)stream, &stream_fd)) { + struct io_stream_data *data = (struct io_stream_data*) stream->data; + if (nread < 0 || uv_fileno((uv_handle_t *)stream, &stream_fd)) { + mp_delete(data->pool->ctx); uv_close((uv_handle_t *)stream, (uv_close_cb) free); free(commands); return; } + if (nread <= 0) { + free(commands); + return; + } if (stream_fd != STDIN_FILENO) { - if (nread < 0) { /* Close if disconnected */ - uv_close((uv_handle_t *)stream, (uv_close_cb) free); - } - if (nread <= 0) { - free(commands); - return; - } uv_os_fd_t dup_fd = dup(stream_fd); if (dup_fd >= 0) { out = fdopen(dup_fd, "w"); } } - char *cmd = NULL; + char *cmd, *cmd_next = NULL; + bool incomplete_cmd = false; + /* Execute */ if (stream && commands && nread > 0) { + if (commands[nread - 1] != '\n') { + incomplete_cmd = true; + } /* Ensure commands is 0-terminated */ - if (commands[nread - 1] == '\n') { - commands[nread - 1] = '\0'; - } else { - if (nread >= buf->len) { /* only equality should be possible */ - char *newbuf = realloc(commands, nread + 1); - if (!newbuf) - goto finish; - commands = newbuf; - } - commands[nread] = '\0'; + if (nread >= buf->len) { /* only equality should be possible */ + char *newbuf = realloc(commands, nread + 1); + if (!newbuf) + goto finish; + commands = newbuf; } + commands[nread] = '\0'; const char *delim = args->quiet ? "" : "> "; /* No command, just new line */ - if (nread == 1 && args->tty_binary_output == false && commands[nread-1] == '\0') { + if (nread == 1 && (data->mode == io_mode_text) == false && commands[nread-1] == '\0' && data->blen == 0) { if (stream_fd != STDIN_FILENO) { fprintf(out, "%s", delim); } @@ -497,12 +498,52 @@ void io_tty_process_input(uv_stream_t *stream, ssize_t nread, const uv_buf_t *bu } } + char *boundary = "\n\0"; cmd = strtok(commands, "\n"); + /* strtok skip '\n' but we need process alone '\n' too */ + if (commands[0] == '\n') { + cmd_next = cmd; + cmd = boundary; + } else { + cmd_next = strtok(NULL, "\n"); + } + + char *pbuf = data->buf + data->blen; while (cmd != NULL) { + /* Last command is incomplete - save it and execute later */ + if (incomplete_cmd && cmd_next == NULL) { + pbuf = mp_append_string(data->pool->ctx, pbuf, cmd); + mp_append_char(data->pool->ctx, pbuf, '\0'); + data->buf = mp_ptr(data->pool->ctx); + data->blen = data->blen + strlen(cmd); + + cmd = cmd_next; + /* There is new incomplete command */ + if (commands[nread - 1] == '\n') + incomplete_cmd = false; + cmd_next = strtok(NULL, "\n"); + continue; + } + + /* Process incomplete command from previously call */ + if (data->blen > 0) { + if (commands[0] != '\n' && commands[0] != '\0') { + pbuf = mp_append_string(data->pool->ctx, pbuf, cmd); + mp_append_char(data->pool->ctx, pbuf, '\0'); + data->buf = mp_ptr(data->pool->ctx); + cmd = data->buf; + } else { + cmd = data->buf; + } + data->blen = 0; + pbuf = data->buf; + } + /* Pseudo-command for switching to "binary output"; */ if (strcmp(cmd, "__binary") == 0) { - stream->data = (void *)io_mode_binary; - cmd = strtok(NULL, "\n"); + data->mode = io_mode_binary; + cmd = cmd_next; + cmd_next = strtok(NULL, "\n"); continue; } @@ -514,17 +555,19 @@ void io_tty_process_input(uv_stream_t *stream, ssize_t nread, const uv_buf_t *bu } /* Simpler output in binary mode */ - if (stream->data == (void *)io_mode_binary) { + if (data->mode == io_mode_binary) { size_t len_s = strlen(message); if (len_s > UINT32_MAX) { - cmd = strtok(NULL, "\n"); + cmd = cmd_next; + cmd_next = strtok(NULL, "\n"); continue; } uint32_t len_n = htonl(len_s); fwrite(&len_n, sizeof(len_n), 1, out); fwrite(message, len_s, 1, out); lua_settop(L, 0); - cmd = strtok(NULL, "\n"); + cmd = cmd_next; + cmd_next = strtok(NULL, "\n"); continue; } /* Log to remote socket if connected */ @@ -547,7 +590,8 @@ void io_tty_process_input(uv_stream_t *stream, ssize_t nread, const uv_buf_t *bu fprintf(fp_out, "%s", delim); } lua_settop(L, 0); - cmd = strtok(NULL, "\n"); + cmd = cmd_next; + cmd_next = strtok(NULL, "\n"); } } finish: @@ -564,17 +608,41 @@ void io_tty_alloc(uv_handle_t *handle, size_t suggested, uv_buf_t *buf) buf->base = malloc(suggested); } +struct io_stream_data *io_tty_alloc_data() { + knot_mm_t _pool = { + .ctx = mp_new(4096), + .alloc = (knot_mm_alloc_t) mp_alloc, + }; + knot_mm_t *pool = mm_alloc(&_pool, sizeof(*pool)); + if (!pool) { + return NULL; + } + memcpy(pool, &_pool, sizeof(*pool)); + + struct io_stream_data *data = mm_alloc(pool, sizeof(struct io_stream_data)); + + data->buf = mp_start(pool->ctx, 512); + data->mode = io_mode_text; + data->blen = 0; + data->pool = pool; + + return data; +} + void io_tty_accept(uv_stream_t *master, int status) { + struct io_stream_data *data = io_tty_alloc_data(); + /* We can't use any allocations after mp_start() and it's easier anyway. */ uv_tcp_t *client = malloc(sizeof(*client)); + client->data = data; + struct args *args = the_args; - if (client) { + if (client && client->data) { uv_tcp_init(master->loop, client); if (uv_accept(master, (uv_stream_t *)client) != 0) { - free(client); + mp_delete(data->pool->ctx); return; } - client->data = (void *) io_mode_text; uv_read_start((uv_stream_t *)client, io_tty_alloc, io_tty_process_input); /* Write command line */ if (!args->quiet) { diff --git a/daemon/io.h b/daemon/io.h index 10cde83d7aad6c849e0c32ba550315b177bfb75b..2dac70f2dadc9b576b2b2ffe59593093d7070e10 100644 --- a/daemon/io.h +++ b/daemon/io.h @@ -20,6 +20,13 @@ enum io_stream_mode { io_mode_binary = 1, }; +struct io_stream_data { + enum io_stream_mode mode; + size_t blen; + char *buf; + knot_mm_t *pool; +}; + /** Bind address into a file-descriptor (only, no libuv). type is e.g. SOCK_DGRAM */ int io_bind(const struct sockaddr *addr, int type, const endpoint_flags_t *flags); /** Initialize a UDP handle and start listening. */ @@ -33,6 +40,7 @@ int io_listen_pipe(uv_loop_t *loop, uv_pipe_t *handle, int fd); void io_tty_process_input(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf); void io_tty_alloc(uv_handle_t *handle, size_t suggested, uv_buf_t *buf); void io_tty_accept(uv_stream_t *master, int status); +struct io_stream_data *io_tty_alloc_data(void); void tcp_timeout_trigger(uv_timer_t *timer); diff --git a/daemon/main.c b/daemon/main.c index 7fbae458acf48aff83f7f9d838fbc194ea5e71e2..535691d181a05ad6de2e4783f40e2aad8c367e5a 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -254,16 +254,16 @@ static int run_worker(uv_loop_t *loop, struct engine *engine, fd_array_t *ipc_se } /* Control sockets or TTY */ - uv_pipe_t pipe; - uv_pipe_init(loop, &pipe, 0); - pipe.data = args; + uv_pipe_t *pipe = malloc(sizeof(*pipe)); + uv_pipe_init(loop, pipe, 0); if (args->interactive) { if (!args->quiet) printf("[system] interactive mode\n> "); - uv_pipe_open(&pipe, 0); - uv_read_start((uv_stream_t*) &pipe, io_tty_alloc, io_tty_process_input); - } else if (args->control_fd != -1 && uv_pipe_open(&pipe, args->control_fd) == 0) { - uv_listen((uv_stream_t *) &pipe, 16, io_tty_accept); + pipe->data = io_tty_alloc_data(); + uv_pipe_open(pipe, 0); + uv_read_start((uv_stream_t*)pipe, io_tty_alloc, io_tty_process_input); + } else if (args->control_fd != -1 && uv_pipe_open(pipe, args->control_fd) == 0) { + uv_listen((uv_stream_t *)pipe, 16, io_tty_accept); } /* Watch IPC pipes (or just assign them if leading the pgroup). */ if (!leader) { @@ -282,7 +282,12 @@ static int run_worker(uv_loop_t *loop, struct engine *engine, fd_array_t *ipc_se #endif /* Run event loop */ uv_run(loop, UV_RUN_DEFAULT); - uv_close((uv_handle_t *)&pipe, NULL); /* Seems OK even on the stopped loop. */ + /* Free pipe's data. Seems OK even on the stopped loop. + * In interactive case it may have been done in callbacks already (single leak). */ + if (!args->interactive) { + uv_close((uv_handle_t *)pipe, NULL); + free(pipe); + } return EXIT_SUCCESS; }