Skip to content
Snippets Groups Projects
Commit 059c0d87 authored by Jan Včelák's avatar Jan Včelák :rocket:
Browse files

backport: TCP short writes handling

Original commits:

cacac425  TCP: short writes handling
5d4fa08f  review, skip empty vectors on partial writev
fd4cfa09  tests: TCP short write test with background receiver
aae00f91  fix missing types in declaration
e2604331  tests: larger buffer for TCP short write test
b9f1bf06  remote: use correct timeout values for sent replies
46371cf3  update use of tcp_send_msg()
05e90e75  TCP: no fixed timeout for tcp_send_msg
1bc348ab  TCP: short-write code cleanups
1921eecd  TCP: implement short-write support with fixed timeout
2602dd23  tests: fix setting of nonblocking flag
a1b83687  tests: unit for TCP partial-write
3f37f999  server, define minimal send/receive buffer sizes
bf128651  TCP: increase send buffer size to fit maximum-sized message
parent 0c28fada
No related branches found
No related tags found
No related merge requests found
......@@ -254,7 +254,7 @@ static int cmd_remote(const char *cmd, uint16_t rrt, int argc, char *argv[])
}
/* Send and free packet. */
int ret = tcp_send_msg(s, pkt->wire, pkt->size);
int ret = tcp_send_msg(s, pkt->wire, pkt->size, NULL);
knot_pkt_free(&pkt);
/* Evaluate and wait for reply. */
......
......@@ -454,7 +454,8 @@ static int remote_senderr(int c, uint8_t *qbuf, size_t buflen)
{
knot_wire_set_qr(qbuf);
knot_wire_set_rcode(qbuf, KNOT_RCODE_REFUSED);
return tcp_send_msg(c, qbuf, buflen);
struct timeval timeout = { conf()->max_conn_reply, 0 };
return tcp_send_msg(c, qbuf, buflen, &timeout);
}
/* Public APIs. */
......@@ -584,7 +585,8 @@ static int remote_send_chunk(int c, knot_pkt_t *query, const char* d, uint16_t l
goto failed;
}
ret = tcp_send_msg(c, resp->wire, resp->size);
struct timeval timeout = { conf()->max_conn_reply, 0 };
ret = tcp_send_msg(c, resp->wire, resp->size, &timeout);
failed:
......
......@@ -103,7 +103,7 @@ static int request_send(struct request *request, const struct timeval *timeout)
/* Send query. */
knot_pkt_t *query = request->data.query;
ret = tcp_send_msg(request->data.fd, query->wire, query->size);
ret = tcp_send_msg(request->data.fd, query->wire, query->size, &tv);
if (ret != query->size) {
return KNOT_ECONN;
}
......
......@@ -437,7 +437,9 @@ static void send_update_response(const zone_t *zone, struct request_data *req)
}
if (net_is_connected(req->fd)) {
tcp_send_msg(req->fd, req->resp->wire, req->resp->size);
struct timeval timeout = { conf()->max_conn_reply, 0 };
tcp_send_msg(req->fd, req->resp->wire, req->resp->size,
&timeout);
} else {
udp_send_msg(req->fd, req->resp->wire, req->resp->size,
(struct sockaddr *)&req->remote);
......
......@@ -34,6 +34,14 @@
#include "libknot/dnssec/crypto.h"
#include "libknot/dnssec/random.h"
/*! \brief Minimal send/receive buffer sizes. */
enum {
UDP_MIN_RCVSIZE = 4096,
UDP_MIN_SNDSIZE = 4096,
TCP_MIN_RCVSIZE = 4096,
TCP_MIN_SNDSIZE = sizeof(uint16_t) + UINT16_MAX
};
/*! \brief Event scheduler loop. */
static int evsched_run(dthread_t *thread)
{
......@@ -90,6 +98,33 @@ static void server_remove_iface(iface_t *iface)
free(iface);
}
/*! \brief Set lower bound for socket option. */
static bool setsockopt_min(int sock, int option, int min)
{
int value = 0;
socklen_t len = sizeof(value);
if (getsockopt(sock, SOL_SOCKET, option, &value, &len) != 0) {
return false;
}
assert(len == sizeof(value));
if (value >= min) {
return true;
}
return setsockopt(sock, SOL_SOCKET, option, &min, sizeof(min)) == 0;
}
/*!
* \brief Enlarge send/receive buffers.
*/
static bool enlarge_net_buffers(int sock, int min_recvsize, int min_sndsize)
{
return setsockopt_min(sock, SO_RCVBUF, min_recvsize) &&
setsockopt_min(sock, SO_SNDBUF, min_sndsize);
}
/*!
* \brief Initialize new interface from config value.
*
......@@ -118,6 +153,10 @@ static int server_init_iface(iface_t *new_if, conf_iface_t *cfg_if)
return sock;
}
if (!enlarge_net_buffers(sock, UDP_MIN_RCVSIZE, UDP_MIN_SNDSIZE)) {
log_warning("failed to set network buffer sizes for UDP");
}
/* Set UDP as non-blocking. */
fcntl(sock, F_SETFL, O_NONBLOCK);
......@@ -130,6 +169,10 @@ static int server_init_iface(iface_t *new_if, conf_iface_t *cfg_if)
return sock;
}
if (!enlarge_net_buffers(sock, TCP_MIN_RCVSIZE, TCP_MIN_SNDSIZE)) {
log_warning("failed to set network buffer sizes for TCP");
}
new_if->fd[IO_TCP] = sock;
/* Listen for incoming connections. */
......
......@@ -114,7 +114,8 @@ static int tcp_handle(tcp_context_t *tcp, int fd,
rcu_read_unlock();
/* Receive data. */
int ret = tcp_recv_msg(fd, rx->iov_base, rx->iov_len, &tmout);
struct timeval recv_tmout = tmout;
int ret = tcp_recv_msg(fd, rx->iov_base, rx->iov_len, &recv_tmout);
if (ret <= 0) {
dbg_net("tcp: client on fd=%d disconnected\n", fd);
if (ret == KNOT_EAGAIN) {
......@@ -144,7 +145,8 @@ static int tcp_handle(tcp_context_t *tcp, int fd,
/* If it has response, send it. */
if (tx_len > 0) {
if (tcp_send_msg(fd, tx->iov_base, tx_len) != tx_len) {
struct timeval send_tmout = tmout;
if (tcp_send_msg(fd, tx->iov_base, tx_len, &send_tmout) != tx_len) {
ret = KNOT_ECONNREFUSED;
break;
}
......@@ -188,9 +190,7 @@ int tcp_accept(int fd)
return incoming;
}
/*! \brief Wait for data and return true if data arrived. */
static int tcp_wait_for_data(int fd, struct timeval *timeout)
static int select_read(int fd, struct timeval *timeout)
{
fd_set set;
FD_ZERO(&set);
......@@ -198,6 +198,15 @@ static int tcp_wait_for_data(int fd, struct timeval *timeout)
return select(fd + 1, &set, NULL, NULL, timeout);
}
static int select_write(int fd, struct timeval *timeout)
{
fd_set set;
FD_ZERO(&set);
FD_SET(fd, &set);
return select(fd + 1, NULL, &set, NULL, timeout);
}
int tcp_recv_data(int fd, uint8_t *buf, int len, struct timeval *timeout)
{
int ret = 0;
......@@ -223,7 +232,7 @@ int tcp_recv_data(int fd, uint8_t *buf, int len, struct timeval *timeout)
/* Check for no data available. */
if (errno == EAGAIN || errno == EINTR) {
/* Continue only if timeout didn't expire. */
ret = tcp_wait_for_data(fd, timeout);
ret = select_read(fd, timeout);
if (ret > 0) {
continue;
} else {
......@@ -237,7 +246,74 @@ int tcp_recv_data(int fd, uint8_t *buf, int len, struct timeval *timeout)
return rcvd;
}
int tcp_send_msg(int fd, const uint8_t *msg, size_t msglen)
/*!
* \brief Shift processed data out of iovec structure.
*/
static void iovec_shift(struct iovec **iov_ptr, int *iovcnt_ptr, size_t done)
{
struct iovec *iov = *iov_ptr;
int iovcnt = *iovcnt_ptr;
for (int i = 0; i < iovcnt && done > 0; i++) {
if (iov[i].iov_len > done) {
iov[i].iov_base += done;
iov[i].iov_len -= done;
done = 0;
} else {
done -= iov[i].iov_len;
*iov_ptr += 1;
*iovcnt_ptr -= 1;
}
}
assert(done == 0);
}
/*!
* \brief Send out TCP data with timeout in case the output buffer is full.
*/
static int send_data(int fd, struct iovec iov[], int iovcnt, struct timeval *timeout)
{
size_t total = 0;
for (int i = 0; i < iovcnt; i++) {
total += iov[i].iov_len;
}
for (size_t avail = total; avail > 0; /* nop */) {
ssize_t sent = writev(fd, iov, iovcnt);
if (sent == avail) {
break;
}
/* Short write. */
if (sent > 0) {
avail -= sent;
iovec_shift(&iov, &iovcnt, sent);
continue;
}
/* Error. */
if (sent == -1) {
if (errno == EAGAIN || errno == EINTR) {
int ret = select_write(fd, timeout);
if (ret > 0) {
continue;
} else if (ret == 0) {
return KNOT_ETIMEOUT;
}
}
return KNOT_ECONN;
}
/* Unreachable. */
assert(0);
}
return total;
}
int tcp_send_msg(int fd, const uint8_t *msg, size_t msglen, struct timeval *timeout)
{
/* Create iovec for gathered write. */
struct iovec iov[2];
......@@ -248,10 +324,9 @@ int tcp_send_msg(int fd, const uint8_t *msg, size_t msglen)
iov[1].iov_len = msglen;
/* Send. */
int total_len = iov[0].iov_len + iov[1].iov_len;
int sent = writev(fd, iov, 2);
if (sent != total_len) {
return KNOT_ECONN;
ssize_t ret = send_data(fd, iov, 2, timeout);
if (ret < 0) {
return ret;
}
return msglen; /* Do not count the size prefix. */
......
......@@ -66,11 +66,12 @@ int tcp_recv_data(int fd, uint8_t *buf, int len, struct timeval *timeout);
* \param fd Associated socket.
* \param msg Buffer for a query wireformat.
* \param msglen Buffer maximum size.
* \param timeout Message send timeout.
*
* \retval Number of sent data on success.
* \retval KNOT_ERROR on error.
*/
int tcp_send_msg(int fd, const uint8_t *msg, size_t msglen);
int tcp_send_msg(int fd, const uint8_t *msg, size_t msglen, struct timeval *timeout);
/*!
* \brief Receive a TCP message.
......
......@@ -22,6 +22,7 @@ hattrie
hhash
journal
namedb
net_shortwrite
node
pkt
process_answer
......
......@@ -27,6 +27,7 @@ check_PROGRAMS = \
hhash \
journal \
namedb \
net_shortwrite \
node \
pkt \
process_answer \
......
/* Copyright (C) 2015 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <tap/basic.h>
#include <fcntl.h>
#include <stdint.h>
#include <string.h>
#include <pthread.h>
#include "knot/server/net.h"
#include "knot/server/tcp-handler.h"
const struct timeval TIMEOUT = { .tv_sec = 1 };
struct data {
int fd;
struct timeval timeout;
uint8_t *buffer;
size_t size;
int result;
};
static void *thr_receive(void *data)
{
struct data *d = data;
d->result = tcp_recv_msg(d->fd, d->buffer, d->size, &d->timeout);
return NULL;
}
int main(int argc, char *argv[])
{
plan_lazy();
int r;
// create TCP server
struct sockaddr_storage addr = { 0 };
addr.ss_family = AF_INET;
int server = net_bound_socket(SOCK_STREAM, &addr);
ok(server >= 0, "server: bind socket");
r = listen(server, 0);
ok(r == 0, "server: start listening");
struct sockaddr *sa = (struct sockaddr *)&addr;
socklen_t salen = sockaddr_len(&addr);
r = getsockname(server, sa, &salen);
ok(r == 0, "server: get bound address");
r = fcntl(server, F_SETFL, O_NONBLOCK);
ok(r == 0, "server: set non-blocking mode");
// create TCP client
int client = net_connected_socket(SOCK_STREAM, &addr, NULL, 0);
ok(client >= 0, "client: connect to server");
r = fcntl(client, F_SETFL, O_NONBLOCK);
ok(r == 0, "client: set non-blocking mode");
int optval = 8192;
socklen_t optlen = sizeof(optval);
r = setsockopt(client, SOL_SOCKET, SO_SNDBUF, &optval, optlen);
ok(r == 0, "client: configure small send buffer");
// accept TCP connection
int accepted = accept(server, NULL, NULL);
ok(accepted >= 0, "server: accepted connection");
uint8_t recvbuf[UINT16_MAX] = { 0 };
struct data recv_data = {
.fd = accepted,
.timeout = TIMEOUT,
.buffer = recvbuf,
.size = sizeof(recvbuf)
};
pthread_t thr;
r = pthread_create(&thr, NULL, thr_receive, &recv_data);
ok(r == 0, "server: start receiver thread");
// send message (should handle partial-write correctly)
uint8_t sndbuf[UINT16_MAX];
for (size_t i = 0; i < sizeof(sndbuf); i++) {
sndbuf[i] = i;
}
struct timeval timeout = TIMEOUT;
r = tcp_send_msg(client, sndbuf, sizeof(sndbuf), &timeout);
ok(r == sizeof(sndbuf), "client: tcp_send_msg() with short-write");
// receive message
r = pthread_join(thr, NULL);
ok(r == 0, "server: wait for reciever thread to terminate");
ok(recv_data.result == sizeof(recvbuf) &&
memcmp(sndbuf, recvbuf, sizeof(recvbuf)) == 0,
"server: tcp_recv_msg() complete and valid data");
// clean up
if (accepted >= 0) {
close(accepted);
}
if (server >= 0) {
close(server);
}
if (client >= 0) {
close(client);
}
return 0;
}
......@@ -51,7 +51,7 @@ static void* responder_thread(void *arg)
break;
}
knot_wire_set_qr(buf);
tcp_send_msg(client, buf, len);
tcp_send_msg(client, buf, len, NULL);
close(client);
}
return NULL;
......@@ -154,7 +154,7 @@ int main(int argc, char *argv[])
/* Terminate responder. */
int responder = net_connected_socket(SOCK_STREAM, &remote.addr, NULL, 0);
assert(responder > 0);
tcp_send_msg(responder, (const uint8_t *)"", 1);
tcp_send_msg(responder, (const uint8_t *)"", 1, NULL);
(void) pthread_join(thread, 0);
close(responder);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment