Commit 87b7712b authored by Libor Peltan's avatar Libor Peltan
Browse files

journal: large code cleanup; identifiers renamed etc

parent 5f385feb
......@@ -15,7 +15,6 @@
*/
#include <limits.h>
#include <stdio.h>
#include <sys/stat.h>
#include <stdarg.h>
......@@ -25,41 +24,35 @@
#include "contrib/files.h"
#include "contrib/endian.h"
/*! \brief journal database name. */
#define DATA_DB_NAME "data"
/*! \brief Minimum journal size. */
#define FSLIMIT_MIN (1 * 1024 * 1024)
/*! \brief Journal version. */
#define JOURNAL_VERSION "1.0"
/*! \brief Changeset chunk size. */
#define CHUNK_MAX (60 * 1024)
/*! \brief Journal version */
#define JOURNAL_VERSION "1.0"
/*! \brief various metadata DB key strings */
#define MDKEY_GLOBAL_VERSION "version"
#define MDKEY_GLOBAL_JOURNAL_COUNT "journal_count"
#define MDKEY_GLOBAL_LAST_TOTAL_OCCUPIED "last_total_occupied"
#define MDKEY_GLOBAL_LAST_INSERTER_ZONE "last_inserter_zone"
#define MDKEY_PERZONE_OCCUPIED "occupied"
#define MDKEY_PERZONE_FLAGS "flags" // this one is also hardcoded in macro txn_commit_md()
#define CHUNK_MAX (60 * 1024)
/*! \brief Various metadata DB key strings. Also hardcoded in macro txn_commit()! */
#define MDKEY_GLOBAL_VERSION "version"
#define MDKEY_GLOBAL_JOURNAL_COUNT "journal_count"
#define MDKEY_GLOBAL_LAST_TOTAL_OCCUPIED "last_total_occupied"
#define MDKEY_GLOBAL_LAST_INSERTER_ZONE "last_inserter_zone"
#define MDKEY_PERZONE_OCCUPIED "occupied"
#define MDKEY_PERZONE_FLAGS "flags"
/*! \brief The number of unused DB key items. */
#define DB_KEY_UNUSED_ZERO (4)
enum {
LAST_FLUSHED_VALID = 1 << 0, /* "last flush is valid" flag. */
SERIAL_TO_VALID = 1 << 1, /* "last serial_to is valid" flag. */
MERGED_SERIAL_VALID = 1 << 2, /* "serial_from" of merged changeset */
DIRTY_SERIAL_VALID = 1 << 3, /* "dirty_serial" is present in the DB */
MERGED_SERIAL_VALID = 1 << 2, /* "serial_from" of merged changeset. */
DIRTY_SERIAL_VALID = 1 << 3, /* "dirty_serial" is present in the DB. */
};
static int journal_flush_allowed(journal_t *j) {
static bool journal_flush_allowed(journal_t *j) {
conf_val_t val = conf_zone_get(conf(), C_ZONEFILE_SYNC, j->zone);
if (val.item == NULL || conf_int(&val) >= 0) {
return 1;
}
return 0;
return conf_int(&val) >= 0;
}
static int journal_merge_allowed(journal_t *j) {
static bool journal_merge_allowed(journal_t *j) {
return !journal_flush_allowed(j); // TODO think of other behaviour, e.g. setting
}
......@@ -98,6 +91,17 @@ static float journal_max_txn(journal_t *j)
* ********************************************************************
*/
typedef struct {
uint32_t first_serial; // Serial_from of the first changeset.
uint32_t last_serial; // Serial_from of the last changeset.
uint32_t last_serial_to; // Serial_to of the last changeset.
uint32_t last_flushed; // Serial_from of the last flushed (or merged) chengeset.
uint32_t merged_serial; // "serial_from" of merged changeset.
uint32_t dirty_serial; // Serial_from of an incompletely inserted changeset which shall be deleted (see DB_MAX_INSERT_TXN).
uint32_t changeset_count; // Number of changesets in this journal.
uint32_t flags; // LAST_FLUSHED_VALID, SERIAL_TO_VALID, MERGED_SERIAL_VALID.
} metadata_t;
typedef struct {
journal_t *j;
knot_db_txn_t *txn;
......@@ -111,7 +115,7 @@ typedef struct {
knot_db_val_t val;
uint8_t key_raw[512];
journal_metadata_t shadow_md;
metadata_t shadow_md;
} txn_t;
static void md_get(txn_t *txn, const knot_dname_t *zone, const char *mdkey, uint32_t *res);
......@@ -588,7 +592,8 @@ static void get_iter_next(txn_t *txn, uint32_t expect_serial, int expect_chunk)
typedef int (*iteration_cb_t)(iteration_ctx_t *ctx);
static int iterate(journal_t *j, txn_t *_txn, iteration_cb_t cb, int method, void *iter_context, uint32_t first, uint32_t last)
static int iterate(journal_t *j, txn_t *_txn, iteration_cb_t cb, int method,
void *iter_context, uint32_t first, uint32_t last)
{
reuse_txn(txn, j, _txn, 1);
......@@ -672,7 +677,8 @@ static int iterate(journal_t *j, txn_t *_txn, iteration_cb_t cb, int method, voi
*/
/*! \brief Deserialize changeset from chunks (in vals) */
static int vals_to_changeset(knot_db_val_t *vals, int nvals, const knot_dname_t *zone_name, changeset_t **ch)
static int vals_to_changeset(knot_db_val_t *vals, int nvals,
const knot_dname_t *zone_name, changeset_t **ch)
{
uint8_t *valps[nvals];
size_t vallens[nvals];
......@@ -735,11 +741,11 @@ static int load_one(journal_t *j, txn_t *_txn, uint32_t serial, changeset_t **ch
txn_ret(txn);
}
static int load_merged_changeset(journal_t *j, txn_t *_txn, changeset_t **mch, const uint32_t *only_if_serial)
static int load_merged_changeset(journal_t *j, txn_t *_txn, changeset_t **mch,
const uint32_t *only_if_serial)
{
assert(*mch == NULL);
reuse_txn(txn, j, _txn, 0);
uint32_t ms = txn->shadow_md.merged_serial, fl = txn->shadow_md.flags;
......@@ -891,7 +897,8 @@ static int delete_tofree(journal_t *j, txn_t *_txn, size_t to_be_freed, size_t *
return KNOT_EOK;
}
delete_status_t ds = { .freed_approx = 0, .to_be_freed = to_be_freed };
iterate(j, txn, del_tofree_itercb, JOURNAL_ITERATION_CHUNKS, &ds, txn->shadow_md.first_serial, txn->shadow_md.last_serial);
iterate(j, txn, del_tofree_itercb, JOURNAL_ITERATION_CHUNKS, &ds,
txn->shadow_md.first_serial, txn->shadow_md.last_serial);
unreuse_txn(txn, _txn);
if (txn->ret == KNOT_EOK) *really_freed = ds.freed_approx;
......@@ -941,7 +948,8 @@ static int delete_count(journal_t *j, txn_t *_txn, size_t to_be_deleted, size_t
return KNOT_EOK;
}
delete_status_t ds = { .freed_approx = 0, .to_be_freed = to_be_deleted };
iterate(j, txn, del_count_itercb, JOURNAL_ITERATION_CHUNKS, &ds, txn->shadow_md.first_serial, txn->shadow_md.last_serial);
iterate(j, txn, del_count_itercb, JOURNAL_ITERATION_CHUNKS, &ds,
txn->shadow_md.first_serial, txn->shadow_md.last_serial);
unreuse_txn(txn, _txn);
if (txn->ret == KNOT_EOK) *really_deleted = ds.freed_approx;
......@@ -995,9 +1003,11 @@ static int merge_unflushed_changesets(journal_t *j, txn_t *_txn, changeset_t **m
if (md_flushed(txn)) {
goto m_u_ch_end;
}
int was_merged = md_flag(txn, MERGED_SERIAL_VALID), was_flushed = md_flag(txn, LAST_FLUSHED_VALID);
int was_merged = md_flag(txn, MERGED_SERIAL_VALID);
int was_flushed = md_flag(txn, LAST_FLUSHED_VALID);
uint32_t from = was_merged ? txn->shadow_md.merged_serial :
(was_flushed ? txn->shadow_md.last_flushed : txn->shadow_md.first_serial);
(was_flushed ? txn->shadow_md.last_flushed :
txn->shadow_md.first_serial);
txn->ret = load_one(j, txn, from, mch);
if (!was_merged && was_flushed && txn->ret == KNOT_EOK) {
from = knot_soa_serial(&(*mch)->soa_to->rrs);
......@@ -1010,9 +1020,10 @@ static int merge_unflushed_changesets(journal_t *j, txn_t *_txn, changeset_t **m
}
from = knot_soa_serial(&(*mch)->soa_to->rrs);
txn->ret = iterate(j, txn, merge_itercb, JOURNAL_ITERATION_CHANGESETS, mch, from, txn->shadow_md.last_serial);
txn->ret = iterate(j, txn, merge_itercb, JOURNAL_ITERATION_CHANGESETS,
mch, from, txn->shadow_md.last_serial);
m_u_ch_end:
m_u_ch_end:
unreuse_txn(txn, _txn);
if (txn->ret != KNOT_EOK && *mch != NULL) {
changeset_free(*mch);
......@@ -1095,9 +1106,9 @@ static int store_changesets(journal_t *j, list_t *changesets)
try_flush
delete_tofree(j, txn, tofree, &freed);
if (freed < free_min) {
txn->ret = KNOT_ESPACE;
log_zone_warning(j->zone, "journal: unable to make free space for insert");
goto store_changeset_cleanup;
txn->ret = KNOT_ESPACE;
log_zone_warning(j->zone, "journal: unable to make free space for insert");
goto store_changeset_cleanup;
}
}
}
......@@ -1120,7 +1131,8 @@ static int store_changesets(journal_t *j, list_t *changesets)
changeset_t * chs_head = (HEAD(*changesets));
uint32_t serial = knot_soa_serial(&chs_head->soa_from->rrs);
if (md_flag(txn, SERIAL_TO_VALID) && serial_compare(txn->shadow_md.last_serial_to, serial) != 0) {
log_zone_warning(j->zone, "discontinuity in chages history (%u -> %u), dropping older changesets", txn->shadow_md.last_serial_to, serial);
log_zone_warning(j->zone, "discontinuity in chages history (%u -> %u), dropping older changesets",
txn->shadow_md.last_serial_to, serial);
try_flush
drop_journal(j, txn);
txn_restart(txn);
......@@ -1132,7 +1144,8 @@ static int store_changesets(journal_t *j, list_t *changesets)
}
txn_key_2u32(txn, j->zone, serial_to, 0);
if (txn_find(txn)) {
log_zone_warning(j->zone, "duplicite changeset serial (%u), dropping older changesets", serial_to);
log_zone_warning(j->zone, "duplicite changeset serial (%u), dropping older changesets",
serial_to);
try_flush
delete_upto(j, txn, txn->shadow_md.first_serial, serial_to);
txn_restart(txn);
......@@ -1145,7 +1158,8 @@ static int store_changesets(journal_t *j, list_t *changesets)
break;
}
size_t maxchunks = changeset_serialized_size(ch) * 2 / CHUNK_MAX + 1, chunks; // twice chsize seems like enough room to store all chunks together
// Twice chsize seems like enough room to store all chunks together.
size_t maxchunks = changeset_serialized_size(ch) * 2 / CHUNK_MAX + 1, chunks;
allchunks = malloc(maxchunks * CHUNK_MAX);
chunkptrs = malloc(maxchunks * sizeof(uint8_t *));
chunksizes = malloc(maxchunks * sizeof(size_t));
......@@ -1157,7 +1171,8 @@ static int store_changesets(journal_t *j, list_t *changesets)
for (int i = 0; i < maxchunks; i++) {
chunkptrs[i] = allchunks + i*CHUNK_MAX + sizeof(journal_header_t);
}
txn->ret = changeset_serialize(ch, chunkptrs, CHUNK_MAX - sizeof(journal_header_t), maxchunks, chunksizes, &chunks);
txn->ret = changeset_serialize(ch, chunkptrs, CHUNK_MAX - sizeof(journal_header_t),
maxchunks, chunksizes, &chunks);
uint32_t serial = knot_soa_serial(&ch->soa_from->rrs);
uint32_t serial_to = knot_soa_serial(&ch->soa_to->rrs);
......@@ -1187,8 +1202,6 @@ static int store_changesets(journal_t *j, list_t *changesets)
// PART 7: metadata update
if (txn->ret != KNOT_EOK) {
log_zone_warning(j->zone, "failed to insert a changeset %lu -> %lu into journal (%s)",
(unsigned long)serial, (unsigned long)serial_to, knot_strerror(txn->ret)); // TODO consider removing
break;
}
if (inserting_merged && ch == TAIL(*changesets)) {
......@@ -1217,7 +1230,7 @@ static int store_changesets(journal_t *j, list_t *changesets)
// PART X : finalization and cleanup
store_changeset_cleanup:
store_changeset_cleanup:
txn_commit(txn);
......@@ -1309,7 +1322,6 @@ static int open_journal_db_unsafe(journal_db_t **db)
opts.mapsize = (*db)->fslimit;
opts.maxdbs = 1;
opts.dbname = DATA_DB_NAME;
int ret = (*db)->db_api->init(&(*db)->db, NULL, &opts);
if (ret != KNOT_EOK) {
(*db)->db = NULL;
......@@ -1372,7 +1384,7 @@ void journal_close(journal_t *j)
j->zone = NULL;
}
int init_journal_db(journal_db_t **db, const char *lmdb_dir_path, size_t lmdb_fslimit)
int journal_db_init(journal_db_t **db, const char *lmdb_dir_path, size_t lmdb_fslimit)
{
if (*db != NULL) {
return KNOT_EOK;
......@@ -1381,8 +1393,12 @@ int init_journal_db(journal_db_t **db, const char *lmdb_dir_path, size_t lmdb_fs
if (*db == NULL) {
return KNOT_ENOMEM;
}
journal_db_t dbinit = { .db = NULL, .db_api = knot_db_lmdb_api(), .path = strdup(lmdb_dir_path),
.fslimit = ((lmdb_fslimit < FSLIMIT_MIN) ? FSLIMIT_MIN : lmdb_fslimit) };
journal_db_t dbinit = {
.db = NULL,
.db_api = knot_db_lmdb_api(),
.path = strdup(lmdb_dir_path),
.fslimit = ((lmdb_fslimit < JOURNAL_MIN_FSLIMIT) ? JOURNAL_MIN_FSLIMIT : lmdb_fslimit)
};
memcpy(*db, &dbinit, sizeof(journal_db_t));
pthread_mutex_init(&(*db)->db_mutex, NULL);
return KNOT_EOK;
......@@ -1390,7 +1406,6 @@ int init_journal_db(journal_db_t **db, const char *lmdb_dir_path, size_t lmdb_fs
static void destroy_journal_db(journal_db_t **db)
{
if (*db == NULL) return;
assert((*db)->db == NULL);
pthread_mutex_destroy(&(*db)->db_mutex);
......@@ -1399,9 +1414,11 @@ static void destroy_journal_db(journal_db_t **db)
*db = NULL;
}
void close_journal_db(journal_db_t **db)
void journal_db_close(journal_db_t **db)
{
assert((*db) != NULL);
if (db == NULL || *db == NULL) {
return;
}
pthread_mutex_lock(&(*db)->db_mutex);
if ((*db)->db != NULL) {
......@@ -1415,7 +1432,9 @@ void close_journal_db(journal_db_t **db)
int journal_flush(journal_t *journal)
{
if (journal == NULL || journal->db == NULL) return KNOT_EINVAL;
if (journal == NULL || journal->db == NULL) {
return KNOT_EINVAL;
}
local_txn_t(txn, journal);
txn_begin(txn, 1);
......@@ -1426,7 +1445,10 @@ int journal_flush(journal_t *journal)
bool journal_exists(journal_db_t **db, knot_dname_t *zone_name)
{
if (db == NULL || *db == NULL || zone_name == NULL) return false;
if (db == NULL || *db == NULL || zone_name == NULL) {
return false;
}
if ((*db)->db == NULL) {
struct stat st;
if (stat((*db)->path, &st) != 0 || st.st_size == 0) {
......@@ -1448,9 +1470,9 @@ bool journal_exists(journal_db_t **db, knot_dname_t *zone_name)
return (res == 1);
}
static knot_db_val_t * dbval_copy(const knot_db_val_t * from)
static knot_db_val_t *dbval_copy(const knot_db_val_t *from)
{
knot_db_val_t * to = malloc(sizeof(knot_db_val_t) + from->len);
knot_db_val_t *to = malloc(sizeof(knot_db_val_t) + from->len);
if (to != NULL) {
memcpy(to, from, sizeof(knot_db_val_t));
to->data = to + 1; // == ((uit8_t *)to) + sizeof(knot_db_val_t)
......@@ -1489,15 +1511,20 @@ int scrape_journal(journal_t *j)
}
txn_iter_finish(txn);
ptrnode_t *del_one;
if (txn->ret == KNOT_EOK) {
ptrnode_t * del_one;
WALK_LIST(del_one, to_del) {
txn->ret = j->db->db_api->del(txn->txn, (knot_db_val_t *)del_one->d);
}
md_update_journal_count(txn, -1);
txn->ret = j->db->db_api->txn_commit(txn->txn);
}
scrape_end:
scrape_end:
WALK_LIST(del_one, to_del) {
free(del_one->d);
}
ptrlist_free(&to_del, NULL);
return txn->ret;
......@@ -1552,12 +1579,12 @@ int journal_db_list_zones(journal_db_t **db, list_t *zones)
txn_iter_key(txn, &key);
int metaflag_len = strlen(MDKEY_PERZONE_FLAGS);
char * compare_metaflag = key.data;
char *compare_metaflag = key.data;
compare_metaflag += key.len - 1;
if (txn->ret == KNOT_EOK && *compare_metaflag == '\0') {
compare_metaflag -= metaflag_len;
if (strcmp(compare_metaflag, MDKEY_PERZONE_FLAGS) == 0) {
char * found_zone = knot_dname_to_str_alloc((const knot_dname_t *) key.data);
char *found_zone = knot_dname_to_str_alloc((const knot_dname_t *)key.data);
ptrlist_add(zones, found_zone, NULL);
}
}
......@@ -1572,7 +1599,6 @@ int journal_db_list_zones(journal_db_t **db, list_t *zones)
txn->ret = KNOT_ENOENT;
}
if (list_size(zones) != expected_count) {
fprintf(stderr, "Expected %u zones, found %zu.\n", expected_count, list_size(zones));
txn->ret = KNOT_EMALF;
}
txn_ret(txn);
......@@ -1597,23 +1623,25 @@ static void _jch_print(const knot_dname_t *zname, int warn_level, const char *fo
va_end(args);
switch (warn_level) {
case KNOT_JOURNAL_CHECK_INFO:
case JOURNAL_CHECK_INFO:
log_zone_info(zname, "%s", buf);
break;
case KNOT_JOURNAL_CHECK_WARN:
case JOURNAL_CHECK_WARN:
log_zone_error(zname, "%s", buf);
break;
default:
break;
}
}
#define jch_print(wl, fmt_args...) if ((wl) <= warn_level) _jch_print(j->zone, wl, fmt_args)
#define jch_info(fmt_args...) jch_print(KNOT_JOURNAL_CHECK_INFO, fmt_args)
#define jch_warn(fmt_args...) jch_print((allok = 0, KNOT_JOURNAL_CHECK_WARN), fmt_args)
#define jch_info(fmt_args...) jch_print(JOURNAL_CHECK_INFO, fmt_args)
#define jch_warn(fmt_args...) jch_print((allok = 0, JOURNAL_CHECK_WARN), fmt_args)
#define jch_txn(comment, fatal) do { if (txn->ret != KNOT_EOK && txn->ret != KNOT_ESEMCHECK) { \
jch_warn("failed transaction: %s (%s)", (comment), knot_strerror(txn->ret)); \
if (fatal) return txn->ret; } } while (0)
int journal_check(journal_t *j, int warn_level)
int journal_check(journal_t *j, journal_check_level warn_level)
{
int ret, allok = 1;
changeset_t *ch;
......@@ -1632,9 +1660,10 @@ int journal_check(journal_t *j, int warn_level)
txn_begin(txn, 1);
jch_txn("begin", 1);
jch_info("metadata: flags >> %d << fs %u ls %u lst %u lf %u ms %u ds %u cnt %u", txn->shadow_md.flags,
txn->shadow_md.first_serial, txn->shadow_md.last_serial, txn->shadow_md.last_serial_to,
txn->shadow_md.last_flushed, txn->shadow_md.merged_serial, txn->shadow_md.dirty_serial, txn->shadow_md.changeset_count);
jch_info("metadata: flags >> %d << fs %u ls %u lst %u lf %u ms %u ds %u cnt %u",
txn->shadow_md.flags, txn->shadow_md.first_serial, txn->shadow_md.last_serial,
txn->shadow_md.last_serial_to, txn->shadow_md.last_flushed, txn->shadow_md.merged_serial,
txn->shadow_md.dirty_serial, txn->shadow_md.changeset_count);
chcount = txn->shadow_md.changeset_count;
first_unflushed = txn->shadow_md.first_serial;
......@@ -1644,14 +1673,19 @@ int journal_check(journal_t *j, int warn_level)
}
if (!md_flag(txn, SERIAL_TO_VALID)) {
if (md_flag(txn, LAST_FLUSHED_VALID)) jch_warn("journal flagged empty but last_flushed valid");
if (md_flag(txn, MERGED_SERIAL_VALID)) jch_warn("no other than merged changeset present, this should not happen");
if (md_flag(txn, LAST_FLUSHED_VALID)) {
jch_warn("journal flagged empty but last_flushed valid");
}
if (md_flag(txn, MERGED_SERIAL_VALID)) {
jch_warn("no other than merged changeset present, this should not happen");
}
goto check_merged;
}
ret = load_one(j, txn, txn->shadow_md.first_serial, &ch);
if (ret != KNOT_EOK) {
jch_warn("can't read first changeset %u (%s)", txn->shadow_md.first_serial, knot_strerror(ret));
jch_warn("can't read first changeset %u (%s)",
txn->shadow_md.first_serial, knot_strerror(ret));
goto check_merged;
}
......@@ -1664,7 +1698,8 @@ int journal_check(journal_t *j, int warn_level)
changeset_free(ch);
ret = load_one(j, txn, txn->shadow_md.last_flushed, &ch);
if (ret != KNOT_EOK) {
jch_warn("can't read last flushed changeset %u (%s)", txn->shadow_md.last_flushed, knot_strerror(ret));
jch_warn("can't read last flushed changeset %u (%s)",
txn->shadow_md.last_flushed, knot_strerror(ret));
}
else {
first_unflushed = knot_soa_serial(&ch->soa_to->rrs);
......@@ -1709,15 +1744,17 @@ int journal_check(journal_t *j, int warn_level)
ch = HEAD(l);
if (serial_compare(sfrom, knot_soa_serial(&ch->soa_from->rrs)) != 0) {
jch_warn("first listed changeset's serial 'from' %u is not ok", knot_soa_serial(&ch->soa_from->rrs));
jch_warn("first listed changeset's serial 'from' %u is not ok",
knot_soa_serial(&ch->soa_from->rrs));
}
ch = TAIL(l);
if (serial_compare(sto, knot_soa_serial(&ch->soa_to->rrs)) != 0) {
jch_warn("last listed changeset's serial 'to' %u is not ok", knot_soa_serial(&ch->soa_to->rrs));
jch_warn("last listed changeset's serial 'to' %u is not ok",
knot_soa_serial(&ch->soa_to->rrs));
}
changesets_free(&l);
check_merged:
check_merged:
if (txn->ret != KNOT_ESEMCHECK) txn_abort(txn);
txn_begin(txn, 0);
jch_txn("begin2", 1);
......@@ -1730,7 +1767,8 @@ int journal_check(journal_t *j, int warn_level)
else {
sfrom = knot_soa_serial(&ch->soa_from->rrs);
sto = knot_soa_serial(&ch->soa_to->rrs);
jch_info("merged changeset %u -> %u (size %zu)", sfrom, sto, changeset_serialized_size(ch));
jch_info("merged changeset %u -> %u (size %zu)", sfrom, sto,
changeset_serialized_size(ch));
if (serial_compare(sfrom, txn->shadow_md.merged_serial) != 0) {
jch_warn("merged changeset's serial 'from' is not ok");
}
......
......@@ -18,19 +18,13 @@
#include <pthread.h>
#include "libknot/libknot.h"
#include "libknot/db/db.h"
#include "contrib/ucw/lists.h"
#include "knot/updates/changesets.h"
#include "knot/journal/serialization.h"
/*!
* \brief j->fslimit special value to open with minimal possible mapsize
*
* ...it is equal to the actual DB file size.
* Beware of using this value for the first time initialized DB !
* It is mostly useful for read only access
*/
#define KNOT_JOURNAL_FSLIMIT_SAMEASLAST (400 * 1024)
/*! \brief Minimum journal size. */
#define JOURNAL_MIN_FSLIMIT (1 * 1024 * 1024)
typedef struct {
knot_db_t *db;
......@@ -38,32 +32,49 @@ typedef struct {
char *path;
size_t fslimit;
pthread_mutex_t db_mutex; // please delete this once you move DB opening from journal_open to db_init
// common metadata: last_inserter_zone, last_total_occupied, journal_count
} journal_db_t;
typedef struct {
uint32_t first_serial; // serial_from of the first changeset
uint32_t last_serial; // serial_from of the last changeset
uint32_t last_serial_to; // serial_to of the last changeset
uint32_t last_flushed; // serial_from of the last flushed (or merged) chengeset
uint32_t merged_serial; // "serial_from" of merged changeset
uint32_t dirty_serial; // serial_from of an incompletely inserted changeset which shall be deleted (see DB_MAX_INSERT_TXN)
uint32_t changeset_count; // # of changesets in this journal
uint32_t flags; // LAST_FLUSHED_VALID, SERIAL_TO_VALID, MERGED_SERIAL_VALID
// specific metadata: occupied
} journal_metadata_t;
typedef struct {
journal_db_t *db;
knot_dname_t *zone;
//journal_metadata_t md;
} journal_t;
enum {
KNOT_JOURNAL_CHECK_SILENT = 0,
KNOT_JOURNAL_CHECK_WARN = 1,
KNOT_JOURNAL_CHECK_INFO = 2
};
typedef enum {
JOURNAL_CHECK_SILENT = 0, // No logging, just curious for return value.
JOURNAL_CHECK_WARN = 1, // Log journal inconsistencies.
JOURNAL_CHECK_INFO = 2 // Log journal state.
} journal_check_level;
/*!
* \brief Initialize shared journal DB file. The DB will be open on first use.
*
* \param db Database to be initialized. Must be (*db == NULL) before!
* \param lmdb_dir_path Path to the directory with DB
* \param lmdb_fslimit Maximum size of DB data file
*
* \return KNOT_E*
*/
int journal_db_init(journal_db_t **db, const char *lmdb_dir_path, size_t lmdb_fslimit);
/*!
* \brief Close shared journal DB file.
*
* \param db DB to close.
*/
void journal_db_close(journal_db_t **db);
/*!
* \brief List the zones contained in journal DB.
*
* \param db[in] Shared journal DB
* \param zones[out] List of strings (char *) of zone names
*
* \return KNOT_EOK ok
* \retval KNOT_ENOMEM no zones found
* \retval KNOT_EMALF different # of zones found than expected
* \retval KNOT_E* other error
*/
int journal_db_list_zones(journal_db_t **db, list_t *zones);
/*!
* \brief Allocate a new journal structure.
......@@ -99,24 +110,6 @@ int journal_open(journal_t *j, journal_db_t **db, const knot_dname_t *zone_name)
*/
void journal_close(journal_t *journal);
/*!
* \brief Initialize shared journal DB file. The DB will be open on first use.
*
* \param db Database to be initialized. Must be (*db == NULL) before!
* \param lmdb_dir_path Path to the directory with DB
* \param lmdb_fslimit Maximum size of DB data file
*
* \return KNOT_E*
*/
int init_journal_db(journal_db_t **db, const char *lmdb_dir_path, size_t lmdb_fslimit);
/*!
* \brief Close shared journal DB file.
*
* \param db DB to close.
*/
void close_journal_db(journal_db_t **db);
/*!
* \brief Load changesets from journal.
*
......@@ -157,8 +150,8 @@ int journal_store_changeset(journal_t *journal, changeset_t *change);
/*!
* \brief Check if this (zone's) journal is present in shared journal DB.
*
* \param db Shared journal DB
* \param zone_name Name of the zone of the journal in question
* \param db Shared journal DB
* \param zone_name Name of the zone of the journal in question
*
* \return true or false
*/
......@@ -184,33 +177,20 @@ int scrape_journal(journal_t *j);
/*! \brief Obtain public information from journal metadata
*
* \param[in] j Journal
* \param[out] is_empty 1 if j contains no changesets
* \param[out] serial_from [if !is_empty] starting serial of changesets history
* \param[out] serial_to [if !is_empty] ending serial of changesets history
* \param[in] j Journal
* \param[out] is_empty 1 if j contains no changesets
* \param[out] serial_from [if !is_empty] starting serial of changesets history
* \param[out] serial_to [if !is_empty] ending serial of changesets history
*/
void journal_metadata_info(journal_t *j, int *is_empty, uint32_t *serial_from, uint32_t *serial_to);
/*!
* \brief List the zones contained in journal DB.
*
* \param db[in] Shared journal DB
* \param zones[out] List of strings (char *) of zone names