Commit cacac425 authored by Jan Včelák's avatar Jan Včelák 🚀

TCP: short writes handling

This changeset enables handling of short writes on TCP connections.
Also, the default send/receive buffer sizes for server TCP sockets is
increased to make fast-path processing more likely.

Formerly, Knot DNS running on FreeBSD 10 failed to answer large AXFR
queries. The default send buffer is too small (32768 bytes) to fit
the whole message. Since the sockets are non-blocking, a short write
happened and the TCP handler code terminated the connection.

See merge request !353
parents c8f81406 5d4fa08f
......@@ -260,7 +260,7 @@ static int cmd_remote(const char *cmd, uint16_t rrt, int argc, char *argv[])
}
/* Send and free packet. */
int ret = tcp_send_msg(s, pkt->wire, pkt->size);
int ret = tcp_send_msg(s, pkt->wire, pkt->size, NULL);
knot_pkt_free(&pkt);
/* Evaluate and wait for reply. */
......
......@@ -490,7 +490,8 @@ static int remote_senderr(int c, uint8_t *qbuf, size_t buflen)
{
knot_wire_set_qr(qbuf);
knot_wire_set_rcode(qbuf, KNOT_RCODE_REFUSED);
return tcp_send_msg(c, qbuf, buflen);
struct timeval timeout = { conf()->max_conn_reply, 0 };
return tcp_send_msg(c, qbuf, buflen, &timeout);
}
/* Public APIs. */
......@@ -629,7 +630,8 @@ static int remote_send_chunk(int c, knot_pkt_t *query, const char* d, uint16_t l
goto failed;
}
ret = tcp_send_msg(c, resp->wire, resp->size);
struct timeval timeout = { conf()->max_conn_reply, 0 };
ret = tcp_send_msg(c, resp->wire, resp->size, &timeout);
failed:
......
......@@ -435,7 +435,9 @@ static void send_update_response(const zone_t *zone, struct knot_request *req)
}
if (net_is_connected(req->fd)) {
tcp_send_msg(req->fd, req->resp->wire, req->resp->size);
struct timeval timeout = { conf()->max_conn_reply, 0 };
tcp_send_msg(req->fd, req->resp->wire, req->resp->size,
&timeout);
} else {
udp_send_msg(req->fd, req->resp->wire, req->resp->size,
(struct sockaddr *)&req->remote);
......
......@@ -21,8 +21,7 @@
#include <errno.h>
#include <assert.h>
#include "dnssec/random.h"
#include "libknot/libknot.h"
#include "libknot/errcode.h"
#include "knot/common/debug.h"
#include "knot/common/trim.h"
#include "knot/server/server.h"
......@@ -33,6 +32,14 @@
#include "knot/zone/timers.h"
#include "knot/zone/zonedb-load.h"
/*! \brief Minimal send/receive buffer sizes. */
enum {
UDP_MIN_RCVSIZE = 4096,
UDP_MIN_SNDSIZE = 4096,
TCP_MIN_RCVSIZE = 4096,
TCP_MIN_SNDSIZE = sizeof(uint16_t) + UINT16_MAX
};
/*! \brief Event scheduler loop. */
static int evsched_run(dthread_t *thread)
{
......@@ -82,6 +89,33 @@ static void server_remove_iface(iface_t *iface)
free(iface);
}
/*! \brief Set lower bound for socket option. */
static bool setsockopt_min(int sock, int option, int min)
{
int value = 0;
socklen_t len = sizeof(value);
if (getsockopt(sock, SOL_SOCKET, option, &value, &len) != 0) {
return false;
}
assert(len == sizeof(value));
if (value >= min) {
return true;
}
return setsockopt(sock, SOL_SOCKET, option, &min, sizeof(min)) == 0;
}
/*!
* \brief Enlarge send/receive buffers.
*/
static bool enlarge_net_buffers(int sock, int min_recvsize, int min_sndsize)
{
return setsockopt_min(sock, SO_RCVBUF, min_recvsize) &&
setsockopt_min(sock, SO_SNDBUF, min_sndsize);
}
/*!
* \brief Initialize new interface from config value.
*
......@@ -120,6 +154,10 @@ static int server_init_iface(iface_t *new_if, conf_iface_t *cfg_if)
return sock;
}
if (!enlarge_net_buffers(sock, UDP_MIN_RCVSIZE, UDP_MIN_SNDSIZE)) {
log_warning("failed to set network buffer sizes for UDP");
}
/* Set UDP as non-blocking. */
fcntl(sock, F_SETFL, O_NONBLOCK);
......@@ -132,6 +170,10 @@ static int server_init_iface(iface_t *new_if, conf_iface_t *cfg_if)
return sock;
}
if (!enlarge_net_buffers(sock, TCP_MIN_RCVSIZE, TCP_MIN_SNDSIZE)) {
log_warning("failed to set network buffer sizes for TCP");
}
new_if->fd[IO_TCP] = sock;
/* Listen for incoming connections. */
......
......@@ -116,7 +116,8 @@ static int tcp_handle(tcp_context_t *tcp, int fd,
rcu_read_unlock();
/* Receive data. */
int ret = tcp_recv_msg(fd, rx->iov_base, rx->iov_len, &tmout);
struct timeval recv_tmout = tmout;
int ret = tcp_recv_msg(fd, rx->iov_base, rx->iov_len, &recv_tmout);
if (ret <= 0) {
dbg_net("tcp: client on fd=%d disconnected\n", fd);
if (ret == KNOT_EAGAIN) {
......@@ -151,7 +152,8 @@ static int tcp_handle(tcp_context_t *tcp, int fd,
/* Send, if response generation passed and wasn't ignored. */
if (ans->size > 0 && !(state & (KNOT_STATE_FAIL|KNOT_STATE_NOOP))) {
if (tcp_send_msg(fd, ans->wire, ans->size) != ans->size) {
struct timeval send_tmout = tmout;
if (tcp_send_msg(fd, ans->wire, ans->size, &send_tmout) != ans->size) {
ret = KNOT_ECONNREFUSED;
break;
}
......
......@@ -190,8 +190,7 @@ int net_is_connected(int fd)
return getpeername(fd, (struct sockaddr *)&ss, &len) == 0;
}
/*! \brief Wait for data and return true if data arrived. */
static int wait_for_data(int fd, struct timeval *timeout)
static int select_read(int fd, struct timeval *timeout)
{
fd_set set;
FD_ZERO(&set);
......@@ -199,6 +198,15 @@ static int wait_for_data(int fd, struct timeval *timeout)
return select(fd + 1, &set, NULL, NULL, timeout);
}
static int select_write(int fd, struct timeval *timeout)
{
fd_set set;
FD_ZERO(&set);
FD_SET(fd, &set);
return select(fd + 1, NULL, &set, NULL, timeout);
}
/* \brief Receive a block of data from TCP socket with wait. */
static int recv_data(int fd, uint8_t *buf, int len, bool oneshot, struct timeval *timeout)
{
......@@ -230,7 +238,7 @@ static int recv_data(int fd, uint8_t *buf, int len, bool oneshot, struct timeval
/* Check for no data available. */
if (errno == EAGAIN || errno == EINTR) {
/* Continue only if timeout didn't expire. */
ret = wait_for_data(fd, timeout);
ret = select_read(fd, timeout);
if (ret > 0) {
continue;
} else {
......@@ -260,7 +268,74 @@ int udp_recv_msg(int fd, uint8_t *buf, size_t len, struct timeval *timeout)
return recv_data(fd, buf, len, true, timeout);
}
int tcp_send_msg(int fd, const uint8_t *msg, size_t msglen)
/*!
* \brief Shift processed data out of iovec structure.
*/
static void iovec_shift(struct iovec **iov_ptr, int *iovcnt_ptr, size_t done)
{
struct iovec *iov = *iov_ptr;
int iovcnt = *iovcnt_ptr;
for (int i = 0; i < iovcnt && done > 0; i++) {
if (iov[i].iov_len > done) {
iov[i].iov_base += done;
iov[i].iov_len -= done;
done = 0;
} else {
done -= iov[i].iov_len;
*iov_ptr += 1;
*iovcnt_ptr -= 1;
}
}
assert(done == 0);
}
/*!
* \brief Send out TCP data with timeout in case the output buffer is full.
*/
static int send_data(int fd, struct iovec iov[], int iovcnt, struct timeval *timeout)
{
size_t total = 0;
for (int i = 0; i < iovcnt; i++) {
total += iov[i].iov_len;
}
for (size_t avail = total; avail > 0; /* nop */) {
ssize_t sent = writev(fd, iov, iovcnt);
if (sent == avail) {
break;
}
/* Short write. */
if (sent > 0) {
avail -= sent;
iovec_shift(&iov, &iovcnt, sent);
continue;
}
/* Error. */
if (sent == -1) {
if (errno == EAGAIN || errno == EINTR) {
int ret = select_write(fd, timeout);
if (ret > 0) {
continue;
} else if (ret == 0) {
return KNOT_ETIMEOUT;
}
}
return KNOT_ECONN;
}
/* Unreachable. */
assert(0);
}
return total;
}
int tcp_send_msg(int fd, const uint8_t *msg, size_t msglen, struct timeval *timeout)
{
/* Create iovec for gathered write. */
struct iovec iov[2];
......@@ -271,10 +346,9 @@ int tcp_send_msg(int fd, const uint8_t *msg, size_t msglen)
iov[1].iov_len = msglen;
/* Send. */
int total_len = iov[0].iov_len + iov[1].iov_len;
int sent = writev(fd, iov, 2);
if (sent != total_len) {
return KNOT_ECONN;
ssize_t ret = send_data(fd, iov, 2, timeout);
if (ret < 0) {
return ret;
}
return msglen; /* Do not count the size prefix. */
......@@ -288,7 +362,7 @@ int tcp_recv_msg(int fd, uint8_t *buf, size_t len, struct timeval *timeout)
/* Receive size. */
unsigned short pktsize = 0;
int ret = recv_data(fd, (uint8_t *)&pktsize, sizeof(pktsize), false, timeout);
int ret = recv_data(fd, (uint8_t *)&pktsize, sizeof(pktsize), false, timeout);
if (ret != sizeof(pktsize)) {
return ret;
}
......
......@@ -117,11 +117,12 @@ int udp_recv_msg(int fd, uint8_t *buf, size_t len, struct timeval *timeout);
* \param fd Associated socket.
* \param msg Buffer for a query wireformat.
* \param msglen Buffer maximum size.
* \param timeout Message send timeout.
*
* \retval Number of sent data on success.
* \retval KNOT_ERROR on error.
*/
int tcp_send_msg(int fd, const uint8_t *msg, size_t msglen);
int tcp_send_msg(int fd, const uint8_t *msg, size_t msglen, struct timeval *timeout);
/*!
* \brief Receive a TCP message.
......
......@@ -55,8 +55,7 @@ static int request_ensure_connected(struct knot_request *request)
return KNOT_EOK;
}
static int request_send(struct knot_request *request,
const struct timeval *timeout)
static int request_send(struct knot_request *request, struct timeval *timeout)
{
/* Wait for writeability or error. */
int ret = request_ensure_connected(request);
......@@ -71,7 +70,7 @@ static int request_send(struct knot_request *request,
/* Send query. */
if (use_tcp(request)) {
ret = tcp_send_msg(request->fd, wire, wire_len);
ret = tcp_send_msg(request->fd, wire, wire_len, timeout);
} else {
ret = udp_send_msg(request->fd, wire, wire_len, NULL);
}
......
......@@ -23,6 +23,7 @@ hattrie
hhash
journal
namedb
net_shortwrite
node
overlay
pkt
......
......@@ -30,6 +30,7 @@ check_PROGRAMS = \
hhash \
journal \
namedb \
net_shortwrite \
node \
overlay \
pkt \
......
/* Copyright (C) 2015 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <tap/basic.h>
#include <fcntl.h>
#include <stdint.h>
#include <string.h>
#include <pthread.h>
#include "libknot/internal/net.h"
const struct timeval TIMEOUT = { .tv_sec = 1 };
struct data {
int fd;
struct timeval timeout;
uint8_t *buffer;
size_t size;
int result;
};
static void *thr_receive(void *data)
{
struct data *d = data;
d->result = tcp_recv_msg(d->fd, d->buffer, d->size, &d->timeout);
return NULL;
}
int main(int argc, char *argv[])
{
plan_lazy();
int r;
// create TCP server
struct sockaddr_storage addr = { 0 };
addr.ss_family = AF_INET;
int server = net_bound_socket(SOCK_STREAM, &addr, 0);
ok(server >= 0, "server: bind socket");
r = listen(server, 0);
ok(r == 0, "server: start listening");
struct sockaddr *sa = (struct sockaddr *)&addr;
socklen_t salen = sockaddr_len(sa);
r = getsockname(server, sa, &salen);
ok(r == 0, "server: get bound address");
r = fcntl(server, F_SETFL, O_NONBLOCK);
ok(r == 0, "server: set non-blocking mode");
// create TCP client
int client = net_connected_socket(SOCK_STREAM, &addr, NULL, 0);
ok(client >= 0, "client: connect to server");
r = fcntl(client, F_SETFL, O_NONBLOCK);
ok(r == 0, "client: set non-blocking mode");
int optval = 8192;
socklen_t optlen = sizeof(optval);
r = setsockopt(client, SOL_SOCKET, SO_SNDBUF, &optval, optlen);
ok(r == 0, "client: configure small send buffer");
// accept TCP connection
int accepted = accept(server, NULL, NULL);
ok(accepted >= 0, "server: accepted connection");
uint8_t recvbuf[UINT16_MAX] = { 0 };
struct data recv_data = {
.fd = accepted,
.timeout = TIMEOUT,
.buffer = recvbuf,
.size = sizeof(recvbuf)
};
pthread_t thr;
r = pthread_create(&thr, NULL, thr_receive, &recv_data);
ok(r == 0, "server: start receiver thread");
// send message (should handle partial-write correctly)
uint8_t sndbuf[UINT16_MAX];
for (size_t i = 0; i < sizeof(sndbuf); i++) {
sndbuf[i] = i;
}
struct timeval timeout = TIMEOUT;
r = tcp_send_msg(client, sndbuf, sizeof(sndbuf), &timeout);
ok(r == sizeof(sndbuf), "client: tcp_send_msg() with short-write");
// receive message
r = pthread_join(thr, NULL);
ok(r == 0, "server: wait for reciever thread to terminate");
ok(recv_data.result == sizeof(recvbuf) &&
memcmp(sndbuf, recvbuf, sizeof(recvbuf)) == 0,
"server: tcp_recv_msg() complete and valid data");
// clean up
if (accepted >= 0) {
close(accepted);
}
if (server >= 0) {
close(server);
}
if (client >= 0) {
close(client);
}
return 0;
}
......@@ -52,7 +52,7 @@ static void* responder_thread(void *arg)
break;
}
knot_wire_set_qr(buf);
tcp_send_msg(client, buf, len);
tcp_send_msg(client, buf, len, NULL);
close(client);
}
return NULL;
......@@ -159,7 +159,7 @@ int main(int argc, char *argv[])
/* Terminate responder. */
int responder = net_connected_socket(SOCK_STREAM, &remote.addr, NULL, 0);
assert(responder > 0);
tcp_send_msg(responder, (const uint8_t *)"", 1);
tcp_send_msg(responder, (const uint8_t *)"", 1, NULL);
(void) pthread_join(thread, 0);
close(responder);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment