Skip to content
Snippets Groups Projects
Commit 8202c5ff authored by Tomas Krizek's avatar Tomas Krizek
Browse files

Merge branch...

Merge branch '585-graphite-prevents-kresd-to-start-if-graphite-server-is-not-available' into 'master'

graphite: Reconnect to the graphite server when it was unavailable

Closes #585

See merge request !1014
parents a85d5897 166e33a5
No related branches found
No related tags found
1 merge request!1014graphite: Reconnect to the graphite server when it was unavailable
Pipeline #65390 failed
......@@ -8,6 +8,7 @@ Bugfixes
- cache: fix interaction between LMDB locks and preallocation (!1013)
- cache garbage collector: fix flushing of messages to logs (!1009)
- cache garbage collector: fix insufficient GC on 32-bit systems (!1009)
- graphite module: do not block resolver on TCP failures (!1014)
Knot Resolver 5.1.1 (2020-05-19)
......
......@@ -6,15 +6,29 @@ if not stats then modules.load('stats') end
if worker.id > 0 then return {} end
local M = {}
local socket = require("cqueues.socket")
local proto_txt = {
[socket.SOCK_DGRAM] = 'udp',
[socket.SOCK_STREAM] = 'tcp'
}
local function make_socket(host, port, stype)
local s, err, status
-- timeout before next interval begins (roughly)
local timeout_sec = (M.interval - 10) / sec
s = socket.connect({ host = host, port = port, type = stype })
s:setmode('bn', 'bn')
status, err = pcall(s.connect, s)
s:settimeout(timeout_sec)
status, err = pcall(s.connect, s, timeout_sec)
if status == true and err == nil then
err = 'connect timeout'
s:close()
status = false
end
if not status then
log('[graphite] connecting: %s@%d %s reason: %s',
host, port, proto_txt[stype], err)
return status, err
end
return s
......@@ -42,26 +56,49 @@ end
-- Send the metrics in a table to multiple Graphite consumers
local function publish_table(metrics, prefix, now)
for key,val in pairs(metrics) do
local msg = key..' '..val..' '..now..'\n'
if prefix then
msg = prefix..'.'..msg
local s
for i in ipairs(M.cli) do
local host = M.info[i]
if M.cli[i] == -1 then
if host.tcp then
s = make_tcp(host.addr, host.port)
else
s = make_udp(host.addr, host.port)
end
if s then
M.cli[i] = s
end
end
for i in ipairs(M.cli) do
local ok, err = M.cli[i]:write(msg)
if not ok then
-- Best-effort reconnect once per two tries
local tcp = M.cli[i]['connect'] ~= nil
local host = M.info[i]
if tcp and host.seen + 2 * M.interval / 1000 <= now then
print(string.format('[graphite] reconnecting: %s@%d reason: %s',
host.addr, host.port, err))
M.cli[i] = make_tcp(host.addr, host.port)
host.seen = now
if M.cli[i] ~= -1 then
for key,val in pairs(metrics) do
local msg = key..' '..val..' '..now..'\n'
if prefix then
msg = prefix..'.'..msg
end
end
local ok, err = pcall(M.cli[i].write, M.cli[i], msg)
if not ok then
local tcp = M.cli[i]['connect'] ~= nil
if tcp and host.seen + 2 * M.interval / 1000 <= now then
local sock_type = (host.tcp and socket.SOCK_STREAM)
or socket.SOCK_DGRAM
log('[graphite] reconnecting: %s@%d %s reason: %s',
host.addr, host.port, proto_txt[sock_type], err)
s = make_tcp(host.addr, host.port)
if s then
M.cli[i] = s
host.seen = now
else
M.cli[i] = -1
break
end
end
end
end -- loop metrics
end
end
end -- loop M.cli
end
function M.init()
......@@ -92,17 +129,8 @@ end
-- @function Make connection to Graphite server.
function M.add_server(_, host, port, tcp)
local s, err
if tcp then
s, err = make_tcp(host, port)
else
s, err = make_udp(host, port)
end
if not s then
error(err)
end
table.insert(M.cli, s)
table.insert(M.info, {addr = host, port = port, seen = 0})
table.insert(M.cli, -1)
table.insert(M.info, {addr = host, port = port, tcp = tcp, seen = 0})
return 0
end
......@@ -112,7 +140,6 @@ function M.config(conf)
if not conf.port then conf.port = 2003 end
if conf.interval then M.interval = conf.interval end
if conf.prefix then M.prefix = conf.prefix end
-- connect to host(s)
if type(conf.host) == 'table' then
for _, val in pairs(conf.host) do
M:add_server(val, conf.port, conf.tcp)
......@@ -122,7 +149,7 @@ function M.config(conf)
end
-- start publishing stats
if M.ev then event.cancel(M.ev) end
M.ev = event.recurrent(M.interval, M.publish)
M.ev = event.recurrent(M.interval, function() worker.coroutine(M.publish) end)
return 0
end
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment