Skip to content
Snippets Groups Projects
Commit de175b90 authored by Vitezslav Kriz's avatar Vitezslav Kriz
Browse files

predict: fix enquing from predict log

 * changed stype to type
 * fix enquening queries from predict log
 * keep heuristic in function generate to predict upcoming epoch
parent e7908bd5
Branches
Tags
1 merge request!330predict: fix enquing from predict log
Pipeline #8572 canceled with stages
in 1 hour, 15 minutes, and 19 seconds
......@@ -54,7 +54,7 @@ local function enqueue(queries)
local nr_queries = #queries
for i = 1, nr_queries do
local entry = queries[i]
local key = string.format('%s %s', entry.stype, entry.name)
local key = string.format('%s %s', entry.type, entry.name)
if not predict.queue[key] then
predict.queue[key] = 1
queued = queued + 1
......@@ -63,6 +63,19 @@ local function enqueue(queries)
return queued
end
-- Enqueue queries from same format as predict.queue or predict.log
local function enqueue_from_log(current)
if not current then return 0 end
queued = 0
for key, val in pairs(current) do
if val and not predict.queue[key] then
predict.queue[key] = val
queued = queued + 1
end
end
return queued
end
-- Prefetch soon-to-expire records
function predict.prefetch()
local queries = stats.expiring()
......@@ -73,24 +86,17 @@ end
-- Sample current epoch, return number of sampled queries
function predict.sample(epoch_now)
if not epoch_now then return 0, 0 end
local current = predict.log[epoch_now] or {}
local queries = stats.frequent()
stats.clear_frequent()
local queued = 0
local current = predict.log[epoch_now]
if predict.epoch ~= epoch_now or current == nil then
if current ~= nil then
queued = enqueue(current)
end
current = {}
end
local nr_samples = #queries
for i = 1, nr_samples do
local entry = queries[i]
local key = string.format('%s %s', entry.stype, entry.name)
local key = string.format('%s %s', entry.type, entry.name)
current[key] = 1
end
predict.log[epoch_now] = current
return nr_samples, queued
return nr_samples
end
-- Predict queries for the upcoming epoch
......@@ -118,13 +124,23 @@ function predict.process(ev)
-- Start a new epoch, or continue sampling
predict.ev_sample = nil
local epoch_now = current_epoch()
local nr_learned, nr_queued = predict.sample(epoch_now)
-- End of epoch, predict next
local nr_queued = 0
-- End of epoch
if predict.epoch ~= epoch_now then
stats['predict.epoch'] = epoch_now
predict.epoch = epoch_now
-- enqueue records from upcoming epoch
nr_queued = enqueue_from_log(predict.log[epoch_now])
-- predict next epoch
nr_queued = nr_queued + generate(epoch_now)
-- clear log for new epoch
predict.log[epoch_now] = {}
end
-- Sample current epoch
local nr_learned = predict.sample(epoch_now)
-- Prefetch expiring records
nr_queued = nr_queued + predict.prefetch()
-- Dispatch predicted queries
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment