From 0acc6a7149da829ba2449f339bda4acd1bdee6c4 Mon Sep 17 00:00:00 2001 From: Libor Peltan <libor.peltan@nic.cz> Date: Wed, 5 Dec 2018 17:54:37 +0100 Subject: [PATCH] journal: add new implementation of LMDB layer and Journal --- Knot.files | 10 + src/knot/Makefile.inc | 10 + src/knot/conf/schema.c | 3 +- src/knot/conf/schema.h | 5 + src/knot/journal/journal.c | 2 +- src/knot/journal/journal.h | 11 +- src/knot/journal/journal_basic.c | 81 ++++ src/knot/journal/journal_basic.h | 103 +++++ src/knot/journal/journal_metadata.c | 350 +++++++++++++++ src/knot/journal/journal_metadata.h | 157 +++++++ src/knot/journal/journal_read.c | 403 ++++++++++++++++++ src/knot/journal/journal_read.h | 138 ++++++ src/knot/journal/journal_write.c | 239 +++++++++++ src/knot/journal/journal_write.h | 104 +++++ src/knot/journal/knot_lmdb.c | 635 ++++++++++++++++++++++++++++ src/knot/journal/knot_lmdb.h | 348 +++++++++++++++ src/knot/updates/changesets.c | 18 +- src/knot/updates/changesets.h | 20 +- tests/knot/test_journal.c | 450 +++++++++----------- 19 files changed, 2823 insertions(+), 264 deletions(-) create mode 100644 src/knot/journal/journal_basic.c create mode 100644 src/knot/journal/journal_basic.h create mode 100644 src/knot/journal/journal_metadata.c create mode 100644 src/knot/journal/journal_metadata.h create mode 100644 src/knot/journal/journal_read.c create mode 100644 src/knot/journal/journal_read.h create mode 100644 src/knot/journal/journal_write.c create mode 100644 src/knot/journal/journal_write.h create mode 100644 src/knot/journal/knot_lmdb.c create mode 100644 src/knot/journal/knot_lmdb.h diff --git a/Knot.files b/Knot.files index d835e739c1..b1a3c5ea1d 100644 --- a/Knot.files +++ b/Knot.files @@ -142,6 +142,16 @@ src/knot/journal/chgset_ctx.c src/knot/journal/chgset_ctx.h src/knot/journal/journal.c src/knot/journal/journal.h +src/knot/journal/journal_basic.c +src/knot/journal/journal_basic.h +src/knot/journal/journal_metadata.c +src/knot/journal/journal_metadata.h +src/knot/journal/journal_read.c +src/knot/journal/journal_read.h +src/knot/journal/journal_write.c +src/knot/journal/journal_write.h +src/knot/journal/knot_lmdb.c +src/knot/journal/knot_lmdb.h src/knot/journal/serialization.c src/knot/journal/serialization.h src/knot/modules/cookies/cookies.c diff --git a/src/knot/Makefile.inc b/src/knot/Makefile.inc index cb8a6f9fe0..6720799541 100644 --- a/src/knot/Makefile.inc +++ b/src/knot/Makefile.inc @@ -125,6 +125,16 @@ libknotd_la_SOURCES = \ knot/journal/chgset_ctx.h \ knot/journal/journal.c \ knot/journal/journal.h \ + knot/journal/journal_basic.c \ + knot/journal/journal_basic.h \ + knot/journal/journal_metadata.c \ + knot/journal/journal_metadata.h \ + knot/journal/journal_read.c \ + knot/journal/journal_read.h \ + knot/journal/journal_write.c \ + knot/journal/journal_write.h \ + knot/journal/knot_lmdb.c \ + knot/journal/knot_lmdb.h \ knot/journal/serialization.c \ knot/journal/serialization.h \ knot/server/server.c \ diff --git a/src/knot/conf/schema.c b/src/knot/conf/schema.c index b16b150c07..19ae1fa742 100644 --- a/src/knot/conf/schema.c +++ b/src/knot/conf/schema.c @@ -23,7 +23,6 @@ #include "knot/conf/confio.h" #include "knot/conf/tools.h" #include "knot/common/log.h" -#include "knot/journal/journal.h" #include "knot/updates/acl.h" #include "libknot/rrtype/opt.h" #include "libdnssec/tsig.h" @@ -332,7 +331,7 @@ static const yp_item_t desc_template[] = { { C_JOURNAL_DB, YP_TSTR, YP_VSTR = { "journal" }, CONF_IO_FRLD_SRV }, { C_JOURNAL_DB_MODE, YP_TOPT, YP_VOPT = { journal_modes, JOURNAL_MODE_ROBUST }, CONF_IO_FRLD_SRV }, - { C_MAX_JOURNAL_DB_SIZE, YP_TINT, YP_VINT = { JOURNAL_MIN_FSLIMIT, VIRT_MEM_LIMIT(TERA(100)), + { C_MAX_JOURNAL_DB_SIZE, YP_TINT, YP_VINT = { MEGA(1), VIRT_MEM_LIMIT(TERA(100)), VIRT_MEM_LIMIT(GIGA(20)), YP_SSIZE }, CONF_IO_FRLD_SRV }, { C_KASP_DB, YP_TSTR, YP_VSTR = { "keys" }, CONF_IO_FRLD_SRV }, diff --git a/src/knot/conf/schema.h b/src/knot/conf/schema.h index 6714cb0dc8..ec7456d61c 100644 --- a/src/knot/conf/schema.h +++ b/src/knot/conf/schema.h @@ -150,6 +150,11 @@ enum { JOURNAL_CONTENT_ALL = 2, }; +enum { + JOURNAL_MODE_ROBUST = 0, // Robust journal DB disk synchronization. + JOURNAL_MODE_ASYNC = 1, // Asynchronous journal DB disk synchronization. +}; + enum { ZONEFILE_LOAD_NONE = 0, ZONEFILE_LOAD_DIFF = 1, diff --git a/src/knot/journal/journal.c b/src/knot/journal/journal.c index 35d9262010..3b6a7ced1f 100644 --- a/src/knot/journal/journal.c +++ b/src/knot/journal/journal.c @@ -1708,7 +1708,7 @@ void journal_close(journal_t *journal) } int journal_db_init(journal_db_t **db, const char *lmdb_dir_path, size_t lmdb_fslimit, - journal_mode_t mode) + int mode) { if (*db != NULL) { return KNOT_EOK; diff --git a/src/knot/journal/journal.h b/src/knot/journal/journal.h index b2ff3d3337..81ad539301 100644 --- a/src/knot/journal/journal.h +++ b/src/knot/journal/journal.h @@ -1,4 +1,4 @@ -/* Copyright (C) 2017 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> +/* Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -28,17 +28,12 @@ /*! \brief Minimum journal size. */ #define JOURNAL_MIN_FSLIMIT (1 * 1024 * 1024) -typedef enum { - JOURNAL_MODE_ROBUST = 0, // Robust journal DB disk synchronization. - JOURNAL_MODE_ASYNC = 1, // Asynchronous journal DB disk synchronization. -} journal_mode_t; - typedef struct { knot_db_t *db; const knot_db_api_t *db_api; char *path; size_t fslimit; - journal_mode_t mode; + int mode; pthread_mutex_t db_mutex; // please delete this once you move DB opening from journal_open to db_init } journal_db_t; @@ -67,7 +62,7 @@ struct journal_txn; * \return KNOT_E* */ int journal_db_init(journal_db_t **db, const char *lmdb_dir_path, size_t lmdb_fslimit, - journal_mode_t mode); + int mode); /*! * \brief Close shared journal DB file. diff --git a/src/knot/journal/journal_basic.c b/src/knot/journal/journal_basic.c new file mode 100644 index 0000000000..d0aff1bdf4 --- /dev/null +++ b/src/knot/journal/journal_basic.c @@ -0,0 +1,81 @@ +/* Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "knot/journal/journal_basic.h" + +#include "knot/conf/conf.h" +#include "knot/journal/journal_metadata.h" +#include "libknot/error.h" + +MDB_val journal_changeset_id_to_key(bool zone_in_journal, uint32_t serial, const knot_dname_t *zone) +{ + if (zone_in_journal) { + return knot_lmdb_make_key("NIS", zone, (uint32_t)0, "bootstrap"); + } else { + return knot_lmdb_make_key("NII", zone, (uint32_t)0, serial); + } +} + +MDB_val journal_changeset_to_chunk_key(const changeset_t *ch, uint32_t chunk_id) +{ + if (ch->soa_from == NULL) { + return knot_lmdb_make_key("NISI", ch->add->apex->owner, (uint32_t)0, "bootstrap", chunk_id); + } else { + return knot_lmdb_make_key("NIII", ch->add->apex->owner, (uint32_t)0, changeset_from(ch), chunk_id); + } +} + +void journal_make_header(void *chunk, const changeset_t *ch) +{ + knot_lmdb_make_key_part(chunk, JOURNAL_HEADER_SIZE, "IILLL", changeset_to(ch), + (uint32_t)0 /* we no longer care for # of chunks */, + (uint64_t)0, (uint64_t)0, (uint64_t)0); +} + +uint32_t journal_next_serial(const MDB_val *chunk) +{ + return be32toh(*(uint32_t *)chunk->mv_data); +} + +bool journal_serial_to(knot_lmdb_txn_t *txn, bool zij, uint32_t serial, + const knot_dname_t *zone, uint32_t *serial_to) +{ + MDB_val key = journal_changeset_id_to_key(zij, serial, zone); + bool found = knot_lmdb_find_prefix(txn, &key); + if (found && serial_to != NULL) { + *serial_to = journal_next_serial(&txn->cur_val); + } + free(key.mv_data); + return found; +} + +bool journal_allow_flush(zone_journal_t j) +{ + conf_val_t val = conf_zone_get(conf(), C_ZONEFILE_SYNC, j.zone); + return conf_int(&val) >= 0; +} + +size_t journal_conf_max_usage(zone_journal_t j) +{ + conf_val_t val = conf_zone_get(conf(), C_MAX_JOURNAL_USAGE, j.zone); + return conf_int(&val); +} + +size_t journal_conf_max_changesets(zone_journal_t j) +{ + conf_val_t val = conf_zone_get(conf(), C_MAX_JOURNAL_DEPTH, j.zone); + return conf_int(&val); +} diff --git a/src/knot/journal/journal_basic.h b/src/knot/journal/journal_basic.h new file mode 100644 index 0000000000..f9637a995d --- /dev/null +++ b/src/knot/journal/journal_basic.h @@ -0,0 +1,103 @@ +/* Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "knot/conf/schema.h" +#include "knot/journal/knot_lmdb.h" +#include "knot/updates/changesets.h" +#include "libknot/dname.h" + +typedef struct { + knot_lmdb_db_t *db; + const knot_dname_t *zone; +} zone_journal_t; + +#define JOURNAL_CHUNK_MAX (70 * 1024) +#define JOURNAL_HEADER_SIZE (32) + +/*! \brief Convert journal_mode to LMDB environment flags. */ +inline static unsigned journal_env_flags(int journal_mode) +{ + return journal_mode == JOURNAL_MODE_ASYNC ? (MDB_WRITEMAP | MDB_MAPASYNC) : 0; +} + +/*! + * \brief Create a database key prefix to search for a changeset. + * + * \param zone_in_journal True if searching for zone-in-journal special changeset. + * \param serial Serial-from of the changeset to be searched for. Ignored if 'zone_in_journal'. + * \param zone Name of the zone. + * + * \return DB key. 'mv_data' shall be freed later. 'mv_data' is NULL on failure. + */ +MDB_val journal_changeset_id_to_key(bool zone_in_journal, uint32_t serial, const knot_dname_t *zone); + +/*! + * \brief Create a database key for changeset chunk. + * + * \param ch Corresponding changeset (perhaps to be stored). + * \param chunk_id Ordinal number of this changeset's chunk. + * + * \return DB key. 'mv_data' shall be freed later. 'mv_data' is NULL on failure. + */ +MDB_val journal_changeset_to_chunk_key(const changeset_t *ch, uint32_t chunk_id); + +/*! + * \brief Initialise chunk header. + * + * \param chunk Pointer to the changeset chunk. It must be at least JOURNAL_HEADER_SIZE, perhaps more. + * \param ch Changeset to be serialized into the chunk. + */ +void journal_make_header(void *chunk, const changeset_t *ch); + +/*! + * \brief Obtain serial-to of the serialized changeset. + * + * \param chunk Any chunk of a serialized changeset. + * + * \return The changeset's serial-to. + */ +uint32_t journal_next_serial(const MDB_val *chunk); + +/*! + * \brief Obtain serial-to of a changeset stored in journal. + * + * \param txn Journal DB transaction. + * \param zij True if changeset in question is zone-in-journal. + * \param serial Serial-from of the changeset in question. + * \param zone Zone name. + * \param serial_to Output: serial-to of the changeset in question. + * + * \return True if the changeset exists in the journal. + */ +bool journal_serial_to(knot_lmdb_txn_t *txn, bool zij, uint32_t serial, + const knot_dname_t *zone, uint32_t *serial_to); + +/*! \brief Return true if the changeset in question exists in the journal. */ +inline static bool journal_contains(knot_lmdb_txn_t *txn, bool zone, uint32_t serial, const knot_dname_t *zone_name) +{ + return journal_serial_to(txn, zone, serial, zone_name, NULL); +} + +/*! \brief Return true if the journal may be flushed according to conf. */ +bool journal_allow_flush(zone_journal_t j); + +/*! \brief Return configured maximal per-zone usage of journal DB. */ +size_t journal_conf_max_usage(zone_journal_t j); + +/*! \brief Return configured maximal depth of journal. */ +size_t journal_conf_max_changesets(zone_journal_t j); diff --git a/src/knot/journal/journal_metadata.c b/src/knot/journal/journal_metadata.c new file mode 100644 index 0000000000..d1a3bccee0 --- /dev/null +++ b/src/knot/journal/journal_metadata.c @@ -0,0 +1,350 @@ +/* Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "knot/journal/journal_metadata.h" + +#include "libknot/error.h" + +static void fix_endian(void *data, size_t data_size, bool in) +{ + uint8_t tmp[data_size]; + memcpy(tmp, data, data_size); + switch (data_size) { + case sizeof(uint16_t): + *(uint16_t *)data = in ? be16toh(*(uint16_t *)tmp) : htobe16(*(uint16_t *)tmp); + break; + case sizeof(uint32_t): + *(uint32_t *)data = in ? be32toh(*(uint32_t *)tmp) : htobe32(*(uint32_t *)tmp); + break; + case sizeof(uint64_t): + *(uint64_t *)data = in ? be64toh(*(uint64_t *)tmp) : htobe64(*(uint64_t *)tmp); + break; + default: + assert(0); + } +} + +static MDB_val metadata_key(const knot_dname_t *zone, const char *metadata) +{ + if (zone == NULL) { + return knot_lmdb_make_key("IS", (uint32_t)0, metadata); + } else { + return knot_lmdb_make_key("NIS", zone, (uint32_t)0, metadata); + } +} + +static bool del_metadata(knot_lmdb_txn_t *txn, const knot_dname_t *zone, const char *metadata) +{ + MDB_val key = metadata_key(zone, metadata); + if (key.mv_data != NULL) { + knot_lmdb_del_prefix(txn, &key); + free(key.mv_data); + } + return (key.mv_data != NULL); +} + +static bool get_metadata(knot_lmdb_txn_t *txn, const knot_dname_t *zone, const char *metadata) +{ + MDB_val key = metadata_key(zone, metadata); + bool ret = knot_lmdb_find(txn, &key, KNOT_LMDB_EXACT); // not FORCE + free(key.mv_data); + return ret; +} + +static bool get_metadata_numeric(knot_lmdb_txn_t *txn, const knot_dname_t *zone, + const char *metadata, void *result, size_t result_size) +{ + if (get_metadata(txn, zone, metadata)) { + if (txn->cur_val.mv_size == result_size) { + memcpy(result, txn->cur_val.mv_data, result_size); + fix_endian(result, result_size, true); + return true; + } else { + txn->ret = KNOT_EMALF; + } + } + return false; +} + +bool get_metadata32(knot_lmdb_txn_t *txn, const knot_dname_t *zone, + const char *metadata, uint32_t *result) +{ + return get_metadata_numeric(txn, zone, metadata, result, sizeof(*result)); +} + +bool get_metadata64(knot_lmdb_txn_t *txn, const knot_dname_t *zone, + const char *metadata, uint64_t *result) +{ + return get_metadata_numeric(txn, zone, metadata, result, sizeof(*result)); +} + +void set_metadata(knot_lmdb_txn_t *txn, const knot_dname_t *zone, const char *metadata, + const void *valp, size_t val_size, bool numeric) +{ + MDB_val key = metadata_key(zone, metadata); + MDB_val val = { val_size, NULL }; + if (knot_lmdb_insert(txn, &key, &val)) { + memcpy(val.mv_data, valp, val_size); + if (numeric) { + fix_endian(val.mv_data, val_size, false); + } + } + free(key.mv_data); +} + +void update_last_inserter(knot_lmdb_txn_t *txn, const knot_dname_t *new_inserter) +{ + uint64_t occupied_now = knot_lmdb_usage(txn), occupied_last = 0, lis_occupied = 0; + (void)get_metadata64(txn, NULL, "last_total_occupied", &occupied_last); + knot_dname_t *last_inserter = get_metadata(txn, NULL, "last_inserter_zone") ? + knot_dname_copy(txn->cur_val.mv_data, NULL) : NULL; + if (occupied_now == occupied_last || last_inserter == NULL) { + goto update_inserter; + } + (void)get_metadata64(txn, last_inserter, "occupied", &lis_occupied); + if (lis_occupied + occupied_now > occupied_last) { + lis_occupied += occupied_now; + lis_occupied -= occupied_last; + } else { + lis_occupied = 0; + } + set_metadata(txn, last_inserter, "occupied", &lis_occupied, sizeof(lis_occupied), true); + +update_inserter: + if (new_inserter == NULL) { + del_metadata(txn, NULL, "last_inserter_zone"); + } else if (last_inserter == NULL || !knot_dname_is_equal(last_inserter, new_inserter)) { + set_metadata(txn, NULL, "last_inserter_zone", new_inserter, knot_dname_size(new_inserter), false); + } + free(last_inserter); + set_metadata(txn, NULL, "last_total_occupied", &occupied_now, sizeof(occupied_now), true); +} + +uint64_t journal_get_occupied(knot_lmdb_txn_t *txn, const knot_dname_t *zone) +{ + uint64_t res = 0; + get_metadata64(txn, zone, "occupied", &res); + return res; +} + +static int first_digit(char * of) +{ + unsigned maj, min; + return sscanf(of, "%u.%u", &maj, &min) == 2 ? maj : -1; +} + +void journal_load_metadata(knot_lmdb_txn_t *txn, const knot_dname_t *zone, journal_metadata_t *md) +{ + memset(md, 0, sizeof(*md)); + if (get_metadata(txn, NULL, "version")) { + switch (first_digit(txn->cur_val.mv_data)) { + case 3: + // TODO warning about downgrade + // FALLTHROUGH + case 1: + // still supported + // FALLTHROUGH + case 2: + // normal operation + break; + case 0: + // failed to read version + txn->ret = KNOT_ENOENT; + return; + default: + txn->ret = KNOT_ENOTSUP; + return; + } + } + md->_new_zone = !get_metadata32(txn, zone, "flags", &md->flags); + (void)get_metadata32(txn, zone, "first_serial", &md->first_serial); + (void)get_metadata32(txn, zone, "last_serial_to", &md->serial_to); + (void)get_metadata32(txn, zone, "merged_serial", &md->merged_serial); + (void)get_metadata32(txn, zone, "changeset_count", &md->changeset_count); + if (!get_metadata32(txn, zone, "flushed_upto", &md->flushed_upto)) { + // importing from version 1.0 + if ((md->flags & JOURNAL_LAST_FLUSHED_VALID)) { + uint32_t last_flushed = 0; + if (!get_metadata32(txn, zone, "last_flushed", &last_flushed) || + !journal_serial_to(txn, false, last_flushed, zone, &md->flushed_upto)) { + txn->ret = KNOT_EMALF; + } else { + md->flags &= ~JOURNAL_LAST_FLUSHED_VALID; + } + } else { + md->flushed_upto = md->first_serial; + } + } + +} + +void journal_store_metadata(knot_lmdb_txn_t *txn, const knot_dname_t *zone, const journal_metadata_t *md) +{ + set_metadata(txn, zone, "first_serial", &md->first_serial, sizeof(md->first_serial), true); + set_metadata(txn, zone, "last_serial_to", &md->serial_to, sizeof(md->serial_to), true); + set_metadata(txn, zone, "flushed_upto", &md->flushed_upto, sizeof(md->flushed_upto), true); + set_metadata(txn, zone, "merged_serial", &md->merged_serial, sizeof(md->merged_serial), true); + set_metadata(txn, zone, "changeset_count", &md->changeset_count, sizeof(md->changeset_count), true); + set_metadata(txn, zone, "flags", &md->flags, sizeof(md->flags), true); + set_metadata(txn, NULL, "version", "2.0", 4, false); + if (md->_new_zone) { + uint64_t journal_count = 0; + (void)get_metadata64(txn, NULL, "journal_count", &journal_count); + ++journal_count; + set_metadata(txn, NULL, "journal_count", &journal_count, sizeof(journal_count), true); + } +} + +void journal_metadata_after_delete(journal_metadata_t *md, uint32_t deleted_upto, + size_t deleted_count) +{ + if (deleted_count == 0) { + return; + } + assert((md->flags & JOURNAL_SERIAL_TO_VALID)); + if (deleted_upto == md->serial_to) { + assert(md->flushed_upto == md->serial_to); + assert(md->changeset_count == deleted_count); + md->flags &= ~JOURNAL_SERIAL_TO_VALID; + } + md->first_serial = deleted_upto; + md->changeset_count -= deleted_count; +} + +void journal_metadata_after_merge(journal_metadata_t *md, bool merged_zij, uint32_t merged_serial, + uint32_t merged_serial_to, uint32_t original_serial_to) +{ + md->flushed_upto = merged_serial_to; + if ((md->flags & JOURNAL_MERGED_SERIAL_VALID)) { + assert(!merged_zij); + assert(merged_serial == md->merged_serial); + } else if (!merged_zij) { + md->merged_serial = merged_serial; + md->flags |= JOURNAL_MERGED_SERIAL_VALID; + assert(merged_serial == md->first_serial); + journal_metadata_after_delete(md, original_serial_to, 1); // the merged changeset writes itself instead of first one + } +} + +void journal_metadata_after_insert(journal_metadata_t *md, uint32_t serial, uint32_t serial_to) +{ + if (md->first_serial == md->serial_to) { // no changesets yet + md->first_serial = serial; + md->flushed_upto = serial; + } + md->serial_to = serial_to; + md->flags |= JOURNAL_SERIAL_TO_VALID; + md->changeset_count++; +} + +int journal_scrape_with_md(zone_journal_t j) +{ + if (!journal_is_existing(j)) { + return KNOT_EOK; + } + knot_lmdb_txn_t txn = { 0 }; + knot_lmdb_begin(j.db, &txn, true); + + update_last_inserter(&txn, NULL); + MDB_val prefix = { knot_dname_size(j.zone), (void *)j.zone }; + knot_lmdb_del_prefix(&txn, &prefix); + + knot_lmdb_commit(&txn); + return txn.ret; +} + +int journal_set_flushed(zone_journal_t j) +{ + knot_lmdb_txn_t txn = { 0 }; + journal_metadata_t md = { 0 }; + knot_lmdb_begin(j.db, &txn, true); + journal_load_metadata(&txn, j.zone, &md); + + md.flushed_upto = md.serial_to; + + journal_store_metadata(&txn, j.zone, &md); + knot_lmdb_commit(&txn); + return txn.ret; +} + +int journal_info(zone_journal_t j, bool *exists, uint32_t *first_serial, + uint32_t *serial_to, bool *has_merged, uint32_t *merged_serial, + uint64_t *occupied, uint64_t *occupied_total) +{ + if (!knot_lmdb_exists(j.db)) { + *exists = false; + return KNOT_EOK; + } + int ret = knot_lmdb_open(j.db); + if (ret != KNOT_EOK) { + return ret; + } + knot_lmdb_txn_t txn = { 0 }; + journal_metadata_t md = { 0 }; + knot_lmdb_begin(j.db, &txn, false); + journal_load_metadata(&txn, j.zone, &md); + *exists = (md.flags & JOURNAL_SERIAL_TO_VALID); + if (first_serial != NULL) { + *first_serial = md.first_serial; + } + if (serial_to != NULL) { + *serial_to = md.serial_to; + } + if (has_merged != NULL) { + *has_merged = (md.flags & JOURNAL_MERGED_SERIAL_VALID); + } + if (merged_serial != NULL) { + *merged_serial = md.merged_serial; + } + if (occupied != NULL) { + get_metadata64(&txn, j.zone, "occupied", occupied); + } + if (occupied_total != NULL) { + *occupied_total = knot_lmdb_usage(&txn); + } + knot_lmdb_abort(&txn); + return txn.ret; +} + +int journals_walk(knot_lmdb_db_t *db, journals_walk_cb_t cb, void *ctx) +{ + if (!knot_lmdb_exists(db)) { + return KNOT_EOK; + } + int ret = knot_lmdb_open(db); + if (ret != KNOT_EOK) { + return ret; + } + knot_lmdb_txn_t txn = { 0 }; + knot_lmdb_begin(db, &txn, false); + uint8_t search_data[KNOT_DNAME_MAXLEN] = { 0 }; + MDB_val search = { 1, search_data }; + while (knot_lmdb_find(&txn, &search, KNOT_LMDB_GEQ)) { + knot_dname_t *found = txn.cur_key.mv_data; + uint32_t unused_flags; + if (get_metadata32(&txn, found, "flags", &unused_flags)) { + // matched journal DB key appears to be a zone name + txn.ret = cb(found, ctx); + } + + // update searched key to next after found zone + search.mv_size = knot_dname_size(found); + memcpy(search.mv_data, found, search.mv_size); + ((uint8_t *)search.mv_data)[search.mv_size - 1]++; + } + knot_lmdb_abort(&txn); + return txn.ret; +} diff --git a/src/knot/journal/journal_metadata.h b/src/knot/journal/journal_metadata.h new file mode 100644 index 0000000000..705289dde4 --- /dev/null +++ b/src/knot/journal/journal_metadata.h @@ -0,0 +1,157 @@ +/* Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "knot/journal/journal_basic.h" + +typedef struct { + uint32_t first_serial; + uint32_t serial_to; + uint32_t flushed_upto; + uint32_t merged_serial; + uint32_t changeset_count; + uint32_t flags; // a bitmap of flags, see enum below + bool _new_zone; // private: if there were no metadata at all previously +} journal_metadata_t; + +enum journal_metadata_flags { + JOURNAL_LAST_FLUSHED_VALID = (1 << 0), // deprecated + JOURNAL_SERIAL_TO_VALID = (1 << 1), + JOURNAL_MERGED_SERIAL_VALID = (1 << 2), +}; + +typedef int (*journals_walk_cb_t)(const knot_dname_t *zone, void *ctx); + +/*! + * \brief Update the computation of DB resources used by each zone. + * + * Because the amount of used space is bigger than sum of changesets' serialized_sizes, + * journal uses a complicated way to compute each zone's used space: there is a metadata + * showing always the previously-inserting zone. Before the next insert, it is computed + * how the total usage of the DB changed during the previous insert (or delete), and the + * usage increase (or decrease) is accounted on the bill of the previous inserter. + * + * \param txn Journal DB transaction. + * \param new_inserter Name of the zone that is going to insert now. Might be NULL if no insert nor delete will be done. + */ +void update_last_inserter(knot_lmdb_txn_t *txn, const knot_dname_t *new_inserter); + +/* \brief Return the journal database usage by given zone. */ +uint64_t journal_get_occupied(knot_lmdb_txn_t *txn, const knot_dname_t *zone); + +/*! + * \brief Load the metadata from DB into structure. + * + * \param txn Journal DB transaction. + * \param zone Zone name. + * \param md Output: metadata structure. + */ +void journal_load_metadata(knot_lmdb_txn_t *txn, const knot_dname_t *zone, journal_metadata_t *md); + +/*! + * \brief Store the metadata from structure into DB. + * + * \param txn Journal DB transaction. + * \param zone Zone name. + * \param md Metadata structure. + */ +void journal_store_metadata(knot_lmdb_txn_t *txn, const knot_dname_t *zone, const journal_metadata_t *md); + +/*! + * \brief Update metadata according to what was deleted. + * + * \param md Metadata structure to be updated. + * \param deleted_upto Serial-to of the last deleted changeset. + * \param deleted_count Number of deleted changesets. + */ +void journal_metadata_after_delete(journal_metadata_t *md, uint32_t deleted_upto, + size_t deleted_count); + +/*! + * \brief Update metadata according to what was merged. + * + * \param md Metadata structure to be updated. + * \param merged_zij True if it was a merge into zone-in-journal. + * \param merged_serial Serial-from of the merged changeset (ignored if 'merged_zij'). + * \param merged_serial_to Serial-to of the merged changeset. + * \param original_serial_to Previous serial-to of the merged changeset before the merge. + */ +void journal_metadata_after_merge(journal_metadata_t *md, bool merged_zij, uint32_t merged_serial, + uint32_t merged_serial_to, uint32_t original_serial_to); + +/*! + * \brief Update metadata according to what was inserted. + * + * \param md Metadata structure to be updated. + * \param serial Serial-from of the inserted changeset. + * \param serial_to Serial-to of the inserted changeset. + */ +void journal_metadata_after_insert(journal_metadata_t *md, uint32_t serial, uint32_t serial_to); + +/*! + * \brief Completely delete all journal records belonging to this zone, including metadata. + * + * \param j Journal to be scraped. + * + * \return KNOT_E* + */ +int journal_scrape_with_md(zone_journal_t j); + +/*! + * \brief Update the metadata stored in journal DB after a zone flush. + * + * \param j Journal to be notified about flush. + * + * \return KNOT_E* + */ +int journal_set_flushed(zone_journal_t j); + +/*! + * \brief Obtain information about the zone's journal from the DB (mostly metadata). + * + * \param j Zone journal. + * \param exists Output: bool if the zone exists in the journal. + * \param first_serial Optional output: serial-from of the first changeset in journal. + * \param serial_to Optional output: serial.to of the last changeset in journal. + * \param has_merged Optional output: bool if there is a special (non zone-in-journal) merged changeset. + * \param merged_serial Optional output: serial-from of the merged changeset. + * \param occupied Optional output: DB space occupied by this zones. + * \param occupied_total Optional output: DB space occupied in total by all zones. + * + * \return KNOT_E* + */ +int journal_info(zone_journal_t j, bool *exists, uint32_t *first_serial, + uint32_t *serial_to, bool *has_merged, uint32_t *merged_serial, + uint64_t *occupied, uint64_t *occupied_total); + +/*! \brief Return true if this zone exists in journal DB. */ +inline static bool journal_is_existing(zone_journal_t j) { + bool ex = false; + journal_info(j, &ex, NULL, NULL, NULL, NULL, NULL, NULL); + return ex; +} + +/*! + * \brief Call a function for each zone being in the journal DB. + * + * \param db Journal database. + * \param cb Callback to be called for each zone-name found. + * \param ctx Arbitrary context to be passed to the callback. + * + * \return An error code from either journal operations or from the callback. + */ +int journals_walk(knot_lmdb_db_t *db, journals_walk_cb_t cb, void *ctx); diff --git a/src/knot/journal/journal_read.c b/src/knot/journal/journal_read.c new file mode 100644 index 0000000000..eecd45cb09 --- /dev/null +++ b/src/knot/journal/journal_read.c @@ -0,0 +1,403 @@ +/* Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "knot/journal/journal_read.h" + +#include "knot/journal/journal_metadata.h" +#include "knot/journal/knot_lmdb.h" + +#include "contrib/macros.h" +#include "contrib/ucw/lists.h" +#include "contrib/wire_ctx.h" +#include "libknot/error.h" + +#include <stdlib.h> + +struct journal_read { + knot_lmdb_txn_t txn; + MDB_val key_prefix; + const knot_dname_t *zone; + wire_ctx_t wire; + uint32_t next; +}; + +int journal_read_get_error(const journal_read_t *ctx, int another_error) +{ + return (ctx == NULL || ctx->txn.ret == KNOT_EOK ? another_error : ctx->txn.ret); +} + +static void update_ctx_wire(journal_read_t *ctx) +{ + ctx->wire = wire_ctx_init_const(ctx->txn.cur_val.mv_data, ctx->txn.cur_val.mv_size); + wire_ctx_skip(&ctx->wire, JOURNAL_HEADER_SIZE); +} + +static bool go_next_changeset(journal_read_t *ctx, bool go_zone, const knot_dname_t *zone) +{ + free(ctx->key_prefix.mv_data); + ctx->key_prefix = journal_changeset_id_to_key(go_zone, ctx->next, zone); + if (!knot_lmdb_find_prefix(&ctx->txn, &ctx->key_prefix)) { + return false; + } + ctx->next = journal_next_serial(&ctx->txn.cur_val); + update_ctx_wire(ctx); + return true; +} + +int journal_read_begin(zone_journal_t j, bool read_zone, uint32_t serial_from, journal_read_t **ctx) +{ + *ctx = NULL; + if (!journal_is_existing(j)) { // this also opens the LMDB if not already + return KNOT_ENOENT; + } + + journal_read_t *newctx = calloc(1, sizeof(*newctx)); + if (newctx == NULL) { + return KNOT_ENOMEM; + } + + newctx->zone = j.zone; + newctx->next = serial_from; + + knot_lmdb_begin(j.db, &newctx->txn, false); + + if (go_next_changeset(newctx, read_zone, j.zone)) { + *ctx = newctx; + return KNOT_EOK; + } else { + journal_read_end(newctx); + return KNOT_ENOENT; + } +} + +void journal_read_end(journal_read_t *ctx) +{ + if (ctx != NULL) { + free(ctx->key_prefix.mv_data); + knot_lmdb_abort(&ctx->txn); + free(ctx); + } +} + +static bool make_data_available(journal_read_t *ctx) +{ + if (wire_ctx_available(&ctx->wire) == 0) { + if (!knot_lmdb_next(&ctx->txn)) { + return false; + } + if (!knot_lmdb_is_prefix_of(&ctx->key_prefix, &ctx->txn.cur_key)) { + return false; + } + update_ctx_wire(ctx); + } + return true; +} + +// thoughts for next design of journal serialization: +// - one TTL per rrset +// - endian +// - optionally storing whole rdataset at once? + +bool journal_read_rrset(journal_read_t *ctx, knot_rrset_t *rrset, bool allow_next_changeset) +{ + //knot_rdataset_clear(&rrset->rrs, NULL); + //memset(rrset, 0, sizeof(*rrset)); + if (!make_data_available(ctx)) { + if (!allow_next_changeset || !go_next_changeset(ctx, false, ctx->zone)) { + return false; + } + } + rrset->owner = knot_dname_copy(ctx->wire.position, NULL); + wire_ctx_skip(&ctx->wire, knot_dname_size(rrset->owner)); + rrset->type = wire_ctx_read_u16(&ctx->wire); + rrset->rclass = wire_ctx_read_u16(&ctx->wire); + uint16_t rrs_count = wire_ctx_read_u16(&ctx->wire); + for (int i = 0; i < rrs_count && ctx->wire.error == KNOT_EOK; i++) { + if (!make_data_available(ctx)) { + ctx->wire.error = KNOT_EFEWDATA; + } + // TODO think of how to export serialized rr directly to knot_rdataset_add + // focus on: even address aligning + uint32_t ttl = wire_ctx_read_u32(&ctx->wire); + if (i == 0) { + rrset->ttl = ttl; + } + uint16_t len = wire_ctx_read_u16(&ctx->wire); + if (ctx->wire.error == KNOT_EOK) { + ctx->wire.error = knot_rrset_add_rdata(rrset, ctx->wire.position, len, NULL); + } + wire_ctx_skip(&ctx->wire, len); + } + if (ctx->txn.ret == KNOT_EOK) { + ctx->txn.ret = ctx->wire.error == KNOT_ERANGE ? KNOT_EMALF : ctx->wire.error; + } + if (ctx->txn.ret == KNOT_EOK) { + return true; + } else { + journal_read_clear_rrset(rrset); + return false; + } +} + +void journal_read_clear_rrset(knot_rrset_t *rr) +{ + knot_rrset_clear(rr, NULL); +} + +int journal_read_rrsets(journal_read_t *read, journal_read_cb_t cb, void *ctx) +{ + knot_rrset_t rr = { 0 }; + bool in_remove_section = false; + int ret = KNOT_EOK; + while (ret == KNOT_EOK && journal_read_rrset(read, &rr, true)) { + if (rr_is_apex_soa(&rr, read->zone)) { + in_remove_section = !in_remove_section; + } + ret = cb(in_remove_section, &rr, ctx); + journal_read_clear_rrset(&rr); + } + ret = journal_read_get_error(read, ret); + journal_read_end(read); + return ret; +} + +static int add_rr_to_contents(zone_contents_t *z, const knot_rrset_t *rrset) +{ + zone_node_t *n = NULL; + return zone_contents_add_rr(z, rrset, &n); + // Shall we ignore ETTL ? +} + +bool journal_read_changeset(journal_read_t *ctx, changeset_t *ch) +{ + zone_contents_t *tree = zone_contents_new(ctx->zone); + knot_rrset_t *soa = calloc(1, sizeof(*soa)), rr = { 0 }; + if (tree == NULL || soa == NULL) { + ctx->txn.ret = KNOT_ENOMEM; + goto fail; + } + memset(ch, 0, sizeof(*ch)); + + if (!journal_read_rrset(ctx, soa, true)) { + goto fail; + } + while (journal_read_rrset(ctx, &rr, false)) { + if (rr_is_apex_soa(&rr, ctx->zone)) { + ch->soa_from = soa; + ch->remove = tree; + soa = malloc(sizeof(*soa)); + tree = zone_contents_new(ctx->zone); + if (tree == NULL || soa == NULL) { + ctx->txn.ret = KNOT_ENOMEM; + goto fail; + } + *soa = rr; // note this tricky assignment + memset(&rr, 0, sizeof(rr)); + } else { + ctx->txn.ret = add_rr_to_contents(tree, &rr); + journal_read_clear_rrset(&rr); + } + } + + if (ctx->txn.ret == KNOT_EOK) { + ch->soa_to = soa; + ch->add = tree; + return true; + } else { +fail: + journal_read_clear_rrset(&rr); + journal_read_clear_rrset(soa); + free(soa); + changeset_clear(ch); + zone_contents_deep_free(tree); + return false; + } +} + +void journal_read_clear_changeset(changeset_t *ch) +{ + changeset_clear(ch); + memset(ch, 0, sizeof(*ch)); +} + +static int just_load_md(zone_journal_t j, journal_metadata_t *md, bool *has_zij) +{ + knot_lmdb_txn_t txn = { 0 }; + knot_lmdb_begin(j.db, &txn, false); + journal_load_metadata(&txn, j.zone, md); + if (has_zij != NULL) { + *has_zij = journal_contains(&txn, true, 0, j.zone); + } + knot_lmdb_abort(&txn); + return txn.ret; +} + +// beware, this function does not operate in single txn! +int journal_walk(zone_journal_t j, journal_walk_cb_t cb, void *ctx) +{ + int ret; + if (!knot_lmdb_exists(j.db)) { + ret = cb(true, NULL, ctx); + if (ret == KNOT_EOK) { + ret = cb(false, NULL, ctx); + } + return ret; + } + ret = knot_lmdb_open(j.db); + if (ret != KNOT_EOK) { + return ret; + } + journal_metadata_t md = { 0 }; + journal_read_t *read = NULL; + changeset_t ch; + bool at_least_one = false, zone_in_j = false; + ret = just_load_md(j, &md, &zone_in_j); + if (ret != KNOT_EOK) { + return ret; + } + if (zone_in_j) { + ret = journal_read_begin(j, true, 0, &read); + goto read_one_special; + } else if ((md.flags & JOURNAL_MERGED_SERIAL_VALID)) { + ret = journal_read_begin(j, false, md.merged_serial, &read); +read_one_special: + if (ret == KNOT_EOK && journal_read_changeset(read, &ch)) { + ret = cb(true, &ch, ctx); + journal_read_clear_changeset(&ch); + } + ret = journal_read_get_error(read, ret); + journal_read_end(read); + read = NULL; + } else { + ret = cb(true, NULL, ctx); + } + + if ((md.flags & JOURNAL_SERIAL_TO_VALID) && md.first_serial != md.serial_to && + ret == KNOT_EOK) { + ret = journal_read_begin(j, false, md.first_serial, &read); + while (ret == KNOT_EOK && journal_read_changeset(read, &ch)) { + ret = cb(false, &ch, ctx); + at_least_one = true; + journal_read_clear_changeset(&ch); + } + ret = journal_read_get_error(read, ret); + journal_read_end(read); + } + if (!at_least_one && ret == KNOT_EOK) { + ret = cb(false, NULL, ctx); + } + return ret; +} + +typedef struct { + size_t observed_count; + size_t observed_merged; + uint32_t merged_serial; + size_t observed_zij; + uint32_t first_serial; + bool first_serial_valid; + uint32_t last_serial; + bool last_serial_valid; +} check_ctx_t; + +static int check_cb(bool special, const changeset_t *ch, void *vctx) +{ + check_ctx_t *ctx = vctx; + if (special && ch != NULL) { + if (ch->remove == NULL) { + ctx->observed_zij++; + ctx->last_serial = changeset_to(ch); + ctx->last_serial_valid = true; + } else { + ctx->merged_serial = changeset_from(ch); + ctx->observed_merged++; + } + } else if (ch != NULL) { + if (!ctx->first_serial_valid) { + ctx->first_serial = changeset_from(ch); + ctx->first_serial_valid = true; + } + ctx->last_serial = changeset_to(ch); + ctx->last_serial_valid = true; + ctx->observed_count++; + } + return KNOT_EOK; +} + +static bool eq(bool a, bool b) +{ + return a ? b : !b; +} + +int journal_sem_check(zone_journal_t j) +{ + check_ctx_t ctx = { 0 }; + journal_metadata_t md = { 0 }; + bool has_zij = false; + + if (!journal_is_existing(j)) { + return KNOT_EOK; + } + + int ret = just_load_md(j, &md, &has_zij); + if (ret == KNOT_EOK) { + ret = journal_walk(j, check_cb, &ctx); + } + if (ret != KNOT_EOK) { + return ret; + } + + if (!eq((md.flags & JOURNAL_SERIAL_TO_VALID), ctx.last_serial_valid)) { + return 101; + } + if (ctx.last_serial_valid && ctx.last_serial != md.serial_to) { + return 102; + } + if (!eq((md.flags & JOURNAL_MERGED_SERIAL_VALID), (ctx.observed_merged > 0))) { + return 103; + } + if (ctx.observed_merged > 1) { + return 104; + } + if (ctx.observed_merged == 1 && ctx.merged_serial != md.merged_serial) { + return 105; + } + if (!eq(has_zij, (ctx.observed_zij > 0))) { + return 106; + } + if (ctx.observed_zij > 1) { + return 107; + } + if (ctx.observed_zij + ctx.observed_merged > 1) { + return 108; + } + if (!eq(((md.flags & JOURNAL_SERIAL_TO_VALID) && md.first_serial != md.serial_to), ctx.first_serial_valid)) { + return 109; + } + if (!eq(ctx.first_serial_valid, (ctx.observed_count > 0))) { + return 110; + } + if (ctx.first_serial_valid && ctx.first_serial != md.first_serial) { + return 111; + } + if (ctx.observed_count != md.changeset_count) { + return 112; + } + if (ctx.observed_merged > 0 && ctx.observed_count == 0) { + return 113; + } + return KNOT_EOK; +} diff --git a/src/knot/journal/journal_read.h b/src/knot/journal/journal_read.h new file mode 100644 index 0000000000..5659ca0b5a --- /dev/null +++ b/src/knot/journal/journal_read.h @@ -0,0 +1,138 @@ +/* Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "knot/journal/journal_basic.h" + +typedef struct journal_read journal_read_t; + +typedef int (*journal_read_cb_t)(bool in_remove_section, const knot_rrset_t *rr, void *ctx); + +typedef int (*journal_walk_cb_t)(bool special, const changeset_t *ch, void *ctx); + +/*! + * \brief Start reading journal from specified changeset. + * + * \param j Journal to be read. + * \param read_zone True if reading shall start with zone-in-journal. + * \param serial_from Serial-from of the changeset to be started at (ignored if 'read_zone'). + * \param ctx Output: journal reading context initialised. + * + * \return KNOT_E* + */ +int journal_read_begin(zone_journal_t j, bool read_zone, uint32_t serial_from, journal_read_t **ctx); + +/*! + * \brief Read a single RRSet from a journal changeset. + * + * \param ctx Journal reading context. + * \param rr Output: RRSet to be filled with serialized data. + * \param allow_next_changeset True to allow jumping to next changeset. + * + * \return False if no more RRSet in this changeset/journal, or failure. + */ +bool journal_read_rrset(journal_read_t *ctx, knot_rrset_t *rr, bool allow_next_changeset); + +/*! + * \brief Free up heap allocations by journal_read_rrset(). + * + * \param rr RRSet initialised by journal_read_rrset(). + */ +void journal_read_clear_rrset(knot_rrset_t *rr); + +// TODO move somewhere. Libknot? +inline static bool rr_is_apex_soa(const knot_rrset_t *rr, const knot_dname_t *apex) +{ + return (rr->type == KNOT_RRTYPE_SOA && knot_dname_is_equal(rr->owner, apex)); +} + +/*! + * \brief Read all RRSets up to the end of journal, calling a function for each. + * + * \note Closes reading context at the end. + * + * \param read Journal reading context. + * \param cb Callback to be called on each read. + * \param ctx Arbitrary context to be passed to the callback. + * + * \return An error code from either journal operations or from the callback. + */ +int journal_read_rrsets(journal_read_t *read, journal_read_cb_t cb, void *ctx); + +/*! + * \brief Read a single changeset from journal. + * + * \param ctx Journal reading context. + * \param ch Output: changeset to be filled with serialized data. + * + * \return False if no more changesets in the journal, or failure. + */ +bool journal_read_changeset(journal_read_t *ctx, changeset_t *ch); + +/*! + * \brief Free up heap allocations by journal_read_changeset(). + * + * \param ch Changeset initialised by journal_read_changeset(). + */ +void journal_read_clear_changeset(changeset_t *ch); + +/*! + * \brief Obtain error code from the journal_read operations previously performed. + * + * \param ctx Journal reading context. + * \param another_error An error code from outside the reading operations to be combined. + * + * \return KNOT_EOK if completely every operation succeeded, KNOT_E* + */ +int journal_read_get_error(const journal_read_t *ctx, int another_error); + +/*! + * \brief Finalise journal reading. + * + * \param ctx Journal reading context (will be freed). + */ +void journal_read_end(journal_read_t *ctx); + +/*! + * \brief Call a function for each changeset stored in journal. + * + * First, the callback will be called for the special changeset - + * either zone-in-journal or merged changeset, with special=true. + * If there is no such, it will be called anyway with ch=NULL. + * + * Than, the callback will be called for each regular changeset + * with special=false. If there is none, it will be called once + * with ch=NULL. + * + * \param j Zone journal to be read. + * \param cb Callback to be called for each changeset (or its non-existence). + * \param ctx Arbitrary context to be passed to the callback. + * + * \return An error code from either journal operations or from the callback. + */ +int journal_walk(zone_journal_t j, journal_walk_cb_t cb, void *ctx); + +/*! + * \brief Perform semantic check of the zone journal (consistency, metadata...). + * + * \param j Zone journal to be checked. + * + * \retval KNOT_E* ( < 0 ) if an error during journal operation. + * \retval > 100 if some inconsistency found. + * \return KNOT_EOK of all ok. + */ +int journal_sem_check(zone_journal_t j); diff --git a/src/knot/journal/journal_write.c b/src/knot/journal/journal_write.c new file mode 100644 index 0000000000..5aa95068cb --- /dev/null +++ b/src/knot/journal/journal_write.c @@ -0,0 +1,239 @@ +/* Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "knot/journal/journal_write.h" + +#include "contrib/macros.h" +#include "knot/journal/journal_metadata.h" +#include "knot/journal/journal_read.h" +#include "knot/journal/serialization.h" +#include "libknot/error.h" + +void journal_write_changeset(knot_lmdb_txn_t *txn, const changeset_t *ch) +{ + MDB_val chunk; + serialize_ctx_t *ser = serialize_init(ch); + if (ser == NULL) { + txn->ret = KNOT_ENOMEM; + } + uint32_t i = 0; + while (serialize_unfinished(ser) && txn->ret == KNOT_EOK) { + serialize_prepare(ser, JOURNAL_CHUNK_MAX - JOURNAL_HEADER_SIZE, &chunk.mv_size); + if (chunk.mv_size == 0) { + break; // beware! If this is ommited, it creates empty chunk => EMALF when reading. + } + chunk.mv_size += JOURNAL_HEADER_SIZE; + chunk.mv_data = NULL; + MDB_val key = journal_changeset_to_chunk_key(ch, i); + if (knot_lmdb_insert(txn, &key, &chunk)) { + journal_make_header(chunk.mv_data, ch); + serialize_chunk(ser, chunk.mv_data + JOURNAL_HEADER_SIZE, chunk.mv_size - JOURNAL_HEADER_SIZE); + } + free(key.mv_data); + i++; + } + serialize_deinit(ser); + // return value is in the txn +} + +static int merge_cb(bool remove, const knot_rrset_t *rr, void *ctx) +{ + changeset_t *ch = ctx; + return remove ? (rr_is_apex_soa(rr, ch->soa_to->owner) ? + KNOT_EOK : changeset_add_removal(ch, rr, CHANGESET_CHECK)) + : changeset_add_addition(ch, rr, CHANGESET_CHECK); +} + +void journal_merge(zone_journal_t j, knot_lmdb_txn_t *txn, bool merge_zij, + uint32_t merge_serial, uint32_t *original_serial_to) +{ + changeset_t merge; + memset(&merge, 0, sizeof(merge)); + journal_read_t *read = NULL; + txn->ret = journal_read_begin(j, merge_zij, merge_serial, &read); + if (txn->ret != KNOT_EOK) { + return; + } + if (journal_read_changeset(read, &merge)) { + *original_serial_to = changeset_to(&merge); + } + txn->ret = journal_read_rrsets(read, merge_cb, &merge); + journal_write_changeset(txn, &merge); + //knot_rrset_clear(&rr, NULL); + journal_read_clear_changeset(&merge); +} + +static bool delete_one(knot_lmdb_txn_t *txn, bool del_zij, uint32_t del_serial, + const knot_dname_t *zone, size_t *freed, uint32_t *next_serial) +{ + *freed = 0; + MDB_val prefix = journal_changeset_id_to_key(del_zij, del_serial, zone); + knot_lmdb_foreach(txn, &prefix) { + *freed += txn->cur_val.mv_size; + *next_serial = journal_next_serial(&txn->cur_val); + knot_lmdb_del_cur(txn); + } + free(prefix.mv_data); + return (*freed > 0); +} + +bool journal_delete(knot_lmdb_txn_t *txn, uint32_t from, const knot_dname_t *zone, + size_t tofree_size, size_t tofree_count, uint32_t stop_at_serial, + size_t *freed_size, size_t *freed_count, uint32_t *stopped_at) +{ + *freed_size = 0; + *freed_count = 0; + size_t freed_now; + while (from != stop_at_serial && + (*freed_size < tofree_size || *freed_count < tofree_count) && + delete_one(txn, false, from, zone, &freed_now, stopped_at)) { + *freed_size += freed_now; + ++(*freed_count); + from = *stopped_at; + } + return (*freed_count > 0); +} + +void journal_try_flush(zone_journal_t j, knot_lmdb_txn_t *txn, journal_metadata_t *md) +{ + bool flush = journal_allow_flush(j); + uint32_t merge_orig; + if (journal_contains(txn, true, 0, j.zone)) { + journal_merge(j, txn, true, 0, &merge_orig); + if (!flush) { + journal_metadata_after_merge(md, true, 0, md->serial_to, merge_orig); + } + } else if (!flush) { + uint32_t merge_serial = ((md->flags & JOURNAL_MERGED_SERIAL_VALID) ? md->merged_serial : md->first_serial); + journal_merge(j, txn, false, merge_serial, &merge_orig); + journal_metadata_after_merge(md, false, merge_serial, md->serial_to, merge_orig); + } + + if (flush) { + // delete merged serial if (very unlikely) exists + if ((md->flags & JOURNAL_MERGED_SERIAL_VALID)) { + size_t unused; + (void)delete_one(txn, false, md->merged_serial, j.zone, &unused, (uint32_t *)&unused); + md->flags &= ~JOURNAL_MERGED_SERIAL_VALID; + } + + // commit partial job and ask zone to flush itself + journal_store_metadata(txn, j.zone, md); + knot_lmdb_commit(txn); + if (txn->ret == KNOT_EOK) { + txn->ret = KNOT_EBUSY; + } + } +} + +void journal_fix_occupation(zone_journal_t j, knot_lmdb_txn_t *txn, journal_metadata_t *md, + int64_t max_usage, ssize_t max_count) +{ + uint64_t occupied = journal_get_occupied(txn, j.zone), freed; + int64_t need_tofree = (int64_t)occupied - max_usage; + size_t count = md->changeset_count, removed; + ssize_t need_todel = (ssize_t)count - max_count; + + while ((need_tofree > 0 || need_todel > 0) && txn->ret == KNOT_EOK) { + uint32_t del_from = md->first_serial; // don't move this line outside of the loop + freed = 0; + removed = 0; + journal_delete(txn, del_from, j.zone, MAX(need_tofree, 0), MAX(need_todel, 0), + md->flushed_upto, &freed, &removed, &del_from); + if (freed == 0) { + if (md->flushed_upto != md->serial_to) { + journal_try_flush(j, txn, md); + } else { + break; + } + } else { + journal_metadata_after_delete(md, del_from, removed); + need_tofree -= freed; + need_todel -= removed; + } + } +} + +int journal_insert_zone(zone_journal_t j, const changeset_t *ch) +{ + if (ch->remove != NULL) { + return KNOT_EINVAL; + } + int ret = knot_lmdb_open(j.db); + if (ret != KNOT_EOK) { + return ret; + } + knot_lmdb_txn_t txn = { 0 }; + knot_lmdb_begin(j.db, &txn, true); + + update_last_inserter(&txn, j.zone); + MDB_val prefix = { knot_dname_size(j.zone), (void *)j.zone }; + knot_lmdb_del_prefix(&txn, &prefix); + + journal_write_changeset(&txn, ch); + + journal_metadata_t md = { 0 }; + md.flags = JOURNAL_SERIAL_TO_VALID; + md.serial_to = changeset_to(ch); + md.first_serial = md.serial_to; + journal_store_metadata(&txn, j.zone, &md); + + knot_lmdb_commit(&txn); + return txn.ret; +} + +int journal_insert(zone_journal_t j, const changeset_t *ch) +{ + size_t ch_size = changeset_serialized_size(ch); + size_t max_usage = journal_conf_max_usage(j); + if (ch_size >= max_usage) { + return KNOT_ESPACE; + } + int ret = knot_lmdb_open(j.db); + if (ret != KNOT_EOK) { + return ret; + } + knot_lmdb_txn_t txn = { 0 }; + journal_metadata_t md = { 0 }; + knot_lmdb_begin(j.db, &txn, true); + journal_load_metadata(&txn, j.zone, &md); + + update_last_inserter(&txn, j.zone); + journal_fix_occupation(j, &txn, &md, max_usage - ch_size, journal_conf_max_changesets(j) - 1); + + // avoid discontinuity + if ((md.flags & JOURNAL_SERIAL_TO_VALID) && md.serial_to != changeset_from(ch)) { + if (journal_contains(&txn, true, 0, j.zone)) { + return KNOT_ESEMCHECK; + } else { + MDB_val prefix = { knot_dname_size(j.zone), (void *)j.zone }; + knot_lmdb_del_prefix(&txn, &prefix); + memset(&md, 0, sizeof(md)); + } + } + + // avoid cycle + if (journal_contains(&txn, false, changeset_to(ch), j.zone)) { + journal_fix_occupation(j, &txn, &md, INT64_MAX, 1); + } + + journal_write_changeset(&txn, ch); + journal_metadata_after_insert(&md, changeset_from(ch), changeset_to(ch)); + + journal_store_metadata(&txn, j.zone, &md); + knot_lmdb_commit(&txn); + return txn.ret; +} diff --git a/src/knot/journal/journal_write.h b/src/knot/journal/journal_write.h new file mode 100644 index 0000000000..37d70a143a --- /dev/null +++ b/src/knot/journal/journal_write.h @@ -0,0 +1,104 @@ +/* Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "knot/journal/journal_basic.h" +#include "knot/journal/journal_metadata.h" + +/*! + * \brief Serialize a changeset into chunks and write it into DB with no checks and metadata update. + * + * \param txn Journal DB transaction. + * \param ch Changeset to be written. + */ +void journal_write_changeset(knot_lmdb_txn_t *txn, const changeset_t *ch); + +/*! + * \brief Merge all following changeset into one of journal changeset. + * + * \param j Zone journal. + * \param txn Journal DB transaction. + * \param merge_zij True if we shall merge into zone-in-journal. + * \param merge_serial Serial-from of the changeset to be merged into (ignored if 'merge_zij'). + * \param original_serial_to Output: previous serial-to of the merged changeset before merge. + * + * \note The error code will be in thx->ret. + */ +void journal_merge(zone_journal_t j, knot_lmdb_txn_t *txn, bool merge_zij, + uint32_t merge_serial, uint32_t *original_serial_to); + +/*! + * \brief Delete some journal changesets in attempt to fulfill usage quotas. + * + * \param txn Journal DB transaction. + * \param from Serial-from of the first cangeset to be deleted. + * \param zone Zone name. + * \param tofree_size Amount of data (in bytes) to be at least deleted. + * \param tofree_count Number of changesets to be at least deleted. + * \param stop_at_serial Must not delete the changeset with this serial-from. + * \param freed_size Output: amount of data really deleted. + * \param freed_count Output: number of changesets really freed. + * \param stopped_at Output: serial-to of the last deleted changeset. + * + * \return True if something was deleted (not necessarily fulfilling tofree_*). + */ +bool journal_delete(knot_lmdb_txn_t *txn, uint32_t from, const knot_dname_t *zone, + size_t tofree_size, size_t tofree_count, uint32_t stop_at_serial, + size_t *freed_size, size_t *freed_count, uint32_t *stopped_at); + +/*! + * \brief Perform a merge or zone flush in order to enable deleting more changesets. + * + * \param j Zone journal. + * \param txn Journal DB transaction. + * \param md Jounral metadata. + * + * \note It might set txn->ret to KNOT_EBUSY to fail out from this operation and let the zone flush itself. + */ +void journal_try_flush(zone_journal_t j, knot_lmdb_txn_t *txn, journal_metadata_t *md); + +/*! + * \brief Perform delete/merge/flush operations to fulfill configured journal quotas. + * + * \param j Zone journal. + * \param txn Journal DB transaction. + * \param md Journal metadata. + * \param max_usage Configured maximum usage (in bytes) of journal DB by this zone. + * \param max_count Configured maximum number of changesets. + */ +void journal_fix_occupation(zone_journal_t j, knot_lmdb_txn_t *txn, journal_metadata_t *md, + int64_t max_usage, ssize_t max_count); + +/*! + * \brief Store zone-in-journal into the journal, update metadata. + * + * \param j Zone journal. + * \param ch Changeset containing zone-in-journal. + * + * \return KNOT_E* + */ +int journal_insert_zone(zone_journal_t j, const changeset_t *ch); + +/*! + * \brief Store changeset into journal, fulfilling quotas and updating metadata. + * + * \param j Zone journal. + * \param ch Changeset to be stored. + * + * \return KNOT_E* + */ +int journal_insert(zone_journal_t j, const changeset_t *ch); diff --git a/src/knot/journal/knot_lmdb.c b/src/knot/journal/knot_lmdb.c new file mode 100644 index 0000000000..7aa60147ad --- /dev/null +++ b/src/knot/journal/knot_lmdb.c @@ -0,0 +1,635 @@ +/* Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#include "knot/journal/knot_lmdb.h" + +#include <pthread.h> +#include <stdarg.h> +#include <stdio.h> // snprintf +#include <stdlib.h> +#include <sys/stat.h> +#include <unistd.h> + +#include "contrib/wire_ctx.h" +#include "libknot/dname.h" +#include "libknot/endian.h" +#include "libknot/error.h" + +#define LMDB_DIR_MODE 0770 +#define LMDB_FILE_MODE 0660 + +static void err_to_knot(int *err) +{ + switch (*err) { + case MDB_SUCCESS: + *err = KNOT_EOK; + break; + case MDB_NOTFOUND: + *err = KNOT_ENOENT; + break; + case MDB_TXN_FULL: + *err = KNOT_ELIMIT; + break; + case MDB_MAP_FULL: + case ENOSPC: + *err = KNOT_ESPACE; + break; + default: + *err = (*err < 0 ? *err : -*err); + } +} + +void knot_lmdb_init(knot_lmdb_db_t *db, const char *path, size_t mapsize, unsigned env_flags, const char *dbname) +{ +#ifdef __OpenBSD__ + /* + * Enforce that MDB_WRITEMAP is set. + * + * MDB assumes a unified buffer cache. + * + * See https://www.openldap.org/pub/hyc/mdm-paper.pdf section 3.1, + * references 17, 18, and 19. + * + * From Howard Chu: "This requirement can be relaxed in the + * current version of the library. If you create the environment + * with the MDB_WRITEMAP option then all reads and writes are + * performed using mmap, so the file buffer cache is irrelevant. + * Of course then you lose the protection that the read-only + * map offers." + */ + env_flags |= MDB_WRITEMAP; +#endif + db->env = NULL; + db->path = strdup(path); + db->mapsize = mapsize; + db->env_flags = env_flags; + db->dbname = dbname; + pthread_mutex_init(&db->opening_mutex, NULL); + db->maxdbs = 2; + db->maxreaders = 126/* = contrib/lmdb/mdb.c DEFAULT_READERS */; +} + +static bool lmdb_stat(const char *lmdb_path, struct stat *st) +{ + char data_mdb[strlen(lmdb_path) + 10]; + snprintf(data_mdb, sizeof(data_mdb), "%s/data.mdb", lmdb_path); + return (stat(data_mdb, st) == 0 && st->st_size > 0); +} + +bool knot_lmdb_exists(knot_lmdb_db_t *db) +{ + if (db->env != NULL) { + return true; + } + if (db->path == NULL) { + return false; + } + struct stat unused; + return lmdb_stat(db->path, &unused); +} + +static int fix_mapsize(knot_lmdb_db_t *db) +{ + if (db->mapsize == 0) { + struct stat st; + if (!lmdb_stat(db->path, &st)) { + return KNOT_ENOENT; + } + db->mapsize = st.st_size * 2; // twice the size as DB might grow while we read it + db->env_flags |= MDB_RDONLY; + } + return KNOT_EOK; +} + +static int _open(knot_lmdb_db_t *db) +{ + MDB_txn *init_txn = NULL; + + if (db->env != NULL) { + return KNOT_EOK; + } + + if (db->path == NULL) { + return KNOT_ENOMEM; + } + + int ret = fix_mapsize(db); + if (ret != KNOT_EOK) { + return ret; + } + + ret = mkdir(db->path, LMDB_DIR_MODE); + if (ret < 0 && errno != EEXIST) { + return -errno; + } + + long page_size = sysconf(_SC_PAGESIZE); + if (page_size <= 0) { + return KNOT_ERROR; + } + size_t mapsize = (db->mapsize / page_size + 1) * page_size; + + ret = mdb_env_create(&db->env); + if (ret != MDB_SUCCESS) { + err_to_knot(&ret); + return ret; + } + + ret = mdb_env_set_mapsize(db->env, mapsize); + if (ret == MDB_SUCCESS) { + ret = mdb_env_set_maxdbs(db->env, db->maxdbs); + } + if (ret == MDB_SUCCESS) { + ret = mdb_env_set_maxreaders(db->env, db->maxreaders); + } + if (ret == MDB_SUCCESS) { + ret = mdb_env_open(db->env, db->path, db->env_flags, LMDB_FILE_MODE); + } + if (ret == MDB_SUCCESS) { + unsigned init_txn_flags = (db->env_flags & MDB_RDONLY); + ret = mdb_txn_begin(db->env, NULL, init_txn_flags, &init_txn); + } + if (ret == MDB_SUCCESS) { + ret = mdb_dbi_open(init_txn, db->dbname, MDB_CREATE, &db->dbi); + } + if (ret == MDB_SUCCESS) { + ret = mdb_txn_commit(init_txn); + } + + if (ret != MDB_SUCCESS) { + if (init_txn != NULL) { + mdb_txn_abort(init_txn); + } + mdb_env_close(db->env); + db->env = NULL; + } + err_to_knot(&ret); + return ret; +} + +int knot_lmdb_open(knot_lmdb_db_t *db) +{ + pthread_mutex_lock(&db->opening_mutex); + int ret = _open(db); + pthread_mutex_unlock(&db->opening_mutex); + return ret; +} + +static void _close(knot_lmdb_db_t *db) +{ + if (db->env != NULL) { + mdb_dbi_close(db->env, db->dbi); + mdb_env_close(db->env); + db->env = NULL; + } +} + +void knot_lmdb_close(knot_lmdb_db_t *db) +{ + pthread_mutex_lock(&db->opening_mutex); + _close(db); + pthread_mutex_unlock(&db->opening_mutex); +} + +static int _reinit(knot_lmdb_db_t *db, const char *path, size_t mapsize, unsigned env_flags) +{ +#ifdef __OpenBSD__ + env_flags |= MDB_WRITEMAP; +#endif + if (strcmp(db->path, path) == 0 && db->mapsize == mapsize && db->env_flags == env_flags) { + return KNOT_EOK; + } + if (db->env != NULL) { + return KNOT_EISCONN; + } + free(db->path); + db->path = strdup(path); + db->mapsize = mapsize; + db->env_flags = env_flags; + return KNOT_EOK; +} + +int knot_lmdb_reinit(knot_lmdb_db_t *db, const char *path, size_t mapsize, unsigned env_flags) +{ + pthread_mutex_lock(&db->opening_mutex); + int ret = _reinit(db, path, mapsize, env_flags); + pthread_mutex_unlock(&db->opening_mutex); + return ret; +} + +int knot_lmdb_reconfigure(knot_lmdb_db_t *db, const char *path, size_t mapsize, unsigned env_flags) +{ + pthread_mutex_lock(&db->opening_mutex); + int ret = _reinit(db, path, mapsize, env_flags); + if (ret != KNOT_EOK) { + _close(db); + ret = _reinit(db, path, mapsize, env_flags); + if (ret == KNOT_EOK) { + ret = _open(db); + } + } + pthread_mutex_unlock(&db->opening_mutex); + return ret; +} + +void knot_lmdb_deinit(knot_lmdb_db_t *db) +{ + knot_lmdb_close(db); + pthread_mutex_destroy(&db->opening_mutex); + free(db->path); +} + +void knot_lmdb_begin(knot_lmdb_db_t *db, knot_lmdb_txn_t *txn, bool rw) +{ + txn->ret = mdb_txn_begin(db->env, NULL, rw ? 0 : MDB_RDONLY, &txn->txn); + err_to_knot(&txn->ret); + if (txn->ret == KNOT_EOK) { + txn->opened = true; + txn->db = db; + txn->is_rw = rw; + } +} + +void knot_lmdb_abort(knot_lmdb_txn_t *txn) +{ + if (txn->opened) { + if (txn->cursor != NULL) { + mdb_cursor_close(txn->cursor); + txn->cursor = false; + } + mdb_txn_abort(txn->txn); + txn->opened = false; + } +} + +static bool txn_semcheck(knot_lmdb_txn_t *txn) +{ + if (!txn->opened && txn->ret == KNOT_EOK) { + txn->ret = KNOT_ESEMCHECK; + } + if (txn->ret != KNOT_EOK) { + knot_lmdb_abort(txn); + return false; + } + return true; +} + +void knot_lmdb_commit(knot_lmdb_txn_t *txn) +{ + if (!txn_semcheck(txn)) { + return; + } + if (txn->cursor != NULL) { + mdb_cursor_close(txn->cursor); + txn->cursor = false; + } + txn->ret = mdb_txn_commit(txn->txn); + err_to_knot(&txn->ret); + if (txn->ret == KNOT_EOK) { + txn->opened = false; + } else { + knot_lmdb_abort(txn); + } +} + +// save the programmer's frequent checking for ENOMEM when creating search keys +static bool txn_enomem(knot_lmdb_txn_t *txn, const MDB_val *tocheck) +{ + if (tocheck->mv_data == NULL) { + txn->ret = KNOT_ENOMEM; + knot_lmdb_abort(txn); + return false; + } + return true; +} + +static bool init_cursor(knot_lmdb_txn_t *txn) +{ + if (txn->cursor == NULL) { + txn->ret = mdb_cursor_open(txn->txn, txn->db->dbi, &txn->cursor); + err_to_knot(&txn->ret); + if (txn->ret != KNOT_EOK) { + knot_lmdb_abort(txn); + return false; + } + } + return true; +} + +static bool curget(knot_lmdb_txn_t *txn, MDB_cursor_op op) +{ + txn->ret = mdb_cursor_get(txn->cursor, &txn->cur_key, &txn->cur_val, op); + err_to_knot(&txn->ret); + if (txn->ret == KNOT_ENOENT) { + txn->ret = KNOT_EOK; + return false; + } + return (txn->ret == KNOT_EOK); +} + +bool knot_lmdb_find(knot_lmdb_txn_t *txn, MDB_val *what, knot_lmdb_find_t how) +{ + if (!txn_semcheck(txn) || !init_cursor(txn) || !txn_enomem(txn, what)) { + return false; + } + txn->cur_key.mv_size = what->mv_size; + txn->cur_key.mv_data = what->mv_data; + txn->cur_val.mv_size = 0; + txn->cur_val.mv_data = NULL; + knot_lmdb_find_t cmp = (how & 3); + bool succ = curget(txn, cmp == KNOT_LMDB_EXACT ? MDB_SET : MDB_SET_RANGE); + if (cmp == KNOT_LMDB_LEQ && txn->ret == KNOT_EOK) { + // LEQ is not supported by LMDB, we use GEQ and go back + if (succ) { + if (txn->cur_key.mv_size != what->mv_size || + memcmp(txn->cur_key.mv_data, what->mv_data, what->mv_size) != 0) { + succ = curget(txn, MDB_PREV); + } + } else { + succ = curget(txn, MDB_LAST); + } + } + + if ((how & KNOT_LMDB_FORCE) && !succ && txn->ret == KNOT_EOK) { + txn->ret = KNOT_ENOENT; + } + + return succ; +} + +bool knot_lmdb_first(knot_lmdb_txn_t *txn) +{ + return txn_semcheck(txn) && init_cursor(txn) && curget(txn, MDB_FIRST); +} + +bool knot_lmdb_next(knot_lmdb_txn_t *txn) +{ + if (txn->cursor == NULL && txn->ret == KNOT_EOK) { + txn->ret = KNOT_EINVAL; + } + if (!txn_semcheck(txn)) { + return false; + } + return curget(txn, MDB_NEXT); +} + +bool knot_lmdb_is_prefix_of(MDB_val *prefix, MDB_val *of) +{ + return prefix->mv_size <= of->mv_size && + memcmp(prefix->mv_data, of->mv_data, prefix->mv_size) == 0; +} + +void knot_lmdb_del_cur(knot_lmdb_txn_t *txn) +{ + if (txn_semcheck(txn)) { + txn->ret = mdb_cursor_del(txn->cursor, 0); + err_to_knot(&txn->ret); + } +} + +void knot_lmdb_del_prefix(knot_lmdb_txn_t *txn, MDB_val *prefix) +{ + knot_lmdb_foreach(txn, prefix) { + knot_lmdb_del_cur(txn); + } +} + +bool knot_lmdb_insert(knot_lmdb_txn_t *txn, MDB_val *key, MDB_val *val) +{ + if (txn_semcheck(txn) && txn_enomem(txn, key)) { + unsigned flags = (val->mv_size > 0 && val->mv_data == NULL ? MDB_RESERVE : 0); + txn->ret = mdb_put(txn->txn, txn->db->dbi, key, val, flags); + err_to_knot(&txn->ret); + } + return (txn->ret == KNOT_EOK); +} + +int knot_lmdb_quick_insert(knot_lmdb_db_t *db, MDB_val key, MDB_val val) +{ + if (val.mv_data == NULL) { + free(key.mv_data); + return KNOT_ENOMEM; + } + knot_lmdb_txn_t txn = { 0 }; + knot_lmdb_begin(db, &txn, true); + knot_lmdb_insert(&txn, &key, &val); + free(key.mv_data); + free(val.mv_data); + knot_lmdb_commit(&txn); + return txn.ret; +} + +size_t knot_lmdb_usage(knot_lmdb_txn_t *txn) +{ + if (!txn_semcheck(txn)) { + return 0; + } + MDB_stat st = { 0 }; + txn->ret = mdb_stat(txn->txn, txn->db->dbi, &st); + err_to_knot(&txn->ret); + + size_t pgs_used = st.ms_branch_pages + st.ms_leaf_pages + st.ms_overflow_pages; + return (pgs_used * st.ms_psize); +} + +static bool make_key_part(void *key_data, size_t key_len, const char *format, va_list arg) +{ + wire_ctx_t wire = wire_ctx_init(key_data, key_len); + const char *tmp_s; + const knot_dname_t *tmp_d; + const void *tmp_v; + size_t tmp; + + for (const char *f = format; *f != '\0'; f++) { + switch (*f) { + case 'B': + wire_ctx_write_u8(&wire, va_arg(arg, int)); + break; + case 'H': + wire_ctx_write_u16(&wire, va_arg(arg, int)); + break; + case 'I': + wire_ctx_write_u32(&wire, va_arg(arg, uint32_t)); + break; + case 'L': + wire_ctx_write_u64(&wire, va_arg(arg, uint64_t)); + break; + case 'S': + tmp_s = va_arg(arg, const char *); + wire_ctx_write(&wire, tmp_s, strlen(tmp_s) + 1); + break; + case 'N': + tmp_d = va_arg(arg, const knot_dname_t *); + wire_ctx_write(&wire, tmp_d, knot_dname_size(tmp_d)); + break; + case 'D': + tmp_v = va_arg(arg, const void *); + tmp = va_arg(arg, size_t); + wire_ctx_write(&wire, tmp_v, tmp); + break; + } + } + + return wire.error == KNOT_EOK && wire_ctx_available(&wire) == 0; +} + +MDB_val knot_lmdb_make_key(const char *format, ...) +{ + MDB_val key = { 0 }; + va_list arg; + const char *tmp_s; + const knot_dname_t *tmp_d; + + // first, just determine the size of the key + va_start(arg, format); + for (const char *f = format; *f != '\0'; f++) { + switch (*f) { + case 'B': + key.mv_size += sizeof(uint8_t); + (void)va_arg(arg, int); // uint8_t will be promoted to int + break; + case 'H': + key.mv_size += sizeof(uint16_t); + (void)va_arg(arg, int); // uint16_t will be promoted to int + break; + case 'I': + key.mv_size += sizeof(uint32_t); + (void)va_arg(arg, uint32_t); + break; + case 'L': + key.mv_size += sizeof(uint64_t); + (void)va_arg(arg, uint64_t); + break; + case 'S': + tmp_s = va_arg(arg, const char *); + key.mv_size += strlen(tmp_s) + 1; + break; + case 'N': + tmp_d = va_arg(arg, const knot_dname_t *); + key.mv_size += knot_dname_size(tmp_d); + break; + case 'D': + (void)va_arg(arg, const void *); + key.mv_size += va_arg(arg, size_t); + break; + } + } + va_end(arg); + + // second, alloc the key and fill it + key.mv_data = malloc(key.mv_size); + if (key.mv_data == NULL) { + return key; + } + va_start(arg, format); + bool succ = make_key_part(key.mv_data, key.mv_size, format, arg); + assert(succ); + (void)succ; + va_end(arg); + return key; +} + +bool knot_lmdb_make_key_part(void *key_data, size_t key_len, const char *format, ...) +{ + va_list arg; + va_start(arg, format); + bool succ = make_key_part(key_data, key_len, format, arg); + va_end(arg); + return succ; +} + +static bool unmake_key_part(const void *key_data, size_t key_len, const char *format, va_list arg) +{ + if (key_data == NULL) { + return false; + } + wire_ctx_t wire = wire_ctx_init_const(key_data, key_len); + for (const char *f = format; *f != '\0' && wire.error == KNOT_EOK && wire_ctx_available(&wire) > 0; f++) { + void *tmp = va_arg(arg, void *); + size_t tmsize; + switch (*f) { + case 'B': + if (tmp == NULL) { + wire_ctx_skip(&wire, sizeof(uint8_t)); + } else { + *(uint8_t *)tmp = wire_ctx_read_u8(&wire); + } + break; + case 'H': + if (tmp == NULL) { + wire_ctx_skip(&wire, sizeof(uint16_t)); + } else { + *(uint16_t *)tmp = wire_ctx_read_u16(&wire); + } + break; + case 'I': + if (tmp == NULL) { + wire_ctx_skip(&wire, sizeof(uint32_t)); + } else { + *(uint32_t *)tmp = wire_ctx_read_u32(&wire); + } + break; + case 'L': + if (tmp == NULL) { + wire_ctx_skip(&wire, sizeof(uint64_t)); + } else { + *(uint64_t *)tmp = wire_ctx_read_u64(&wire); + } + break; + case 'S': + if (tmp != NULL) { + *(const char **)tmp = (const char *)wire.position; + } + wire_ctx_skip(&wire, strlen((const char *)wire.position) + 1); + break; + case 'N': + if (tmp != NULL) { + *(const knot_dname_t **)tmp = (const knot_dname_t *)wire.position; + } + wire_ctx_skip(&wire, knot_dname_size((const knot_dname_t *)wire.position)); + break; + case 'D': + tmsize = va_arg(arg, size_t); + if (tmp != NULL) { + memcpy(tmp, wire.position, tmsize); + } + wire_ctx_skip(&wire, tmsize); + break; + } + } + return (wire.error == KNOT_EOK && wire_ctx_available(&wire) == 0); +} + +bool knot_lmdb_unmake_key(const void *key_data, size_t key_len, const char *format, ...) +{ + va_list arg; + va_start(arg, format); + bool succ = unmake_key_part(key_data, key_len, format, arg); + va_end(arg); + return succ; +} + +bool knot_lmdb_unmake_curval(knot_lmdb_txn_t *txn, const char *format, ...) +{ + va_list arg; + va_start(arg, format); + bool succ = unmake_key_part(txn->cur_val.mv_data, txn->cur_val.mv_size, format, arg); + va_end(arg); + if (!succ && txn->ret == KNOT_EOK) { + txn->ret = KNOT_EMALF; + } + return succ; +} diff --git a/src/knot/journal/knot_lmdb.h b/src/knot/journal/knot_lmdb.h new file mode 100644 index 0000000000..2a4dfc6927 --- /dev/null +++ b/src/knot/journal/knot_lmdb.h @@ -0,0 +1,348 @@ +/* Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see <https://www.gnu.org/licenses/>. + */ + +#pragma once + +#include "contrib/lmdb/lmdb.h" + +#include <stdbool.h> +#include <stdlib.h> + +typedef struct knot_lmdb_db { + MDB_dbi dbi; + MDB_env *env; + pthread_mutex_t opening_mutex; + + // those are static options. Set them after knot_lmdb_init(). + unsigned maxdbs; + unsigned maxreaders; + + // those are internal options. Please don't touch them directly. + size_t mapsize; + unsigned env_flags; // MDB_NOTLS, MDB_RDONLY, MDB_WRITEMAP, MDB_DUPSORT, MDB_NOSYNC, MDB_MAPASYNC + const char *dbname; + char *path; +} knot_lmdb_db_t; + +typedef struct { + MDB_txn *txn; + MDB_cursor *cursor; + MDB_val cur_key; + MDB_val cur_val; + + bool opened; + bool is_rw; + int ret; + knot_lmdb_db_t *db; +} knot_lmdb_txn_t; + +typedef enum { + KNOT_LMDB_EXACT = 3, /*! \brief Search for exactly matching key. */ + KNOT_LMDB_LEQ = 1, /*! \brief Search lexicographically lower or equal key. */ + KNOT_LMDB_GEQ = 2, /*! \brief Search lexicographically greater or equal key. */ + KNOT_LMDB_FORCE = 4, /*! \brief If no matching key found, consider it a transaction failure (KNOT_ENOENT). */ +} knot_lmdb_find_t; + +/*! + * \brief Initialise the DB handling structure. + * + * \param db DB handling structure. + * \param path Path to LMDB database on filesystem. + * \param mapsize Maximum size of the DB on FS. + * \param env_flags LMDB environment flags (e.g. MDB_RDONLY) + * \param dbname Optional: name of the sub-database. + */ +void knot_lmdb_init(knot_lmdb_db_t *db, const char *path, size_t mapsize, unsigned env_flags, const char *dbname); + +/*! + * \brief Check if the databse exists on the filesystem. + * + * \param db The DB in question. + * + * \return True if it already exists. + */ +bool knot_lmdb_exists(knot_lmdb_db_t *db); + +/*! + * \brief Open the previously initialised DB. + * + * \param db The DB to be opened. + * + * \note If db->mapsize is zero, it will be set to twice the current size, and DB opened read-only! + * + * \return KNOT_E* + */ +int knot_lmdb_open(knot_lmdb_db_t *db); + +/*! + * \brief Close the database, but keep it initialised. + * + * \param db The DB to be closed. + */ +void knot_lmdb_close(knot_lmdb_db_t *db); + +/*! + * \brief Re-initialise existing DB with modified parameters. + * + * \note If the paramateres differ and DB is open, it will be refused. + * + * \param db The DB to be modified. + * \param path New path to the DB. + * \param mapsize New mapsize. + * \param env_flags New LMDB environment flags. + * + * \return KNOT_EOK on success, KNOT_EISCONN if not possible. + */ +int knot_lmdb_reinit(knot_lmdb_db_t *db, const char *path, size_t mapsize, unsigned env_flags); + +/*! + * \brief Re-open opened DB with modified parameters. + * + * \note The DB will be first closed, re-initialised and finally opened again. + * + * \note There must not be any DB transaction during this process. + * + * \param db The DB to be modified. + * \param path New path to the DB. + * \param mapsize New mapsize. + * \param env_flags New LMDB environment flags. + * + * \return KNOT_E* + */ +int knot_lmdb_reconfigure(knot_lmdb_db_t *db, const char *path, size_t mapsize, unsigned env_flags); + +/*! + * \brief Close and de-initialise DB. + * + * \param db DB to be deinitialised. + */ +void knot_lmdb_deinit(knot_lmdb_db_t *db); + +/*! + * \brief Return true if DB is open. + */ +inline static bool knot_lmdb_is_open(knot_lmdb_db_t *db) { return db->env != NULL; } + +/*! + * \brief Start a DB transaction. + * + * \param db The database. + * \param txn Transaction handling structure to be initialised. + * \param rw True for read-write transaction, false for read-only. + * + * \note The error code will be stored in txn->ret. + */ +void knot_lmdb_begin(knot_lmdb_db_t *db, knot_lmdb_txn_t *txn, bool rw); + +/*! + * \brief Abort a transaction. + * + * \param txn Transaction to be aborted. + */ +void knot_lmdb_abort(knot_lmdb_txn_t *txn); + +/*! + * \brief Commit a transaction, or abort it if id had failured. + * + * \param txn Transaction to be commited. + * + * \note If txn->ret equals KNOT_EOK afterwards, whole DB transaction was successful. + */ +void knot_lmdb_commit(knot_lmdb_txn_t *txn); + +/*! + * \brief Find a key in database. The matched key will be in txn->cur_key and its value in txn->cur_val. + * + * \param txn DB transaction. + * \param what Key to be searched for. + * \param how Method of comparing keys. See comments at knot_lmdb_find_t. + * + * \note It's possible to use knot_lmdb_next() subsequently to iterate over following keys. + * + * \return True if a key found, false if none or failure. + */ +bool knot_lmdb_find(knot_lmdb_txn_t *txn, MDB_val *what, knot_lmdb_find_t how); + +/*! + * \brief Start iteration the whole DB from lexicographically first key. + * + * \note The first DB record will be in txn->cur_key and txn->cur_val. + * + * \param txn DB transaction. + * + * \return True if ok, false if no key at all or failure. + */ +bool knot_lmdb_first(knot_lmdb_txn_t *txn); + +/*! + * \brief Iterate to the lexicographically next key (sets txn->cur_key and txn->cur_val). + * + * \param txn DB transaction. + * + * \return True if ok, false if behind the end of DB or failure. + */ +bool knot_lmdb_next(knot_lmdb_txn_t *txn); + +/*! + * \brief Check if one DB key is a prefix of another, + * + * \param prefix DB key prefix. + * \param of Another DB key. + * + * \return True iff 'prefix' is a prefix of 'of'. + */ +bool knot_lmdb_is_prefix_of(MDB_val *prefix, MDB_val *of); + +/*! + * \brief Find leftmost key in DB matching given prefix. + * + * \param txn DB transaction. + * \param prefix Prefix searched for. + * + * \return True if found, false if none or failure. + */ +inline static bool knot_lmdb_find_prefix(knot_lmdb_txn_t *txn, MDB_val *prefix) +{ + return knot_lmdb_find(txn, prefix, KNOT_LMDB_GEQ) && + knot_lmdb_is_prefix_of(prefix, &txn->cur_key); +} + +/*! + * \brief Execute following block of commands for every key in DB matching given prefix. + * + * \param txn DB transaction. + * \param prefix Prefix searched for. + */ +#define knot_lmdb_foreach(txn, prefix) \ + for (bool _knot_lmdb_foreach_found = knot_lmdb_find((txn), (prefix), KNOT_LMDB_GEQ); \ + _knot_lmdb_foreach_found && knot_lmdb_is_prefix_of((prefix), &(txn)->cur_key); \ + _knot_lmdb_foreach_found = knot_lmdb_next((txn))) + +/*! + * \brief Delete the one DB record, that the iteration is currently pointing to. + * + * \note It's safe to delete during an uncomplicated iteration, e.g. knot_lmdb_foreach(). + * + * \param txn DB transaction. + */ +void knot_lmdb_del_cur(knot_lmdb_txn_t *txn); + +/*! + * \brief Delete all DB records matching given key prefix. + * + * \param txn DB transaction. + * \param prefix Prefix to be deleted. + */ +void knot_lmdb_del_prefix(knot_lmdb_txn_t *txn, MDB_val *prefix); + +/*! + * \brief Insert a new record into the DB. + * + * \note If a record with equal key already exists in the DB, its value will be quietly overwritten. + * + * \param txn DB transaction. + * \param key Inserted key. + * \param val Inserted value. + * + * \return False if failure. + */ +bool knot_lmdb_insert(knot_lmdb_txn_t *txn, MDB_val *key, MDB_val *val); + +/*! + * \brief Open a transaction, insert a record, commmit and free key's and val's mv_data. + * + * \param db DB to be inserted into. + * \param key Inserted key. + * \param val Inserted val. + * + * \return KNOT_E* + */ +int knot_lmdb_quick_insert(knot_lmdb_db_t *db, MDB_val key, MDB_val val); + +/*! + * \brief Amount of bytes used by the DB storage. + * + * \note According to LMDB design, it will be a multiple of page size, which is usually 4096. + * + * \param txn DB transaction. + * + * \return DB usage. + */ +size_t knot_lmdb_usage(knot_lmdb_txn_t *txn); + +/*! + * \brief Serialize various parameters into a DB key. + * + * \param format Specifies the number and type of parameters. + * \param ... For each character in 'format', one or two parameters with the actual values. + * + * \return DB key structure. 'mv_data' needs to be freed later. 'mv_data' is NULL on failure. + * + * Possible format characters are: + * - B for a byte + * - H for uint16 + * - I for uint32 + * - L for uint64, like H and I, the serialization converts them to big endian + * - S for zero-terminated string + * - N for a domain name (in knot_dname_t* format) + * - D for fixed-size data (takes two params: void* and size_t) + */ +MDB_val knot_lmdb_make_key(const char *format, ...); + +/*! + * \brief Serialize various parameters into prepared buffer. + * + * \param key_data Pointer to the buffer. + * \param key_len Size of the buffer. + * \param format Specifies the number and type of parameters. + * \param ... For each character in 'format', one or two parameters with the actual values. + * + * \note See comment at knot_lmdb_make_key(). + * + * \return True if ok and the serialization took exactly 'key_len', false on failure. + */ +bool knot_lmdb_make_key_part(void *key_data, size_t key_len, const char *format, ...); + +/*! + * \brief Deserialize various parameters from a buffer. + * + * \note 'format' must exactly correspond with what the data in buffer actually are. + * + * \param key_data Pointer to the buffer. + * \param key_len Size of the buffer. + * \param format Specifies the number and type of parameters. + * \param ... For each character in 'format', pointer to where the values will be stored. + * + * \note For B, H, I, L; provide simply pointers to variables of correspodning type. + * \note For S, N; provide pointer to pointer - it will be set to pointing inside the buffer, so no allocation here. + * \note For D, provide void* and size_t, the data will be copied. + * + * \return True if no failure. + */ +bool knot_lmdb_unmake_key(const void *key_data, size_t key_len, const char *format, ...); + +/*! + * \brief Deserialize various parameters from txn->cur_val. Set txn->ret to KNOT_EMALF if failure. + * + * \param txn DB transaction. + * \param format Specifies the number and type of parameters. + * \param ... For each character in 'format', pointer to where the values will be stored. + * + * \note See comment at knot_lmdb_unmake_key(). + * + * \return True if no failure. + */ +bool knot_lmdb_unmake_curval(knot_lmdb_txn_t *txn, const char *format, ...); diff --git a/src/knot/updates/changesets.c b/src/knot/updates/changesets.c index d886006ac2..484616cd48 100644 --- a/src/knot/updates/changesets.c +++ b/src/knot/updates/changesets.c @@ -1,4 +1,4 @@ -/* Copyright (C) 2018 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> +/* Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -338,7 +338,7 @@ int changeset_add_removal(changeset_t *ch, const knot_rrset_t *rrset, changeset_ } const knot_rrset_t *to_remove = (rrset_cancelout == NULL ? rrset : rrset_cancelout); - int ret = knot_rrset_empty(to_remove) ? KNOT_EOK : add_rr_to_contents(ch->remove, to_remove); + int ret = (knot_rrset_empty(to_remove) || ch->remove == NULL) ? KNOT_EOK : add_rr_to_contents(ch->remove, to_remove); if (flags & CHANGESET_CHECK) { knot_rrset_free((knot_rrset_t *)rrset, NULL); @@ -423,6 +423,16 @@ int changeset_merge(changeset_t *ch1, const changeset_t *ch2, int flags) return KNOT_EOK; } +uint32_t changeset_from(const changeset_t *ch) +{ + return ch->soa_from == NULL ? 0 : knot_soa_serial(ch->soa_from->rrs.rdata); +} + +uint32_t changeset_to(const changeset_t *ch) +{ + return ch->soa_to == NULL ? 0 : knot_soa_serial(ch->soa_to->rrs.rdata); +} + typedef struct { const zone_contents_t *zone; changeset_t *fixing; @@ -594,6 +604,8 @@ changeset_t *changeset_from_contents(const zone_contents_t *contents) zone_contents_deep_free(res->add); res->add = copy; + zone_contents_deep_free(res->remove); + res->remove = NULL; return res; } @@ -601,7 +613,7 @@ void changeset_from_contents_free(changeset_t *ch) { assert(ch); assert(ch->soa_from == NULL); - assert(zone_contents_is_empty(ch->remove)); + assert(ch->remove == NULL); update_free_zone(ch->add); diff --git a/src/knot/updates/changesets.h b/src/knot/updates/changesets.h index c6a6a33baf..2cbd1bb9c1 100644 --- a/src/knot/updates/changesets.h +++ b/src/knot/updates/changesets.h @@ -1,4 +1,4 @@ -/* Copyright (C) 2018 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> +/* Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -139,6 +139,24 @@ int changeset_remove_removal(changeset_t *ch, const knot_rrset_t *rrset); */ int changeset_merge(changeset_t *ch1, const changeset_t *ch2, int flags); +/*! + * \brief Get serial "from" of the changeset. + * + * \param ch Changeset in question. + * + * \return Its serial "from", or 0 if none. + */ +uint32_t changeset_from(const changeset_t *ch); + +/*! + * \brief Get serial "to" of the changeset. + * + * \param ch Changeset in question. + * + * \return Its serial "to", or 0 if none. + */ +uint32_t changeset_to(const changeset_t *ch); + /*! * \brief Remove from changeset those rdata which won't be added/removed from zone. * diff --git a/tests/knot/test_journal.c b/tests/knot/test_journal.c index 84dd30d85d..d166f05f9b 100644 --- a/tests/knot/test_journal.c +++ b/tests/knot/test_journal.c @@ -1,4 +1,4 @@ -/* Copyright (C) 2018 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> +/* Copyright (C) 2019 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz> This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -24,8 +24,10 @@ #include <tap/basic.h> #include <tap/files.h> +#include "knot/journal/journal_read.h" +#include "knot/journal/journal_write.h" + #include "libknot/libknot.h" -#include "knot/journal/journal.c" #include "knot/zone/zone.h" #include "knot/zone/zone-diff.h" #include "libknot/rrtype/soa.h" @@ -36,8 +38,11 @@ #define MIN_SOA_SIZE 22 char *test_dir_name; -journal_db_t *db; -journal_t *j; + +knot_lmdb_db_t jdb; +zone_journal_t jj; + +unsigned env_flag; static void set_conf(int zonefile_sync, size_t journal_usage, const knot_dname_t *apex) { @@ -114,6 +119,8 @@ static void init_random_changeset(changeset_t *ch, const uint32_t from, const ui if (is_bootstrap) { ch->soa_from = NULL; + zone_contents_deep_free(ch->remove); + ch->remove = NULL; } else { init_soa(&soa, from, apex); ch->soa_from = knot_rrset_copy(&soa, NULL); @@ -168,14 +175,26 @@ static void changeset_set_soa_serials(changeset_t *ch, uint32_t from, uint32_t t /*! \brief Compare two changesets for equality. */ static bool changesets_eq(const changeset_t *ch1, changeset_t *ch2) { - if (changeset_size(ch1) != changeset_size(ch2)) { + bool bootstrap = (ch1->remove == NULL); + + if (!bootstrap && changeset_size(ch1) != changeset_size(ch2)) { + return false; + } + + if ((bootstrap && ch2->remove != NULL) || + (!bootstrap && ch2->remove == NULL)) { return false; } changeset_iter_t it1; - changeset_iter_all(&it1, ch1); changeset_iter_t it2; - changeset_iter_all(&it2, ch2); + if (bootstrap) { + changeset_iter_add(&it1, ch1); + changeset_iter_add(&it2, ch2); + } else { + changeset_iter_all(&it1, ch1); + changeset_iter_all(&it2, ch2); + } knot_rrset_t rr1 = changeset_iter_next(&it1); knot_rrset_t rr2 = changeset_iter_next(&it2); @@ -243,82 +262,74 @@ static bool test_continuity(list_t *l) static void test_journal_db(void) { - int ret, ret2 = KNOT_EOK; + env_flag = journal_env_flags(JOURNAL_MODE_ASYNC); + knot_lmdb_init(&jdb, test_dir_name, 2048 * 1024, env_flag, NULL); - ret = journal_db_init(&db, test_dir_name, 2 * 1024 * 1024, JOURNAL_MODE_ASYNC); - is_int(KNOT_EOK, ret, "journal: init db (%d)", ret); + int ret = knot_lmdb_open(&jdb); + is_int(KNOT_EOK, ret, "journal: open db (%s)", knot_strerror(ret)); - ret = journal_open_db(&db); - is_int(KNOT_EOK, ret, "journal: open db (%d)", ret); + ret = knot_lmdb_reconfigure(&jdb, test_dir_name, 4096 * 1024, env_flag); + is_int(KNOT_EOK, ret, "journal: re-open with bigger mapsize (%s)", knot_strerror(ret)); - journal_db_close(&db); - ok(db == NULL, "journal: close and destroy db"); + ret = knot_lmdb_reconfigure(&jdb, test_dir_name, 1024 * 1024, env_flag); + is_int(KNOT_EOK, ret, "journal: re-open with smaller mapsize (%s)", knot_strerror(ret)); - ret = journal_db_init(&db, test_dir_name, 4 * 1024 * 1024, JOURNAL_MODE_ASYNC); - if (ret == KNOT_EOK) ret2 = journal_open_db(&db); - ok(ret == KNOT_EOK && ret2 == KNOT_EOK, "journal: open with bigger mapsize (%d, %d)", ret, ret2); - journal_db_close(&db); + knot_lmdb_deinit(&jdb); +} - ret = journal_db_init(&db, test_dir_name, 1024 * 1024, JOURNAL_MODE_ASYNC); - if (ret == KNOT_EOK) ret2 = journal_open_db(&db); - ok(ret == KNOT_EOK && ret2 == KNOT_EOK, "journal: open with smaller mapsize (%d, %d)", ret, ret2); - journal_db_close(&db); +static int load_j_list(zone_journal_t *zj, bool zij, uint32_t serial, + journal_read_t **read, list_t *list) +{ + changeset_t *ch; + init_list(list); + int ret = journal_read_begin(*zj, zij, serial, read); + if (ret == KNOT_EOK) { + while ((ch = calloc(1, sizeof(*ch))) != NULL && + journal_read_changeset(*read, ch)) { + add_tail(list, &ch->n); + } + free(ch); + ret = journal_read_get_error(*read, KNOT_EOK); + } + return ret; } /*! \brief Test behavior with real changesets. */ static void test_store_load(const knot_dname_t *apex) { - int ret, ret2 = KNOT_EOK; - set_conf(1000, 512 * 1024, apex); - j = journal_new(); - ok(j != NULL, "journal: new"); - - ret = journal_db_init(&db, test_dir_name, (512 + 1024) * 1024, JOURNAL_MODE_ASYNC); - if (ret == KNOT_EOK) ret2 = journal_open(j, &db, apex); - is_int(KNOT_EOK, ret, "journal: open (%d, %d)", ret, ret2); + knot_lmdb_init(&jdb, test_dir_name, 1536 * 1024, env_flag, NULL); + assert(knot_lmdb_open(&jdb) == KNOT_EOK); - /* Save and load changeset. */ - changeset_t *m_ch = changeset_new(apex); - init_random_changeset(m_ch, 0, 1, 128, apex, false); - ret = journal_store_changeset(j, m_ch); - is_int(KNOT_EOK, ret, "journal: store changeset (%d)", ret); - ret = journal_check(j, JOURNAL_CHECK_INFO); - is_int(KNOT_EOK, ret, "journal check (%d)", ret); + jj.db = &jdb; + jj.zone = apex; list_t l, k; - init_list(&l); - init_list(&k); - ret = journal_load_changesets(j, &l, 0); - add_tail(&k, &m_ch->n); - ok(ret == KNOT_EOK && changesets_list_eq(&l, &k), "journal: load changeset (%d)", ret); - ret = journal_check(j, JOURNAL_CHECK_STDERR); - is_int(KNOT_EOK, ret, "journal check (%d)", ret); - - /* Load ctx's. */ - chgset_ctx_list_t cl = { { 0 }, 0 }; - ret = journal_load_chgset_ctx(j, &cl, 0); - ok(ret == KNOT_EOK, "journal: chgset_ctx: load (%s)", knot_strerror(ret)); - chgset_ctx_list_close(&cl); + changeset_t *m_ch = changeset_new(apex), r_ch, e_ch; + init_random_changeset(m_ch, 0, 1, 128, apex, false); + int ret = journal_insert(jj, m_ch); + is_int(KNOT_EOK, ret, "journal: store changeset (%s)", knot_strerror(ret)); + ret = journal_sem_check(jj); + is_int(KNOT_EOK, ret, "journal: check after store changeset (%s)", knot_strerror(ret)); + journal_read_t *read = NULL; + + ret = load_j_list(&jj, false, changeset_from(m_ch), &read, &l); + is_int(KNOT_EOK, ret, "journal: read single changeset (%s)", knot_strerror(ret)); + ok(1 == list_size(&l), "journal: read exactly one changeset"); + ok(changesets_eq(m_ch, HEAD(l)), "journal: changeset equal after read"); changesets_free(&l); - changesets_free(&k); - /* Flush the journal. */ - ret = journal_flush(j); - is_int(KNOT_EOK, ret, "journal: first and simple flush (%d)", ret); - ret = journal_check(j, JOURNAL_CHECK_STDERR); - is_int(KNOT_EOK, ret, "journal check (%d)", ret); - init_list(&l); - init_list(&k); + journal_read_end(read); + ret = journal_set_flushed(jj); + is_int(KNOT_EOK, ret, "journal: first simple flush (%s)", knot_strerror(ret)); - /* Fill the journal. */ - ret = KNOT_EOK; + init_list(&k); uint32_t serial = 1; for (; ret == KNOT_EOK && serial < 40000; ++serial) { changeset_t *m_ch2 = changeset_new(apex); init_random_changeset(m_ch2, serial, serial + 1, 128, apex, false); - ret = journal_store_changeset(j, m_ch2); + ret = journal_insert(jj, m_ch2); if (ret != KNOT_EOK) { changeset_free(m_ch2); break; @@ -326,175 +337,118 @@ static void test_store_load(const knot_dname_t *apex) add_tail(&k, &m_ch2->n); } is_int(KNOT_EBUSY, ret, "journal: overfill with changesets (%d inserted) (%d should= %d)", - serial, ret, KNOT_EBUSY); - ret = journal_check(j, JOURNAL_CHECK_STDERR); - is_int(KNOT_EOK, ret, "journal check (%d)", ret); - - /* Load all changesets stored until now. */ - ret = journal_load_changesets(j, &l, 1); - ok(ret == KNOT_EOK && changesets_list_eq(&l, &k), "journal: load changesets (%d)", ret); + serial, ret, KNOT_EBUSY); + ret = journal_sem_check(jj); + is_int(KNOT_EOK, ret, "journal check (%s)", knot_strerror(ret)); + ret = load_j_list(&jj, false, 1, &read, &l); + is_int(KNOT_EOK, ret, "journal: load list (%s)", knot_strerror(ret)); + ok(changesets_list_eq(&l, &k), "journal: changeset lists equal after read"); + ok(test_continuity(&l) == KNOT_EOK, "journal: changesets are in order"); changesets_free(&l); - init_list(&l); - ret = journal_load_changesets(j, &l, 1); - ok(ret == KNOT_EOK && changesets_list_eq(&l, &k), "journal: re-load changesets (%d)", ret); - - ret = journal_load_chgset_ctx(j, &cl, 1); - ok(ret == KNOT_EOK, "journal: chgset_ctx: load 2 (%s)", knot_strerror(ret)); - ok(list_size(&cl.l) == list_size(&l), "journal: chgset_ctx: load size %zu ?== %zu", list_size(&cl.l), list_size(&l)); - chgset_ctx_list_close(&cl); + journal_read_end(read); + ret = load_j_list(&jj, false, 1, &read, &l); + is_int(KNOT_EOK, ret, "journal: load list 2nd (%s)", knot_strerror(ret)); + ok(changesets_list_eq(&l, &k), "journal: changeset lists equal after 2nd read"); changesets_free(&l); - init_list(&l); + journal_read_end(read); - /* Flush the journal. */ - ret = journal_flush(j); - is_int(KNOT_EOK, ret, "journal: second flush (%d)", ret); - ret = journal_check(j, JOURNAL_CHECK_STDERR); - is_int(KNOT_EOK, ret, "journal check (%d)", ret); + ret = journal_set_flushed(jj); + is_int(KNOT_EOK, ret, "journal: flush after overfill (%s)", knot_strerror(ret)); + ret = journal_sem_check(jj); + is_int(KNOT_EOK, ret, "journal check (%s)", knot_strerror(ret)); - /* Test whether the journal kept changesets after flush. */ - ret = journal_load_changesets(j, &l, 1); - ok(ret == KNOT_EOK && changesets_list_eq(&l, &k), "journal: load right after flush (%d)", ret); + ret = load_j_list(&jj, false, 1, &read, &l); + is_int(KNOT_EOK, ret, "journal: load list (%s)", knot_strerror(ret)); + ok(changesets_list_eq(&l, &k), "journal: changeset lists equal after flush"); + changesets_free(&l); + journal_read_end(read); changesets_free(&k); - changesets_free(&l); - init_list(&k); - init_list(&l); - /* Store next changeset. */ changeset_t ch; ret = changeset_init(&ch, apex); ok(ret == KNOT_EOK, "journal: changeset init (%d)", ret); - init_random_changeset(&ch, serial, serial + 1, 128, apex, false); - ret = journal_store_changeset(j, &ch); - changeset_clear(&ch); + init_random_changeset(&ch, serial, serial + 1, 555, apex, false); + ret = journal_insert(jj, &ch); is_int(KNOT_EOK, ret, "journal: store after flush (%d)", ret); - ret = journal_check(j, JOURNAL_CHECK_STDERR); - is_int(KNOT_EOK, ret, "journal check (%d)", ret); - - /* Load last changesets. */ - init_list(&l); - ret = journal_load_changesets(j, &l, serial); + ret = journal_sem_check(jj); + is_int(KNOT_EOK, ret, "journal check (%s)", knot_strerror(ret)); + ret = load_j_list(&jj, false, serial, &read, &l); + is_int(KNOT_EOK, ret, "journal: load after store after flush after overfill (%s)", knot_strerror(ret)); + is_int(1, list_size(&l), "journal: single changeset in list"); + ok(changesets_eq(&ch, HEAD(l)), "journal: changeset unmalformed after overfill"); changesets_free(&l); - is_int(KNOT_EOK, ret, "journal: load changesets after flush (%d)", ret); - - /* Flush the journal again. */ - ret = journal_flush(j); - is_int(KNOT_EOK, ret, "journal: flush again (%d)", ret); - ret = journal_check(j, JOURNAL_CHECK_STDERR); - is_int(KNOT_EOK, ret, "journal check (%d)", ret); - - /* Fill the journal using a list. */ - uint32_t m_serial = 1; - for (; m_serial < serial / 2; ++m_serial) { - changeset_t *m_ch7 = changeset_new(apex); - init_random_changeset(m_ch7, m_serial, m_serial + 1, 128, apex, false); - add_tail(&l, &m_ch7->n); - } - ret = journal_store_changesets(j, &l); - is_int(KNOT_EOK, ret, "journal: fill with changesets using a list (%d inserted)", m_serial); - ret = journal_check(j, JOURNAL_CHECK_STDERR); - is_int(KNOT_EOK, ret, "journal check (%d)", ret); - - /* Cleanup. */ - changesets_free(&l); - init_list(&l); - - /* Load all previous changesets. */ - ret = journal_load_changesets(j, &l, 1); - ok(ret == KNOT_EOK && knot_soa_serial(((changeset_t *)TAIL(l))->soa_to->rrs.rdata) == m_serial, - "journal: load all changesets"); + journal_read_end(read); - /* Check for changeset ordering. */ - ok(test_continuity(&l) == KNOT_EOK, "journal: changesets are in order"); + changeset_clear(&ch); - /* Cleanup. */ + changeset_init(&ch, apex); + init_random_changeset(&ch, 2, 3, 100, apex, false); + ret = journal_insert(jj, &ch); + is_int(KNOT_EOK, ret, "journal: insert discontinuous changeset (%s)", knot_strerror(ret)); + ret = journal_sem_check(jj); + is_int(KNOT_EOK, ret, "journal check (%s)", knot_strerror(ret)); + ret = load_j_list(&jj, false, 2, &read, &l); + is_int(KNOT_EOK, ret, "journal: read after discontinuity (%s)", knot_strerror(ret)); + is_int(1, list_size(&l), "journal: dicontinuity caused journal to drop"); changesets_free(&l); - init_list(&l); - ret = journal_flush(j); - is_int(KNOT_EOK, ret, "journal: allways ok journal_flush 0"); - ret = journal_drop_changesets(j); /* Clear the journal for the collision test */ - is_int(KNOT_EOK, ret, "journal: allways ok journal_drop_changesets"); - - /* Test for serial number collision handling. We insert changesets - * with valid serial sequence that overflows and then collides with itself. - * The sequence is 0 -> 1 -> 2 -> 2147483647 -> 4294967294 -> 1 which should - * remove changesets 0->1 and 1->2. */ - ok(EMPTY_LIST(k), "journal: empty list k"); - ok(EMPTY_LIST(l), "journal: empty list l"); - changeset_t *m_ch3 = changeset_new(apex); - init_random_changeset(m_ch3, 0, 1, 128, apex, false); - ret = journal_store_changeset(j, m_ch3); - is_int(KNOT_EOK, ret, "journal: allways ok journal_store_changeset 1"); - changeset_set_soa_serials(m_ch3, 1, 2, apex); - ret = journal_store_changeset(j, m_ch3); - is_int(KNOT_EOK, ret, "journal: allways ok journal_store_changeset 2"); - changeset_set_soa_serials(m_ch3, 2, 2147483647, apex); - add_tail(&k, &m_ch3->n); - ret = journal_store_changeset(j, m_ch3); - is_int(KNOT_EOK, ret, "journal: allways ok journal_store_changeset 3"); - changeset_t *m_ch4 = changeset_new(apex); - init_random_changeset(m_ch4, 2147483647, 4294967294, 128, apex, false); - add_tail(&k, &m_ch4->n); - ret = journal_store_changeset(j, m_ch4); - is_int(KNOT_EOK, ret, "journal: allways ok journal_store_changeset 4"); - changeset_t *m_ch5 = changeset_new(apex); - init_random_changeset(m_ch5, 4294967294, 1, 128, apex, false); - add_tail(&k, &m_ch5->n); - ret = journal_store_changeset(j, m_ch5); - is_int(KNOT_EBUSY, ret, "journal: allways ok journal_store_changeset 5"); - ret = journal_flush(j); - is_int(KNOT_EOK, ret, "journal: allways ok journal_flush 1"); - ret = journal_store_changeset(j, m_ch5); - is_int(KNOT_EOK, ret, "journal: allways ok journal_store_changeset 6"); - ret = journal_flush(j); - is_int(KNOT_EOK, ret, "journal: allways ok journal_flush 2"); - ret = journal_load_changesets(j, &l, 0); - ret2 = journal_load_changesets(j, &l, 1); - int ret3 = journal_load_changesets(j, &l, 2); - fprintf(stderr, "ret=%d ret2=%d ret3=%d\n", ret, ret2, ret3); - ok(ret == KNOT_ENOENT && ret2 == KNOT_ENOENT && ret3 == KNOT_EOK && - changesets_list_eq(&l, &k), "journal: serial collision"); - ret = journal_check(j, JOURNAL_CHECK_STDERR); - is_int(KNOT_EOK, ret, "journal check (%d)", ret); - - /* Cleanup. */ + journal_read_end(read); + + // Test for serial number collision handling. We insert changesets + // with valid serial sequence that overflows and then collides with itself. + // The sequence is 0 -> 1 -> 2 -> 2147483647 -> 4294967294 -> 1 which should + // remove changesets 0->1 and 1->2. * + uint32_t serials[6] = { 0, 1, 2, 2147483647, 4294967294, 1 }; + for (int i = 0; i < 5; i++) { + changeset_clear(&ch); + changeset_init(&ch, apex); + init_random_changeset(&ch, serials[i], serials[i + 1], 100, apex, false); + ret = journal_insert(jj, &ch); + is_int(i == 4 ? KNOT_EBUSY : KNOT_EOK, ret, "journal: inserting cycle (%s)", knot_strerror(ret)); + ret = journal_sem_check(jj); + is_int(KNOT_EOK, ret, "journal check (%s)", knot_strerror(ret)); + } + ret = journal_set_flushed(jj); + is_int(KNOT_EOK, ret, "journal: flush in cycle (%s)", knot_strerror(ret)); + ret = journal_insert(jj, &ch); + is_int(KNOT_EOK, ret, "journal: inserted cycle (%s)", knot_strerror(ret)); + ret = journal_sem_check(jj); + is_int(KNOT_EOK, ret, "journal check (%s)", knot_strerror(ret)); + ret = journal_read_begin(jj, false, 0, &read); + is_int(KNOT_ENOENT, ret, "journal: cycle removed first changeset (%d should= %d)", ret, KNOT_ENOENT); + ret = journal_read_begin(jj, false, 1, &read); + is_int(KNOT_ENOENT, ret, "journal: cycle removed second changeset (%d should= %d)", ret, KNOT_ENOENT); + ret = load_j_list(&jj, false, 4294967294, &read, &l); + is_int(KNOT_EOK, ret, "journal: read after cycle (%s)", knot_strerror(ret)); + ok(3 >= list_size(&l), "journal: cycle caused journal to partly drop"); + ok(changesets_eq(&ch, HEAD(l)), "journal: changeset unmalformed after cycle"); changesets_free(&l); - changesets_free(&k); - - init_list(&l); - init_list(&k); - - /* Check bootstrap changeset */ - ret = journal_drop_changesets(j); - ok(ret == KNOT_EOK, "journal: journal_drop_changesets must be ok"); - - changeset_t *m_ch6 = changeset_new(apex); - init_random_changeset(m_ch6, 0, 1, 128, apex, true); - ret = journal_store_changeset(j, m_ch6); - ok(ret == KNOT_EOK, "journal: store bootstrap (%d)", ret); - ret = journal_check(j, JOURNAL_CHECK_STDERR); - ok(ret == KNOT_EOK, "journal check (%d)", ret); - changeset_t *m_ch7 = changeset_new(apex); - init_random_changeset(m_ch7, 1, 2, 128, apex, false); - ret = journal_store_changeset(j, m_ch7); - ok(ret == KNOT_EOK, "journal: store after bootstrap (%d)", ret); - add_tail(&k, &m_ch6->n); - add_tail(&k, &m_ch7->n); - ret = journal_load_bootstrap(j, &l); - ok(ret == KNOT_EOK && changesets_list_eq(&l, &k), "journal: load boostrap (%d)", ret); - ret = journal_check(j, JOURNAL_CHECK_STDERR); - ok(ret == KNOT_EOK, "journal check (%d)", ret); - + journal_read_end(read); + changeset_clear(&ch); + changeset_free(m_ch); + + changeset_init(&e_ch, apex); + init_random_changeset(&e_ch, 0, 1, 200, apex, true); + ret = journal_insert_zone(jj, &e_ch); + is_int(KNOT_EOK, ret, "journal: insert zone-in-journal (%s)", knot_strerror(ret)); + changeset_init(&r_ch, apex); + init_random_changeset(&r_ch, 1, 2, 200, apex, false); + ret = journal_insert(jj, &r_ch); + is_int(KNOT_EOK, ret, "journal: insert after zone-in-journal (%s)", knot_strerror(ret)); + ret = load_j_list(&jj, true, 0, &read, &l); + is_int(KNOT_EOK, ret, "journal: load zone-in-journal (%s)", knot_strerror(ret)); + is_int(2, list_size(&l), "journal: read two changesets from zone-in-journal"); + ok(changesets_eq(&e_ch, HEAD(l)), "journal: zone-in-journal unmalformed"); + ok(changesets_eq(&r_ch, TAIL(l)), "journal: after zone-in-journal unmalformed"); changesets_free(&l); - changesets_free(&k); + journal_read_end(read); + changeset_clear(&e_ch); + changeset_clear(&r_ch); - init_list(&l); - init_list(&k); - - ret = journal_scrape(j); - ok(ret == KNOT_EOK, "journal: scrape must be ok"); + ret = journal_scrape_with_md(jj); + is_int(KNOT_EOK, ret, "journal: scrape with md (%s)", knot_strerror(ret)); unset_conf(); } @@ -605,11 +559,8 @@ static changeset_t * tm_chs(const knot_dname_t * apex, int x) static int merged_present(void) { - local_txn_t(txn, j); - txn_begin(txn, 0); - int res = md_flag(txn, MERGED_SERIAL_VALID); - txn_abort(txn); - return res; + bool exists, has_merged; + return journal_info(jj, &exists, NULL, NULL, &has_merged, NULL, NULL, NULL) == KNOT_EOK && exists && has_merged; } static void test_merge(const knot_dname_t *apex) @@ -618,67 +569,67 @@ static void test_merge(const knot_dname_t *apex) list_t l; // allow merge - set_conf(-1, 512 * 1024, apex); - ok(journal_merge_allowed(j), "journal: merge allowed"); + set_conf(-1, 100 * 1024, apex); + ok(!journal_allow_flush(jj), "journal: merge allowed"); - ret = journal_drop_changesets(j); + ret = journal_scrape_with_md(jj); is_int(KNOT_EOK, ret, "journal: journal_drop_changesets must be ok"); // insert stuff and check the merge for (i = 0; !merged_present() && i < 40000; i++) { - ret = journal_store_changeset(j, tm_chs(apex, i)); + ret = journal_insert(jj, tm_chs(apex, i)); is_int(KNOT_EOK, ret, "journal: journal_store_changeset must be ok"); } - init_list(&l); - ret = journal_load_changesets(j, &l, 0); + ret = journal_sem_check(jj); + is_int(KNOT_EOK, ret, "journal: sem check (%s)", knot_strerror(ret)); + journal_read_t *read = NULL; + ret = load_j_list(&jj, false, 0, &read, &l); is_int(KNOT_EOK, ret, "journal: journal_load_changesets must be ok"); + assert(ret == KNOT_EOK); ok(list_size(&l) == 2, "journal: read the merged and one following"); changeset_t * mch = (changeset_t *)HEAD(l); ok(list_size(&l) >= 1 && tm_rrcnt(mch, 1) == 2, "journal: merged additions # = 2"); ok(list_size(&l) >= 1 && tm_rrcnt(mch, -1) == 1, "journal: merged removals # = 1"); changesets_free(&l); + journal_read_end(read); // insert one more and check the #s of results - journal_store_changeset(j, tm_chs(apex, i)); - init_list(&l); - ret = journal_load_changesets(j, &l, 0); + ret = journal_insert(jj, tm_chs(apex, i)); + is_int(KNOT_EOK, ret, "journal: insert one more (%s)", knot_strerror(ret)); + ret = load_j_list(&jj, false, 0, &read, &l); is_int(KNOT_EOK, ret, "journal: journal_load_changesets2 must be ok"); ok(list_size(&l) == 3, "journal: read merged together with new changeset"); changesets_free(&l); - init_list(&l); - ret = journal_load_changesets(j, &l, (uint32_t) (i - 3)); + journal_read_end(read); + ret = load_j_list(&jj, false, i - 3, &read, &l); is_int(KNOT_EOK, ret, "journal: journal_load_changesets3 must be ok"); ok(list_size(&l) == 4, "journal: read short history of merged/unmerged changesets"); changesets_free(&l); + journal_read_end(read); - ret = journal_drop_changesets(j); + + ret = journal_scrape_with_md(jj); assert(ret == KNOT_EOK); // disallow merge unset_conf(); set_conf(1000, 512 * 1024, apex); - ok(!journal_merge_allowed(j), "journal: merge disallowed"); + ok(journal_allow_flush(jj), "journal: merge disallowed"); tm_rrs(NULL, 0); tm_chs(NULL, 0); unset_conf(); } -static void test_stress_base(journal_t *journal, const knot_dname_t *apex, +static void test_stress_base(const knot_dname_t *apex, size_t update_size, size_t file_size) { int ret; uint32_t serial = 0; - journal_close(journal); - journal_db_close(&db); - db = NULL; - ret = journal_db_init(&db, test_dir_name, file_size, JOURNAL_MODE_ASYNC); - assert(ret == KNOT_EOK); - ret = journal_open_db(&db); - assert(ret == KNOT_EOK); - ret = journal_open(journal, &db, apex); - assert(ret == KNOT_EOK); + + ret = knot_lmdb_reconfigure(&jdb, test_dir_name, file_size, journal_env_flags(JOURNAL_MODE_ASYNC)); + is_int(KNOT_EOK, ret, "journal: recofigure to mapsize %zu (%s)", file_size, knot_strerror(ret)); set_conf(1000, file_size / 2, apex); @@ -691,7 +642,7 @@ static void test_stress_base(journal_t *journal, const knot_dname_t *apex, serial = 0; while (true) { changeset_set_soa_serials(&ch, serial, serial + 1, apex); - ret = journal_store_changeset(journal, &ch); + ret = journal_insert(jj, &ch); if (ret == KNOT_EOK) { serial++; } else { @@ -699,8 +650,11 @@ static void test_stress_base(journal_t *journal, const knot_dname_t *apex, } } - ret = journal_flush(journal); - ok(serial > 0 && ret == KNOT_EOK, "journal: pass #%d fillup run (%d inserts)", i, serial); + ret = journal_set_flushed(jj); + if (ret == KNOT_EOK) { + ret = journal_sem_check(jj); + } + ok(serial > 0 && ret == KNOT_EOK, "journal: pass #%d fillup run (%d inserts) (%s)", i, serial, knot_strerror(ret)); } changeset_clear(&ch); @@ -709,16 +663,16 @@ static void test_stress_base(journal_t *journal, const knot_dname_t *apex, } /*! \brief Test behavior when writing to jurnal and flushing it. */ -static void test_stress(journal_t *journal, const knot_dname_t *apex) +static void test_stress(const knot_dname_t *apex) { diag("stress test: small data"); - test_stress_base(journal, apex, 40, (1024 + 512) * 1024); + test_stress_base(apex, 40, (1024 + 512) * 1024); diag("stress test: medium data"); - test_stress_base(journal, apex, 400, 3 * 1024 * 1024); + test_stress_base(apex, 400, 3 * 1024 * 1024); diag("stress test: large data"); - test_stress_base(journal, apex, 4000, 10 * 1024 * 1024); + test_stress_base(apex, 4000, 10 * 1024 * 1024); } int main(int argc, char *argv[]) @@ -735,11 +689,9 @@ int main(int argc, char *argv[]) test_merge(apex); - test_stress(j, apex); + test_stress(apex); - journal_close(j); - journal_free(&j); - journal_db_close(&db); + knot_lmdb_deinit(&jdb); test_rm_rf(test_dir_name); free(test_dir_name); -- GitLab