-
Michal 'vorner' Vaner authored
In case ipset terminates with error, wait with closing the output until EOF comes. Closing it right away may cause losing error messages.
Michal 'vorner' Vaner authoredIn case ipset terminates with error, wait with closing the output until EOF comes. Closing it right away may cause losing error messages.
queue.c 9.07 KiB
/*
Ucollect - small utility for real-time analysis of network data
Copyright (C) 2015 CZ.NIC, z.s.p.o. (http://www.nic.cz/)
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "queue.h"
#include "../../core/context.h"
#include "../../core/mem_pool.h"
#include "../../core/util.h"
#include "../../core/loop.h"
#include <stdbool.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#define QUEUE_FLUSH_TIME 5000
#define QUEUE_RETRY_TIME 60000
struct queue {
bool active, timeout_started;
bool broken;
bool broken_timeout_id;
int ipset_pipe;
pid_t pid;
size_t timeout_id;
reload_callback_t reload_callback;
};
struct queue *queue_alloc(struct context *context, reload_callback_t reload_callback) {
struct queue *result = mem_pool_alloc(context->permanent_pool, sizeof *result);
*result = (struct queue) {
.reload_callback = reload_callback
};
return result;
}
static void start(struct context *context, struct queue *queue) {
ulog(LLOG_DEBUG, "Starting ipset subcommand\n");
sanity(!queue->active, "Trying to start already active queue\n");
int conn[2];
sanity(socketpair(AF_UNIX, SOCK_STREAM, 0, conn) != -1, "Couldn't create FWUp socketpair: %s\n", strerror(errno));
struct loop *loop = context->loop;
/*
* Register the local end. This one will be in the parent process,
* therefore it needs to be watched and killed in case the plugin
* dies. It will also be auto-closed in the child by loop_fork(),
* saving us the bother to close it manually there.
*/
loop_plugin_register_fd(context, conn[1], queue);
pid_t pid = loop_fork(loop);
if (pid)
// The parent doesn't need the remote end, no matter if the fork worked or not.
sanity(close(conn[0]) != -1, "Couldn't close the read end of FWUp pipe: %s\n", strerror(errno));
sanity(pid != -1, "Couldn't fork the ipset command: %s\n", strerror(errno));
if (pid) {
// The parent. Update the queue and be done with it.
queue->active = true;
queue->ipset_pipe = conn[1];
queue->pid = pid;
} else {
// The child. Screw the socket into our input and stderr and exec to ipset command.
if (dup2(conn[0], 0) == -1)
die("Couldn't attach the socketpair to ipset input: %s\n", strerror(errno));
if (dup2(conn[0], 1) == -1)
die("Couldn't attach the socketpair to ipset stdout: %s\n", strerror(errno));
if (dup2(conn[0], 2) == -1)
die("Couldn't attach the socketpair to ipset stderr: %s\n", strerror(errno));
// Get rid of the original.
close(conn[0]);
execl("/usr/sbin/ipset", "ipset", "-exist", "restore", (char *)NULL);
// Still here? The above must have failed :-(
die("Couldn't exec ipset: %s\n", strerror(errno));
}
}
static void retry_timeout(struct context *context, void *data, size_t id __attribute__((unused))) {
struct queue *queue = data;
sanity(!queue->timeout_started, "Timeout started and retry timeout fired\n");
ulog(LLOG_WARN, "Trying to re-fill IPsets now\n");
// Leave the broken state and retry filling the ipsets
queue->broken = false;
queue->broken_timeout_id = 0;
sanity(queue->reload_callback, "The reload callback is NULL\n");
queue->reload_callback(context);
}
static void lost(struct context *context, struct queue *queue, bool error) {
if (queue->broken)
// Already lost, don't do it again.
return;
/*
* In case we got EOF before and errorenous termination of the command later,
* we need not to deactivate, close the pipe and such. But we still
* want to mark it as broken, start the retry timeout and re-synchronize.
*
* If the termination comes sooner than EOF (which is likely, but probably not
* guaranteed), then we mark it broken & inactive in one go and will not
* enter the routine once again.
*/
if (queue->active) {
// Deactivate
if (error) {
ulog(LLOG_WARN, "The ipset command %d died, data may be out of sync\n", queue->pid);
// Don't close the pipe yet. It may contain error messages we would like to pass to the log
} else {
ulog(LLOG_DEBUG, "Closing ipset subcommand\n");
loop_plugin_unregister_fd(context, queue->ipset_pipe);
sanity(close(queue->ipset_pipe) == 0, "Error closing the ipset pipe: %s\n", strerror(errno));
}
queue->ipset_pipe = 0;
queue->active = false;
queue->pid = 0;
if (queue->timeout_started) {
queue->timeout_started = false;
loop_timeout_cancel(context->loop, queue->timeout_id);
}
} else if (error)
ulog(LLOG_WARN, "IPset command considered broken post-morten\n");
if (error) {
queue->broken = true;
queue->broken_timeout_id = loop_timeout_add(context->loop, QUEUE_RETRY_TIME, context, queue, retry_timeout);
}
}
static void flush_timeout(struct context *context, void *data, size_t id __attribute__((unused))) {
struct queue *queue = data;
queue->timeout_started = false;
queue_flush(context, queue);
}
void enqueue(struct context *context, struct queue *queue, const char *command) {
if (queue->broken) {
ulog(LLOG_DEBUG_VERBOSE, "Not queueing command '%s', the queue is currently broken\n", command);
return;
}
if (!queue->active)
start(context, queue);
sanity(queue->active, "Failed to start the queue\n");
sanity(queue->ipset_pipe > 0, "Strange pipe FD to the ip set command: %i\n", queue->ipset_pipe);
size_t len = strlen(command);
sanity(len, "Empty ipset command\n");
sanity(command[len - 1] == '\n', "IPset command '%s' not terminated by a newline\n", command);
ulog(LLOG_DEBUG_VERBOSE, "IPset command %s", command); // Now newline at the end of format, command contains one
while (len) {
ssize_t sent = send(queue->ipset_pipe, command, len, MSG_NOSIGNAL);
if (sent == -1) {
switch (errno) {
case ECONNRESET:
case EPIPE:
lost(context, queue, true);
return;
case EINTR:
ulog(LLOG_WARN, "Interrupted while writing data to ipset, retrying\n");
continue;
default:
sanity(false, "Error writing to ipset: %s\n", strerror(errno));
}
}
// Move forward in whatever was sent
command += sent;
len -= sent;
}
if (!queue->timeout_started) {
queue->timeout_started = true;
queue->timeout_id = loop_timeout_add(context->loop, QUEUE_FLUSH_TIME, context, queue, flush_timeout);
}
}
void queue_flush(struct context *context, struct queue *queue) {
lost(context, queue, false);
}
void queue_fd_data(struct context *context, int fd, void *userdata) {
struct queue *q = userdata;
const size_t buf_size = 512;
char *err_msg = mem_pool_alloc(context->temp_pool, buf_size + 1 /* Extra one for \0 at the end */);
ssize_t result = recv(fd, err_msg, buf_size, MSG_DONTWAIT);
switch (result) {
case -1:
switch (errno) {
case EAGAIN:
#if EAGAIN != EWOULDBLOCK
case EWOULDBLOCK:
#endif
case EINTR:
// It might work next time
return;
default:
// Default isn't last on purpose.
insane("Error reading from IPSet stderr: %s\n", strerror(errno));
case ECONNRESET:
; // Fall through from this case statement out and out of the case -1 into the close
}
// No break. See the above comment.
case 0: // Close
ulog(LLOG_WARN, "IPSet closed by the other end\n");
if (q->active)
lost(context, q, false);
else {
// The command terminated, but the current file descriptor was still alive for a whlie. Close it.
loop_plugin_unregister_fd(context, fd);
sanity(close(fd) == 0, "Error closing the ipset pipe: %s\n", strerror(errno));
}
return;
default:
err_msg[result] = '\0';
if (err_msg[result - 1] == '\n')
err_msg[result - 1] = '\0';
char *pos = err_msg;
while ((pos = index(pos, '\n')))
*pos = '\\';
ulog(LLOG_WARN, "IPSet output: %s\n", err_msg);
return;
}
}
void queue_child_died(struct context *context, int state, pid_t child, struct queue *queue) {
if (!queue->active)
return; // It can't be our child, no queue is currently active
if (queue->pid != child)
return; // Not our child, something else died
bool broken = true;
if (WIFEXITED(state)) {
int ecode = WEXITSTATUS(state);
if (ecode != 0)
ulog(LLOG_ERROR, "The ipset command %d terminated with status %d\n", (int)child, ecode);
else {
ulog(LLOG_DEBUG, "The ipset command %d terminated successfully\n", (int)child);
broken = false;
}
} else if (WIFSIGNALED(state)) {
int signal = WTERMSIG(state);
ulog(LLOG_ERROR, "The ipset command %d terminated with signal %d\n", (int)child, signal);
} else
ulog(LLOG_ERROR, "The ipset command %d died for unknown reason, call the police to investigate\n", (int)child);
lost(context, queue, broken);
}