From 85b4355c6dd6e645ee89d898aeec4df977dd3679 Mon Sep 17 00:00:00 2001 From: Marek Vavrusa <marek@vavrusa.com> Date: Tue, 5 Jul 2016 00:35:15 -0700 Subject: [PATCH] modules: http, graphite, policy, daf support map() all relevant modules now support running in forked mode and polling workers for information. for example graphite module can poll stats from all workers and then aggregate before sending, or HTTP module can run on the process group leader only and then poll workers for information. --- modules/daf/daf.js | 6 +- modules/daf/daf.lua | 88 ++++++++++++++++++++--------- modules/graphite/graphite.lua | 81 ++++++++++++++------------- modules/http/http.lua | 70 ++++++++++++----------- modules/http/prometheus.lua | 29 ++++++++-- modules/http/static/kresd.css | 8 +++ modules/http/static/kresd.js | 101 ++++++++++++++++++++++++++++++++-- modules/http/static/main.tpl | 21 +++++-- modules/policy/policy.lua | 8 +-- 9 files changed, 292 insertions(+), 120 deletions(-) diff --git a/modules/daf/daf.js b/modules/daf/daf.js index d66026093..a61411892 100644 --- a/modules/daf/daf.js +++ b/modules/daf/daf.js @@ -271,12 +271,12 @@ $(function() { } /* Rule builder submit */ $('#daf-add').click(function () { - const form = $('#daf-builder-form'); + const form = $('#daf-builder-form').parent(); if (dafBuilder.items.length == 0 || form.hasClass('has-error')) { return; } /* Clear previous errors and resubmit. */ - form.find('.alert').remove(); + form.parent().find('.alert').remove(); $.post('daf', dafBuilder.items.join(' ')) .done(function (data) { dafBuilder.clear(); @@ -284,7 +284,7 @@ $(function() { }) .fail(function (data) { const reason = data.responseText.length > 0 ? data.responseText : 'internal error'; - form.append( + form.after( '<div class="alert alert-danger" role="alert">'+ 'Couldn\'t add rule (code: '+data.status+', reason: '+reason+').'+ '</div>' diff --git a/modules/daf/daf.lua b/modules/daf/daf.lua index a03d9b861..ebadda7f0 100644 --- a/modules/daf/daf.lua +++ b/modules/daf/daf.lua @@ -1,5 +1,3 @@ -local cqueues = require('cqueues') - -- Load dependent modules if not view then modules.load('view') end if not policy then modules.load('policy') end @@ -60,7 +58,8 @@ local filters = { end, } -local function parse_filter(tok, g) +local function parse_filter(tok, g, prev) + if not tok then error(string.format('expected filter after "%s"', prev)) end local filter = filters[tok:lower()] if not filter then error(string.format('invalid filter "%s"', tok)) end return filter(g) @@ -77,11 +76,11 @@ local function parse_rule(g) -- or terminate filter chain and return tok = g() while tok do - if tok == 'AND' then - local fa, fb = f, parse_filter(g(), g) + if tok:lower() == 'and' then + local fa, fb = f, parse_filter(g(), g, tok) f = function (req, qry) return fa(req, qry) and fb(req, qry) end - elseif tok == 'OR' then - local fa, fb = f, parse_filter(g(), g) + elseif tok:lower() == 'or' then + local fa, fb = f, parse_filter(g(), g, tok) f = function (req, qry) return fa(req, qry) or fb(req, qry) end else break @@ -131,7 +130,7 @@ local M = { -- @function Cleanup module function M.deinit() - if http then + if http and http.endpoints then http.endpoints['/daf'] = nil http.endpoints['/daf.js'] = nil http.snippets['/daf'] = nil @@ -140,6 +139,10 @@ end -- @function Add rule function M.add(rule) + -- Ignore duplicates + for _, r in ipairs(M.rules) do + if r.info == rule then return r end + end local id, action, filter = compile(rule) if not id then error(action) end -- Combine filter and action into policy @@ -202,6 +205,15 @@ function M.enable(id, val) return M.toggle(id, true) end +local function consensus(op, ...) + local ret = true + local results = map(string.format(op, ...)) + for _, r in ipairs(results) do + ret = ret and r + end + return ret +end + -- @function Public-facing API local function api(h, stream) local m = h:get(':method') @@ -227,7 +239,7 @@ local function api(h, stream) local path = h:get(':path') local id = tonumber(path:match '/([^/]*)$') if id then - if M.del(id) then + if consensus('daf.del "%s"', id) then return tojson(true) end return 404, '"No such rule"' -- Not found @@ -237,8 +249,10 @@ local function api(h, stream) elseif m == 'POST' then local query = stream:get_body_as_string() if query then - local ok, r, err = pcall(M.add, query) - if not ok then return 500, string.format('"%s"', r) end + local ok, r = pcall(M.add, query) + if not ok then return 500, string.format('"%s"', r:match('/([^/]+)$')) end + -- Dispatch to all other workers + consensus('daf.add "%s"', query) return rule_info(r) end return 400 @@ -252,7 +266,7 @@ local function api(h, stream) end -- We do not support more actions if action == 'active' then - if M.toggle(id, val == 'true') then + if consensus('daf.toggle(%d, %s)', id, val == 'true' or 'false') then return tojson(true) else return 404, '"No such rule"' @@ -263,23 +277,39 @@ local function api(h, stream) end end +local function getmatches() + local update = {} + for _, rules in ipairs(map 'daf.rules') do + for _, r in ipairs(rules) do + local id = tostring(r.rule.id) + -- Must have string keys for JSON object and not an array + update[id] = (update[id] or 0) + r.rule.count + end + end + return update +end + -- @function Publish DAF statistics local function publish(h, ws) - local ok, counters = true, {} + local cqueues = require('cqueues') + local ok, last = true, nil while ok do -- Check if we have new rule matches - local update = {} - for _, r in ipairs(M.rules) do - local id = r.rule.id - if counters[id] ~= r.rule.count then - -- Must have string keys for JSON object and not an array - update[tostring(id)] = r.rule.count - counters[id] = r.rule.count + local diff = {} + local has_update, update = pcall(getmatches) + if has_update then + if last then + for id, count in pairs(update) do + if not last[id] or last[id] < count then + diff[id] = count + end + end end + last = update end -- Update counters when there is a new data - if next(update) ~= nil then - ok = ws:send(tojson(update)) + if next(diff) ~= nil then + ok = ws:send(tojson(diff)) else ok = ws:send_ping() end @@ -289,7 +319,7 @@ end -- @function Configure module function M.config(conf) - if not http then error('"http" module is not loaded, cannot load DAF') end + if not http or not http.endpoints then return end -- Export API and data publisher http.endpoints['/daf.js'] = http.page('daf.js', 'daf') http.endpoints['/daf'] = {'application/json', api, publish} @@ -301,13 +331,17 @@ function M.config(conf) <div class="col-md-11"> <input type="text" id="daf-builder" class="form-control" aria-label="..." /> </div> - <button type="button" id="daf-add" class="btn btn-default btn-sm">Add</button> + <div class="col-md-1"> + <button type="button" id="daf-add" class="btn btn-default btn-sm">Add</button> + </div> </form> </div> <div class="row"> - <table id="daf-rules" class="table table-striped table-responsive"> - <th><td>No rules here yet.</td></th> - </table> + <div class="col-md-12"> + <table id="daf-rules" class="table table-striped table-responsive"> + <th><td>No rules here yet.</td></th> + </table> + </div> </div> ]]} end diff --git a/modules/graphite/graphite.lua b/modules/graphite/graphite.lua index c04357653..aa4354e04 100644 --- a/modules/graphite/graphite.lua +++ b/modules/graphite/graphite.lua @@ -1,5 +1,9 @@ ---- @module graphite -local graphite = {} +-- Load dependent modules +if not stats then modules.load('stats') end + +-- This is leader-only module +if worker.id > 0 then return {} end +local M = {} local socket = require('socket') -- Create connected UDP socket @@ -38,6 +42,16 @@ local function make_tcp(host, port) return s end +local function merge(results) + local t = {} + for _, result in ipairs(results) do + for k, v in pairs(result) do + t[k] = (t[k] or 0) + v + end + end + return t +end + -- Send the metrics in a table to multiple Graphite consumers local function publish_table(metrics, prefix, now) for key,val in pairs(metrics) do @@ -45,16 +59,16 @@ local function publish_table(metrics, prefix, now) if prefix then msg = prefix..'.'..msg end - for i in ipairs(graphite.cli) do - local ok, err = graphite.cli[i]:send(msg) + for i in ipairs(M.cli) do + local ok, err = M.cli[i]:send(msg) if not ok then -- Best-effort reconnect once per two tries - local tcp = graphite.cli[i]['connect'] ~= nil - local host = graphite.info[i] - if tcp and host.seen + 2 * graphite.interval / 1000 <= now then + local tcp = M.cli[i]['connect'] ~= nil + local host = M.info[i] + if tcp and host.seen + 2 * M.interval / 1000 <= now then print(string.format('[graphite] reconnecting: %s#%d reason: %s', host.addr, host.port, err)) - graphite.cli[i] = make_tcp(host.addr, host.port) + M.cli[i] = make_tcp(host.addr, host.port) host.seen = now end end @@ -62,56 +76,49 @@ local function publish_table(metrics, prefix, now) end end -function graphite.init(module) - graphite.ev = nil - graphite.cli = {} - graphite.info = {} - graphite.interval = 5 * sec - graphite.prefix = 'kresd.' .. hostname() +function M.init(module) + M.ev = nil + M.cli = {} + M.info = {} + M.interval = 5 * sec + M.prefix = 'kresd.' .. hostname() return 0 end -function graphite.deinit(module) - if graphite.ev then event.cancel(graphite.ev) end +function M.deinit(module) + if M.ev then event.cancel(M.ev) end return 0 end -- @function Publish results to the Graphite server(s) -function graphite.publish() +function M.publish() local now = os.time() -- Publish built-in statistics - if not graphite.cli then error("no graphite server configured") end - publish_table(cache.stats(), graphite.prefix..'.cache', now) - publish_table(worker.stats(), graphite.prefix..'.worker', now) + if not M.cli then error("no graphite server configured") end + publish_table(merge(map 'cache.stats()'), M.prefix..'.cache', now) + publish_table(merge(map 'worker.stats()'), M.prefix..'.worker', now) -- Publish extended statistics if available - if not stats then - return 0 - end - local now_metrics = stats.list() - if type(now_metrics) ~= 'table' then - return 0 -- No metrics to watch - end - publish_table(now_metrics, graphite.prefix, now) + publish_table(merge(map 'stats.list()'), M.prefix, now) return 0 end -- @function Make connection to Graphite server. -function graphite.add_server(graphite, host, port, tcp) +function M.add_server(graphite, host, port, tcp) local s, err = tcp and make_tcp(host, port) or make_udp(host, port) if not s then error(err) end - table.insert(graphite.cli, s) - table.insert(graphite.info, {addr = host, port = port, seen = 0}) + table.insert(M.cli, s) + table.insert(M.info, {addr = host, port = port, seen = 0}) return 0 end -function graphite.config(conf) +function M.config(conf) -- config defaults if not conf then return 0 end if not conf.port then conf.port = 2003 end - if conf.interval then graphite.interval = conf.interval end - if conf.prefix then graphite.prefix = conf.prefix end + if conf.interval then M.interval = conf.interval end + if conf.prefix then M.prefix = conf.prefix end -- connect to host(s) if type(conf.host) == 'table' then for key, val in pairs(conf.host) do @@ -121,9 +128,9 @@ function graphite.config(conf) graphite:add_server(conf.host, conf.port, conf.tcp) end -- start publishing stats - if graphite.ev then event.cancel(graphite.ev) end - graphite.ev = event.recurrent(graphite.interval, graphite.publish) + if M.ev then event.cancel(M.ev) end + M.ev = event.recurrent(M.interval, M.publish) return 0 end -return graphite +return M diff --git a/modules/http/http.lua b/modules/http/http.lua index c78ce7949..b6b5dd07d 100644 --- a/modules/http/http.lua +++ b/modules/http/http.lua @@ -1,3 +1,9 @@ +-- Load dependent modules +if not stats then modules.load('stats') end + +-- This is leader-only module +if worker.id > 0 then return {} end + -- This is a module that does the heavy lifting to provide an HTTP/2 enabled -- server that supports TLS by default and provides endpoint for other modules -- in order to enable them to export restful APIs and websocket streams. @@ -318,40 +324,40 @@ end -- @function Configure module function M.config(conf) - conf = conf or {} - assert(type(conf) == 'table', 'config { host = "...", port = 443, cert = "...", key = "..." }') - -- Configure web interface for resolver - if not conf.port then conf.port = 8053 end - if not conf.host then conf.host = 'localhost' end - if conf.geoip and has_mmdb then M.geoip = mmdb.open(conf.geoip) end - M.interface(conf.host, conf.port, M.endpoints, conf.cert, conf.key) - -- TODO: configure DNS/HTTP(s) interface - if M.ev then return end - -- Schedule both I/O activity notification and timeouts - local poll_step - poll_step = function () - local ok, err, _, co = cq:step(0) - if not ok then warn('[http] error: %s %s', err, debug.traceback(co)) end - -- Reschedule timeout or create new one - local timeout = cq:timeout() - if timeout then - -- Throttle web requests - if timeout == 0 then timeout = 0.001 end - -- Convert from seconds to duration - timeout = timeout * sec - if not M.timeout then - M.timeout = event.after(timeout, poll_step) - else - event.reschedule(M.timeout, timeout) - end - else -- Cancel running timeout when there is no next deadline - if M.timeout then - event.cancel(M.timeout) - M.timeout = nil - end + if conf == true then conf = {} end + assert(type(conf) == 'table', 'config { host = "...", port = 443, cert = "...", key = "..." }') + -- Configure web interface for resolver + if not conf.port then conf.port = 8053 end + if not conf.host then conf.host = 'localhost' end + if conf.geoip and has_mmdb then M.geoip = mmdb.open(conf.geoip) end + M.interface(conf.host, conf.port, M.endpoints, conf.cert, conf.key) + -- TODO: configure DNS/HTTP(s) interface + if M.ev then return end + -- Schedule both I/O activity notification and timeouts + local poll_step + poll_step = function () + local ok, err, _, co = cq:step(0) + if not ok then warn('[http] error: %s %s', err, debug.traceback(co)) end + -- Reschedule timeout or create new one + local timeout = cq:timeout() + if timeout then + -- Throttle web requests + if timeout == 0 then timeout = 0.001 end + -- Convert from seconds to duration + timeout = timeout * sec + if not M.timeout then + M.timeout = event.after(timeout, poll_step) + else + event.reschedule(M.timeout, timeout) + end + else -- Cancel running timeout when there is no next deadline + if M.timeout then + event.cancel(M.timeout) + M.timeout = nil end end - M.ev = event.socket(cq:pollfd(), poll_step) + end + M.ev = event.socket(cq:pollfd(), poll_step) end return M diff --git a/modules/http/prometheus.lua b/modules/http/prometheus.lua index c7378fed8..a8d3300aa 100644 --- a/modules/http/prometheus.lua +++ b/modules/http/prometheus.lua @@ -4,15 +4,25 @@ local snapshots, snapshots_count = {}, 120 -- Gauge metrics local gauges = { ['worker.concurrent'] = true, + ['worker.rss'] = true, } --- Load dependent modules -if not stats then modules.load('stats') end +local function merge(t, results, prefix) + for x, result in pairs(results) do + if type(result) == 'table' then + for k, v in pairs(result) do + local val = t[prefix..k] + t[prefix..k] = (val or 0) + v + end + end + end +end local function getstats() - local t = stats.list() - for k,v in pairs(cache.stats()) do t['cache.'..k] = v end - for k,v in pairs(worker.stats()) do t['worker.'..k] = v end + local t = {} + merge(t, map 'stats.list()', '') + merge(t, map 'cache.stats()', 'cache.') + merge(t, map 'worker.stats()', 'worker.') return t end @@ -52,9 +62,16 @@ local function snapshot_start(h, ws) end end end + -- Aggregate per-worker metrics + local wdata = {} + for i, info in pairs(map 'worker.info()') do + if type(info) == 'table' then + wdata[tostring(info.pid)] = {rss=info.rss, usertime=info.usertime, systime=info.systime, pagefaults=info.pagefaults, queries=info.queries} + end + end -- Publish stats updates periodically if not is_empty then - local update = {time=os.time(), stats=stats_dt, upstreams=upstreams or {}} + local update = {time=os.time(), stats=stats_dt, upstreams=upstreams, workers=wdata} table.insert(snapshots, update) if #snapshots > snapshots_count then table.remove(snapshots, 1) diff --git a/modules/http/static/kresd.css b/modules/http/static/kresd.css index 7b5444356..98035d22b 100644 --- a/modules/http/static/kresd.css +++ b/modules/http/static/kresd.css @@ -29,3 +29,11 @@ body { border-color: #4cae4c !important; color: #fff !important; } + +.spark { + display: inline-block; +} + +.spark-legend { + display: inline-block; +} \ No newline at end of file diff --git a/modules/http/static/kresd.js b/modules/http/static/kresd.js index 00fc1f971..abfa670fc 100644 --- a/modules/http/static/kresd.js +++ b/modules/http/static/kresd.js @@ -44,7 +44,7 @@ $(function() { /* Render other interesting metrics as lines (hidden by default) */ var data = []; - var last_metric = 15; + var last_metric = 17; var metrics = { 'answer.noerror': [0, 'NOERROR', null, 'By RCODE'], 'answer.nodata': [1, 'NODATA', null, 'By RCODE'], @@ -62,6 +62,8 @@ $(function() { 'worker.concurrent': [13, 'Queries outstanding'], 'worker.queries': [14, 'Queries received/s'], 'worker.dropped': [15, 'Queries dropped'], + 'worker.usertime': [16, 'CPU (user)', null, 'Workers'], + 'worker.systime': [17, 'CPU (sys)', null, 'Workers'], }; /* Render latency metrics as sort of a heatmap */ @@ -182,7 +184,10 @@ $(function() { var bubblemap = {}; function pushUpstreams(resp) { if (resp == null) { + $('#map-container').hide(); return; + } else { + $('#map-container').show(); } /* Get current maximum number of queries for bubble diameter adjustment */ var maxQueries = 1; @@ -235,6 +240,90 @@ $(function() { age = age + 1; } + /* Per-worker information */ + function updateRate(x, y, dt) { + return (100.0 * ((x - y) / dt)).toFixed(1); + } + function updateWorker(row, next, data, timestamp, buffer) { + const dt = timestamp - data.timestamp; + const cell = row.find('td'); + /* Update spark lines and CPU times first */ + if (dt > 0.0) { + const utimeRate = updateRate(next.usertime, data.last.usertime, dt); + const stimeRate = updateRate(next.systime, data.last.systime, dt); + cell.eq(1).find('span').text(utimeRate + '% / ' + stimeRate + '%'); + /* Update sparkline graph */ + data.data.push([new Date(timestamp * 1000), utimeRate, stimeRate]); + if (data.data.length > 60) { + data.data.shift(); + } + if (!buffer) { + data.graph.updateOptions( { 'file': data.data } ); + } + } + /* Update other fields */ + if (!buffer) { + cell.eq(2).text(formatNumber(next.rss) + 'B'); + cell.eq(3).text(next.pagefaults); + cell.eq(4).text('Healthy').addClass('text-success'); + } + } + + var workerData = {}; + function pushWorkers(resp, timestamp, buffer) { + if (resp == null) { + return; + } + const workerTable = $('#workers'); + for (var pid in resp) { + var row = workerTable.find('tr[data-pid='+pid+']'); + if (row.length == 0) { + row = workerTable.append( + '<tr data-pid='+pid+'><td>'+pid+'</td>'+ + '<td><div class="spark" id="spark-'+pid+'" /><span /></td><td></td><td></td><td></td>'+ + '</tr>'); + /* Create sparkline visualisation */ + const spark = row.find('#spark-'+pid); + spark.css({'margin-right': '1em', width: '80px', height: '1.4em'}); + workerData[pid] = {timestamp: timestamp, data: [[new Date(timestamp * 1000),0,0]], last: resp[pid]}; + const workerGraph = new Dygraph(spark[0], + workerData[pid].data, { + valueRange: [0, 100], + legend: 'never', + axes : { + x : { + drawGrid: false, + drawAxis : false, + }, + y : { + drawGrid: false, + drawAxis : false, + } + }, + labels: ['x', '%user', '%sys'], + labelsDiv: '', + } + ); + workerData[pid].graph = workerGraph; + } + updateWorker(row, resp[pid], workerData[pid], timestamp, buffer); + /* Track last datapoint */ + workerData[pid].last = resp[pid]; + workerData[pid].timestamp = timestamp; + } + /* Prune unhealthy PIDs */ + if (!buffer) { + workerTable.find('tr').each(function () { + const e = $(this); + if (!(e.data('pid') in resp)) { + const healthCell = e.find('td').last(); + healthCell.removeClass('text-success') + healthCell.text('Dead').addClass('text-danger'); + } + }); + } + } + /* WebSocket endpoints */ var wsStats = (secure ? 'wss://' : 'ws://') + location.host + '/stats'; var ws = new Socket(wsStats); @@ -244,14 +333,16 @@ $(function() { if (data.length > 0) { pushUpstreams(data[data.length - 1].upstreams); } + /* Buffer datapoints and redraw last */ for (var i in data) { - pushMetrics(data[i].stats, data[i].time, true); + const is_last = (i == data.length - 1); + pushWorkers(data[i].workers, data[i].time, !is_last); + pushMetrics(data[i].stats, data[i].time, !is_last); } - graph.updateOptions( { 'file': data } ); } else { - pushMetrics(data.stats, data.time); pushUpstreams(data.upstreams); + pushWorkers(data.workers, data.time); + pushMetrics(data.stats, data.time); } - }; }); \ No newline at end of file diff --git a/modules/http/static/main.tpl b/modules/http/static/main.tpl index e55075de5..558e02909 100644 --- a/modules/http/static/main.tpl +++ b/modules/http/static/main.tpl @@ -62,11 +62,24 @@ </div> </div> </div> + <div class="row"> + <h3>Running workers</h3> + <div class="col-md-12"> + <table id="workers" class="table table-responsive"> + <tr> + <th>PID</th><th>CPU per-worker (user/sys)</th> + <th>RSS</th><th>Page faults</th><th>Status</th> + </tr> + </table> + </div> + </div> </div> - <a name="worldmap"></a> - <h2 class="sub-header">Where do the queries go?</h2> - <div class="col-md-12"> - <div id="map" style="position: relative;"></div> + <div class="row" id="map-container"> + <a name="worldmap"></a> + <h2 class="sub-header">Where do the queries go?</h2> + <div class="col-md-12"> + <div id="map" style="position: relative;"></div> + </div> </div> <div class="col-md-12"> {{ snippets }} diff --git a/modules/policy/policy.lua b/modules/policy/policy.lua index b7207ac95..7f71f0eb4 100644 --- a/modules/policy/policy.lua +++ b/modules/policy/policy.lua @@ -175,12 +175,8 @@ local function rpz_zonefile(action, path) end -- RPZ policy set -function policy.rpz(action, path, format) - if format == 'lmdb' then - error('lmdb zone format is NYI') - else - return rpz_zonefile(action, path) - end +function policy.rpz(action, path) + return rpz_zonefile(action, path) end -- Evaluate packet in given rules to determine policy action -- GitLab