Skip to content
Snippets Groups Projects

Journal Synchronized

Merged Ghost User requested to merge journal_synchronized into master
+ 355
344
Compare changes
  • Side-by-side
  • Inline
Files
+ 273
263
@@ -23,6 +23,7 @@
#include <inttypes.h>
#include <sys/stat.h>
#include <sys/mman.h>
#include <assert.h>
#include "common/crc.h"
#include "libknot/common.h"
@@ -133,6 +134,237 @@ static int journal_recover(journal_t *j)
return KNOT_EOK;
}
/*! \brief Open journal file for r/w (returns error if not exists). */
static int journal_open_file(journal_t *j)
{
assert(j != NULL);
int ret = KNOT_EOK;
j->fd = open(j->path, O_RDWR);
dbg_journal_verb("journal: open_file '%s'\n", j->path);
if (j->fd < 0) {
if (errno != ENOENT) {
return knot_map_errno(errno);
}
/* Create new journal file and open if not exists. */
ret = journal_create(j->path, JOURNAL_NCOUNT);
if(ret == KNOT_EOK) {
return journal_open_file(j);
}
return ret;
}
/* File lock. */
memset(&j->fl, 0, sizeof(struct flock));
j->fl.l_type = F_WRLCK;
j->fl.l_whence = SEEK_SET;
j->fl.l_start = 0;
j->fl.l_len = 0;
j->fl.l_pid = getpid();
/* Attempt to lock. */
dbg_journal_verb("journal: locking journal %s\n", j->path);
ret = fcntl(j->fd, F_SETLK, &j->fl);
/* Lock. */
if (ret < 0) {
struct flock efl;
memcpy(&efl, &j->fl, sizeof(struct flock));
fcntl(j->fd, F_GETLK, &efl);
log_server_warning("Journal file '%s' is locked by process "
"PID=%d, waiting for process to "
"release lock.\n",
j->path, efl.l_pid);
ret = fcntl(j->fd, F_SETLKW, &j->fl);
}
dbg_journal("journal: locked journal %s (returned %d)\n", j->path, ret);
/* Read magic bytes. */
dbg_journal("journal: reading magic bytes\n");
const char magic_req[MAGIC_LENGTH] = JOURNAL_MAGIC;
char magic[MAGIC_LENGTH];
if (!sfread(magic, MAGIC_LENGTH, j->fd)) {
dbg_journal_verb("journal: cannot read magic bytes\n");
goto open_file_error;
}
if (memcmp(magic, magic_req, MAGIC_LENGTH) != 0) {
log_server_warning("Journal file '%s' version is too old, "
"it will be purged.\n", j->path);
j->fl.l_type = F_UNLCK;
fcntl(j->fd, F_SETLK, &j->fl);
assert(j->fd > -1);
close(j->fd);
j->fd = -1;
ret = journal_create(j->path, JOURNAL_NCOUNT);
if(ret == KNOT_EOK) {
return journal_open_file(j);
}
return ret;
}
crc_t crc = 0;
if (!sfread(&crc, sizeof(crc_t), j->fd)) {
dbg_journal_verb("journal: cannot read CRC\n");
goto open_file_error;
}
/* Recalculate CRC. */
char buf[4096];
ssize_t rb = 0;
crc_t crc_calc = crc_init();
while((rb = read(j->fd, buf, sizeof(buf))) > 0) {
crc_calc = crc_update(crc_calc, (const unsigned char *)buf, rb);
}
/* Compare */
if (crc == crc_calc) {
/* Rewind. */
if (lseek(j->fd, MAGIC_LENGTH + sizeof(crc_t), SEEK_SET) < 0) {
goto open_file_error;
}
} else {
log_server_warning("Journal file '%s' CRC error, "
"it will be purged.\n", j->path);
j->fl.l_type = F_UNLCK;
fcntl(j->fd, F_SETLK, &j->fl);
close(j->fd);
j->fd = -1;
ret = journal_create(j->path, JOURNAL_NCOUNT);
if(ret == KNOT_EOK) {
return journal_open_file(j);
}
return ret;
}
/* Get journal file size. */
struct stat st;
if (stat(j->path, &st) < 0) {
dbg_journal_verb("journal: cannot get journal fsize\n");
goto open_file_error;
}
/* Set file size. */
j->fsize = st.st_size;
/* Read maximum number of entries. */
if (!sfread(&j->max_nodes, sizeof(uint16_t), j->fd)) {
dbg_journal_verb("journal: cannot read max_nodes\n");
goto open_file_error;
}
/* Check max_nodes, but this is riddiculous. */
if (j->max_nodes == 0) {
dbg_journal_verb("journal: max_nodes can't be zero\n");
goto open_file_error;
}
/* Allocate nodes. */
const size_t node_len = sizeof(journal_node_t);
j->nodes = malloc(j->max_nodes * node_len);
if (j->nodes == NULL) {
dbg_journal_verb("journal: can't allocate nodes\n");
goto open_file_error;
} else {
memset(j->nodes, 0, j->max_nodes * node_len);
}
/* Load node queue state. */
j->qhead = j->qtail = 0;
if (!sfread(&j->qhead, sizeof(uint16_t), j->fd)) {
dbg_journal_verb("journal: cannot read qhead\n");
goto open_file_error;
}
/* Load queue tail. */
if (!sfread(&j->qtail, sizeof(uint16_t), j->fd)) {
dbg_journal_verb("journal: cannot read qtail\n");
goto open_file_error;
}
/* Check head + tail */
if (j->qtail > j->max_nodes || j->qhead > j->max_nodes) {
dbg_journal_verb("journal: queue pointers corrupted\n");
goto open_file_error;
}
/* Load empty segment descriptor. */
if (!sfread(&j->free, node_len, j->fd)) {
dbg_journal_verb("journal: cannot read free segment ptr\n");
goto open_file_error;
}
/* Read journal descriptors table. */
if (!sfread(j->nodes, j->max_nodes * node_len, j->fd)) {
dbg_journal_verb("journal: cannot read node table\n");
goto open_file_error;
}
dbg_journal("journal: opened journal size=%u, queue=<%u, %u>, fd=%d\n",
j->max_nodes, j->qhead, j->qtail, j->fd);
/* Check node queue. */
unsigned qtail_free = (jnode_flags(j, j->qtail) <= JOURNAL_FREE);
unsigned qhead_free = j->max_nodes - 1; /* Left of qhead must be free.*/
if (j->qhead > 0) {
qhead_free = (j->qhead - 1);
}
qhead_free = (jnode_flags(j, qhead_free) <= JOURNAL_FREE);
if ((j->qhead != j->qtail) && (!qtail_free || !qhead_free)) {
log_server_warning("Recovering journal '%s' metadata "
"after crash.\n",
j->path);
ret = journal_recover(j);
if (ret != KNOT_EOK) {
log_server_error("Journal file '%s' is unrecoverable, "
"metadata corrupted - %s\n",
j->path, knot_strerror(ret));
goto open_file_error;
}
}
/* Save file lock and return. */
return KNOT_EOK;
/* Unlock and close file and return error. */
open_file_error:
free(j->nodes);
j->nodes = NULL;
j->fl.l_type = F_UNLCK;
fcntl(j->fd, F_SETLK, &j->fl);
close(j->fd);
j->fd = -1;
return KNOT_ERROR;
}
/*! \brief Close journal file. */
static int journal_close_file(journal_t *journal)
{
/* Check journal. */
if (journal == NULL) {
return KNOT_EINVAL;
}
/* Recalculate CRC. */
int ret = journal_update_crc(journal->fd);
/* Unlock journal file. */
journal->fl.l_type = F_UNLCK;
fcntl(journal->fd, F_SETLK, &journal->fl);
dbg_journal("journal: unlocked journal %p\n", journal);
/* Close file. */
close(journal->fd);
journal->fd = -1;
/* Free nodes. */
free(journal->nodes);
journal->nodes = NULL;
dbg_journal("journal: closed journal %p\n", journal);
return ret;
}
int journal_write_in(journal_t *j, journal_node_t **rn, uint64_t id, size_t len)
{
const size_t node_len = sizeof(journal_node_t);
@@ -435,248 +667,41 @@ int journal_create(const char *fn, uint16_t max_nodes)
return KNOT_EOK;
}
journal_t* journal_open(const char *fn, size_t fslimit, int mode, uint16_t bflags)
journal_t* journal_open(const char *fn, size_t fslimit, uint16_t bflags)
{
/*! \todo Memory mapping may be faster than stdio? (issue #964) */
if (fn == NULL) {
return NULL;
}
/* Check for lazy mode. */
if (mode & JOURNAL_LAZY) {
dbg_journal("journal: opening journal %s lazily\n", fn);
journal_t *j = malloc(sizeof(journal_t));
if (j != NULL) {
memset(j, 0, sizeof(journal_t));
j->fd = -1;
j->path = strdup(fn);
j->fslimit = fslimit;
j->bflags = bflags;
j->refs = 1;
}
return j;
}
/* Open journal file for r/w (returns error if not exists). */
int fd = open(fn, O_RDWR);
if (fd < 0) {
if (errno == ENOENT) {
if(journal_create(fn, JOURNAL_NCOUNT) == KNOT_EOK) {
return journal_open(fn, fslimit, mode, bflags);
}
}
return NULL;
}
/* File lock. */
struct flock fl;
memset(&fl, 0, sizeof(struct flock));
fl.l_type = F_WRLCK;
fl.l_whence = SEEK_SET;
fl.l_start = 0;
fl.l_len = 0;
fl.l_pid = getpid();
/* Attempt to lock. */
dbg_journal_verb("journal: locking journal %s\n", fn);
int ret = fcntl(fd, F_SETLK, &fl);
/* Lock. */
if (ret < 0) {
struct flock efl;
memcpy(&efl, &fl, sizeof(struct flock));
fcntl(fd, F_GETLK, &efl);
log_server_warning("Journal file '%s' is locked by process "
"PID=%d, waiting for process to "
"release lock.\n",
fn, efl.l_pid);
ret = fcntl(fd, F_SETLKW, &fl);
}
fl.l_type = F_UNLCK;
dbg_journal("journal: locked journal %s (returned %d)\n", fn, ret);
/* Read magic bytes. */
dbg_journal("journal: reading magic bytes\n");
const char magic_req[MAGIC_LENGTH] = JOURNAL_MAGIC;
char magic[MAGIC_LENGTH];
if (!sfread(magic, MAGIC_LENGTH, fd)) {
dbg_journal_detail("journal: cannot read magic bytes\n");
fcntl(fd, F_SETLK, &fl);
close(fd);
return NULL;
}
if (memcmp(magic, magic_req, MAGIC_LENGTH) != 0) {
log_server_warning("Journal file '%s' version is too old, "
"it will be flushed.\n", fn);
fcntl(fd, F_SETLK, &fl);
close(fd);
if (journal_create(fn, JOURNAL_NCOUNT) == KNOT_EOK) {
return journal_open(fn, fslimit, mode, bflags);
}
return NULL;
}
crc_t crc = 0;
if (!sfread(&crc, sizeof(crc_t), fd)) {
dbg_journal_detail("journal: cannot read CRC\n");
fcntl(fd, F_SETLK, &fl);
close(fd);
return NULL;
}
/* Recalculate CRC. */
char buf[4096];
ssize_t rb = 0;
crc_t crc_calc = crc_init();
while((rb = read(fd, buf, sizeof(buf))) > 0) {
crc_calc = crc_update(crc_calc, (const unsigned char *)buf, rb);
}
/* Compare */
if (crc == crc_calc) {
/* Rewind. */
if (lseek(fd, MAGIC_LENGTH + sizeof(crc_t), SEEK_SET) < 0) {
fcntl(fd, F_SETLK, &fl);
close(fd);
return NULL;
}
} else {
log_server_warning("Journal file '%s' CRC error, "
"it will be flushed.\n", fn);
fcntl(fd, F_SETLK, &fl);
close(fd);
if (journal_create(fn, JOURNAL_NCOUNT) == KNOT_EOK) {
return journal_open(fn, fslimit, mode, bflags);
}
return NULL;
}
/* Read maximum number of entries. */
uint16_t max_nodes = 512;
if (!sfread(&max_nodes, sizeof(uint16_t), fd)) {
dbg_journal_detail("journal: cannot read max_nodes\n");
fcntl(fd, F_SETLK, &fl);
close(fd);
return NULL;
}
/* Check max_nodes, but this is riddiculous. */
if (max_nodes == 0) {
dbg_journal_detail("journal: max_nodes is invalid\n");
fcntl(fd, F_SETLK, &fl);
close(fd);
return NULL;
}
/* Allocate journal structure. */
const size_t node_len = sizeof(journal_node_t);
journal_t *j = malloc(sizeof(journal_t) + max_nodes * node_len);
journal_t *j = malloc(sizeof(journal_t));
if (j == NULL) {
dbg_journal_detail("journal: cannot allocate journal\n");
fcntl(fd, F_SETLK, &fl);
close(fd);
return NULL;
}
memset(j, 0, sizeof(journal_t) + max_nodes * node_len);
j->qhead = j->qtail = 0;
j->fd = fd;
j->max_nodes = max_nodes;
j->bflags = bflags;
j->refs = 1;
/* Load node queue state. */
if (!sfread(&j->qhead, sizeof(uint16_t), fd)) {
dbg_journal_detail("journal: cannot read qhead\n");
fcntl(fd, F_SETLK, &fl);
close(fd);
free(j);
return NULL;
}
/* Load queue tail. */
if (!sfread(&j->qtail, sizeof(uint16_t), fd)) {
dbg_journal_detail("journal: cannot read qtail\n");
fcntl(fd, F_SETLK, &fl);
close(fd);
free(j);
return NULL;
}
/* Check head + tail */
if (j->qtail > max_nodes || j->qhead > max_nodes) {
dbg_journal_detail("journal: queue pointers corrupted\n");
fcntl(fd, F_SETLK, &fl);
close(fd);
free(j);
return NULL;
}
/* Load empty segment descriptor. */
if (!sfread(&j->free, node_len, fd)) {
dbg_journal_detail("journal: cannot read free segment ptr\n");
fcntl(fd, F_SETLK, &fl);
close(fd);
free(j);
return NULL;
}
/* Read journal descriptors table. */
if (!sfread(&j->nodes, max_nodes * node_len, fd)) {
dbg_journal_detail("journal: cannot read node table\n");
fcntl(fd, F_SETLK, &fl);
close(fd);
free(j);
return NULL;
}
/* Get journal file size. */
struct stat st;
if (stat(fn, &st) < 0) {
dbg_journal_detail("journal: cannot get journal fsize\n");
fcntl(fd, F_SETLK, &fl);
close(fd);
free(j);
return NULL;
}
memset(j, 0, sizeof(journal_t));
j->bflags = bflags;
j->fd = -1;
/* Set file size. */
j->fsize = st.st_size;
if (fslimit == 0) {
j->fslimit = FSLIMIT_INF;
} else {
j->fslimit = fslimit;
}
dbg_journal("journal: opened journal size=%u, queue=<%u, %u>, fd=%d\n",
max_nodes, j->qhead, j->qtail, j->fd);
/* Check node queue. */
unsigned qtail_free = (jnode_flags(j, j->qtail) <= JOURNAL_FREE);
unsigned qhead_free = j->max_nodes - 1; /* Left of qhead must be free.*/
if (j->qhead > 0) {
qhead_free = (j->qhead - 1);
/* Copy path. */
j->path = strdup(fn);
if (j->path == NULL) {
free(j);
return NULL;
}
qhead_free = (jnode_flags(j, qhead_free) <= JOURNAL_FREE);
if ((j->qhead != j->qtail) && (!qtail_free || !qhead_free)) {
log_server_warning("Recovering journal '%s' metadata "
"after crash.\n",
fn);
ret = journal_recover(j);
if (ret != KNOT_EOK) {
log_server_error("Journal file '%s' is unrecoverable, "
"metadata corrupted - %s\n",
fn, knot_strerror(ret));
fcntl(fd, F_SETLK, &fl);
close(fd);
free(j);
return NULL;
}
/* Initialize mutex. */
if (pthread_mutex_init(&j->mutex, NULL) != 0) {
free(j->path);
free(j);
return NULL;
}
/* Save file lock. */
fl.l_type = F_WRLCK;
memcpy(&j->fl, &fl, sizeof(struct flock));
return j;
}
@@ -986,57 +1011,42 @@ int journal_close(journal_t *journal)
return KNOT_EINVAL;
}
/* Check if lazy. */
int ret = KNOT_EOK;
if (journal->fd < 0) {
free(journal->path);
} else {
/* Recalculate CRC. */
ret = journal_update_crc(journal->fd);
/* Unlock journal file. */
journal->fl.l_type = F_UNLCK;
fcntl(journal->fd, F_SETLK, &journal->fl);
dbg_journal("journal: unlocked journal %p\n", journal);
/* Close file. */
close(journal->fd);
}
dbg_journal("journal: closed journal %p\n", journal);
/* Free allocated resources. */
pthread_mutex_destroy(&journal->mutex);
free(journal->path);
free(journal);
return ret;
return KNOT_EOK;
}
journal_t *journal_retain(journal_t *journal)
int journal_retain(journal_t *journal)
{
/* Return active journal if opened lazily. */
if (journal != NULL) {
if (journal->fd < 0) {
dbg_journal("journal: retain(), opening for rw\n");
journal = journal_open(journal->path, journal->fslimit,
0, journal->bflags);
} else {
++journal->refs;
dbg_journal("journal: retain(), ++refcount\n");
}
if (journal == NULL) {
return KNOT_EINVAL;
}
return journal;
dbg_journal("%s: lock(%p)\n", __func__, journal);
pthread_mutex_lock(&journal->mutex);
dbg_journal("%s: open(%p)\n", __func__, journal);
int ret = journal_open_file(journal);
if (ret != KNOT_EOK) {
dbg_journal("%s: open(%p) FAIL\n", __func__, journal);
pthread_mutex_unlock(&journal->mutex);
}
return ret;
}
void journal_release(journal_t *journal) {
if (journal != NULL) {
if (journal->refs == 1) {
dbg_journal("journal: release(), closing last\n");
journal_close(journal);
} else {
--journal->refs;
dbg_journal_verb("journal: release(), --refcount\n");
}
void journal_release(journal_t *journal)
{
if (journal == NULL) {
return;
}
dbg_journal("%s: close(%p)\n", __func__, journal);
journal_close_file(journal);
dbg_journal("%s: unlock(%p)\n", __func__, journal);
pthread_mutex_unlock(&journal->mutex);
}
Loading