Skip to content
Snippets Groups Projects
Commit 3019d54b authored by Marek Vavrusa's avatar Marek Vavrusa
Browse files

Implemented working journal subsystem with unit test.

Commit refs #927, #928, #935.
parent 818a3049
No related branches found
No related tags found
No related merge requests found
......@@ -60,6 +60,8 @@ unittests_SOURCES = \
tests/knot/conf_tests.h \
tests/knot/dthreads_tests.c \
tests/knot/dthreads_tests.h \
tests/knot/journal_tests.c \
tests/knot/journal_tests.h \
tests/knot/server_tests.c \
tests/knot/server_tests.h \
tests/unittests_main.c
......
......@@ -19,6 +19,7 @@
#define NS_DEBUG
//#define SERVER_DEBUG
//#define DT_DEBUG
//#define JOURNAL_DEBUG
//#define NET_DEBUG
//#define ZONES_DEBUG
#define XFR_DEBUG
......@@ -47,6 +48,12 @@
#define debug_dt_hex(data, len)
#endif
#ifdef JOURNAL_DEBUG
#define debug_journal(msg...) log_msg(LOG_SERVER, LOG_DEBUG, msg)
#else
#define debug_journal(msg...)
#endif
#ifdef NS_DEBUG
#define debug_ns(msg...) log_msg(LOG_ANSWER, LOG_DEBUG, msg)
#define debug_ns_hex(data, len) hex_log(LOG_ANSWER, (data), (len))
......
......@@ -2,16 +2,17 @@
#include <stdio.h>
#include <string.h>
#include "knot/other/debug.h"
#include "journal.h"
static inline int sfread(void *dst, size_t len, FILE *fp)
{
return fread(dst, len, 1, fp) == len;
return fread(dst, len, 1, fp) == 1;
}
static inline int sfwrite(const void *src, size_t len, FILE *fp)
{
return fwrite(src, len, 1, fp) == len;
return fwrite(src, len, 1, fp) == 1;
}
int journal_create(const char *fn, uint16_t max_nodes)
......@@ -19,6 +20,7 @@ int journal_create(const char *fn, uint16_t max_nodes)
/* Create journal file. */
FILE *fp = fopen(fn, "w");
if (!fp) {
debug_journal("journal: failed to create file '%s'\n", fn);
return -1;
}
......@@ -28,16 +30,30 @@ int journal_create(const char *fn, uint16_t max_nodes)
/*! \todo Unlink created file on error? */
/* Create journal header. */
debug_journal("journal: creating header\n");
if (!sfwrite(&max_nodes, sizeof(uint16_t), fp)) {
fclose(fp);
return -1;
}
/* Create empty queue head + tail. */
uint16_t zval = 0;
if (!sfwrite(&zval, sizeof(uint16_t), fp)) {
fclose(fp);
return -1;
}
if (!sfwrite(&zval, sizeof(uint16_t), fp)) {
fclose(fp);
return -1;
}
debug_journal("journal: creating free segment descriptor\n");
/* Create free segment descriptor. */
journal_node_t jn;
jn.id = 0;
jn.flags = JOURNAL_VALID;
jn.pos = 0;
jn.pos = JOURNAL_HSIZE + (max_nodes + 1) * sizeof(journal_node_t);
jn.len = 0;
if (!sfwrite(&jn, sizeof(journal_node_t), fp)) {
fclose(fp);
......@@ -45,6 +61,7 @@ int journal_create(const char *fn, uint16_t max_nodes)
}
/* Create nodes. */
debug_journal("journal: creating node table, size=%u\n", max_nodes);
memset(&jn, 0, sizeof(journal_node_t));
for(uint16_t i = 0; i < max_nodes; ++i) {
if (!sfwrite(&jn, sizeof(journal_node_t), fp)) {
......@@ -54,6 +71,7 @@ int journal_create(const char *fn, uint16_t max_nodes)
}
/* Journal file created. */
debug_journal("journal: file '%s' initialized\n", fn);
return 0;
}
......@@ -64,8 +82,9 @@ journal_t* journal_open(const char *fn)
/*! \todo Lock file. */
/* Open journal file for r/w. */
FILE *fp = fopen(fn, "rw");
FILE *fp = fopen(fn, "r+");
if (!fp) {
debug_journal("journal: failed to open file '%s'\n", fn);
return 0;
}
......@@ -86,27 +105,42 @@ journal_t* journal_open(const char *fn)
fclose(fp);
return 0;
}
/* Read journal header. */
j->n_next = 0;
j->qhead = j->qtail = 0;
j->fp = fp;
j->max_nodes = max_nodes;
if (!sfread(&j->free, node_len, fp)) {
/* Load node queue state. */
if (!sfread(&j->qhead, sizeof(uint16_t), fp)) {
fclose(fp);
free(j);
return 0;
}
/*! \todo Load node queue state. */
/* Load queue tail. */
if (!sfread(&j->qtail, sizeof(uint16_t), fp)) {
fclose(fp);
free(j);
return 0;
}
/* Load empty segment descriptor. */
if (!sfread(&j->free, node_len, fp)) {
fclose(fp);
free(j);
return 0;
}
/* Read journal descriptors table. */
if (fread(&j->nodes, node_len, max_nodes, fp) != node_len * max_nodes) {
if (fread(&j->nodes, node_len, max_nodes, fp) != max_nodes) {
fclose(fp);
free(j);
return 0;
}
/*! \todo Some file checksum, check node integrity. */
debug_journal("journal: opened journal size=%u, queue=<%u, %u>, fd=%d\n",
max_nodes, j->qhead, j->qtail, fileno(j->fp));
return j;
}
......@@ -114,7 +148,8 @@ int journal_fetch(journal_t *journal, int id, const journal_node_t** dst)
{
/*! \todo Organize journal descriptors in btree? */
/*! \todo Or store pointer to last fetch for sequential lookup? */
for(uint16_t i = 0; i < journal->max_nodes; ++i) {
for(uint16_t i = 0; i != journal->max_nodes; ++i) {
if (journal->nodes[i].id == id) {
*dst = journal->nodes + i;
return 0;
......@@ -128,14 +163,20 @@ int journal_read(journal_t *journal, int id, char *dst)
{
const journal_node_t *n = 0;
if(journal_fetch(journal, id, &n) != 0) {
debug_journal("journal: failed to fetch node with id=%d\n",
id);
return -1;
}
/* Check valid flag. */
if (n->flags == JOURNAL_NULL) {
if (n->flags != JOURNAL_VALID) {
debug_journal("journal: node with id=%d is invalid\n", id);
return -1;
}
debug_journal("journal: reading node with id=%d, data=<%u, %u>\n",
id, n->pos, n->pos + n->len);
/* Seek journal node. */
fseek(journal->fp, n->pos, SEEK_SET);
......@@ -145,65 +186,138 @@ int journal_read(journal_t *journal, int id, char *dst)
int journal_write(journal_t *journal, int id, const char *src, size_t size)
{
/*! \todo Free nodes if necessary. */
/*! \todo Find key with already existing identifier? */
const size_t node_len = sizeof(journal_node_t);
/* Find next free node. */
uint16_t jnext = (journal->qtail + 1) % journal->max_nodes;
debug_journal("journal: writing id=%d, node=%u, next=%u\n",
id, journal->qtail, jnext);
/* Increase free segment if on the end of file. */
journal_node_t *n = journal->nodes + journal->qtail;
if (journal->free.len < size) {
return -1;
/* Increase on the end of node queue / uninitialized. */
if (journal->qtail > jnext || n->flags == JOURNAL_NULL) {
debug_journal("journal: growing by +%zu, pos=%u\n",
size - journal->free.len, journal->free.pos);
journal->free.len = size;
}
}
/*! \todo Find first unused node, need to treat nodes as queue. */
uint16_t jn = (journal->n_next) % journal->max_nodes;
long jn_fpos = JOURNAL_HSIZE + (jn + 1) * sizeof(journal_node_t);
journal_node_t *n = journal->nodes + jn;
/*! \todo Free nodes if necessary. */
while (journal->free.len < size) {
/* Evict least recent node if not empty. */
journal_node_t *head = journal->nodes + journal->qhead;
head->flags = JOURNAL_FREE;
/* Write back evicted node. */
fseek(journal->fp, JOURNAL_HSIZE + (journal->qhead + 1) * node_len, SEEK_SET);
if (!sfwrite(head, node_len, journal->fp)) {
return -1;
}
debug_journal("journal: evicted node=%u, growing by +%u\n",
journal->qhead, head->len);
/* Write back query state. */
journal->qhead = (journal->qhead + 1) % journal->max_nodes;
uint16_t qstate[2] = {journal->qhead, journal->qtail};
fseek(journal->fp, JOURNAL_HSIZE - 2 * sizeof(uint16_t), SEEK_SET);
if (!sfwrite(qstate, 2 * sizeof(uint16_t), journal->fp)) {
return -1;
}
/* Increase free segment. */
journal->free.len += head->len;
}
/* Calculate node position in permanent storage. */
long jn_fpos = JOURNAL_HSIZE + (journal->qtail + 1) * node_len;
/* Invalidate node and write back. */
n->id = id;
n->pos = journal->free.pos;
n->len = size;
n->flags = JOURNAL_NULL;
fseek(journal->fp, jn_fpos, SEEK_SET);
if (fwrite(n, sizeof(journal_node_t), 1, journal->fp) != sizeof(journal_node_t)) {
n->flags = JOURNAL_FREE;
if (!sfwrite(n, node_len, journal->fp)) {
debug_journal("journal: failed to writeback node=%d to %u\n",
n->id, jn_fpos);
return -1;
}
/* Write data to permanent storage. */
fseek(journal->fp, n->pos, SEEK_SET);
if (fwrite(src, size, 1, journal->fp) != size) {
if (!sfwrite(src, size, journal->fp)) {
return -1;
}
/* Mark node as valid and write back. */
n->flags = JOURNAL_VALID;
fseek(journal->fp, jn_fpos, SEEK_SET);
if (fwrite(n, sizeof(journal_node_t), 1, journal->fp) != sizeof(journal_node_t)) {
if (!sfwrite(n, node_len, journal->fp)) {
return -1;
}
debug_journal("journal: written node=%u, data=<%u, %u>\n",
journal->qtail, n->pos, n->pos + n->len);
/* Trim free space on the last node. */
if (journal->qtail > jnext) {
debug_journal("journal: trimming free space, next=%u\n",
jnext);
journal_node_t *next = journal->nodes + jnext;
journal->free.pos = next->pos;
journal->free.len = 0;
} else {
/* Mark used space. */
journal->free.pos += size;
journal->free.len -= size;
debug_journal("journal: free segment <%u, %u>\n",
journal->free.pos, journal->free.pos + journal->free.len);
}
/* Node write successful. */
++journal->n_next;
journal->free.pos += size;
journal->free.len -= size;
journal->qtail = jnext;
/* Write back free segment state. */
fseek(journal->fp, JOURNAL_HSIZE, SEEK_SET);
if (fwrite(&journal->free, sizeof(journal_node_t), 1, journal->fp) != sizeof(journal_node_t)) {
if (!sfwrite(&journal->free, node_len, journal->fp)) {
/*! \todo Node is marked valid and failed to shrink free space,
node will be overwritten on the next open - this may be
a problem, how to solve it properly? */
return -1;
}
/* Write back query state, not essential as it may be recovered.
* qhead - lowest valid node identifier (least recent)
* qtail - highest valid node identifier (most recently used)
*/
uint16_t qstate[2] = {journal->qhead, journal->qtail};
fseek(journal->fp, JOURNAL_HSIZE - 2 * sizeof(uint16_t), SEEK_SET);
if (!sfwrite(qstate, 2 * sizeof(uint16_t), journal->fp)) {
return -1;
}
/*! \todo Delayed write-back? */
return 0;
debug_journal("journal: write finished, nqueue=<%u, %u>\n",
journal->qhead, journal->qtail);
return size;
}
int journal_close(journal_t *journal)
{
/*! \todo Store node queue state. */
/* Close file. */
fclose(journal->fp);
debug_journal("journal: closed journal %p\n", journal);
/* Free allocated resources. */
free(journal);
return 0;
......
......@@ -15,14 +15,17 @@
#include <stdint.h>
/* File structure
* uint16_t max_entries
* <max_entries + 1> * journal_entry_t
* uint16_t max_nodes
* uint16_t qhead
* uint16_t qtail
* <max_nodes + 1> * journal_entry_t
* <data>
*/
typedef enum journal_flag_t {
JOURNAL_NULL = 0 << 0, /*!< Invalid journal entry. */
JOURNAL_VALID = 1 << 0 /*!< Valid journal entry. */
JOURNAL_FREE = 1 << 0, /*!< Free journal entry. */
JOURNAL_VALID = 1 << 1 /*!< Valid journal entry. */
} journal_flag_t;
/*! 12 bytes. */
......@@ -40,16 +43,16 @@ typedef struct journal_t
{
FILE *fp;
uint16_t max_nodes; /*!< Number of nodes. */
uint16_t n_next; /*! \todo Temporary index to next free node. */
uint16_t qhead; /*!< Node queue head. */
uint16_t qtail; /*!< Node queue tail. */
journal_node_t free; /*!< Free segment. */
journal_node_t nodes[]; /*!< Array of nodes. */
/* Implement nodes as queue? */
} journal_t;
#define JOURNAL_NCOUNT 512 /*!< Default node count. */
/*! max_entries */
#define JOURNAL_HSIZE (sizeof(uint16_t))
/*! max_entries, qhead, qtail */
#define JOURNAL_HSIZE (sizeof(uint16_t)*3)
/*! \todo Document functions. */
......
#include <string.h>
#include "tests/knot/journal_tests.h"
#include "knot/server/journal.h"
static int journal_tests_count(int argc, char *argv[]);
static int journal_tests_run(int argc, char *argv[]);
/*
* Unit API.
*/
unit_api journal_tests_api = {
"Journal",
&journal_tests_count,
&journal_tests_run
};
/*
* Unit implementation.
*/
static const int JOURNAL_TEST_COUNT = 8;
/*! \brief Generate random string with given length. */
static int randstr(char* dst, size_t len)
{
for (int i = 0; i < len - 1; ++i) {
dst[i] = '0' + (int) (('Z'-'0') * (rand() / (RAND_MAX + 1.0)));
}
dst[len - 1] = '\0';
return 0;
}
/*! API: return number of tests. */
static int journal_tests_count(int argc, char *argv[])
{
return JOURNAL_TEST_COUNT;
}
/*! API: run tests. */
static int journal_tests_run(int argc, char *argv[])
{
/* Test 1: Create journal. */
const int jsize = 512;
char jfn_buf[] = "/tmp/journal.XXXXXX";
mkstemp(jfn_buf);
const char *jfilename = jfn_buf;
int ret = journal_create(jfilename, jsize);
ok(ret == 0, "journal: create journal '%s'", jfilename);
/* Test 2: Open journal. */
journal_t *j = journal_open(jfilename);
ok(j != 0, "journal: open");
/* Test 3: Write entry to log. */
const char *sample = "deadbeef";
ret = journal_write(j, 0x0a, sample, strlen(sample));
ok(ret > 0, "journal: write");
/* Test 4: Read entry from log. */
char tmpbuf[64] = {'\0'};
ret = journal_read(j, 0x0a, tmpbuf);
ok(ret > 0, "journal: read entry");
/* Test 5: Compare read data. */
ret = strncmp(sample, tmpbuf, strlen(sample));
ok(ret == 0, "journal: read data integrity check");
/* Test 6: Write random data. */
int chk_key = 0;
char chk_buf[64] = {'\0'};
ret = 0;
for (int i = 0; i < jsize * 2; ++i) {
int key = rand() % 65535;
randstr(tmpbuf, sizeof(tmpbuf));
if (journal_write(j, key, tmpbuf, sizeof(tmpbuf)) <= 0) {
ret = -1;
break;
}
/* Store some key on the end. */
if (i == (jsize * 2) - jsize/5) {
chk_key = key;
memcpy(chk_buf, tmpbuf, sizeof(chk_buf));
}
}
ok(ret == 0, "journal: sustained looped writes");
/* Test 7: Check data integrity. */
memset(tmpbuf, 0, sizeof(tmpbuf));
journal_read(j, chk_key, tmpbuf);
ret = strncmp(chk_buf, tmpbuf, sizeof(chk_buf));
ok(ret == 0, "journal: read data integrity check");
/* Test 8: Reopen log and re-read value. */
memset(tmpbuf, 0, sizeof(tmpbuf));
journal_close(j);
j = journal_open(jfilename);
journal_read(j, chk_key, tmpbuf);
ret = strncmp(chk_buf, tmpbuf, sizeof(chk_buf));
ok(ret == 0, "journal: read data integrity check after close/open");
/* Close journal. */
journal_close(j);
/* Delete journal. */
unlink(jfilename);
return 0;
}
#ifndef _KNOT_JOURNAL_TESTS_H_
#define _KNOT_JOURNAL_TESTS_H_
#include "common/libtap/tap_unit.h"
/* Unit API. */
unit_api journal_tests_api;
#endif /* _KNOT_JOURNAL_TESTS_H_ */
......@@ -9,6 +9,7 @@
#include "tests/common/da_tests.h"
#include "tests/common/acl_tests.h"
#include "tests/knot/dthreads_tests.h"
#include "tests/knot/journal_tests.h"
#include "tests/knot/server_tests.h"
#include "tests/knot/conf_tests.h"
......@@ -21,6 +22,7 @@ int main(int argc, char *argv[])
// Build test set
unit_api *tests[] = {
/* Core data structures. */
&journal_tests_api, //! Journal unit
&slab_tests_api, //! SLAB allocator unit
&skiplist_tests_api, //! Skip list unit
&dthreads_tests_api, //! DThreads testing unit
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment