From a6a600eb4756351bed4804b896805e7bf527bd64 Mon Sep 17 00:00:00 2001
From: Libor Peltan <libor.peltan@nic.cz>
Date: Mon, 12 Dec 2016 12:29:15 +0100
Subject: [PATCH] journal: added max-journal-usage and max-journal-depth conf
 parameters

---
 src/knot/conf/scheme.c     |  21 +++++---
 src/knot/conf/scheme.h     |   2 +
 src/knot/journal/journal.c | 104 ++++++++++++++++++++++++++++++++-----
 src/knot/journal/journal.h |   1 +
 tests/journal_lmdb.c       |  29 ++++++-----
 5 files changed, 124 insertions(+), 33 deletions(-)

diff --git a/src/knot/conf/scheme.c b/src/knot/conf/scheme.c
index 1af7dd9f3d..a6cc3fd656 100644
--- a/src/knot/conf/scheme.c
+++ b/src/knot/conf/scheme.c
@@ -45,6 +45,12 @@
 #define HOURS(x)	((x) * 3600)
 #define DAYS(x)		((x) * HOURS(24))
 
+#define GIGA		(1024LLU * 1024 * 1024)
+#define TERA		(1024 * GIGA)
+
+#define VIRT_MEM_TOP_32BIT (2 * GIGA)
+#define VIRT_MEM_LIMIT(x) (((sizeof(void *) < 8) && ((x) > VIRT_MEM_TOP_32BIT)) ? VIRT_MEM_TOP_32BIT : (x))
+
 #define FMOD		(YP_FMULTI | CONF_IO_FRLD_MOD | CONF_IO_FRLD_ZONES)
 
 static const knot_lookup_t keystore_backends[] = {
@@ -225,11 +231,6 @@ static const yp_item_t desc_remote[] = {
 	{ NULL }
 };
 
-
-#define VIRT_MEM_TOP (2LLU * 1024 * 1204 * 1204)
-#define VIRT_MEM_LIMIT(x) (((sizeof(void *) < 8) && ((x) > VIRT_MEM_TOP)) ? VIRT_MEM_TOP : (x))
-
-
 #define ZONE_ITEMS(FLAGS) \
 	{ C_STORAGE,             YP_TSTR,  YP_VSTR = { STORAGE_DIR }, FLAGS }, \
 	{ C_FILE,                YP_TSTR,  YP_VNONE, FLAGS }, \
@@ -243,6 +244,9 @@ static const yp_item_t desc_remote[] = {
 	{ C_IXFR_DIFF,           YP_TBOOL, YP_VNONE }, \
 	{ C_MAX_ZONE_SIZE,       YP_TINT,  YP_VINT = { 0, INT64_MAX, INT64_MAX, YP_SSIZE }, \
 	                                   FLAGS }, \
+	{ C_MAX_JOURNAL_USAGE,   YP_TINT,  YP_VINT = { 40 * 1024, INT64_MAX, 100 * 1024 * 1024, \
+	                                               YP_SSIZE } }, \
+	{ C_MAX_JOURNAL_DEPTH,   YP_TINT,  YP_VINT = { 2, INT64_MAX, INT64_MAX, YP_SSIZE } }, \
 	{ C_KASP_DB,             YP_TSTR,  YP_VSTR = { "keys" }, FLAGS }, \
 	{ C_DNSSEC_SIGNING,      YP_TBOOL, YP_VNONE, FLAGS }, \
 	{ C_DNSSEC_POLICY,       YP_TREF,  YP_VREF = { C_POLICY }, FLAGS, { check_ref_dflt } }, \
@@ -258,9 +262,10 @@ static const yp_item_t desc_template[] = {
 	{ C_TIMER_DB,            YP_TSTR,  YP_VSTR = { "timers" }, CONF_IO_FRLD_ZONES }, \
 	{ C_GLOBAL_MODULE,       YP_TDATA, YP_VDATA = { 0, NULL, mod_id_to_bin, mod_id_to_txt }, \
 	                                   YP_FMULTI | CONF_IO_FRLD_MOD, { check_modref } }, \
-	{ C_JOURNAL,             YP_TSTR,  YP_VSTR = { "journal.db" }, CONF_IO_FRLD_ZONES }, \
-	{ C_MAX_JOURNAL_SIZE,    YP_TINT,  YP_VINT = { 1024 * 1024, VIRT_MEM_LIMIT(100LLU * 1024 * 1024 * 1024 * 1024), \
-	                                               VIRT_MEM_LIMIT(20LLU * 1024 * 1024 * 1024), YP_SSIZE } }, \
+	{ C_JOURNAL,             YP_TSTR,  YP_VSTR = { "journal.db" }, CONF_IO_FRLD_SRV }, \
+	{ C_MAX_JOURNAL_SIZE,    YP_TINT,  YP_VINT = { 1024 * 1024, VIRT_MEM_LIMIT(100 * TERA), \
+	                                               VIRT_MEM_LIMIT(20 * GIGA), YP_SSIZE }, \
+	                                               CONF_IO_FRLD_SRV }, \
 	{ NULL }
 };
 
diff --git a/src/knot/conf/scheme.h b/src/knot/conf/scheme.h
index 26820e917a..7deb2084d8 100644
--- a/src/knot/conf/scheme.h
+++ b/src/knot/conf/scheme.h
@@ -63,6 +63,8 @@
 #define C_MANUAL		"\x06""manual"
 #define C_MASTER		"\x06""master"
 #define C_MAX_JOURNAL_SIZE	"\x10""max-journal-size"
+#define C_MAX_JOURNAL_USAGE	"\x11""max-journal-usage"
+#define C_MAX_JOURNAL_DEPTH	"\x11""max-journal-depth"
 #define C_MAX_TCP_CLIENTS	"\x0F""max-tcp-clients"
 #define C_MAX_UDP_PAYLOAD	"\x0F""max-udp-payload"
 #define C_MAX_ZONE_SIZE		"\x0D""max-zone-size"
diff --git a/src/knot/journal/journal.c b/src/knot/journal/journal.c
index 0f8302a63d..89f351818d 100644
--- a/src/knot/journal/journal.c
+++ b/src/knot/journal/journal.c
@@ -52,7 +52,7 @@ enum {
 static int journal_flush_allowed(journal_t *j) {
 	conf_val_t val = conf_zone_get(conf(), C_ZONEFILE_SYNC, j->zone);
 	if (val.item == NULL || conf_int(&val) >= 0) {
-		return 1; // val->item == NULL  --->  default behaviour, ie standard flush, no merge.
+		return 1;
 	}
 	return 0;
 }
@@ -61,6 +61,18 @@ static int journal_merge_allowed(journal_t *j) {
 	return !journal_flush_allowed(j); // TODO think of other behaviour, e.g. setting
 }
 
+static size_t journal_max_usage(journal_t * j)
+{
+	conf_val_t val = conf_zone_get(conf(), C_MAX_JOURNAL_USAGE, j->zone);
+	return conf_int(&val);
+}
+
+static size_t journal_max_changesets(journal_t * j)
+{
+	conf_val_t val = conf_zone_get(conf(), C_MAX_JOURNAL_DEPTH, j->zone);
+	return conf_int(&val);
+}
+
 static float journal_tofree_factor(journal_t *j)
 {
 	return 2.0f;
@@ -194,6 +206,7 @@ static void txn_begin(txn_t *txn, int write_allowed)
 	txn_begin_md(last_flushed);
 	txn_begin_md(merged_serial);
 	txn_begin_md(dirty_serial);
+	txn_begin_md(changeset_count);
 	txn_begin_md(flags);
 }
 
@@ -301,6 +314,7 @@ static void txn_commit(txn_t *txn)
 		txn_commit_md(last_flushed);
 		txn_commit_md(merged_serial);
 		txn_commit_md(dirty_serial);
+		txn_commit_md(changeset_count);
 		txn_commit_md(flags);
 	}
 
@@ -453,13 +467,6 @@ static void unmake_header(const knot_db_val_t *from, uint32_t *serial_to,
 	if (header_size != NULL) *header_size = sizeof(*h);
 }
 
-static size_t journal_max_occupied(journal_t *j, txn_t *txn)
-{
-	uint32_t jcnt;
-	md_get(txn, NULL, MDKEY_GLOBAL_JOURNAL_COUNT, &jcnt);
-	return (txn->ret == KNOT_EOK ? (j->db->fslimit / (jcnt + 1)) : 0);
-}
-
 static uint32_t first_digit(uint32_t of)
 {
 	while (of > 9) of /= 10;
@@ -758,8 +765,10 @@ static int del_upto_itercb(iteration_ctx_t *ctx)
 	// one whole changeset has been deleted => update metadata. We are sure that the deleted changeset is first at this time. If it's not merged changeset, point first_serial to next one
 	if (ctx->chunk_index == ctx->chunk_count - 1) {
 		if (!md_flag(ctx->txn, MERGED_SERIAL_VALID) ||
-		    serial_compare(ctx->txn->shadow_md.merged_serial,ctx->serial) != 0)
+		    serial_compare(ctx->txn->shadow_md.merged_serial,ctx->serial) != 0) {
 			ctx->txn->shadow_md.first_serial = ctx->serial_to;
+			ctx->txn->shadow_md.changeset_count--;
+		}
 		if (serial_compare(ctx->txn->shadow_md.last_flushed, ctx->serial) == 0) ctx->txn->shadow_md.flags &= ~LAST_FLUSHED_VALID;
 		if (serial_compare(ctx->txn->shadow_md.last_serial,  ctx->serial) == 0) ctx->txn->shadow_md.flags &= ~SERIAL_TO_VALID;
 		if (serial_compare(ctx->txn->shadow_md.merged_serial,ctx->serial) == 0) ctx->txn->shadow_md.flags &= ~MERGED_SERIAL_VALID;
@@ -817,6 +826,7 @@ static int del_tofree_itercb(iteration_ctx_t *ctx)
 	// when whole changeset deleted, check target and update metadata
 	if (ctx->chunk_index == ctx->chunk_count - 1) {
 		ctx->txn->shadow_md.first_serial = ctx->serial_to;
+		ctx->txn->shadow_md.changeset_count--;
 		if (serial_compare(ctx->txn->shadow_md.last_flushed, ctx->serial) == 0) {
 			ctx->txn->shadow_md.flags &= ~LAST_FLUSHED_VALID;
 			ds->to_be_freed = 0; // prevents deleting unflushed changesets
@@ -855,6 +865,55 @@ static int delete_tofree(journal_t *j, txn_t *_txn, size_t to_be_freed, size_t *
 	txn_ret(txn);
 }
 
+static int del_count_itercb(iteration_ctx_t *ctx)
+{
+	delete_status_t *ds = ctx->iter_context;
+	if (ds->freed_approx >= ds->to_be_freed) {
+		return KNOT_EOK;
+	}
+	txn_key_2u32(ctx->txn, ctx->txn->j->zone, ctx->serial, ctx->chunk_index);
+	txn_del(ctx->txn);
+	txn_check_ret(ctx->txn);
+
+	// when whole changeset deleted, check target and update metadata
+	if (ctx->chunk_index == ctx->chunk_count - 1) {
+		ctx->txn->shadow_md.first_serial = ctx->serial_to;
+		ctx->txn->shadow_md.changeset_count--;
+		if (serial_compare(ctx->txn->shadow_md.last_flushed, ctx->serial) == 0) {
+			ctx->txn->shadow_md.flags &= ~LAST_FLUSHED_VALID;
+			ds->to_be_freed = ds->freed_approx; // prevents deleting unflushed changesets
+		}
+		if (serial_compare(ctx->txn->shadow_md.last_serial, ctx->serial) == 0) {
+			ctx->txn->shadow_md.flags &= ~SERIAL_TO_VALID;
+		}
+		ds->freed_approx++;
+	}
+	return KNOT_EOK;
+}
+
+/*!
+ * \brief Deletes specified number of changesets
+ *
+ * It tries deleting olny flushed changesets, preserves all unflushed ones.
+ *
+ * \retval KNOT_EOK if no error, even if too little or nothing deleted (check really_deleted for result); KNOT_E* if error
+ */
+static int delete_count(journal_t *j, txn_t *_txn, size_t to_be_deleted, size_t *really_deleted)
+{
+	reuse_txn(txn, j, _txn, 1);
+
+	if (!md_flag(txn, LAST_FLUSHED_VALID)) {
+		*really_deleted = 0;
+		return KNOT_EOK;
+	}
+	delete_status_t ds = { .freed_approx = 0, .to_be_freed = to_be_deleted };
+	iterate(j, txn, del_count_itercb, JOURNAL_ITERATION_CHUNKS, &ds, txn->shadow_md.first_serial, txn->shadow_md.last_serial);
+	unreuse_txn(txn, _txn);
+
+	if (txn->ret == KNOT_EOK) *really_deleted = ds.freed_approx;
+	txn_ret(txn);
+}
+
 static int delete_dirty_serial(journal_t *j, txn_t *_txn)
 {
 	reuse_txn(txn, j, _txn, 1);
@@ -988,7 +1047,7 @@ static int store_changesets(journal_t *j, list_t *changesets)
 	// PART 3 : check if we exceeded designed occupation and delete some
 	uint32_t occupied, occupied_max;
 	md_get(txn, j->zone, MDKEY_PERZONE_OCCUPIED, &occupied);
-	occupied_max = journal_max_occupied(j, txn);
+	occupied_max = journal_max_usage(j);
 	occupied += serialized_size_total;
 	if (occupied > occupied_max) {
 		size_t freed;
@@ -1008,6 +1067,20 @@ static int store_changesets(journal_t *j, list_t *changesets)
 		}
 	}
 
+	// PART 3.5 : check if we exceeded history depth
+	long over_limit = (long)txn->shadow_md.changeset_count - journal_max_changesets(j) +
+			  list_size(changesets) - (inserting_merged ? 1 : 0);
+	if (over_limit > 0) {
+		size_t deled;
+		delete_count(j, txn, over_limit, &deled);
+		over_limit -= deled;
+		if (over_limit > 0) {
+			try_flush
+			delete_count(j, txn, over_limit, &deled);
+			// ignore further errors here, the limit is not so important
+		}
+	}
+
 	// PART 4: continuity and duplicity check
 	changeset_t * chs_head = (HEAD(*changesets));
 	uint32_t serial = knot_soa_serial(&chs_head->soa_from->rrs);
@@ -1094,6 +1167,7 @@ static int store_changesets(journal_t *j, list_t *changesets)
 			txn->shadow_md.flags |= SERIAL_TO_VALID;
 			txn->shadow_md.last_serial = serial;
 			txn->shadow_md.last_serial_to = serial_to;
+			txn->shadow_md.changeset_count++;
 		}
 
 		free(allchunks);
@@ -1511,6 +1585,7 @@ int journal_check(journal_t *j, int warn_level)
 	changeset_t *ch;
 	uint32_t sfrom, sto;
 	uint32_t first_unflushed;
+	uint32_t chcount;
 
 	jch_info("started");
 
@@ -1523,9 +1598,10 @@ int journal_check(journal_t *j, int warn_level)
 	txn_begin(txn, 1);
 	jch_txn("begin", 1);
 
-	jch_info("metadata: flags >> %d << fs %u ls %u lst %u lf %u ms %u ds %u", txn->shadow_md.flags, txn->shadow_md.first_serial, txn->shadow_md.last_serial, txn->shadow_md.last_serial_to,
-                 txn->shadow_md.last_flushed, txn->shadow_md.merged_serial, txn->shadow_md.dirty_serial);
+	jch_info("metadata: flags >> %d << fs %u ls %u lst %u lf %u ms %u ds %u cnt %u", txn->shadow_md.flags, txn->shadow_md.first_serial, txn->shadow_md.last_serial, txn->shadow_md.last_serial_to,
+                 txn->shadow_md.last_flushed, txn->shadow_md.merged_serial, txn->shadow_md.dirty_serial, txn->shadow_md.changeset_count);
 
+	chcount = txn->shadow_md.changeset_count;
 	first_unflushed = txn->shadow_md.first_serial;
 
 	if (md_flag(txn, DIRTY_SERIAL_VALID)) {
@@ -1592,6 +1668,10 @@ int journal_check(journal_t *j, int warn_level)
 		goto check_merged;
 	}
 	jch_info("listed %zu changesets", list_size(&l));
+	if (list_size(&l) != chcount) {
+		jch_warn("expected %u changesets but found %zu", chcount, list_size(&l));
+	}
+
 	ch = HEAD(l);
 	if (serial_compare(sfrom, knot_soa_serial(&ch->soa_from->rrs)) != 0) {
 		jch_warn("first listed changeset's serial 'from' %u is not ok", knot_soa_serial(&ch->soa_from->rrs));
diff --git a/src/knot/journal/journal.h b/src/knot/journal/journal.h
index e2f175362f..b391cc33df 100644
--- a/src/knot/journal/journal.h
+++ b/src/knot/journal/journal.h
@@ -48,6 +48,7 @@ typedef struct {
 	uint32_t last_flushed;		// serial_from of the last flushed (or merged) chengeset
 	uint32_t merged_serial;		// "serial_from" of merged changeset
 	uint32_t dirty_serial;		// serial_from of an incompletely inserted changeset which shall be deleted (see DB_MAX_INSERT_TXN)
+	uint32_t changeset_count;	// # of changesets in this journal
 	uint32_t flags;			// LAST_FLUSHED_VALID, SERIAL_TO_VALID, MERGED_SERIAL_VALID
 	// specific metadata: occupied
 } journal_metadata_t;
diff --git a/tests/journal_lmdb.c b/tests/journal_lmdb.c
index 730b0871ac..b2ee468994 100644
--- a/tests/journal_lmdb.c
+++ b/tests/journal_lmdb.c
@@ -25,7 +25,6 @@
 #include <tap/files.h>
 
 #include "libknot/libknot.h"
-//#define JOURNAL_TEST_ENV
 #include "knot/journal/journal.c"
 #include "knot/zone/zone.h"
 #include "knot/zone/zone-diff.h"
@@ -43,6 +42,16 @@ journal_db_t * db; // global
 journal_t * j;
 uint8_t *apex = (uint8_t *)"\4test";
 
+static void set_conf(int zonefile_sync, size_t journal_usage)
+{
+	char conf_str[512];
+	snprintf(conf_str, 512, "zone:\n  - domain: %s\n    zonefile-sync: %d\n"
+		 "    max-journal-usage: %zu\n    max-journal-depth: 1000\n",
+		      (const char *)(apex + 1), zonefile_sync, journal_usage);
+	int ret = test_conf(conf_str, NULL);
+	assert(ret == KNOT_EOK);
+}
+
 /*! \brief Generate random string with given length. */
 static int randstr(char* dst, size_t len)
 {
@@ -262,6 +271,8 @@ static void test_store_load(void)
 	if (ret == KNOT_EOK) ret2 = journal_open(j, &db, apex);
 	ok(ret == KNOT_EOK, "journal: open (%d, %d)", ret, ret2);
 
+	set_conf(1000, 512 * 1024);
+
 	/* Save and load changeset. */
 	changeset_t *m_ch = changeset_new(apex);
 	init_random_changeset(m_ch, 0, 1, 128, apex);
@@ -537,12 +548,7 @@ static void test_merge(void)
 	list_t l;
 
 	// allow merge
-	const char *conf_str =
-		"zone:\n"
-		"  - domain: test\n"
-		"    zonefile-sync: -1\n";
-	ret = test_conf(conf_str, NULL);
-	assert(ret == KNOT_EOK);
+	set_conf(-1, 512 * 1024);
 	ok(journal_merge_allowed(j), "journal: merge allowed");
 
 	ret = drop_journal(j, NULL);
@@ -574,12 +580,7 @@ static void test_merge(void)
 	assert(ret == KNOT_EOK);
 
 	// disallow merge
-	const char *conf_str2 =
-		"zone:\n"
-		"  - domain: test\n"
-		"    zonefile-sync: 10\n";
-	ret = test_conf(conf_str2, NULL);
-	assert(ret == KNOT_EOK);
+	set_conf(1000, 512 * 1024);
 	ok(!journal_merge_allowed(j), "journal: merge disallowed");
 }
 
@@ -598,6 +599,8 @@ static void test_stress_base(journal_t *j, size_t update_size, size_t file_size)
 	ret = journal_open(j, &db, apex);
 	assert(ret == KNOT_EOK);
 
+	set_conf(1000, file_size / 2);
+
 	changeset_t ch;
 	changeset_init(&ch, apex);
 	init_random_changeset(&ch, serial, serial + 1, update_size, apex);
-- 
GitLab