From fa0eda99d07e0ab8882bc7b7ff5b9290c54215a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Vavrus=CC=8Ca?= <marek.vavrusa@nic.cz> Date: Wed, 15 Jul 2015 10:21:44 +0200 Subject: [PATCH] modules/prefetch: wip --- modules/prefetch/prefetch.lua | 97 ++++++++++++++++++++++------------- modules/stats/stats.c | 4 +- 2 files changed, 63 insertions(+), 38 deletions(-) diff --git a/modules/prefetch/prefetch.lua b/modules/prefetch/prefetch.lua index b0c840557..ff3b31664 100644 --- a/modules/prefetch/prefetch.lua +++ b/modules/prefetch/prefetch.lua @@ -1,63 +1,88 @@ --- Batch soon-expiring records in a queue and fetch them periodically. --- This helps to reduce a latency for records that are often accessed. +-- Speculative prefetching for repetitive and soon-expiring records to reduce latency. -- @module prefetch -- @field queue table of scheduled records -- @field queue_max maximum length of the queue --- @field queue_len current length of the queue -- @field window length of the coalescing window local prefetch = { queue = {}, - queue_max = 1000, - queue_len = 0, - window = 30, - layer = { - -- Schedule cached entries that are expiring soon - finish = function(state, req, answer) - local qry = kres.query_resolved(req) - if not kres.query.has_flag(qry, kres.query.EXPIRING) then - return state - end - -- Refresh entries that probably expire in this time window - local qlen = prefetch.queue_len - if qlen > prefetch.queue_max then - return state - end - -- Key: {qtype [1], qname [1-255]} - local key = string.char(answer:qtype())..answer:qname() - local val = prefetch.queue[key] - if not val then - prefetch.queue[key] = 1 - prefetch.queue_len = qlen + 1 - else - prefetch.queue[key] = val + 1 - end - return state - end - } + batch = 0, + epoch = 0, + period = 4 * 24, + window = 15, + log = {} } +-- Calculate current epoch (number of quarter-hours today) +local function current_epoch() + return os.date('%H')*(60/prefetch.window) + math.floor(os.date('%M')/prefetch.window) + 1 +end + -- Resolve queued records and flush the queue -function prefetch.batch(module) +function prefetch.dispatch(ev) -- Defer prefetching if the server is loaded if worker.stats().concurrent > 10 then + event.after(minute, prefetch.dispatch) + prefetch.batch = prefetch.batch + prefetch.batch / 2 return 0 end - local to_delete = prefetch.queue_max / 5 local deleted = 0 for key, val in pairs(prefetch.queue) do worker.resolve(string.sub(key, 2), string.byte(key)) - prefetch.queue[key] = nil + if val > 1 then + prefetch.queue[key] = val - 1 + else + prefetch.queue[key] = nil + end deleted = deleted + 1 - if deleted == to_delete then + if deleted == prefetch.batch then break end end - prefetch.queue_len = prefetch.queue_len - deleted + if deleted > 0 then + event.after(minute, prefetch.dispatch) + end return 0 end +-- Process current epoch +function prefetch.process(ev) + -- Process current learning epoch + local start = os.clock() + local recent_queries = stats.queries() + stats.queries_clear() + local current = {} + for i = 1, #recent_queries do + local entry = recent_queries[i] + local key = string.char(entry.type)..entry.name + current[key] = entry.count + -- print('.. learning', entry.name, entry.type) + end + print (string.format('[prob] learned epoch: %d, %.2f sec', prefetch.epoch, os.clock() - start)) + prefetch.log[prefetch.epoch] = current + prefetch.epoch = prefetch.epoch % prefetch.period + 1 + -- Predict queries for the next epoch based on the usage patterns + for i = 1, prefetch.period / 2 - 1 do + current = prefetch.log[prefetch.epoch - i] + local past = prefetch.log[prefetch.epoch - 2*i] + if current and past then + for k, v in pairs(current) do + if past[k] ~= nil then + prefetch.queue[k] = v + end + end + end + end + print (string.format('[prob] predicted epoch: %d, %.2f sec', prefetch.epoch, os.clock() - start)) + -- TODO: batch in soon-expiring queries + -- TODO: clusterize records often found together + -- Dispatch prefetch requests + prefetch.batch = #prefetch.queue / prefetch.window + event.after(0, prefetch.dispatch) +end + function prefetch.init(module) - event.recurrent(prefetch.window * sec, prefetch.batch) + prefetch.epoch = current_epoch() + event.recurrent(prefetch.window * minute, prefetch.process) end function prefetch.deinit(module) diff --git a/modules/stats/stats.c b/modules/stats/stats.c index db2c942ac..0ab33518a 100644 --- a/modules/stats/stats.c +++ b/modules/stats/stats.c @@ -122,10 +122,10 @@ static void collect_sample(struct stat_data *data, struct kr_rplan *rplan, knot_ { /* Sample key = {[2] type, [1-255] owner} */ char key[sizeof(uint16_t) + KNOT_DNAME_MAXLEN]; - /* Sample queries leading to iteration */ + /* Sample queries leading to iteration or expiring */ struct kr_query *qry = NULL; WALK_LIST(qry, rplan->resolved) { - if (!(qry->flags & QUERY_CACHED)) { + if (!(qry->flags & QUERY_CACHED) || (qry->flags & QUERY_EXPIRING)) { int key_len = collect_key(key, qry->sname, qry->stype); unsigned *count = lru_set(data->frequent.names, key, key_len); if (count) { -- GitLab