Skip to content
Snippets Groups Projects
Forked from Knot projects / Knot DNS
9578 commits behind the upstream repository.
journal.c 29.73 KiB
/*  Copyright (C) 2011 CZ.NIC, z.s.p.o. <knot-dns@labs.nic.cz>

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    (at your option) any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <inttypes.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <assert.h>

#include "knot/common/debug.h"
#include "knot/server/journal.h"
#include "knot/server/serialization.h"
#include "libknot/rrtype/soa.h"

/*! \brief Infinite file size limit. */
#define FSLIMIT_INF (~((size_t)0))

/*! \brief Node classification macros. */
#define jnode_flags(j, i) ((j)->nodes[(i)].flags)

/*! \brief Next node. */
#define jnode_next(j, i) (((i) + 1) % (j)->max_nodes)

/*! \brief Previous node. */
#define jnode_prev(j, i) (((i) == 0) ? (j)->max_nodes - 1 : (i) - 1)

typedef uint32_t crc_t;
static const crc_t CRC_PLACEHOLDER = 0;

static inline int sfread(void *dst, size_t len, int fd)
{
	return read(fd, dst, len) == len;
}

static inline int sfwrite(const void *src, size_t len, int fd)
{
	return write(fd, src, len) == len;
}

static inline journal_node_t *journal_end(journal_t *journal) {
	return journal->nodes +  journal->qtail;
}

/*! \brief Equality compare function. */
static inline int journal_cmp_eq(uint64_t k1, uint64_t k2)
{
	if (k1 > k2) return 1;
	if (k1 < k2) return -1;
	return 0;
}

/*! \brief Return 'serial_from' part of the key. */
static inline uint32_t journal_key_from(uint64_t k)
{
	/*      64    32       0
	 * key = [TO   |   FROM]
	 * Need: Least significant 32 bits.
	 */
	return (uint32_t)(k & ((uint64_t)0x00000000ffffffff));
}

/*----------------------------------------------------------------------------*/

/*! \brief Compare function to match entries with starting serial. */
static inline int journal_key_from_cmp(uint64_t k, uint64_t from)
{
	/*      64    32       0
	 * key = [TO   |   FROM]
	 * Need: Least significant 32 bits.
	 */
	return ((uint64_t)journal_key_from(k)) - from;
}

/*! \brief Make key for journal from serials. */
static inline uint64_t ixfrdb_key_make(uint32_t from, uint32_t to)
{
	/*      64    32       0
	 * key = [TO   |   FROM]
	 */
	return (((uint64_t)to) << ((uint64_t)32)) | ((uint64_t)from);
}

/*! \brief Recover metadata from journal. */
static int journal_recover(journal_t *j)
{
	if (j == NULL) {
		return KNOT_EINVAL;
	}

	/* Attempt to recover queue. */
	int qstate[2] = { -1, -1 };
	unsigned c = 0, p = j->max_nodes - 1;
	while (1) {

		/* Fetch previous and current node. */
		journal_node_t *np = j->nodes + p;
		journal_node_t *nc = j->nodes + c;

		/* Check flags
		 * p c (0 = free, 1 = non-free)
		 * 0 0 - in free segment
		 * 0 1 - c-node is qhead
		 * 1 0 - c-node is qtail
		 * 1 1 - in full segment
		 */
		unsigned c_set = (nc->flags > JOURNAL_FREE);
		unsigned p_set = (np->flags > JOURNAL_FREE);
		if (!p_set && c_set && qstate[0] < 0) {
			qstate[0] = c; /* Recovered qhead. */
			dbg_journal_verb("journal: recovered qhead=%u\n",
			                 qstate[0]);
		}
		if (p_set && !c_set && qstate[1] < 0) {\
			qstate[1] = c; /* Recovered qtail. */
			dbg_journal_verb("journal: recovered qtail=%u\n",
			                 qstate[1]);
		}

		/* Both qstates set. */
		if (qstate[0] > -1 && qstate[1] > -1) {
			break;
		}
		/* Set prev and next. */
		p = c;
		c = (c + 1) % j->max_nodes;

		/* All nodes probed. */
		if (c == 0) {
			dbg_journal("journal: failed to recover node queue\n");
			break;
		}
	}

	/* Evaluate */
	if (qstate[0] < 0 || qstate[1] < 0) {
		return KNOT_ERANGE;
	}

	/* Write back. */
	int seek_ret = lseek(j->fd, JOURNAL_HSIZE - 2 * sizeof(uint16_t), SEEK_SET);
	if (seek_ret < 0 || !sfwrite(qstate, 2 * sizeof(uint16_t), j->fd)) {
		dbg_journal("journal: failed to write back queue state\n");
		return KNOT_ERROR;
	}

	/* Reset queue state. */
	j->qhead = qstate[0];
	j->qtail = qstate[1];
	dbg_journal("journal: node queue=<%u,%u> recovered\n",
	            qstate[0], qstate[1]);


	return KNOT_EOK;
}

/*! \brief Create new journal. */
static int journal_create_file(const char *fn, uint16_t max_nodes)
{
	if (fn == NULL) {
		return KNOT_EINVAL;
	}

	/* File lock. */
	struct flock fl = { .l_type = F_WRLCK, .l_whence = SEEK_SET,
	                    .l_start = 0, .l_len = 0, .l_pid = getpid() };

	/* Create journal file. */
	int fd = open(fn, O_RDWR|O_CREAT|O_TRUNC, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP);
	if (fd < 0) {
		dbg_journal("journal: failed to create file '%s'\n", fn);
		return knot_map_errno(errno);
	}

	/* Lock. */
	if (fcntl(fd, F_SETLKW, &fl) == -1) {
		close(fd);
		remove(fn);
		return KNOT_ERROR;
	}

	/* Create journal header. */
	dbg_journal("journal: creating header\n");
	const char magic[MAGIC_LENGTH] = JOURNAL_MAGIC;
	if (!sfwrite(magic, MAGIC_LENGTH, fd)) {
		close(fd);
		remove(fn);
		return KNOT_ERROR;
	}

	if (!sfwrite(&CRC_PLACEHOLDER, sizeof(CRC_PLACEHOLDER), fd)) {
		close(fd);
		remove(fn);
		return KNOT_ERROR;
	}
	if (!sfwrite(&max_nodes, sizeof(uint16_t), fd)) {
		close(fd);
		remove(fn);
		return KNOT_ERROR;
	}

	/* Create node queue head + tail.
	 * qhead points to least recent node
	 * qtail points to next free node
	 * qhead == qtail means empty queue
	 */
	uint16_t zval = 0;
	if (!sfwrite(&zval, sizeof(uint16_t), fd)) {
		close(fd);
		remove(fn);
		return KNOT_ERROR;
	}

	if (!sfwrite(&zval, sizeof(uint16_t), fd)) {
		close(fd);
		remove(fn);
		return KNOT_ERROR;
	}

	dbg_journal_verb("journal: creating free segment descriptor\n");

	/* Create free segment descriptor. */
	journal_node_t jn;
	memset(&jn, 0, sizeof(journal_node_t));
	jn.id = 0;
	jn.flags = JOURNAL_VALID;
	jn.pos = JOURNAL_HSIZE + (max_nodes + 1) * sizeof(journal_node_t);
	jn.len = 0;
	if (!sfwrite(&jn, sizeof(journal_node_t), fd)) {
		close(fd);
		remove(fn);
		return KNOT_ERROR;
	}

	/* Create nodes. */
	dbg_journal("journal: creating node table, size=%u\n", max_nodes);
	memset(&jn, 0, sizeof(journal_node_t));
	for(uint16_t i = 0; i < max_nodes; ++i) {
		if (!sfwrite(&jn, sizeof(journal_node_t), fd)) {
			close(fd);
			if (remove(fn) < 0) {
				dbg_journal("journal: failed to remove journal file after error\n");
			}
			return KNOT_ERROR;
		}
	}

	/* Unlock and close. */
	close(fd);

	/* Journal file created. */
	dbg_journal("journal: file '%s' initialized\n", fn);
	return KNOT_EOK;
}

/*! \brief Open journal file for r/w (returns error if not exists). */
static int journal_open_file(journal_t *j)
{
	assert(j != NULL);

	int ret = KNOT_EOK;
	j->fd = open(j->path, O_RDWR);
	dbg_journal_verb("journal: open_file '%s'\n", j->path);
	if (j->fd < 0) {
		if (errno != ENOENT) {
			return knot_map_errno(errno);
		}

		/* Create new journal file and open if not exists. */
		ret = journal_create_file(j->path, JOURNAL_NCOUNT);
		if(ret == KNOT_EOK) {
			return journal_open_file(j);
		}
		return ret;
	}

	/* File lock. */
	struct flock lock = { .l_type = F_WRLCK, .l_whence = SEEK_SET,
	                      .l_start  = 0, .l_len = 0, .l_pid = 0 };
	/* Attempt to lock. */
	dbg_journal_verb("journal: locking journal %s\n", j->path);
	ret = fcntl(j->fd, F_SETLKW, &lock);
	if (ret < 0) {
		return knot_map_errno(errno);
	}

	/* Read magic bytes. */
	dbg_journal("journal: reading magic bytes\n");
	const char magic_req[MAGIC_LENGTH] = JOURNAL_MAGIC;
	char magic[MAGIC_LENGTH];
	if (!sfread(magic, MAGIC_LENGTH, j->fd)) {
		dbg_journal_verb("journal: cannot read magic bytes\n");
		goto open_file_error;
	}
	if (memcmp(magic, magic_req, MAGIC_LENGTH) != 0) {
		log_warning("journal '%s', version too old, purging", j->path);
		close(j->fd);
		j->fd = -1;
		ret = journal_create_file(j->path, JOURNAL_NCOUNT);
		if(ret == KNOT_EOK) {
			return journal_open_file(j);
		}
		return ret;
	}

	/* Skip CRC */
	if (lseek(j->fd, MAGIC_LENGTH + sizeof(crc_t), SEEK_SET) < 0) {
		goto open_file_error;
	}

	/* Get journal file size. */
	struct stat st;
	if (fstat(j->fd, &st) < 0) {
		dbg_journal_verb("journal: cannot get journal fsize\n");
		goto open_file_error;
	}

	/* Set file size. */
	j->fsize = st.st_size;

	/* Read maximum number of entries. */
	if (!sfread(&j->max_nodes, sizeof(uint16_t), j->fd)) {
		dbg_journal_verb("journal: cannot read max_nodes\n");
		goto open_file_error;
	}

	/* Check max_nodes, but this is riddiculous. */
	if (j->max_nodes == 0) {
		dbg_journal_verb("journal: invalid max_nodes\n");
		goto open_file_error;
	}

	/* Allocate nodes. */
	const size_t node_len = sizeof(journal_node_t);
	j->nodes = malloc(j->max_nodes * node_len);
	if (j->nodes == NULL) {
		dbg_journal_verb("journal: can't allocate nodes\n");
		goto open_file_error;
	} else {
		memset(j->nodes, 0, j->max_nodes * node_len);
	}

	/* Load node queue state. */
	j->qhead = j->qtail = 0;
	if (!sfread(&j->qhead, sizeof(uint16_t), j->fd)) {
		dbg_journal_verb("journal: cannot read qhead\n");
		goto open_file_error;
	}

	/* Load queue tail. */
	if (!sfread(&j->qtail, sizeof(uint16_t), j->fd)) {
		dbg_journal_verb("journal: cannot read qtail\n");
		goto open_file_error;
	}

	/* Check head + tail */
	if (j->qtail >= j->max_nodes || j->qhead >= j->max_nodes) {
		dbg_journal_verb("journal: queue pointers corrupted\n");
		goto open_file_error;
	}

	/* Load empty segment descriptor. */
	if (!sfread(&j->free, node_len, j->fd)) {
		dbg_journal_verb("journal: cannot read free segment ptr\n");
		goto open_file_error;
	}

	/* Read journal descriptors table. */
	if (!sfread(j->nodes, j->max_nodes * node_len, j->fd)) {
		dbg_journal_verb("journal: cannot read node table\n");
		goto open_file_error;
	}

	dbg_journal("journal: opened journal size=%u, queue=<%u, %u>, fd=%d\n",
	            j->max_nodes, j->qhead, j->qtail, j->fd);

	/* Check node queue. */
	unsigned qtail_free = (jnode_flags(j, j->qtail) <= JOURNAL_FREE);
	unsigned qhead_free = j->max_nodes - 1; /* Left of qhead must be free.*/
	if (j->qhead > 0) {
		qhead_free = (j->qhead - 1);
	}
	qhead_free = (jnode_flags(j, qhead_free) <= JOURNAL_FREE);
	if ((j->qhead != j->qtail) && (!qtail_free || !qhead_free)) {
		log_warning("journal '%s', recovering metadata after crash", j->path);
		ret = journal_recover(j);
		if (ret != KNOT_EOK) {
			log_error("journal '%s', unrecoverable corruption (%s)",
			          j->path, knot_strerror(ret));
			goto open_file_error;
		}
	}

	/* Save file lock and return. */
	return KNOT_EOK;

	/* Unlock and close file and return error. */
open_file_error:
	free(j->nodes);
	j->nodes = NULL;
	close(j->fd);
	j->fd = -1;
	return KNOT_ERROR;
}

/*! \brief Close journal file. */
static int journal_close_file(journal_t *journal)
{
	/* Check journal. */
	if (journal == NULL) {
		return KNOT_EINVAL;
	}

	/* Close file. */
	if (journal->fd > 0) {
		close(journal->fd);
		journal->fd = -1;
	}

	/* Free nodes. */
	free(journal->nodes);
	journal->nodes = NULL;

	return KNOT_EOK;
}

/*!  \brief Sync node state to permanent storage. */
static int journal_update(journal_t *journal, journal_node_t *n)
{
	if (journal == NULL || n == NULL) {
		return KNOT_EINVAL;
	}

	/* Calculate node offset. */
	const size_t node_len = sizeof(journal_node_t);
	size_t i = n - journal->nodes;
	if (i > journal->max_nodes) {
		return KNOT_EINVAL;
	}

	/* Calculate node position in permanent storage. */
	long jn_fpos = JOURNAL_HSIZE + (i + 1) * node_len;

	dbg_journal("journal: syncing journal node=%zu id=%llu flags=0x%x\n",
		      i, (unsigned long long)n->id, n->flags);

	/* Write back. */
	int seek_ret = lseek(journal->fd, jn_fpos, SEEK_SET);
	if (seek_ret < 0 || !sfwrite(n, node_len, journal->fd)) {
		dbg_journal("journal: failed to writeback node=%llu to %ld\n",
		            (unsigned long long)n->id, jn_fpos);
		return KNOT_ERROR;
	}

	return KNOT_EOK;
}

int journal_write_in(journal_t *j, journal_node_t **rn, uint64_t id, size_t len)
{
	const size_t node_len = sizeof(journal_node_t);
	*rn = NULL;

	/* Find next free node. */
	uint16_t jnext = (j->qtail + 1) % j->max_nodes;

	dbg_journal("journal: will write id=%llu, node=%u, size=%zu, fsize=%zu\n",
	            (unsigned long long)id, j->qtail, len, j->fsize);

	/* Calculate remaining bytes to reach file size limit. */
	size_t fs_remaining = 0;
	if (j->fsize < j->fslimit) {
		fs_remaining = j->fslimit - j->fsize;
	}

	int seek_ret = 0;

	/* Increase free segment if on the end of file. */
	dbg_journal("journal: free.pos = %u free.len = %u\n",
	            j->free.pos, j->free.len);
	journal_node_t *n = j->nodes + j->qtail;
	if (j->free.pos + j->free.len == j->fsize) {

		dbg_journal_verb("journal: * is last node\n");

		/* Grow journal file until the size limit. */
		if(j->free.len < len && len <= fs_remaining) {
			size_t diff = len - j->free.len;
			dbg_journal("journal: * growing by +%zu, pos=%u, "
			            "new fsize=%zu\n",
			            diff, j->free.pos,
			            j->fsize + diff);
			j->fsize += diff; /* Appending increases file size. */
			j->free.len += diff;

		}

		/*  Rewind if resize is needed, but the limit is reached. */
		if(j->free.len < len && len > fs_remaining) {
			journal_node_t *head = j->nodes + j->qhead;
			j->fsize = j->free.pos;
			j->free.pos = head->pos;
			j->free.len = 0;
			dbg_journal_verb("journal: * fslimit reached, "
			                 "rewinding to %u\n",
			                 head->pos);
			dbg_journal_verb("journal: * file size trimmed to %zu\n",
			                 j->fsize);
		}
	}

	/* Count node visits to prevent looping. */
	uint16_t visit_count = 0;

	/* Evict occupied nodes if necessary. */
	while (j->free.len < len || j->nodes[jnext].flags > JOURNAL_FREE) {

		/* Evict least recent node if not empty. */
		journal_node_t *head = j->nodes + j->qhead;

		/* Check if it has been synced to disk. */
		if ((head->flags & JOURNAL_DIRTY) && (head->flags & JOURNAL_VALID)) {
			return KNOT_EBUSY;
		}

		/* Write back evicted node. */
		head->flags = JOURNAL_FREE;
		seek_ret = lseek(j->fd, JOURNAL_HSIZE + (j->qhead + 1) * node_len, SEEK_SET);
		if (seek_ret < 0 || !sfwrite(head, node_len, j->fd)) {
			return KNOT_ERROR;
		}

		dbg_journal("journal: * evicted node=%u, growing by +%u\n",
			      j->qhead, head->len);

		/* Write back query state. */
		j->qhead = (j->qhead + 1) % j->max_nodes;
		uint16_t qstate[2] = {j->qhead, j->qtail};
		seek_ret = lseek(j->fd, JOURNAL_HSIZE - 2 * sizeof(uint16_t), SEEK_SET);
		if (seek_ret < 0 || !sfwrite(qstate, 2 * sizeof(uint16_t), j->fd)) {
			return KNOT_ERROR;
		}

		/* Increase free segment. */
		j->free.len += head->len;

		/* Update node visit count. */
		visit_count += 1;
		if (visit_count >= j->max_nodes) {
			return KNOT_ESPACE;
		}
	}

	/* Invalidate node and write back. */
	n->id = id;
	n->pos = j->free.pos;
	n->len = len;
	n->flags = JOURNAL_FREE;
	n->next = jnext;
	journal_update(j, n);
	*rn = n;
	return KNOT_EOK;
}

int journal_write_out(journal_t *journal, journal_node_t *n)
{
	/* Mark node as valid and write back. */
	uint16_t jnext = n->next;
	size_t size = n->len;
	const size_t node_len = sizeof(journal_node_t);
	n->flags = JOURNAL_VALID | journal->bflags;
	n->next = 0;
	journal_update(journal, n);

	/* Handle free segment on node rotation. */
	if (journal->qtail > jnext && journal->fslimit == FSLIMIT_INF) {
		/* Trim free space. */
		journal->fsize -= journal->free.len;
		dbg_journal_verb("journal: * trimmed filesize to %zu\n",
		                 journal->fsize);

		/* Rewind free segment. */
		journal_node_t *n = journal->nodes + jnext;
		journal->free.pos = n->pos;
		journal->free.len = 0;

	} else {
		/* Mark used space. */
		journal->free.pos += size;
		journal->free.len -= size;
	}

	dbg_journal("journal: finishing node=%u id=%llu flags=0x%x, "
	            "data=<%u, %u> free=<%u, %u>\n",
	            journal->qtail, (unsigned long long)n->id,
	            n->flags, n->pos, n->pos + n->len,
	            journal->free.pos,
	            journal->free.pos + journal->free.len);

	/* Write back free segment state. */
	int seek_ret = lseek(journal->fd, JOURNAL_HSIZE, SEEK_SET);
	if (seek_ret < 0 || !sfwrite(&journal->free, node_len, journal->fd)) {
		/* Node is marked valid and failed to shrink free space,
		 * node will be overwritten on the next write. Return error.
		 */
		dbg_journal("journal: failed to write back "
		            "free segment descriptor\n");
		return KNOT_ERROR;
	}

	/* Node write successful. */
	journal->qtail = jnext;

	/* Write back queue state, not essential as it may be recovered.
	 * qhead - lowest valid node identifier (least recent)
	 * qtail - highest valid node identifier (most recently used)
	 */
	uint16_t qstate[2] = {journal->qhead, journal->qtail};
	seek_ret = lseek(journal->fd, JOURNAL_HSIZE - 2 * sizeof(uint16_t), SEEK_SET);
	if (seek_ret < 0 || !sfwrite(qstate, 2 * sizeof(uint16_t), journal->fd)) {
		dbg_journal("journal: failed to write back queue state\n");
		return KNOT_ERROR;
	}

	return KNOT_EOK;
}


journal_t* journal_open(const char *path, size_t fslimit)
{
	if (path == NULL) {
		return NULL;
	}

	journal_t *j = malloc(sizeof(journal_t));
	if (j == NULL) {
		return NULL;
	}

	memset(j, 0, sizeof(journal_t));
	j->bflags = JOURNAL_DIRTY;
	j->fd = -1;

	/* Set file size. */
	if (fslimit == 0) {
		j->fslimit = FSLIMIT_INF;
	} else {
		j->fslimit = fslimit;
	}

	/* Copy path. */
	j->path = strdup(path);
	if (j->path == NULL) {
		free(j);
		return NULL;
	}

	/* Open journal file. */
	int ret = journal_open_file(j);
	if (ret != KNOT_EOK) {
		journal_close(j);
		return NULL;
	}

	return j;
}

/*!
 * \brief Entry identifier compare function.
 *
 * \retval -n if k1 < k2
 * \retval +n if k1 > k2
 * \retval  0 if k1 == k2
 */
typedef int (*journal_cmp_t)(uint64_t k1, uint64_t k2);

static int journal_fetch(journal_t *journal, uint64_t id,
		  journal_cmp_t cf, journal_node_t** dst)
{
	if (journal == NULL || dst == NULL) {
		return KNOT_EINVAL;
	}

	/*! \todo Organize journal descriptors in btree? */
	size_t i = jnode_prev(journal, journal->qtail);
	size_t endp = jnode_prev(journal, journal->qhead);
	for(; i != endp; i = jnode_prev(journal, i)) {
		journal_node_t *n = journal->nodes + i;
		if (cf(n->id, id) == 0) {
			*dst = journal->nodes + i;
			return KNOT_EOK;
		}
	}

	return KNOT_ENOENT;
}

static int journal_read_node(journal_t *journal, journal_node_t *n, char *dst)
{
	dbg_journal("journal: reading node with id=%"PRIu64", data=<%u, %u>, flags=0x%hx\n",
	            n->id, n->pos, n->pos + n->len, n->flags);

	/* Check valid flag. */
	if (!(n->flags & JOURNAL_VALID)) {
		dbg_journal("journal: node with id=%llu is invalid "
		            "(flags=0x%hx)\n", (unsigned long long)n->id, n->flags);
		return KNOT_EINVAL;
	}

	/* Seek journal node. */
	int seek_ret = lseek(journal->fd, n->pos, SEEK_SET);

	/* Read journal node content. */
	if (seek_ret < 0 || !sfread(dst, n->len, journal->fd)) {
		return KNOT_ERROR;
	}

	return KNOT_EOK;
}

int journal_map(journal_t *journal, uint64_t id, char **dst, size_t size, bool rdonly)
{
	if (journal == NULL || dst == NULL) {
		return KNOT_EINVAL;
	}

	/* Check if entry exists. */
	journal_node_t *n = NULL;
	int ret = journal_fetch(journal, id, journal_cmp_eq, &n);
	if (ret != KNOT_EOK) {
		/* Return error if read only. */
		if (rdonly) {
			return ret;
		}

		/* Prepare journal write. */
		ret = journal_write_in(journal, &n, id, size);
		if (ret != KNOT_EOK) {
			return ret;
		}

		/* Reserve data in permanent storage. */
		/*! \todo This is only needed when inflating journal file. */
		if (lseek(journal->fd, n->pos, SEEK_SET) < 0) {
			return KNOT_ERROR;
		}
		char nbuf[4096] = {0};
		size_t wb = sizeof(nbuf);
		while (size > 0) {
			if (size < sizeof(nbuf)) {
				wb = size;
			}
			if (!sfwrite(nbuf, wb, journal->fd)) {
				return KNOT_ERROR;
			}
			size -= wb;
		}
	} else {
		/* Entry resizing is not really supported now. */
		if (n->len < size) {
			return KNOT_ESPACE;
		}
	}

	/* Align offset to page size (required). */
	const size_t ps = sysconf(_SC_PAGESIZE);
	off_t ps_delta = (n->pos % ps);
	off_t off = n->pos - ps_delta;

	/* Map file region. */
	*dst = mmap(NULL, n->len + ps_delta, PROT_READ | PROT_WRITE, MAP_SHARED,
	            journal->fd, off);
	if (*dst == ((void*)-1)) {
		dbg_journal("journal: couldn't mmap() fd=%d <%u,%u> %d\n",
		            journal->fd, n->pos, n->pos+n->len, errno);
		return KNOT_ERROR;
	}

	/* Advise usage of memory. */
#ifdef HAVE_MADVISE
	madvise(*dst, n->len + ps_delta, MADV_SEQUENTIAL);
#endif
	/* Correct dst pointer to alignment. */
	*dst += ps_delta;

	return KNOT_EOK;
}

int journal_unmap(journal_t *journal, uint64_t id, void *ptr, int finalize)
{
	if (journal == NULL || ptr == NULL) {
		return KNOT_EINVAL;
	}

	/* Mapped node is on tail. */
	/* @todo: This is hack to allow read-only correct unmap. */
	int ret = KNOT_EOK;
	journal_node_t *n = journal->nodes + journal->qtail;
	if (!finalize) {
		ret = journal_fetch(journal, id, journal_cmp_eq, &n);
		if (ret != KNOT_EOK) {
			return KNOT_ENOENT;
		}
	}
	if(n->id != id) {
		dbg_journal("journal: failed to find mmap node with id=%llu\n",
		            (unsigned long long)id);
		return KNOT_ENOENT;
	}

	/* Realign memory. */
	const size_t ps = sysconf(_SC_PAGESIZE);
	off_t ps_delta = (n->pos % ps);
	ptr = ((char*)ptr - ps_delta);

	/* Unmap memory. */
	if (munmap(ptr, n->len + ps_delta) != 0) {
		dbg_journal("journal: couldn't munmap() fd=%d <%u,%u> %d\n",
		            journal->fd, n->pos, n->pos+n->len, errno);
		return KNOT_ERROR;
	}

	/* Finalize. */
	if (finalize) {
		ret = journal_write_out(journal, n);
	}
	return ret;
}

int journal_close(journal_t *journal)
{
	/* Check journal. */
	if (journal == NULL) {
		return KNOT_EINVAL;
	}

	/* Close file. */
	journal_close_file(journal);

	/* Free allocated resources. */
	free(journal->path);
	free(journal);

	return KNOT_EOK;
}

bool journal_exists(const char *path)
{
	if (path == NULL) {
		return false;
	}

	/* Check journal file existence. */
	struct stat st;
	return stat(path, &st) == 0;
}

/*! \brief No doc here. Moved from zones.h (@mvavrusa) */
static int changesets_unpack(changeset_t *chs)
{

	/* Read changeset flags. */
	if (chs->data == NULL) {
		return KNOT_EMALF;
	}
	size_t remaining = chs->size;

	/* Read initial changeset RRSet - SOA. */
	uint8_t *stream = chs->data + (chs->size - remaining);
	knot_rrset_t rrset;
	int ret = rrset_deserialize(stream, &remaining, &rrset);
	if (ret != KNOT_EOK) {
		return KNOT_EMALF;
	}

	assert(rrset.type == KNOT_RRTYPE_SOA);
	chs->soa_from = knot_rrset_copy(&rrset, NULL);
	knot_rrset_clear(&rrset, NULL);
	if (chs->soa_from == NULL) {
		return KNOT_ENOMEM;
	}

	/* Read remaining RRSets */
	bool in_remove_section = true;
	while (remaining > 0) {

		/* Parse next RRSet. */
		stream = chs->data + (chs->size - remaining);
		knot_rrset_init_empty(&rrset);
		ret = rrset_deserialize(stream, &remaining, &rrset);
		if (ret != KNOT_EOK) {
			return KNOT_EMALF;
		}
		/* Check for next SOA. */
		if (rrset.type == KNOT_RRTYPE_SOA) {
			/* Move to ADD section if in REMOVE. */
			if (in_remove_section) {
				chs->soa_to = knot_rrset_copy(&rrset, NULL);
				if (chs->soa_to == NULL) {
					ret = KNOT_ENOMEM;
					break;
				}
				in_remove_section = false;
			} else {
				/* Final SOA, no-op. */
				;
			}
		} else {
			/* Remove RRSets. */
			if (in_remove_section) {
				ret = changeset_rem_rrset(chs, &rrset);
			} else {
				/* Add RRSets. */
				ret = changeset_add_rrset(chs, &rrset);
			}
		}
		knot_rrset_clear(&rrset, NULL);
		if (ret != KNOT_EOK) {
			break;
		}
	}

	return ret;
}

static int rrset_write_to_mem(const knot_rrset_t *rr, char **entry,
                              size_t *remaining) {
	size_t written = 0;
	int ret = rrset_serialize(rr, *((uint8_t **)entry),
	                          &written);
	if (ret == KNOT_EOK) {
		assert(written <= *remaining);
		*remaining -= written;
		*entry += written;
	}

	return ret;
}

static int serialize_and_store_chgset(const changeset_t *chs,
                                      char *entry, size_t max_size)
{
	/* Serialize SOA 'from'. */
	int ret = rrset_write_to_mem(chs->soa_from, &entry, &max_size);
	if (ret != KNOT_EOK) {
		return ret;
	}

	changeset_iter_t itt;
	ret = changeset_iter_rem(&itt, chs, false);
	if (ret != KNOT_EOK) {
		return ret;
	}

	knot_rrset_t rrset = changeset_iter_next(&itt);
	while (!knot_rrset_empty(&rrset)) {
		ret = rrset_write_to_mem(&rrset, &entry, &max_size);
		if (ret != KNOT_EOK) {
			changeset_iter_clear(&itt);
			return ret;
		}
		rrset = changeset_iter_next(&itt);
	}
	changeset_iter_clear(&itt);

	/* Serialize SOA 'to'. */
	ret = rrset_write_to_mem(chs->soa_to, &entry, &max_size);
	if (ret != KNOT_EOK) {
		return ret;
	}

	/* Serialize RRSets from the 'add' section. */
	ret = changeset_iter_add(&itt, chs, false);
	if (ret != KNOT_EOK) {
		return ret;
	}

	rrset = changeset_iter_next(&itt);
	while (!knot_rrset_empty(&rrset)) {
		ret = rrset_write_to_mem(&rrset, &entry, &max_size);
		if (ret != KNOT_EOK) {
			changeset_iter_clear(&itt);
			return ret;
		}
		rrset = changeset_iter_next(&itt);
	}
	changeset_iter_clear(&itt);

	return KNOT_EOK;
}

static int changeset_pack(const changeset_t *chs, journal_t *j)
{
	assert(chs != NULL);
	assert(j != NULL);

	uint64_t k = ixfrdb_key_make(knot_soa_serial(&chs->soa_from->rrs),
	                             knot_soa_serial(&chs->soa_to->rrs));

	/* Count the size of the entire changeset in serialized form. */
	size_t entry_size = 0;

	int ret = changeset_binary_size(chs, &entry_size);
	assert(ret == KNOT_EOK);

	/* Reserve space for the journal entry. */
	char *journal_entry = NULL;
	ret = journal_map(j, k, &journal_entry, entry_size, false);
	if (ret != KNOT_EOK) {
		return ret;
	}

	assert(journal_entry != NULL);

	/* Serialize changeset, saving it bit by bit. */
	ret = serialize_and_store_chgset(chs, journal_entry, entry_size);
	/* Unmap the journal entry.
	 * If successfuly written changeset to journal, validate the entry. */
	int unmap_ret = journal_unmap(j, k, journal_entry, ret == KNOT_EOK);
	if (ret == KNOT_EOK && unmap_ret != KNOT_EOK) {
		ret = unmap_ret; /* Propagate the result. */
	}

	return ret;
}

/*! \brief Helper for iterating journal (this is temporary until #80) */
typedef int (*journal_apply_t)(journal_t *, journal_node_t *, const zone_t *, list_t *);
static int journal_walk(const char *fn, uint32_t from, uint32_t to,
                        journal_apply_t cb, const zone_t *zone, list_t *chgs)
{
	/* Open journal for reading. */
	journal_t *journal = journal_open(fn, FSLIMIT_INF);
	if (journal == NULL) {
		return KNOT_ENOMEM;
	}

	/* Read entries from starting serial until finished. */
	uint32_t found_to = from;
	journal_node_t *n = 0;
	int ret = journal_fetch(journal, from, journal_key_from_cmp, &n);
	if (ret != KNOT_EOK) {
		goto finish;
	}

	while (n != 0 && n != journal_end(journal)) {
		/* Check for history end. */
		if (to == found_to) {
			break;
		}

		/* Skip wrong changesets. */
		if (!(n->flags & JOURNAL_VALID)) {
			++n;
			continue;
		}

		/* Callback. */
		ret = cb(journal, n, zone, chgs);
		if (ret != KNOT_EOK) {
			break;
		}

		++n;
	}

finish:
	/* Close journal. */
	journal_close(journal);
	return ret;
}

static int load_changeset(journal_t *journal, journal_node_t *n, const zone_t *zone, list_t *chgs)
{
	changeset_t *ch = changeset_new(zone->name);
	if (ch == NULL) {
		return KNOT_ENOMEM;
	}

	/* Initialize changeset. */
	ch->data = malloc(n->len);
	if (!ch->data) {
		return KNOT_ENOMEM;
	}

	/* Read journal entry. */
	int ret = journal_read_node(journal, n, (char*)ch->data);
	if (ret != KNOT_EOK) {
		return ret;
	}

	/* Update changeset binary size. */
	ch->size = n->len;

	/* Insert into changeset list. */
	add_tail(chgs, &ch->n);

	return KNOT_EOK;
}

int journal_load_changesets(const zone_t *zone, list_t *dst,
                            uint32_t from, uint32_t to)
{
	int ret = journal_walk(zone->conf->ixfr_db, from, to, &load_changeset, zone, dst);
	if (ret != KNOT_EOK) {
		return ret;
	}

	/* Unpack binary data. */
	assert(dst != NULL);
	/*
	 * Parses changesets from the binary format stored in chgsets->data
	 * into the changeset_t structures.
	 */
	changeset_t* chs = NULL;
	WALK_LIST(chs, *dst) {
		ret = changesets_unpack(chs);
		if (ret != KNOT_EOK) {
			return ret;
		}
	}

	/* Check for complete history. */
	changeset_t *last = TAIL(*dst);
	if (to != knot_soa_serial(&last->soa_to->rrs)) {
		return KNOT_ERANGE;
	}

	return KNOT_EOK;
}

int journal_store_changesets(list_t *src, const char *path, size_t size_limit)
{
	if (src == NULL || path == NULL) {
		return KNOT_EINVAL;
	}

	/* Open journal for reading. */
	int ret = KNOT_EOK;
	journal_t *journal = journal_open(path, size_limit);
	if (journal == NULL) {
		return KNOT_ENOMEM;
	}

	/* Begin writing to journal. */
	changeset_t *chs = NULL;
	WALK_LIST(chs, *src) {
		ret = changeset_pack(chs, journal);
		if (ret != KNOT_EOK) {
			break;
		}
	}

	journal_close(journal);
	return ret;
}

int journal_store_changeset(changeset_t *change, const char *path, size_t size_limit)
{
	if (change == NULL || path == NULL) {
		return KNOT_EINVAL;
	}

	/* Open journal for reading. */
	journal_t *journal = journal_open(path, size_limit);
	if (journal == NULL) {
		return KNOT_ENOMEM;
	}

	int ret = changeset_pack(change, journal);

	journal_close(journal);
	return ret;
}

static void mark_synced(journal_t *journal, journal_node_t *node)
{
	/* Check for dirty bit (not synced to permanent storage). */
	if (node->flags & JOURNAL_DIRTY) {
		/* Remove dirty bit. */
		node->flags = node->flags & ~JOURNAL_DIRTY;
		journal_update(journal, node);
	}
}

int journal_mark_synced(const char *path)
{
	if (!journal_exists(path)) {
		return KNOT_EOK;
	}

	journal_t *journal = journal_open(path, FSLIMIT_INF);
	if (journal == NULL) {
		return KNOT_ENOMEM;
	}

	size_t i = journal->qhead;
	for(; i != journal->qtail; i = (i + 1) % journal->max_nodes) {
		mark_synced(journal, journal->nodes + i);
	}

	journal_close(journal);

	return KNOT_EOK;
}