Commit 1c7c652d authored by Daniel Salzman's avatar Daniel Salzman
Browse files

Merge branch 'journal_fixing' into 'master'

Journal fixing

See merge request !626
parents de809b16 774e3886
......@@ -40,6 +40,10 @@
/*! \brief The number of unused bytes in DB key. */
#define DB_KEY_UNUSED_ZERO (4)
/*! \brief Metadata inserted on the beginning of each chunk:
* uint32_t serial_to + uint32_t chunk_count + 24B unused */
#define JOURNAL_HEADER_SIZE (32)
enum {
LAST_FLUSHED_VALID = 1 << 0, /* "last flush is valid" flag. */
SERIAL_TO_VALID = 1 << 1, /* "last serial_to is valid" flag. */
......@@ -189,6 +193,9 @@ static int txn_cmpkey(txn_t *txn, knot_db_val_t *key2)
if (txn->key.len != key2->len) {
return (txn->key.len < key2->len ? -1 : 1);
}
if (key2->len == 0) {
return 0;
}
return memcmp(txn->key.data, key2->data, key2->len);
}
......@@ -200,7 +207,9 @@ static void txn_val_u32(txn_t *txn, uint32_t *res)
if (txn->val.len != sizeof(uint32_t)) {
txn->ret = KNOT_EMALF;
}
*res = be32toh(*(uint32_t *)txn->val.data);
uint32_t beval;
memcpy(&beval, (uint32_t *)txn->val.data, sizeof(beval));
*res = be32toh(beval);
}
#define txn_begin_md(md) md_get(txn, txn->j->zone, #md, &txn->shadow_md.md)
......@@ -210,6 +219,8 @@ static void txn_val_u32(txn_t *txn, uint32_t *res)
#define txn_check_ret(txn) if ((txn)->ret != KNOT_EOK) return ((txn)->ret)
#define txn_ret(txn) return ((txn)->ret == KNOT_ESEMCHECK ? KNOT_EOK : (txn)->ret)
#define txn_finished_ok(txn) ((txn)->ret == KNOT_ESEMCHECK || (txn)->ret == KNOT_EOK)
static void txn_begin(txn_t *txn, bool write_allowed)
{
if (txn->ret != KNOT_ESEMCHECK) {
......@@ -427,7 +438,7 @@ static int md_set_common_last_inserter_zone(txn_t *txn, knot_dname_t *zone)
static void md_get_common_last_occupied(txn_t *txn, size_t *res)
{
uint32_t sres;
uint32_t sres = 0;
md_get(txn, NULL, MDKEY_GLOBAL_LAST_TOTAL_OCCUPIED, &sres);
*res = (size_t) sres;
}
......@@ -462,36 +473,39 @@ static int md_flushed(txn_t *txn)
serial_compare(txn->shadow_md.last_flushed, txn->shadow_md.last_serial) == 0));
}
/*! \brief some "metadata" inserted to the beginning of each chunk */
typedef struct {
uint32_t serial_to; // changeset's SOA-to serial
uint32_t chunk_count; // # of changeset's chunks
uint8_t unused_for_future[24];
} journal_header_t;
static void make_header(knot_db_val_t *to, uint32_t serial_to, int chunk_count)
{
assert(to->len >= sizeof(journal_header_t));
assert(to->len >= JOURNAL_HEADER_SIZE);
assert(chunk_count > 0);
journal_header_t h;
memset(&h, 0, sizeof(h));
h.serial_to = htobe32(serial_to);
h.chunk_count = htobe32((uint32_t)chunk_count);
memcpy(to->data, &h, sizeof(h));
uint32_t be_serial_to = htobe32(serial_to);
uint32_t be_chunk_count = htobe32((uint32_t)chunk_count);
memcpy(to->data, &be_serial_to, sizeof(be_serial_to));
memcpy(to->data + sizeof(be_serial_to), &be_chunk_count, sizeof(be_chunk_count));
memset(to->data + sizeof(be_serial_to) + sizeof(be_chunk_count), 0,
JOURNAL_HEADER_SIZE - sizeof(be_serial_to) - sizeof(be_chunk_count));
}
/*! \brief read properties from chunk header "from". All the output params are optional */
static void unmake_header(const knot_db_val_t *from, uint32_t *serial_to,
int *chunk_count, size_t *header_size)
{
assert(from->len >= sizeof(journal_header_t));
journal_header_t *h = (journal_header_t *)from->data;
assert(from->len >= JOURNAL_HEADER_SIZE);
if (serial_to != NULL) *serial_to = be32toh(h->serial_to);
assert(be32toh(h->chunk_count) <= INT_MAX);
if (chunk_count != NULL) *chunk_count = (int)be32toh(h->chunk_count);
if (header_size != NULL) *header_size = sizeof(*h);
uint32_t be_serial_to, be_chunk_count;
if (serial_to != NULL) {
memcpy(&be_serial_to, from->data, sizeof(be_serial_to));
*serial_to = be32toh(be_serial_to);
}
if (chunk_count != NULL) {
memcpy(&be_chunk_count, from->data + sizeof(be_serial_to), sizeof(be_chunk_count));
assert(be32toh(be_chunk_count) <= INT_MAX);
*chunk_count = (int)be32toh(be_chunk_count);
}
if (header_size != NULL) {
*header_size = JOURNAL_HEADER_SIZE;
}
}
static int first_digit(char * of)
......@@ -501,7 +515,7 @@ static int first_digit(char * of)
static void md_update_journal_count(txn_t * txn, int change_amount)
{
uint32_t jcnt;
uint32_t jcnt = 0;
md_get(txn, NULL, MDKEY_GLOBAL_JOURNAL_COUNT, &jcnt);
md_set(txn, NULL, MDKEY_GLOBAL_JOURNAL_COUNT, jcnt + change_amount);
}
......@@ -578,7 +592,7 @@ typedef struct {
*/
static void get_iter_next(txn_t *txn, uint32_t expect_serial, int expect_chunk)
{
knot_db_val_t other_key;
knot_db_val_t other_key = { 0 };
txn_check(txn);
txn_iter_next(txn);
......@@ -683,8 +697,8 @@ static int vals_to_changeset(knot_db_val_t *vals, int nvals,
uint8_t *valps[nvals];
size_t vallens[nvals];
for (int i = 0; i < nvals; i++) {
valps[i] = vals[i].data + sizeof(journal_header_t);
vallens[i] = vals[i].len - sizeof(journal_header_t);
valps[i] = vals[i].data + JOURNAL_HEADER_SIZE;
vallens[i] = vals[i].len - JOURNAL_HEADER_SIZE;
}
changeset_t *t_ch = changeset_new(zone_name);
......@@ -734,7 +748,7 @@ static int load_one(journal_t *j, txn_t *_txn, uint32_t serial, changeset_t **ch
changeset_t *rch = NULL;
iterate(j, txn, load_one_itercb, JOURNAL_ITERATION_CHANGESETS, &rch, serial, serial);
unreuse_txn(txn, _txn);
if (txn->ret == KNOT_EOK) {
if (txn_finished_ok(txn)) {
if (rch == NULL) txn->ret = KNOT_ENOENT;
else *ch = rch;
}
......@@ -747,6 +761,7 @@ static int load_merged_changeset(journal_t *j, txn_t *_txn, changeset_t **mch,
assert(*mch == NULL);
reuse_txn(txn, j, _txn, false);
txn_check_ret(txn);
uint32_t ms = txn->shadow_md.merged_serial, fl = txn->shadow_md.flags;
if ((fl & MERGED_SERIAL_VALID) &&
......@@ -831,6 +846,7 @@ static int delete_upto(journal_t *j, txn_t *txn, uint32_t dbfirst, uint32_t last
static int delete_merged_changeset(journal_t *j, txn_t *t)
{
reuse_txn(txn, j, t, true);
txn_check_ret(txn);
if (!md_flag(txn, MERGED_SERIAL_VALID)) {
txn->ret = KNOT_ENOENT;
}
......@@ -844,6 +860,7 @@ static int delete_merged_changeset(journal_t *j, txn_t *t)
static int drop_journal(journal_t *j, txn_t *_txn)
{
reuse_txn(txn, j, _txn, true);
txn_check_ret(txn);
if (md_flag(txn, MERGED_SERIAL_VALID)) {
delete_merged_changeset(j, txn);
}
......@@ -897,6 +914,7 @@ static int del_tofree_itercb(iteration_ctx_t *ctx)
static int delete_tofree(journal_t *j, txn_t *_txn, size_t to_be_freed, size_t *really_freed)
{
reuse_txn(txn, j, _txn, true);
txn_check_ret(txn);
if (!md_flag(txn, LAST_FLUSHED_VALID)) {
*really_freed = 0;
......@@ -907,7 +925,7 @@ static int delete_tofree(journal_t *j, txn_t *_txn, size_t to_be_freed, size_t *
txn->shadow_md.first_serial, txn->shadow_md.last_serial);
unreuse_txn(txn, _txn);
if (txn->ret == KNOT_EOK) *really_freed = ds.freed_approx;
if (txn_finished_ok(txn)) *really_freed = ds.freed_approx;
txn_ret(txn);
}
......@@ -948,6 +966,7 @@ static int del_count_itercb(iteration_ctx_t *ctx)
static int delete_count(journal_t *j, txn_t *_txn, size_t to_be_deleted, size_t *really_deleted)
{
reuse_txn(txn, j, _txn, true);
txn_check_ret(txn);
if (!md_flag(txn, LAST_FLUSHED_VALID)) {
*really_deleted = 0;
......@@ -958,13 +977,14 @@ static int delete_count(journal_t *j, txn_t *_txn, size_t to_be_deleted, size_t
txn->shadow_md.first_serial, txn->shadow_md.last_serial);
unreuse_txn(txn, _txn);
if (txn->ret == KNOT_EOK) *really_deleted = ds.freed_approx;
if (txn_finished_ok(txn)) *really_deleted = ds.freed_approx;
txn_ret(txn);
}
static int delete_dirty_serial(journal_t *j, txn_t *_txn)
{
reuse_txn(txn, j, _txn, true);
txn_check_ret(txn);
if (!md_flag(txn, DIRTY_SERIAL_VALID)) return KNOT_EOK;
......@@ -976,7 +996,7 @@ static int delete_dirty_serial(journal_t *j, txn_t *_txn)
txn_key_2u32(txn, j->zone, ds, ++chunk);
}
unreuse_txn(txn, _txn);
if (txn->ret == KNOT_EOK) {
if (txn_finished_ok(txn)) {
txn->shadow_md.flags &= ~DIRTY_SERIAL_VALID;
}
txn_ret(txn);
......@@ -1005,6 +1025,7 @@ static int merge_itercb(iteration_ctx_t *ctx)
static int merge_unflushed_changesets(journal_t *j, txn_t *_txn, changeset_t **mch)
{
reuse_txn(txn, j, _txn, false);
txn_check_ret(txn);
*mch = NULL;
if (md_flushed(txn)) {
goto m_u_ch_end;
......@@ -1031,7 +1052,7 @@ static int merge_unflushed_changesets(journal_t *j, txn_t *_txn, changeset_t **m
m_u_ch_end:
unreuse_txn(txn, _txn);
if (txn->ret != KNOT_EOK && *mch != NULL) {
if (!txn_finished_ok(txn) && *mch != NULL) {
changeset_free(*mch);
*mch = NULL;
}
......@@ -1086,7 +1107,7 @@ static int store_changesets(journal_t *j, list_t *changesets)
occupied_now = knot_db_lmdb_get_usage(j->db->db);
md_set(txn, NULL, MDKEY_GLOBAL_LAST_TOTAL_OCCUPIED, occupied_now);
if (occupied_now != occupied_last) {
knot_dname_t *last_zone;
knot_dname_t *last_zone = NULL;
uint32_t lz_occupied;
md_get_common_last_inserter_zone(txn, &last_zone);
md_get(txn, last_zone, MDKEY_PERZONE_OCCUPIED, &lz_occupied);
......@@ -1097,7 +1118,7 @@ static int store_changesets(journal_t *j, list_t *changesets)
md_set_common_last_inserter_zone(txn, j->zone);
// PART 3 : check if we exceeded designed occupation and delete some
uint32_t occupied, occupied_max;
uint32_t occupied = 0, occupied_max;
md_get(txn, j->zone, MDKEY_PERZONE_OCCUPIED, &occupied);
occupied_max = journal_max_usage(j);
occupied += serialized_size_total;
......@@ -1175,9 +1196,9 @@ static int store_changesets(journal_t *j, list_t *changesets)
break;
}
for (int i = 0; i < maxchunks; i++) {
chunkptrs[i] = allchunks + i*CHUNK_MAX + sizeof(journal_header_t);
chunkptrs[i] = allchunks + i*CHUNK_MAX + JOURNAL_HEADER_SIZE;
}
txn->ret = changeset_serialize(ch, chunkptrs, CHUNK_MAX - sizeof(journal_header_t),
txn->ret = changeset_serialize(ch, chunkptrs, CHUNK_MAX - JOURNAL_HEADER_SIZE,
maxchunks, chunksizes, &chunks);
if (txn->ret != KNOT_EOK) {
break;
......@@ -1188,7 +1209,7 @@ static int store_changesets(journal_t *j, list_t *changesets)
for (int i = 0; i < chunks; i++) {
vals[i].data = allchunks + i*CHUNK_MAX;
vals[i].len = sizeof(journal_header_t) + chunksizes[i];
vals[i].len = JOURNAL_HEADER_SIZE + chunksizes[i];
make_header(vals + i, serial_to, chunks);
}
......@@ -1550,6 +1571,7 @@ void journal_metadata_info(journal_t *j, bool *is_empty, uint32_t *serial_from,
local_txn_t(txn, j);
txn_begin(txn, false);
txn_check(txn);
*is_empty = !md_flag(txn, SERIAL_TO_VALID);
*serial_from = txn->shadow_md.first_serial;
......@@ -1653,7 +1675,7 @@ static void _jch_print(const knot_dname_t *zname, int warn_level, const char *fo
int journal_check(journal_t *j, journal_check_level warn_level)
{
int ret, allok = 1;
changeset_t *ch;
changeset_t *ch = NULL;
uint32_t sfrom, sto;
uint32_t first_unflushed;
uint32_t chcount;
......
......@@ -568,14 +568,16 @@ static void test_merge(void)
ok(journal_merge_allowed(j), "journal: merge allowed");
ret = drop_journal(j, NULL);
assert(ret == KNOT_EOK);
ok(ret == KNOT_EOK, "journal: drop_journal must be ok");
// insert stuff and check the merge
for (i = 0; !merged_present(); i++) {
ret = journal_store_changeset(j, tm_chs(apex, i));
ok(ret == KNOT_EOK, "journal: journal_store_changeset must be ok");
}
init_list(&l);
ret = journal_load_changesets(j, &l, 0);
ok(ret == KNOT_EOK, "journal: journal_load_changesets must be ok");
ok(list_size(&l) == 2, "journal: read the merged and one following");
changeset_t * mch = (changeset_t *)HEAD(l);
ok(list_size(&l) >= 1 && tm_rrcnt(mch, 1) == 2, "journal: merged additions # = 2");
......@@ -586,10 +588,12 @@ static void test_merge(void)
journal_store_changeset(j, tm_chs(apex, i));
init_list(&l);
ret = journal_load_changesets(j, &l, 0);
ok(ret == KNOT_EOK, "journal: journal_load_changesets2 must be ok");
ok(list_size(&l) == 3, "journal: read merged together with new changeset");
changesets_free(&l);
init_list(&l);
ret = journal_load_changesets(j, &l, (uint32_t) (i - 3));
ok(ret == KNOT_EOK, "journal: journal_load_changesets3 must be ok");
ok(list_size(&l) == 4, "journal: read short history of merged/unmerged changesets");
changesets_free(&l);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment