Skip to content
Snippets Groups Projects

daemon/worker: improved handling of timeouted outgouing tcp connections

Merged Grigorii Demidov requested to merge tcp-timeouted-connection into master
+ 83
24
@@ -738,17 +738,29 @@ static int session_tls_hs_cb(struct session *session, int status)
}
}
struct session *s = worker_find_tcp_connected(worker, peer);
ret = kr_ok();
if (deletion_res == kr_ok()) {
/* peer was in the waiting list, add to the connected list. */
ret = worker_add_tcp_connected(worker, peer, session);
if (s) {
/* Something went wrong,
* peer already is in the connected list. */
ret = kr_error(EINVAL);
} else {
ret = worker_add_tcp_connected(worker, peer, session);
}
} else {
/* peer wasn't in the waiting list.
* In this case it must be successful rehandshake.
* Peer must be already in the connected list. */
const char *key = tcpsess_key(peer);
assert(key);
assert(map_contains(&worker->tcp_connected, key) != 0);
* It can be
* 1) either successful rehandshake; in this case peer
* must be already in the connected list.
* 2) or successful handshake with session, which was timeouted
* by on_tcp_connect_timeout(); after successful tcp connection;
* in this case peer isn't in the connected list.
**/
if (!s || s != session) {
ret = kr_error(EINVAL);
}
}
if (ret == kr_ok()) {
while (!session_waitinglist_is_empty(session)) {
@@ -809,35 +821,67 @@ static void on_connect(uv_connect_t *req, int status)
assert(session_flags(session)->outgoing);
if (status == UV_ECANCELED) {
if (kr_verbose_status) {
if (session_flags(session)->closing) {
worker_del_tcp_waiting(worker, peer);
assert(session_is_empty(session));
return;
}
/* Check if the connection is in the waiting list.
* If no, most likely this is timeouted connection
* which was removed from waiting list by
* on_tcp_connect_timeout() callback. */
struct session *s = worker_find_tcp_waiting(worker, peer);
if (!s || s != session) {
/* session isn't on the waiting list.
* it's timeouted session. */
if (VERBOSE_STATUS) {
const char *peer_str = kr_straddr(peer);
kr_log_verbose( "[wrkr]=> connect to '%s' cancelled\n", peer_str ? peer_str : "");
kr_log_verbose( "[wrkr]=> connected to '%s', but session "
"is already timeouted, close\n",
peer_str ? peer_str : "");
}
worker_del_tcp_waiting(worker, peer);
assert(session_is_empty(session) && session_flags(session)->closing);
assert(session_tasklist_is_empty(session));
session_waitinglist_retry(session, false);
session_close(session);
return;
}
if (session_flags(session)->closing) {
worker_del_tcp_waiting(worker, peer);
assert(session_is_empty(session));
s = worker_find_tcp_connected(worker, peer);
if (s) {
/* session already in the connected list.
* Something went wrong, it can be due to races when kresd has tried
* to reconnect to upstream after unsuccessful attempt. */
if (VERBOSE_STATUS) {
const char *peer_str = kr_straddr(peer);
kr_log_verbose( "[wrkr]=> connected to '%s', but peer "
"is already connected, close\n",
peer_str ? peer_str : "");
}
assert(session_tasklist_is_empty(session));
session_waitinglist_retry(session, false);
session_close(session);
return;
}
if (status != 0) {
if (VERBOSE_STATUS) {
const char *peer_str = kr_straddr(peer);
kr_log_verbose( "[wrkr]=> connect to '%s' failed (%s), flagged as 'bad'\n",
kr_log_verbose( "[wrkr]=> connection to '%s' failed (%s), flagged as 'bad'\n",
peer_str ? peer_str : "", uv_strerror(status));
}
worker_del_tcp_waiting(worker, peer);
struct qr_task *task = session_waitinglist_get(session);
struct kr_qflags *options = &task->ctx->req.options;
unsigned score = options->FORWARD || options->STUB ? KR_NS_FWD_DEAD : KR_NS_DEAD;
kr_nsrep_update_rtt(NULL, peer, score,
worker->engine->resolver.cache_rtt,
KR_NS_UPDATE_NORESET);
if (task && status != UV_ETIMEDOUT) {
/* Penalize upstream.
* In case of UV_ETIMEDOUT upstream has been
* already penalized in on_tcp_connect_timeout() */
struct kr_qflags *options = &task->ctx->req.options;
unsigned score = options->FORWARD || options->STUB ? KR_NS_FWD_DEAD : KR_NS_DEAD;
kr_nsrep_update_rtt(NULL, peer, score,
worker->engine->resolver.cache_rtt,
KR_NS_UPDATE_NORESET);
}
assert(session_tasklist_is_empty(session));
session_waitinglist_retry(session, false);
session_close(session);
@@ -902,11 +946,19 @@ static void on_tcp_connect_timeout(uv_timer_t *timer)
worker_del_tcp_waiting(worker, peer);
struct qr_task *task = session_waitinglist_get(session);
if (!task) {
/* Normally shouldn't happen. */
const char *peer_str = kr_straddr(peer);
VERBOSE_MSG(NULL, "=> connection to '%s' failed (internal timeout), empty waitinglist\n",
peer_str ? peer_str : "");
return;
}
struct kr_query *qry = task_get_last_pending_query(task);
WITH_VERBOSE (qry) {
char peer_str[INET6_ADDRSTRLEN];
inet_ntop(peer->sa_family, kr_inaddr(peer), peer_str, sizeof(peer_str));
VERBOSE_MSG(qry, "=> connection to '%s' failed\n", peer_str);
const char *peer_str = kr_straddr(peer);
VERBOSE_MSG(qry, "=> connection to '%s' failed (internal timeout)\n",
peer_str ? peer_str : "");
}
unsigned score = qry->flags.FORWARD || qry->flags.STUB ? KR_NS_FWD_DEAD : KR_NS_DEAD;
@@ -917,7 +969,14 @@ static void on_tcp_connect_timeout(uv_timer_t *timer)
worker->stats.timeout += session_waitinglist_get_len(session);
session_waitinglist_retry(session, true);
assert (session_tasklist_is_empty(session));
session_close(session);
/* uv_cancel() doesn't support uv_connect_t request,
* so that we can't cancel it.
* There still exists possibility of successful connection
* for this request.
* So connection callback (on_connect()) must check
* if connection is in the list of waiting connection.
* If no, most likely this is timeouted connection even if
* it was successful. */
}
/* This is called when I/O timeouts */
Loading