Skip to content
Snippets Groups Projects
Commit 818a3049 authored by Marek Vavrusa's avatar Marek Vavrusa
Browse files

Initial implementation of journal. Probably not working yet.

* Fast in-memory lookup.
* ACID traits.
* Fragmentation trimming.

Commit refs #925, #927, #928, #935.
parent 9a2219ef
No related merge requests found
......@@ -150,6 +150,8 @@ src/knot/server/udp-handler.c
src/knot/server/udp-handler.h
src/knot/server/zones.c
src/knot/server/zones.h
src/knot/server/journal.c
src/knot/server/journal.h
src/knot/server/xfr-in.c
src/knot/server/xfr-in.h
src/knot/server/notify.c
......
......@@ -204,6 +204,7 @@ libknot_la_SOURCES = \
knot/ctl/process.c \
knot/ctl/process.h \
knot/server/dthreads.c \
knot/server/journal.c \
knot/server/socket.c \
knot/server/name-server.c \
knot/server/server.c \
......@@ -217,6 +218,7 @@ libknot_la_SOURCES = \
knot/server/tcp-handler.h \
knot/server/xfr-handler.h \
knot/server/dthreads.h \
knot/server/journal.h \
knot/server/zones.h \
knot/server/xfr-in.h \
knot/server/xfr-in.c \
......
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include "journal.h"
static inline int sfread(void *dst, size_t len, FILE *fp)
{
return fread(dst, len, 1, fp) == len;
}
static inline int sfwrite(const void *src, size_t len, FILE *fp)
{
return fwrite(src, len, 1, fp) == len;
}
int journal_create(const char *fn, uint16_t max_nodes)
{
/* Create journal file. */
FILE *fp = fopen(fn, "w");
if (!fp) {
return -1;
}
/* Disable buffering. */
setvbuf(fp, (char *)0, _IONBF, 0);
/*! \todo Unlink created file on error? */
/* Create journal header. */
if (!sfwrite(&max_nodes, sizeof(uint16_t), fp)) {
fclose(fp);
return -1;
}
/* Create free segment descriptor. */
journal_node_t jn;
jn.id = 0;
jn.flags = JOURNAL_VALID;
jn.pos = 0;
jn.len = 0;
if (!sfwrite(&jn, sizeof(journal_node_t), fp)) {
fclose(fp);
return -1;
}
/* Create nodes. */
memset(&jn, 0, sizeof(journal_node_t));
for(uint16_t i = 0; i < max_nodes; ++i) {
if (!sfwrite(&jn, sizeof(journal_node_t), fp)) {
fclose(fp);
return -1;
}
}
/* Journal file created. */
return 0;
}
journal_t* journal_open(const char *fn)
{
/*! \todo Memory mapping may be faster than stdio? */
/*! \todo Lock file. */
/* Open journal file for r/w. */
FILE *fp = fopen(fn, "rw");
if (!fp) {
return 0;
}
/* Disable buffering. */
setvbuf(fp, (char *)0, _IONBF, 0);
/* Read maximum number of entries. */
uint16_t max_nodes = 512;
if (!sfread(&max_nodes, sizeof(uint16_t), fp)) {
fclose(fp);
return 0;
}
/* Allocate journal structure. */
const size_t node_len = sizeof(journal_node_t);
journal_t *j = malloc(sizeof(journal_t) + max_nodes * node_len);
if (!j) {
fclose(fp);
return 0;
}
/* Read journal header. */
j->n_next = 0;
j->fp = fp;
j->max_nodes = max_nodes;
if (!sfread(&j->free, node_len, fp)) {
fclose(fp);
free(j);
return 0;
}
/*! \todo Load node queue state. */
/* Read journal descriptors table. */
if (fread(&j->nodes, node_len, max_nodes, fp) != node_len * max_nodes) {
fclose(fp);
free(j);
return 0;
}
/*! \todo Some file checksum, check node integrity. */
return j;
}
int journal_fetch(journal_t *journal, int id, const journal_node_t** dst)
{
/*! \todo Organize journal descriptors in btree? */
/*! \todo Or store pointer to last fetch for sequential lookup? */
for(uint16_t i = 0; i < journal->max_nodes; ++i) {
if (journal->nodes[i].id == id) {
*dst = journal->nodes + i;
return 0;
}
}
return -1;
}
int journal_read(journal_t *journal, int id, char *dst)
{
const journal_node_t *n = 0;
if(journal_fetch(journal, id, &n) != 0) {
return -1;
}
/* Check valid flag. */
if (n->flags == JOURNAL_NULL) {
return -1;
}
/* Seek journal node. */
fseek(journal->fp, n->pos, SEEK_SET);
/* Read journal node content. */
return fread(dst, n->len, 1, journal->fp);
}
int journal_write(journal_t *journal, int id, const char *src, size_t size)
{
/*! \todo Free nodes if necessary. */
if (journal->free.len < size) {
return -1;
}
/*! \todo Find first unused node, need to treat nodes as queue. */
uint16_t jn = (journal->n_next) % journal->max_nodes;
long jn_fpos = JOURNAL_HSIZE + (jn + 1) * sizeof(journal_node_t);
journal_node_t *n = journal->nodes + jn;
/* Invalidate node and write back. */
n->pos = journal->free.pos;
n->len = size;
n->flags = JOURNAL_NULL;
fseek(journal->fp, jn_fpos, SEEK_SET);
if (fwrite(n, sizeof(journal_node_t), 1, journal->fp) != sizeof(journal_node_t)) {
return -1;
}
/* Write data to permanent storage. */
fseek(journal->fp, n->pos, SEEK_SET);
if (fwrite(src, size, 1, journal->fp) != size) {
return -1;
}
/* Mark node as valid and write back. */
n->flags = JOURNAL_VALID;
fseek(journal->fp, jn_fpos, SEEK_SET);
if (fwrite(n, sizeof(journal_node_t), 1, journal->fp) != sizeof(journal_node_t)) {
return -1;
}
/* Node write successful. */
++journal->n_next;
journal->free.pos += size;
journal->free.len -= size;
/* Write back free segment state. */
fseek(journal->fp, JOURNAL_HSIZE, SEEK_SET);
if (fwrite(&journal->free, sizeof(journal_node_t), 1, journal->fp) != sizeof(journal_node_t)) {
/*! \todo Node is marked valid and failed to shrink free space,
node will be overwritten on the next open - this may be
a problem, how to solve it properly? */
return -1;
}
/*! \todo Delayed write-back? */
return 0;
}
int journal_close(journal_t *journal)
{
/*! \todo Store node queue state. */
/* Close file. */
fclose(journal->fp);
/* Free allocated resources. */
free(journal);
return 0;
}
/*!
* \file journal.h
*
* \author Marek Vavrusa <marek.vavrusa@nic.cz>
*
* \brief Journal for storing transactions on permanent storage.
*
* \addtogroup utils
* @{
*/
#ifndef _KNOT_JOURNAL_H_
#define _KNOT_JOURNAL_H_
#include <stdint.h>
/* File structure
* uint16_t max_entries
* <max_entries + 1> * journal_entry_t
* <data>
*/
typedef enum journal_flag_t {
JOURNAL_NULL = 0 << 0, /*!< Invalid journal entry. */
JOURNAL_VALID = 1 << 0 /*!< Valid journal entry. */
} journal_flag_t;
/*! 12 bytes. */
typedef struct journal_node_t
{
uint16_t id; /*!< Node ID. */
uint16_t flags; /*!< Node flags. */
uint32_t pos; /*!< Position in journal file. */
uint32_t len; /*!< Entry data length. */
} journal_node_t;
/*! \todo Review nodes data storage type. */
typedef struct journal_t
{
FILE *fp;
uint16_t max_nodes; /*!< Number of nodes. */
uint16_t n_next; /*! \todo Temporary index to next free node. */
journal_node_t free; /*!< Free segment. */
journal_node_t nodes[]; /*!< Array of nodes. */
/* Implement nodes as queue? */
} journal_t;
#define JOURNAL_NCOUNT 512 /*!< Default node count. */
/*! max_entries */
#define JOURNAL_HSIZE (sizeof(uint16_t))
/*! \todo Document functions. */
int journal_create(const char *fn, uint16_t max_nodes);
journal_t* journal_open(const char *fn);
int journal_fetch(journal_t *journal, int id, const journal_node_t** dst);
int journal_read(journal_t *journal, int id, char *dst);
int journal_write(journal_t *journal, int id, const char *src, size_t size);
int journal_close(journal_t *journal);
#endif /* _KNOT_JOURNAL_H_ */
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment