Skip to content
Snippets Groups Projects

Journal evolution

Merged Libor Peltan requested to merge journal_evolution into master
1 file
+ 79
71
Compare changes
  • Side-by-side
  • Inline
+ 79
71
@@ -114,6 +114,7 @@ typedef struct {
journal_t *j;
knot_db_txn_t *txn;
int ret;
bool opened;
bool is_rw;
@@ -133,14 +134,10 @@ static void md_set32(txn_t *txn, const knot_dname_t *zone, const char *mdkey, ui
static void txn_init(txn_t *txn, knot_db_txn_t *db_txn, journal_t *j)
{
memset(txn, 0, sizeof(*txn));
txn->j = j;
txn->txn = db_txn;
txn->ret = KNOT_ESEMCHECK;
txn->iter = NULL;
txn->key.len = 0;
txn->key.data = &txn->key_raw;
txn->val.len = 0;
txn->val.data = NULL;
}
#define local_txn_t(txn_name, journal) \
@@ -248,18 +245,15 @@ static void txn_val_u64(txn_t *txn, uint64_t *res)
#define txn_begin_md(md) md_get32(txn, txn->j->zone, #md, &txn->shadow_md.md)
#define txn_commit_md(md) md_set32(txn, txn->j->zone, #md, txn->shadow_md.md)
#define txn_check(txn) if ((txn)->ret != KNOT_EOK) return
#define txn_check_ret(txn) if ((txn)->ret != KNOT_EOK) return ((txn)->ret)
#define txn_ret(txn) return ((txn)->ret == KNOT_ESEMCHECK ? KNOT_EOK : (txn)->ret)
#define txn_finished_ok(txn) ((txn)->ret == KNOT_ESEMCHECK || (txn)->ret == KNOT_EOK)
#define txn_check_open(txn) if (((txn)->ret = ((txn)->opened ? (txn)->ret : KNOT_EINVAL)) != KNOT_EOK) return
#define txn_check_ret(txn) if (((txn)->ret = ((txn)->opened ? (txn)->ret : KNOT_EINVAL)) != KNOT_EOK) return ((txn)->ret)
static void txn_begin(txn_t *txn, bool write_allowed)
{
if (txn->ret == KNOT_EOK) {
if (txn->ret == KNOT_EOK && txn->opened) {
txn->ret = KNOT_EINVAL;
}
if (txn->ret != KNOT_ESEMCHECK) {
if (txn->ret != KNOT_EOK) {
return;
}
@@ -267,6 +261,7 @@ static void txn_begin(txn_t *txn, bool write_allowed)
(write_allowed ? 0 : KNOT_DB_RDONLY));
txn->is_rw = write_allowed;
txn->opened = true;
txn_begin_md(first_serial);
txn_begin_md(last_serial);
@@ -280,41 +275,38 @@ static void txn_begin(txn_t *txn, bool write_allowed)
static void txn_find_force(txn_t *txn)
{
if (txn->ret == KNOT_EOK) {
txn->ret = txn->j->db->db_api->find(txn->txn, &txn->key, &txn->val, 0);
}
txn_check_open(txn);
txn->ret = txn->j->db->db_api->find(txn->txn, &txn->key, &txn->val, 0);
}
static int txn_find(txn_t *txn)
static bool txn_find(txn_t *txn)
{
if (txn->ret != KNOT_EOK) {
return 0;
if (txn->ret != KNOT_EOK || !txn->opened) {
return false;
}
txn_find_force(txn);
if (txn->ret == KNOT_ENOENT) {
txn->ret = KNOT_EOK;
return 0;
return false;
}
return (txn->ret == KNOT_EOK ? 1 : 0);
return (txn->ret == KNOT_EOK);
}
static void txn_insert(txn_t *txn)
{
if (txn->ret == KNOT_EOK) {
txn->ret = txn->j->db->db_api->insert(txn->txn, &txn->key, &txn->val, 0);
}
txn_check_open(txn);
txn->ret = txn->j->db->db_api->insert(txn->txn, &txn->key, &txn->val, 0);
}
static void txn_del(txn_t *txn)
{
if (txn->ret == KNOT_EOK) {
txn->ret = txn->j->db->db_api->del(txn->txn, &txn->key);
}
txn_check_open(txn);
txn->ret = txn->j->db->db_api->del(txn->txn, &txn->key);
}
static void txn_iter_begin(txn_t *txn)
{
txn_check(txn);
txn_check_open(txn);
txn->iter = txn->j->db->db_api->iter_begin(txn->txn, KNOT_DB_FIRST);
if (txn->iter == NULL) {
txn->ret = KNOT_ENOMEM;
@@ -364,13 +356,10 @@ static void txn_iter_finish(txn_t *txn)
static void txn_abort(txn_t *txn)
{
if (txn->ret == KNOT_ESEMCHECK) {
return;
}
txn_iter_finish(txn);
txn->j->db->db_api->txn_abort(txn->txn);
if (txn->ret == KNOT_EOK) {
txn->ret = KNOT_ESEMCHECK;
if (txn->opened) {
txn_iter_finish(txn);
txn->j->db->db_api->txn_abort(txn->txn);
txn->opened = false;
}
}
@@ -396,7 +385,7 @@ static void txn_commit(txn_t *txn)
txn->ret = txn->j->db->db_api->txn_commit(txn->txn);
if (txn->ret == KNOT_EOK) {
txn->ret = KNOT_ESEMCHECK;
txn->opened = false;
}
txn_abort(txn); // no effect if all ok
}
@@ -404,7 +393,8 @@ static void txn_commit(txn_t *txn)
static void txn_restart(txn_t *txn)
{
txn_commit(txn);
if (txn->ret == KNOT_ESEMCHECK) {
assert(!txn->opened);
if (txn->ret == KNOT_EOK) {
txn_begin(txn, txn->is_rw);
}
}
@@ -439,7 +429,7 @@ static void txn_unreuse(txn_t **txn, txn_t *reused)
static void md_get(txn_t *txn, const knot_dname_t *zone, const char *mdkey, uint64_t *res)
{
txn_check(txn);
txn_check_open(txn);
txn_key_str(txn, zone, mdkey);
uint64_t res1 = 0;
if (txn_find(txn)) {
@@ -462,7 +452,7 @@ static void md_get32(txn_t *txn, const knot_dname_t *zone, const char *mdkey, ui
// allocates res
static void md_get_common_last_inserter_zone(txn_t *txn, knot_dname_t **res)
{
txn_check(txn);
txn_check_open(txn);
txn_key_str(txn, NULL, MDKEY_GLOBAL_LAST_INSERTER_ZONE);
if (txn_find(txn)) {
*res = knot_dname_copy(txn->val.data, NULL);
@@ -484,7 +474,7 @@ static int md_set_common_last_inserter_zone(txn_t *txn, knot_dname_t *zone)
static void md_del_last_inserter_zone(txn_t *txn, knot_dname_t *if_equals)
{
txn_check(txn);
txn_check_open(txn);
txn_key_str(txn, NULL, MDKEY_GLOBAL_LAST_INSERTER_ZONE);
if (txn_find(txn)) {
if (if_equals == NULL || knot_dname_is_equal(txn->val.data, if_equals)) {
@@ -622,7 +612,7 @@ static int initial_md_check(journal_t *j, bool *dirty_present)
txn_abort(txn);
}
txn_ret(txn);
return txn->ret;
}
/*
@@ -663,12 +653,17 @@ static void get_iter_next(iteration_ctx_t *ctx, iteration_cb_t key_cb)
{
knot_db_val_t other_key = { 0 };
txn_check(ctx->txn);
txn_check_open(ctx->txn);
txn_iter_next(ctx->txn);
txn_iter_key(ctx->txn, &other_key);
key_cb(ctx);
if (ctx->txn->ret == KNOT_ENOENT ||
(ctx->txn->ret == KNOT_EOK && txn_cmpkey(ctx->txn, &other_key) != 0)) {
ctx->txn->ret = KNOT_EOK;
if (ctx->txn->iter != NULL) {
txn_iter_finish(ctx->txn);
}
txn_iter_begin(ctx->txn);
txn_iter_seek(ctx->txn);
}
}
@@ -746,7 +741,7 @@ static int iterate(journal_t *j, txn_t *_txn, iteration_cb_t cb, int method,
unreuse_txn(txn, _txn);
txn_ret(txn);
return txn->ret;
}
static int normal_iterkeycb(iteration_ctx_t *ctx)
@@ -822,11 +817,11 @@ static int load_one(journal_t *j, txn_t *_txn, uint32_t serial, changeset_t **ch
changeset_t *rch = NULL;
iterate(j, txn, load_one_itercb, JOURNAL_ITERATION_CHANGESETS, &rch, serial, serial, normal_iterkeycb);
unreuse_txn(txn, _txn);
if (txn_finished_ok(txn)) {
if (txn->ret == KNOT_EOK) {
if (rch == NULL) txn->ret = KNOT_ENOENT;
else *ch = rch;
}
txn_ret(txn);
return txn->ret;
}
static int load_merged_changeset(journal_t *j, txn_t *_txn, changeset_t **mch,
@@ -844,7 +839,7 @@ static int load_merged_changeset(journal_t *j, txn_t *_txn, changeset_t **mch,
}
unreuse_txn(txn, _txn);
txn_ret(txn);
return txn->ret;
}
int journal_load_changesets(journal_t *j, list_t *dst, uint32_t from)
@@ -866,7 +861,7 @@ int journal_load_changesets(journal_t *j, list_t *dst, uint32_t from)
ls, normal_iterkeycb);
txn_commit(txn);
txn_ret(txn);
return txn->ret;
}
int load_bootstrap_iterkeycb(iteration_ctx_t *ctx)
@@ -894,11 +889,11 @@ static int load_bootstrap_changeset(journal_t *j, txn_t *_txn, changeset_t **ch)
iterate(j, txn, load_bootstrap_itercb, JOURNAL_ITERATION_CHANGESETS, &rch,
0, 0, load_bootstrap_iterkeycb);
unreuse_txn(txn, _txn);
if (txn_finished_ok(txn)) {
if (txn->ret == KNOT_EOK) {
if (rch == NULL) txn->ret = KNOT_ENOENT;
else *ch = rch;
}
txn_ret(txn);
return txn->ret;
}
int journal_load_bootstrap(journal_t *j, list_t *dst)
@@ -925,7 +920,7 @@ int journal_load_bootstrap(journal_t *j, list_t *dst)
}
jlb_end:
txn_commit(txn);
txn_ret(txn);
return txn->ret;
}
/*
@@ -988,7 +983,7 @@ static int delete_merged_changeset(journal_t *j, txn_t *t)
delete_upto(j, txn, txn->shadow_md.merged_serial, txn->shadow_md.merged_serial);
}
unreuse_txn(txn, t);
txn_ret(txn);
return txn->ret;
}
static int drop_journal(journal_t *j, txn_t *_txn)
@@ -1004,7 +999,7 @@ static int drop_journal(journal_t *j, txn_t *_txn)
md_del_last_inserter_zone(txn, j->zone);
md_set(txn, j->zone, MDKEY_PERZONE_OCCUPIED, 0);
unreuse_txn(txn, _txn);
txn_ret(txn);
return txn->ret;
}
static int del_tofree_itercb(iteration_ctx_t *ctx)
@@ -1061,8 +1056,10 @@ static int delete_tofree(journal_t *j, txn_t *_txn, size_t to_be_freed, size_t *
txn->shadow_md.first_serial, txn->shadow_md.last_serial, normal_iterkeycb);
unreuse_txn(txn, _txn);
if (txn_finished_ok(txn)) *really_freed = ds.freed_approx;
txn_ret(txn);
if (txn->ret == KNOT_EOK) {
*really_freed = ds.freed_approx;
}
return txn->ret;
}
static int del_count_itercb(iteration_ctx_t *ctx)
@@ -1113,8 +1110,10 @@ static int delete_count(journal_t *j, txn_t *_txn, size_t to_be_deleted, size_t
txn->shadow_md.first_serial, txn->shadow_md.last_serial, normal_iterkeycb);
unreuse_txn(txn, _txn);
if (txn_finished_ok(txn)) *really_deleted = ds.freed_approx;
txn_ret(txn);
if (txn->ret == KNOT_EOK) {
*really_deleted = ds.freed_approx;
}
return txn->ret;
}
static int delete_dirty_serial(journal_t *j, txn_t *_txn)
@@ -1132,10 +1131,10 @@ static int delete_dirty_serial(journal_t *j, txn_t *_txn)
txn_key_2u32(txn, j->zone, ds, ++chunk);
}
unreuse_txn(txn, _txn);
if (txn_finished_ok(txn)) {
if (txn->ret == KNOT_EOK) {
txn->shadow_md.flags &= ~DIRTY_SERIAL_VALID;
}
txn_ret(txn);
return txn->ret;
}
/*
@@ -1173,6 +1172,7 @@ static int merge_unflushed_changesets(journal_t *j, txn_t *_txn, changeset_t **m
txn->shadow_md.first_serial);
txn->ret = load_one(j, txn, from, mch);
if (!was_merged && was_flushed && txn->ret == KNOT_EOK) {
// we have to jump to ONE AFTER last_flushed
from = knot_soa_serial(&(*mch)->soa_to->rrs);
changeset_free(*mch);
*mch = NULL;
@@ -1183,16 +1183,18 @@ static int merge_unflushed_changesets(journal_t *j, txn_t *_txn, changeset_t **m
}
from = knot_soa_serial(&(*mch)->soa_to->rrs);
txn->ret = iterate(j, txn, merge_itercb, JOURNAL_ITERATION_CHANGESETS,
mch, from, txn->shadow_md.last_serial, normal_iterkeycb);
if (serial_compare(from, txn->shadow_md.last_serial_to) != 0) {
txn->ret = iterate(j, txn, merge_itercb, JOURNAL_ITERATION_CHANGESETS,
mch, from, txn->shadow_md.last_serial, normal_iterkeycb);
}
m_u_ch_end:
unreuse_txn(txn, _txn);
if (!txn_finished_ok(txn) && *mch != NULL) {
if (txn->ret != KNOT_EOK && *mch != NULL) {
changeset_free(*mch);
*mch = NULL;
}
txn_ret(txn);
return txn->ret;
}
// uses local context, e.g.: j, txn, changesets, nchs, serialized_size_total, store_changeset_cleanup, inserting_merged
@@ -1201,6 +1203,9 @@ m_u_ch_end:
if (journal_merge_allowed(j)) { \
changeset_t *merged; \
merge_unflushed_changesets(j, txn, &merged); \
if (txn->ret != KNOT_EOK) { \
goto store_changeset_cleanup; \
} \
add_tail(changesets, &merged->n); \
nchs++; \
serialized_size_total += changeset_serialized_size(merged); \
@@ -1301,7 +1306,10 @@ static int store_changesets(journal_t *j, list_t *changesets)
serial_compare(txn->shadow_md.last_serial_to, serial) != 0) {
log_zone_warning(j->zone, "journal, discontinuity in changes history (%u -> %u), dropping older changesets",
txn->shadow_md.last_serial_to, serial);
try_flush
if (!md_flushed(txn) && !journal_merge_allowed(j)) {
txn->ret = KNOT_EBUSY; // aka try_flush, but don't merge
goto store_changeset_cleanup;
}
drop_journal(j, txn);
txn_restart(txn);
}
@@ -1422,7 +1430,7 @@ store_changeset_cleanup:
txn_commit(txn);
if (txn->ret != KNOT_ESEMCHECK) {
if (txn->ret != KNOT_EOK) {
local_txn_t(ddtxn, j);
txn_begin(ddtxn, true);
if (md_flag(ddtxn, DIRTY_SERIAL_VALID)) {
@@ -1444,7 +1452,7 @@ store_changeset_cleanup:
changeset_free(dbgchst);
}
txn_ret(txn);
return txn->ret;
}
#undef try_flush
@@ -1633,7 +1641,7 @@ int journal_flush(journal_t *journal)
txn_begin(txn, true);
md_flush(txn);
txn_commit(txn);
txn_ret(txn);
return txn->ret;
}
bool journal_exists(journal_db_t **db, knot_dname_t *zone_name)
@@ -1657,10 +1665,10 @@ bool journal_exists(journal_db_t **db, knot_dname_t *zone_name)
local_txn_t(txn, &fake_journal);
txn_begin(txn, false);
txn_key_str(txn, zone_name, MDKEY_PERZONE_FLAGS);
int res = txn_find(txn);
bool res = txn_find(txn);
txn_abort(txn);
return (res == 1);
return res;
}
static knot_db_val_t *dbval_copy(const knot_db_val_t *from)
@@ -1737,7 +1745,7 @@ void journal_metadata_info(journal_t *j, bool *is_empty, uint32_t *serial_from,
local_txn_t(txn, j);
txn_begin(txn, false);
txn_check(txn);
txn_check_open(txn);
*is_empty = !md_flag(txn, SERIAL_TO_VALID);
*serial_from = txn->shadow_md.first_serial;
@@ -1798,7 +1806,7 @@ int journal_db_list_zones(journal_db_t **db, list_t *zones)
if (list_size(zones) != expected_count) {
txn->ret = KNOT_EMALF;
}
txn_ret(txn);
return txn->ret;
}
/*
@@ -1840,7 +1848,7 @@ static void _jch_print(const knot_dname_t *zname, int warn_level, const char *fo
#define jch_print(wl, fmt_args...) if ((wl) <= warn_level) _jch_print(j->zone, warn_level, fmt_args)
#define jch_info(fmt_args...) jch_print(JOURNAL_CHECK_INFO, fmt_args)
#define jch_warn(fmt_args...) jch_print((allok = 0, JOURNAL_CHECK_WARN), fmt_args)
#define jch_txn(comment, fatal) do { if (txn->ret != KNOT_EOK && txn->ret != KNOT_ESEMCHECK) { \
#define jch_txn(comment, fatal) do { if (txn->ret != KNOT_EOK) { \
jch_warn("failed transaction: %s (%s)", (comment), knot_strerror(txn->ret)); \
if (fatal) return txn->ret; } } while (0)
@@ -1986,7 +1994,7 @@ int journal_check(journal_t *j, journal_check_level_t warn_level)
changesets_free(&l);
check_merged:
if (txn->ret != KNOT_ESEMCHECK) txn_abort(txn);
if (txn->opened) txn_abort(txn);
txn_begin(txn, false);
jch_txn("begin2", true);
if (md_flag(txn, MERGED_SERIAL_VALID)) {
Loading