Skip to content
Snippets Groups Projects
Commit 2d8f9207 authored by Marek Vavruša's avatar Marek Vavruša
Browse files

cache: implemented as namedb/lmdb backend

- removed excess copies in query, cache time tracking
- simple insert/replace semantics
- zero-copy query if TTL recalculation is not needed
- doc
parent 1093ce48
Branches
Tags
No related merge requests found
......@@ -4,8 +4,8 @@
#include <unistd.h>
#include <errno.h>
#include <lmdb.h>
#include <libknot/internal/mempattern.h>
#include <libknot/internal/namedb/namedb_lmdb.h>
#include <libknot/errcode.h>
#include <libknot/descriptor.h>
......@@ -13,352 +13,145 @@
#include "lib/defines.h"
#define DEBUG_MSG(fmt, ...) fprintf(stderr, "[cache] " fmt, ## __VA_ARGS__)
#define db_api namedb_lmdb_api()
struct kr_cache
namedb_t *kr_cache_open(const char *handle, mm_ctx_t *mm)
{
MDB_dbi dbi;
MDB_env *env;
mm_ctx_t *pool;
};
struct namedb_lmdb_opts opts = NAMEDB_LMDB_OPTS_INITIALIZER;
opts.path = handle;
struct kr_txn
{
MDB_dbi dbi;
unsigned flags;
MDB_txn *txn;
MDB_txn *parent;
mm_ctx_t *mm;
};
/* MDB access */
static void create_env_dir(const char *path)
{
(void) mkdir(path, 0770);
}
static int dbase_open(struct kr_cache *cache, const char *handle)
{
int ret = mdb_env_create(&cache->env);
if (ret != 0) {
return ret;
}
create_env_dir(handle);
ret = mdb_env_open(cache->env, handle, 0, 0644);
if (ret != 0) {
mdb_env_close(cache->env);
return ret;
}
MDB_txn *txn = NULL;
ret = mdb_txn_begin(cache->env, NULL, 0, &txn);
if (ret != 0) {
mdb_env_close(cache->env);
return ret;
}
ret = mdb_open(txn, NULL, MDB_DUPSORT, &cache->dbi);
if (ret != 0) {
mdb_txn_abort(txn);
mdb_env_close(cache->env);
return ret;
}
ret = mdb_txn_commit(txn);
if (ret != 0) {
mdb_env_close(cache->env);
return ret;
}
DEBUG_MSG("open '%s'\n", handle);
return 0;
}
static void dbase_close(struct kr_cache *cache)
{
mdb_close(cache->env, cache->dbi);
mdb_env_close(cache->env);
DEBUG_MSG("close\n");
}
/* data access */
static MDB_cursor *cursor_acquire(struct kr_txn *txn)
{
MDB_cursor *cursor = NULL;
int ret = mdb_cursor_open(txn->txn, txn->dbi, &cursor);
if (ret != 0) {
namedb_t *db = NULL;
int ret = db_api->init(&db, mm, &opts);
if (ret != KNOT_EOK) {
return NULL;
}
return cursor;
return db;
}
static void cursor_release(MDB_cursor *cursor)
void kr_cache_close(namedb_t *cache)
{
mdb_cursor_close(cursor);
db_api->deinit(cache);
}
/* data serialization */
#define PACKED_RRTYPE(d) *((uint16_t *)(d))
#define PACKED_RDATA(d) ((knot_rdata_t *)(d) + sizeof(uint16_t))
static MDB_val pack_key(const knot_dname_t *name)
int kr_cache_txn_begin(namedb_t *cache, namedb_txn_t *txn, unsigned flags)
{
MDB_val key = { knot_dname_size(name), (void *)name };
return key;
return db_api->txn_begin(cache, txn, flags);
}
static int del_entry(MDB_cursor *cur)
int kr_cache_txn_commit(namedb_txn_t *txn)
{
/* Remember duplicate data count. */
size_t rr_count = 0;
mdb_cursor_count(cur, &rr_count);
/* Remove key if last entry. */
int ret = MDB_SUCCESS;
if (rr_count == 1) {
ret = mdb_cursor_del(cur, MDB_NODUPDATA);
} else {
ret = mdb_cursor_del(cur, 0);
}
if (ret == MDB_SUCCESS) {
return KNOT_EOK;
}
return KNOT_ERROR;
return db_api->txn_commit(txn);
}
static int pack_entry(MDB_cursor *cur, const knot_dname_t *name, uint16_t type,
const knot_rdata_t *rdata, uint32_t expire)
void kr_cache_txn_abort(namedb_txn_t *txn)
{
size_t rdlen = knot_rdata_array_size(knot_rdata_rdlen(rdata));
size_t datalen = rdlen + sizeof(type);
uint8_t buf[datalen];
memcpy(buf, &type, sizeof(type));
memcpy(buf + sizeof(type), rdata, rdlen);
knot_rdata_set_ttl(buf + sizeof(type), expire);
MDB_val key = pack_key(name);
MDB_val data = { datalen, buf };
int ret = mdb_cursor_put(cur, &key, &data, 0);
if (ret != MDB_SUCCESS) {
DEBUG_MSG("cache insert failed => %s\n", mdb_strerror(ret));
return KNOT_ERROR;
}
return KNOT_EOK;
return db_api->txn_abort(txn);
}
static int pack_list(MDB_cursor *cur, const knot_rrset_t *rr)
static size_t cache_key(uint8_t *buf, const knot_dname_t *name, uint16_t type)
{
uint32_t expire = time(NULL) + knot_rrset_ttl(rr);
int ret = KNOT_EOK;
const knot_rdataset_t *rrs = &rr->rrs;
for (uint16_t i = 0; i < rrs->rr_count; i++) {
knot_rdata_t *rd = knot_rdataset_at(rrs, i);
ret = pack_entry(cur, rr->owner, rr->type, rd, expire);
if (ret != KNOT_EOK) {
break;
}
}
#ifndef NDEBUG
char owner[KNOT_DNAME_MAXLEN], type_str[16];
knot_dname_to_str(owner, rr->owner, sizeof(owner));
knot_rrtype_to_string(rr->type, type_str, sizeof(type_str));
DEBUG_MSG("store '%s' type '%s' => %s\n", owner, type_str, knot_strerror(ret));
#endif
return ret;
size_t len = knot_dname_to_wire(buf, name, KNOT_DNAME_MAXLEN);
memcpy(buf + len, &type, sizeof(uint16_t));
return len + sizeof(uint16_t);
}
static int unpack_entry(MDB_cursor *cur, knot_rrset_t *rr, MDB_val *data, uint32_t now, mm_ctx_t *mm)
static struct kr_cache_rrset *cache_rr(namedb_txn_t *txn, const knot_dname_t *name, uint16_t type)
{
knot_rdata_t *rd = PACKED_RDATA(data->mv_data);
uint16_t rr_type = PACKED_RRTYPE(data->mv_data);
if (rr_type != rr->type) {
return KNOT_EOK;
}
uint8_t keybuf[KNOT_DNAME_MAXLEN + sizeof(uint16_t)];
size_t key_len = cache_key(keybuf, name, type);
/* Check if TTL expired (with negative grace period). */
if (knot_rdata_ttl(rd) <= now + KR_TTL_GRACE) {
return del_entry(cur);
/* Look up and return value */
namedb_val_t key = { keybuf, key_len };
namedb_val_t val = { NULL, 0 };
int ret = db_api->find(txn, &key, &val, 0);
if (ret != KNOT_EOK) {
return NULL;
}
return knot_rdataset_add(&rr->rrs, rd, mm);
return (struct kr_cache_rrset *)val.data;
}
static int unpack_list(MDB_cursor *cur, knot_rrset_t *rr, mm_ctx_t *mm)
int kr_cache_query(namedb_txn_t *txn, knot_rrset_t *rr, uint32_t *timestamp)
{
uint32_t now = time(NULL);
MDB_val key = pack_key(rr->owner);
MDB_val data = { 0, NULL };
/* Fetch first entry. */
int ret = mdb_cursor_get(cur, &key, &data, MDB_SET_KEY);
/* Unpack, and find chained duplicates. */
while (ret == MDB_SUCCESS) {
ret = unpack_entry(cur, rr, &data, now, mm);
if (ret != KNOT_EOK) {
return ret;
}
ret = mdb_cursor_get(cur, &key, &data, MDB_NEXT_DUP);
}
/* No results. */
if (knot_rrset_empty(rr)) {
return KNOT_ENOENT;
}
/* Update TTL for all records. */
for (uint16_t i = 0; i < rr->rrs.rr_count; ++i) {
knot_rdata_t *rd = knot_rdataset_at(&rr->rrs, i);
knot_rdata_set_ttl(rd, knot_rdata_ttl(rd) - now);
}
/* Check if the RRSet is in the cache. */
struct kr_cache_rrset *found_rr = cache_rr(txn, rr->owner, rr->type);
if (found_rr != NULL) {
#ifndef NDEBUG
char owner[KNOT_DNAME_MAXLEN], type_str[16];
knot_dname_to_str(owner, rr->owner, sizeof(owner));
knot_rrtype_to_string(rr->type, type_str, sizeof(type_str));
DEBUG_MSG("load '%s' type '%s' => %u records\n", owner, type_str, rr->rrs.rr_count);
char name_str[KNOT_DNAME_MAXLEN];
knot_dname_to_str(name_str, rr->owner, sizeof(name_str));
char type_str[16];
knot_rrtype_to_string(rr->type, type_str, sizeof(type_str));
DEBUG_MSG("query '%s %s' => %u RRs\n", name_str, type_str, found_rr->count);
#endif
/* Assign data and return success. */
rr->rrs.rr_count = found_rr->count;
rr->rrs.data = found_rr->data;
return KNOT_EOK;
}
/* No time constraint */
if (timestamp == NULL) {
return KNOT_EOK;
}
struct kr_cache *kr_cache_open(const char *handle, unsigned flags, mm_ctx_t *mm)
{
struct kr_cache *cache = mm_alloc(mm, sizeof(struct kr_cache));
if (cache == NULL) {
return NULL;
}
memset(cache, 0, sizeof(struct kr_cache));
/* Check if all RRs are still valid. */
uint32_t drift = *timestamp - found_rr->timestamp;
for (unsigned i = 0; i < rr->rrs.rr_count; ++i) {
const knot_rdata_t *rd = knot_rdataset_at(&rr->rrs, i);
if (drift >= knot_rdata_ttl(rd)) {
return KNOT_ENOENT;
}
}
int ret = dbase_open(cache, handle);
if (ret != 0) {
mm_free(mm, cache);
return NULL;
*timestamp = drift;
return KNOT_EOK;
}
cache->pool = mm;
return cache;
/* Not found. */
return KNOT_ENOENT;
}
void kr_cache_close(struct kr_cache *cache)
int kr_cache_insert(namedb_txn_t *txn, const knot_rrset_t *rr, uint32_t timestamp)
{
dbase_close(cache);
mm_free(cache->pool, cache);
}
struct kr_txn *kr_cache_txn_begin(struct kr_cache *cache, struct kr_txn *parent, unsigned flags, mm_ctx_t *mm)
{
assert(cache);
struct kr_txn *txn = mm_alloc(mm, sizeof(struct kr_txn));
if (txn == NULL) {
return NULL;
}
memset(txn, 0, sizeof(struct kr_txn));
txn->dbi = cache->dbi;
txn->mm = mm;
if (parent) {
txn->parent = parent->txn;
}
unsigned mdb_flags = 0;
txn->flags = flags;
if (flags & KR_CACHE_RDONLY) {
mdb_flags |= MDB_RDONLY;
}
int ret = mdb_txn_begin(cache->env, txn->parent, mdb_flags, &txn->txn);
if (ret != 0) {
mm_free(mm, txn);
return NULL;
/* Ignore empty records. */
if (knot_rrset_empty(rr)) {
return KNOT_EOK;
}
return txn;
}
uint8_t keybuf[KNOT_DNAME_MAXLEN + sizeof(uint16_t)];
size_t key_len = cache_key(keybuf, rr->owner, rr->type);
namedb_val_t key = { keybuf, key_len };
int kr_cache_txn_commit(struct kr_txn *txn)
{
int ret = mdb_txn_commit(txn->txn);
namedb_val_t val = { NULL, sizeof(struct kr_cache_rrset) + knot_rdataset_size(&rr->rrs) };
#ifndef NDEBUG
MDB_stat stat;
mdb_stat(txn->txn, txn->dbi, &stat);
DEBUG_MSG("commit, %zu entries\n", stat.ms_entries);
char name_str[KNOT_DNAME_MAXLEN];
knot_dname_to_str(name_str, rr->owner, sizeof(name_str));
char type_str[16];
knot_rrtype_to_string(rr->type, type_str, sizeof(type_str));
DEBUG_MSG("insert '%s %s' => %u RRs (%zuB)\n", name_str, type_str, rr->rrs.rr_count, val.len);
#endif
mm_free(txn->mm, txn);
return ret;
}
void kr_cache_txn_abort(struct kr_txn *txn)
{
mdb_txn_abort(txn->txn);
mm_free(txn->mm, txn);
}
int kr_cache_query(struct kr_txn *txn, knot_rrset_t *rr)
{
MDB_cursor *cursor = cursor_acquire(txn);
if (cursor == NULL) {
return KNOT_ENOMEM;
}
int ret = unpack_list(cursor, rr, txn->mm);
cursor_release(cursor);
return ret;
}
int kr_cache_insert(struct kr_txn *txn, const knot_rrset_t *rr, unsigned flags)
{
MDB_cursor *cursor = cursor_acquire(txn);
if (cursor == NULL) {
return KNOT_ERROR;
int ret = db_api->insert(txn, &key, &val, 0);
if (ret != KNOT_EOK) {
return ret;
}
/* TODO: cache eviction if full */
int ret = pack_list(cursor, rr);
/* Write cached record. */
struct kr_cache_rrset *cache_rr = val.data;
cache_rr->timestamp = timestamp;
cache_rr->count = rr->rrs.rr_count;
memcpy(cache_rr->data, rr->rrs.data, knot_rdataset_size(&rr->rrs));
cursor_release(cursor);
return ret;
return KNOT_EOK;
}
int kr_cache_remove(struct kr_txn *txn, const knot_rrset_t *rr)
int kr_cache_remove(namedb_txn_t *txn, const knot_rrset_t *rr)
{
MDB_cursor *cursor = cursor_acquire(txn);
if (cursor == NULL) {
return -1;
}
int ret = 0;
MDB_val key = pack_key(rr->owner);
MDB_val data;
uint8_t keybuf[KNOT_DNAME_MAXLEN + sizeof(uint16_t)];
size_t key_len = cache_key(keybuf, rr->owner, rr->type);
namedb_val_t key = { keybuf, key_len };
while ((ret = mdb_cursor_get(cursor, &key, &data, MDB_NEXT_DUP)) == 0) {
if (PACKED_RRTYPE(data.mv_data) == rr->type &&
knot_rdataset_member(&rr->rrs, PACKED_RDATA(data.mv_data), false)) {
mdb_cursor_del(cursor, 0);
}
}
/* TODO: selective deletion by RRSet subtraction */
cursor_release(cursor);
return ret;
return db_api->del(txn, &key);
}
......@@ -16,22 +16,80 @@ limitations under the License.
#pragma once
#include <libknot/rrset.h>
#include <libknot/internal/namedb/namedb.h>
enum kr_cache_flag {
KR_CACHE_NOFLAG = 0,
KR_CACHE_RDONLY = 1 << 0
/*!
* \brief Serialized form of the RRSet with inception timestamp.
*/
struct kr_cache_rrset
{
uint32_t timestamp;
uint16_t count;
uint8_t data[];
};
struct kr_cache;
struct kr_txn;
/*!
* \brief Open/create persistent cache in given path.
* \param handle Path to existing directory where the DB should be created.
* \param mm Memory context.
* \return database instance or NULL
*/
namedb_t *kr_cache_open(const char *handle, mm_ctx_t *mm);
struct kr_cache *kr_cache_open(const char *handle, unsigned flags, mm_ctx_t *mm);
void kr_cache_close(struct kr_cache *cache);
/*!
* \brief Close persistent cache.
* \note This doesn't clear the data, just closes the connection to the database.
* \param cache database instance
*/
void kr_cache_close(namedb_t *cache);
struct kr_txn *kr_cache_txn_begin(struct kr_cache *cache, struct kr_txn *parent, unsigned flags, mm_ctx_t *mm);
int kr_cache_txn_commit(struct kr_txn *txn);
void kr_cache_txn_abort(struct kr_txn *txn);
/*!
* \brief Begin cache transaction (read-only or write).
*
* \param cache database instance
* \param txn transaction instance to be initialized (output)
* \param flags transaction flags (see namedb.h in libknot)
* \return KNOT_E*
*/
int kr_cache_txn_begin(namedb_t *cache, namedb_txn_t *txn, unsigned flags);
int kr_cache_query(struct kr_txn *txn, knot_rrset_t *rr);
int kr_cache_insert(struct kr_txn *txn, const knot_rrset_t *rr, unsigned flags);
int kr_cache_remove(struct kr_txn *txn, const knot_rrset_t *rr);
/*!
* \brief Commit existing transaction.
* \param txn transaction instance
* \return KNOT_E*
*/
int kr_cache_txn_commit(namedb_txn_t *txn);
/*!
* \brief Abort existing transaction instance.
* \param txn transaction instance
*/
void kr_cache_txn_abort(namedb_txn_t *txn);
/*!
* \brief Query the cache for given RRSet (name, type, class)
* \note The 'drift' is the time passed between the cache time of the RRSet and now (in seconds).
* \param txn transaction instance
* \param rr query RRSet (its rdataset may be changed depending on the result)
* \param timestamp current time (will be replaced with drift if successful)
* \return KNOT_E*
*/
int kr_cache_query(namedb_txn_t *txn, knot_rrset_t *rr, uint32_t *timestamp);
/*!
* \brief Insert RRSet into cache, replacing any existing data.
* \param txn transaction instance
* \param rr inserted RRSet
* \param timestamp current time
* \return KNOT_E*
*/
int kr_cache_insert(namedb_txn_t *txn, const knot_rrset_t *rr, uint32_t timestamp);
/*!
* \brief Remove RRSet from cache.
* \param txn transaction instance
* \param rr removed RRSet
* \return KNOT_E*
*/
int kr_cache_remove(namedb_txn_t *txn, const knot_rrset_t *rr);
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment