Skip to content
Snippets Groups Projects
Verified Commit 0b355484 authored by Libor Peltan's avatar Libor Peltan Committed by Petr Špaček
Browse files

kr_cache_gc: separated DB transactions to prevent blocking; config; bugfix

parent f20864eb
No related branches found
No related tags found
1 merge request!817cache garbage collector
......@@ -12,6 +12,8 @@
#include <lib/cache/impl.h>
#include <lib/defines.h>
#include "kr_cache_gc.h"
// TODO remove and use time(NULL) ! this is just for debug with pre-generated cache
int64_t now = 1523701784;
......@@ -34,6 +36,14 @@ static double gc_timer_end(gc_timer_t *t)
return (((double)end.tv_sec - (double)start->tv_sec) + ((double)end.tv_nsec - (double)start->tv_nsec) / 1e9);
}
static unsigned long gc_timer_usecs(gc_timer_t *t)
{
gc_timer_t *start = t == NULL ? &gc_timer_internal : t;
gc_timer_t end = { 0 };
(void)clock_gettime(CLOCK_MONOTONIC, &end);
return ((end.tv_sec - start->tv_sec) * 1000000UL + (end.tv_nsec - start->tv_nsec) / 1000UL);
}
// section: function key_consistent
static const uint16_t *key_consistent(knot_db_val_t key)
......@@ -91,6 +101,19 @@ static knot_db_t *knot_db_t_kres2libknot(const knot_db_t *db)
return libknot_db;
}
// section: dbval_copy
static knot_db_val_t *dbval_copy(const knot_db_val_t *from)
{
knot_db_val_t *to = malloc(sizeof(knot_db_val_t) + from->len);
if (to != NULL) {
memcpy(to, from, sizeof(knot_db_val_t));
to->data = to + 1; // == ((uit8_t *)to) + sizeof(knot_db_val_t)
memcpy(to->data, from->data, from->len);
}
return to;
}
// section: rrtype list
dynarray_declare(rrtype, uint16_t, DYNARRAY_VISIBILITY_STATIC, 64);
......@@ -122,24 +145,24 @@ static void rrtypelist_print(rrtype_dynarray_t *arr)
// section: main
dynarray_declare(entry, knot_db_val_t, DYNARRAY_VISIBILITY_STATIC, 256);
dynarray_define(entry, knot_db_val_t, DYNARRAY_VISIBILITY_STATIC);
dynarray_declare(entry, knot_db_val_t*, DYNARRAY_VISIBILITY_STATIC, 256);
dynarray_define(entry, knot_db_val_t*, DYNARRAY_VISIBILITY_STATIC);
int kr_cache_gc(const char *cache)
int kr_cache_gc(kr_cache_gc_cfg_t *cfg)
{
char cache_data[strlen(cache) + 10];
snprintf(cache_data, sizeof(cache_data), "%s/data.mdb", cache);
char cache_data[strlen(cfg->cache_path) + 10];
snprintf(cache_data, sizeof(cache_data), "%s/data.mdb", cfg->cache_path);
struct stat st = { 0 };
if (stat(cache, &st) || !(st.st_mode & S_IFDIR) || stat(cache_data, &st)) {
printf("Error: %s does not exist or is not a LMDB.\n", cache);
if (stat(cfg->cache_path, &st) || !(st.st_mode & S_IFDIR) || stat(cache_data, &st)) {
printf("Error: %s does not exist or is not a LMDB.\n", cfg->cache_path);
return -ENOENT;
}
size_t cache_size = st.st_size;
struct kr_cdb_opts opts = { cache, cache_size };
struct kr_cdb_opts opts = { cfg->cache_path, cache_size };
struct kr_cache krc = { 0 };
int ret = kr_cache_open(&krc, NULL, &opts, NULL);
......@@ -148,8 +171,14 @@ int kr_cache_gc(const char *cache)
return -EINVAL;
}
entry_dynarray_t to_del = { 0 };
rrtype_dynarray_t cache_rrtypes = { 0 };
gc_timer_t timer_analyze = { 0 }, timer_delete = { 0 }, timer_rw_txn = { 0 };
const knot_db_api_t *api = knot_db_lmdb_api();
knot_db_txn_t txn = { 0 };
knot_db_iter_t *it = NULL;
knot_db_t *db = knot_db_t_kres2libknot(krc.db);
if (db == NULL) {
printf("Out of memory.\n");
......@@ -160,13 +189,14 @@ int kr_cache_gc(const char *cache)
size_t real_size = knot_db_lmdb_get_mapsize(db), usage = knot_db_lmdb_get_usage(db);
printf("Cache size: %zu, Usage: %zu (%.2lf%%)\n", real_size, usage, (double)usage / real_size * 100.0);
ret = api->txn_begin(db, &txn, 0);
gc_timer_start(&timer_analyze);
ret = api->txn_begin(db, &txn, KNOT_DB_RDONLY);
if (ret != KNOT_EOK) {
printf("Error starting DB transaction (%s).\n", knot_strerror(ret));
goto fail;
}
knot_db_iter_t *it = NULL;
it = api->iter_begin(&txn, KNOT_DB_FIRST);
if (it == NULL) {
printf("Error iterating DB.\n");
......@@ -174,10 +204,9 @@ int kr_cache_gc(const char *cache)
goto fail;
}
entry_dynarray_t to_del = { 0 };
rrtype_dynarray_t cache_rrtypes = { 0 };
gc_timer_start(NULL);
size_t cache_records = 0, deleted_records = 0;
size_t oversize_records = 0, already_gone = 0;;
size_t used_space = 0, rw_txn_count = 1;
while (it != NULL) {
knot_db_val_t key = { 0 }, val = { 0 };
......@@ -194,44 +223,87 @@ int kr_cache_gc(const char *cache)
int64_t over = entry->time + entry->ttl;
over -= now;
if (over < 0) {
entry_dynarray_add(&to_del, &key);
knot_db_val_t *todelete;
if ((cfg->temp_keys_space > 0 &&
used_space + key.len + sizeof(key) > cfg->temp_keys_space) ||
(todelete = dbval_copy(&key)) == NULL) {
oversize_records++;
} else {
used_space += todelete->len + sizeof(*todelete);
entry_dynarray_add(&to_del, &todelete);
}
}
}
it = api->iter_next(it);
}
printf("Cache analyzed in %.2lf secs, %zu records types", gc_timer_end(NULL), cache_records);
api->txn_abort(&txn);
printf("Cache analyzed in %.2lf secs, %zu records types", gc_timer_end(&timer_analyze), cache_records);
rrtypelist_print(&cache_rrtypes);
printf("%zu records to be deleted using %.2lf MBytes of temporary memory, %zu records skipped due to memory limit.\n",
to_del.size, ((double)used_space / 1048576.0), oversize_records);
rrtype_dynarray_free(&cache_rrtypes);
gc_timer_start(NULL);
gc_timer_start(&timer_delete);
gc_timer_start(&timer_rw_txn);
rrtype_dynarray_t deleted_rrtypes = { 0 };
dynarray_foreach(entry, knot_db_val_t, i, to_del) {
ret = api->del(&txn, i);
if (ret != KNOT_EOK) {
printf("Warning: skipping deleting because of error (%s)\n", knot_strerror(ret));
} else {
ret = api->txn_begin(db, &txn, 0);
if (ret != KNOT_EOK) {
printf("Error starting DB transaction (%s).\n", knot_strerror(ret));
goto fail;
}
dynarray_foreach(entry, knot_db_val_t*, i, to_del) {
ret = api->del(&txn, *i);
switch (ret) {
case KNOT_EOK:
deleted_records++;
const uint16_t *entry_type = ret == KNOT_EOK ? key_consistent(*i) : NULL;
const uint16_t *entry_type = ret == KNOT_EOK ? key_consistent(**i) : NULL;
assert(entry_type != NULL);
rrtypelist_add(&deleted_rrtypes, *entry_type);
break;
case KNOT_ENOENT:
already_gone++;
break;
default:
printf("Warning: skipping deleting because of error (%s)\n", knot_strerror(ret));
continue;
}
if ((cfg->rw_txn_items > 0 &&
(deleted_records + already_gone) % cfg->rw_txn_items == 0) ||
(cfg->rw_txn_duration > 0 &&
gc_timer_usecs(&timer_rw_txn) > cfg->rw_txn_duration)) {
ret = api->txn_commit(&txn);
if (ret == KNOT_EOK) {
rw_txn_count++;
usleep(cfg->rw_txn_delay);
gc_timer_start(&timer_rw_txn);
ret = api->txn_begin(db, &txn, 0);
}
if (ret != KNOT_EOK) {
printf("Error restarting DB transaction (%s)\n", knot_strerror(ret));
goto fail;
}
}
}
printf("Deleted in %.2lf secs %zu records types", gc_timer_end(NULL), deleted_records);
printf("Deleted %zu records (%zu already gone) types", deleted_records, already_gone);
rrtypelist_print(&deleted_rrtypes);
rrtype_dynarray_free(&deleted_rrtypes);
entry_dynarray_free(&to_del);
printf("It took %.2lf secs, %zu transactions \n", gc_timer_end(&timer_delete), rw_txn_count);
//api->iter_finish(it);
//it = NULL;
ret = api->txn_commit(&txn);
txn.txn = NULL;
fail:
rrtype_dynarray_free(&deleted_rrtypes);
dynarray_foreach(entry, knot_db_val_t*, i, to_del) {
free(*i);
}
entry_dynarray_free(&to_del);
api->iter_finish(it);
if (txn.txn) {
api->txn_abort(&txn);
......
int kr_cache_gc(const char *cache);
#pragma once
#include <stddef.h>
typedef struct {
const char *cache_path; // path to the LMDB with resolver cache
unsigned long gc_interval; // waiting time between two whole garbage collections in usecs (0 = just one-time cleanup)
size_t temp_keys_space; // maximum amount of temporary memory for copied keys in bytes (0 = unlimited)
size_t rw_txn_items; // maximum number of deleted records per RW transaction (0 = unlimited)
unsigned long rw_txn_duration; // maximum duration of RW transaction in usecs (0 = unlimited)
unsigned long rw_txn_delay; // waiting time between two RW transactions in usecs
} kr_cache_gc_cfg_t;
int kr_cache_gc(kr_cache_gc_cfg_t *cfg);
#define KR_CACHE_GC_VERSION "0.1"
......@@ -28,7 +28,13 @@ static void got_killed(int signum)
static void print_help()
{
printf("Usage: kr_cache_gc -c <resolver_cache> [ -d <garbage_interval(ms)> ]\n");
printf("Usage: kr_cache_gc -c <resolver_cache> [ optional params... ]\n");
printf("Optional params:\n");
printf(" -d <garbage_interval(millis)>\n");
printf(" -l <deletes_per_txn>\n");
printf(" -m <rw_txn_duration(usecs)>\n");
printf(" -w <wait_next_rw_txn(usecs)>\n");
printf(" -t <temporary_memory(MBytes)>\n");
}
int main(int argc, char *argv[])
......@@ -41,22 +47,33 @@ int main(int argc, char *argv[])
signal(SIGCHLD, got_killed);
signal(SIGINT, got_killed);
const char *cache_path = NULL;
unsigned long interval = 0;
kr_cache_gc_cfg_t cfg = { 0 };
int o;
while ((o = getopt(argc, argv, "hc:d:")) != -1) {
while ((o = getopt(argc, argv, "hc:d:l:m:w:t:")) != -1) {
switch (o) {
case 'c':
cache_path = optarg;
cfg.cache_path = optarg;
break;
#define get_nonneg_optarg(to) do { if (atol(optarg) < 0) { print_help(); return 2; } to = atol(optarg); } while (0)
case 'd':
if (atol(optarg) < 0) {
print_help();
return 2;
}
interval = atol(optarg) * 1000;
get_nonneg_optarg(cfg.gc_interval);
cfg.gc_interval *= 1000;
break;
case 'l':
get_nonneg_optarg(cfg.rw_txn_items);
break;
case 'm':
get_nonneg_optarg(cfg.rw_txn_duration);
break;
case 'w':
get_nonneg_optarg(cfg.rw_txn_delay);
break;
case 't':
get_nonneg_optarg(cfg.temp_keys_space);
cfg.temp_keys_space *= 1048576;
break;
#undef get_nonneg_optarg
case ':':
case '?':
case 'h':
......@@ -67,20 +84,20 @@ int main(int argc, char *argv[])
}
}
if (cache_path == NULL) {
if (cfg.cache_path == NULL) {
print_help();
return 1;
}
do {
int ret = kr_cache_gc(cache_path);
int ret = kr_cache_gc(&cfg);
if (ret) {
printf("Error (%s)\n", kr_strerror(ret));
return 10;
}
usleep(interval);
} while (interval > 0 && !killed);
usleep(cfg.gc_interval);
} while (cfg.gc_interval > 0 && !killed);
return 0;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment