From 7d34bfea09828a62ff368cabb82ec217ed7ef263 Mon Sep 17 00:00:00 2001 From: Jan Kadlec <jan.kadlec@nic.cz> Date: Fri, 12 Dec 2014 18:09:10 +0100 Subject: [PATCH] backport: Merge branch 'journal_patches' Journal patches See merge request !321 --- src/knot/nameserver/ixfr.c | 7 +- src/knot/server/journal.c | 361 +++++++------------------- src/knot/server/journal.h | 2 +- src/knot/zone/zone-load.c | 4 + src/knot/zone/zone.c | 21 +- src/knot/zone/zone.h | 3 + src/knot/zone/zonedb.c | 4 + tests-extra/tests/ixfr/stress/test.py | 77 ++++++ tests-extra/tools/dnstest/server.py | 3 + tests/journal.c | 267 +++++++++++++++++-- 10 files changed, 447 insertions(+), 302 deletions(-) create mode 100644 tests-extra/tests/ixfr/stress/test.py diff --git a/src/knot/nameserver/ixfr.c b/src/knot/nameserver/ixfr.c index a052d3cd2..c9b08d8f2 100644 --- a/src/knot/nameserver/ixfr.c +++ b/src/knot/nameserver/ixfr.c @@ -162,7 +162,7 @@ static int ixfr_process_changeset(knot_pkt_t *pkt, const void *item, #undef IXFR_SAFE_PUT /*! \brief Loads IXFRs from journal. */ -static int ixfr_load_chsets(list_t *chgsets, const zone_t *zone, +static int ixfr_load_chsets(list_t *chgsets, zone_t *zone, const knot_rrset_t *their_soa) { assert(chgsets); @@ -176,7 +176,10 @@ static int ixfr_load_chsets(list_t *chgsets, const zone_t *zone, return KNOT_EUPTODATE; } + pthread_mutex_lock(&zone->journal_lock); ret = journal_load_changesets(zone, chgsets, serial_from, serial_to); + pthread_mutex_unlock(&zone->journal_lock); + if (ret != KNOT_EOK) { changesets_free(chgsets); } @@ -241,7 +244,7 @@ static int ixfr_answer_init(struct query_data *qdata) const knot_rrset_t *their_soa = &knot_pkt_section(qdata->query, KNOT_AUTHORITY)->rr[0]; list_t chgsets; init_list(&chgsets); - int ret = ixfr_load_chsets(&chgsets, qdata->zone, their_soa); + int ret = ixfr_load_chsets(&chgsets, (zone_t *)qdata->zone, their_soa); if (ret != KNOT_EOK) { dbg_ns("%s: failed to load changesets => %d\n", __func__, ret); return ret; diff --git a/src/knot/server/journal.c b/src/knot/server/journal.c index 455e792c7..0e102134e 100644 --- a/src/knot/server/journal.c +++ b/src/knot/server/journal.c @@ -34,15 +34,18 @@ /*! \brief Infinite file size limit. */ #define FSLIMIT_INF (~((size_t)0)) -/*! \brief Node classification macros. */ -#define jnode_flags(j, i) ((j)->nodes[(i)].flags) - /*! \brief Next node. */ #define jnode_next(j, i) (((i) + 1) % (j)->max_nodes) /*! \brief Previous node. */ #define jnode_prev(j, i) (((i) == 0) ? (j)->max_nodes - 1 : (i) - 1) +/*! \bref Starting node data position. */ +#define jnode_base_pos(max_nodes) (JOURNAL_HSIZE + (max_nodes + 1) * sizeof(journal_node_t)) + +typedef uint32_t crc_t; +static const crc_t CRC_PLACEHOLDER = 0; + static inline int sfread(void *dst, size_t len, int fd) { return read(fd, dst, len) == len; @@ -53,10 +56,6 @@ static inline int sfwrite(const void *src, size_t len, int fd) return write(fd, src, len) == len; } -static inline journal_node_t *journal_end(journal_t *journal) { - return journal->nodes + journal->qtail; -} - /*! \brief Equality compare function. */ static inline int journal_cmp_eq(uint64_t k1, uint64_t k2) { @@ -96,108 +95,6 @@ static inline uint64_t ixfrdb_key_make(uint32_t from, uint32_t to) return (((uint64_t)to) << ((uint64_t)32)) | ((uint64_t)from); } -/*! \brief Recover metadata from journal. */ -static int journal_recover(journal_t *j) -{ - if (j == NULL) { - return KNOT_EINVAL; - } - - /* Attempt to recover queue. */ - int qstate[2] = { -1, -1 }; - unsigned c = 0, p = j->max_nodes - 1; - while (1) { - - /* Fetch previous and current node. */ - journal_node_t *np = j->nodes + p; - journal_node_t *nc = j->nodes + c; - - /* Check flags - * p c (0 = free, 1 = non-free) - * 0 0 - in free segment - * 0 1 - c-node is qhead - * 1 0 - c-node is qtail - * 1 1 - in full segment - */ - unsigned c_set = (nc->flags > JOURNAL_FREE); - unsigned p_set = (np->flags > JOURNAL_FREE); - if (!p_set && c_set && qstate[0] < 0) { - qstate[0] = c; /* Recovered qhead. */ - dbg_journal_verb("journal: recovered qhead=%u\n", - qstate[0]); - } - if (p_set && !c_set && qstate[1] < 0) {\ - qstate[1] = c; /* Recovered qtail. */ - dbg_journal_verb("journal: recovered qtail=%u\n", - qstate[1]); - } - - /* Both qstates set. */ - if (qstate[0] > -1 && qstate[1] > -1) { - break; - } - - /* Set prev and next. */ - p = c; - c = (c + 1) % j->max_nodes; - - /* All nodes probed. */ - if (c == 0) { - dbg_journal("journal: failed to recover node queue\n"); - break; - } - } - - /* Evaluate */ - if (qstate[0] < 0 || qstate[1] < 0) { - return KNOT_ERANGE; - } - - /* Write back. */ - int seek_ret = lseek(j->fd, JOURNAL_HSIZE - 2 * sizeof(uint16_t), SEEK_SET); - if (seek_ret < 0 || !sfwrite(qstate, 2 * sizeof(uint16_t), j->fd)) { - dbg_journal("journal: failed to write back queue state\n"); - return KNOT_ERROR; - } - - /* Reset queue state. */ - j->qhead = qstate[0]; - j->qtail = qstate[1]; - dbg_journal("journal: node queue=<%u,%u> recovered\n", - qstate[0], qstate[1]); - - - return KNOT_EOK; -} - -/* Recalculate CRC. */ -static int journal_update_crc(int fd) -{ - if (fcntl(fd, F_GETFL) < 0) { - return KNOT_EINVAL; - } - - char buf[4096]; - ssize_t rb = 0; - crc_t crc = crc_init(); - if (lseek(fd, MAGIC_LENGTH + sizeof(crc_t), SEEK_SET) < 0) { - return KNOT_ERROR; - } - while((rb = read(fd, buf, sizeof(buf))) > 0) { - crc = crc_update(crc, (const unsigned char *)buf, rb); - } - if (lseek(fd, MAGIC_LENGTH, SEEK_SET) < 0) { - return KNOT_ERROR; - } - if (!sfwrite(&crc, sizeof(crc_t), fd)) { - dbg_journal("journal: couldn't write CRC to fd=%d\n", fd); - return KNOT_ERROR; - } - - return KNOT_EOK; -} - - /*! \brief Create new journal. */ static int journal_create_file(const char *fn, uint16_t max_nodes) { @@ -231,8 +128,8 @@ static int journal_create_file(const char *fn, uint16_t max_nodes) remove(fn); return KNOT_ERROR; } - crc_t crc = crc_init(); - if (!sfwrite(&crc, sizeof(crc_t), fd)) { + + if (!sfwrite(&CRC_PLACEHOLDER, sizeof(CRC_PLACEHOLDER), fd)) { close(fd); remove(fn); return KNOT_ERROR; @@ -268,7 +165,7 @@ static int journal_create_file(const char *fn, uint16_t max_nodes) memset(&jn, 0, sizeof(journal_node_t)); jn.id = 0; jn.flags = JOURNAL_VALID; - jn.pos = JOURNAL_HSIZE + (max_nodes + 1) * sizeof(journal_node_t); + jn.pos = jnode_base_pos(max_nodes); jn.len = 0; if (!sfwrite(&jn, sizeof(journal_node_t), fd)) { close(fd); @@ -289,15 +186,6 @@ static int journal_create_file(const char *fn, uint16_t max_nodes) } } - /* Recalculate CRC. */ - if (journal_update_crc(fd) != KNOT_EOK) { - close(fd); - if(remove(fn) < 0) { - dbg_journal("journal: failed to remove journal file after error\n"); - } - return KNOT_ERROR; - } - /* Unlock and close. */ close(fd); @@ -355,35 +243,10 @@ static int journal_open_file(journal_t *j) } return ret; } - crc_t crc = 0; - if (!sfread(&crc, sizeof(crc_t), j->fd)) { - dbg_journal_verb("journal: cannot read CRC\n"); - goto open_file_error; - } - - /* Recalculate CRC. */ - char buf[4096]; - ssize_t rb = 0; - crc_t crc_calc = crc_init(); - while((rb = read(j->fd, buf, sizeof(buf))) > 0) { - crc_calc = crc_update(crc_calc, (const unsigned char *)buf, rb); - } - /* Compare */ - if (crc == crc_calc) { - /* Rewind. */ - if (lseek(j->fd, MAGIC_LENGTH + sizeof(crc_t), SEEK_SET) < 0) { - goto open_file_error; - } - } else { - log_warning("journal '%s', CRC error, purging", j->path); - close(j->fd); - j->fd = -1; - ret = journal_create_file(j->path, JOURNAL_NCOUNT); - if(ret == KNOT_EOK) { - return journal_open_file(j); - } - return ret; + /* Skip CRC */ + if (lseek(j->fd, MAGIC_LENGTH + sizeof(crc_t), SEEK_SET) < 0) { + goto open_file_error; } /* Get journal file size. */ @@ -408,6 +271,13 @@ static int journal_open_file(journal_t *j) goto open_file_error; } + /* Check minimum fsize limit. */ + size_t fslimit_min = jnode_base_pos(j->max_nodes) + 1024; /* At least 1K block */ + if (j->fslimit < fslimit_min) { + log_error("journal '%s', filesize limit smaller than '%zu'", j->path, fslimit_min); + goto open_file_error; + } + /* Allocate nodes. */ const size_t node_len = sizeof(journal_node_t); j->nodes = malloc(j->max_nodes * node_len); @@ -452,23 +322,6 @@ static int journal_open_file(journal_t *j) dbg_journal("journal: opened journal size=%u, queue=<%u, %u>, fd=%d\n", j->max_nodes, j->qhead, j->qtail, j->fd); - /* Check node queue. */ - unsigned qtail_free = (jnode_flags(j, j->qtail) <= JOURNAL_FREE); - unsigned qhead_free = j->max_nodes - 1; /* Left of qhead must be free.*/ - if (j->qhead > 0) { - qhead_free = (j->qhead - 1); - } - qhead_free = (jnode_flags(j, qhead_free) <= JOURNAL_FREE); - if ((j->qhead != j->qtail) && (!qtail_free || !qhead_free)) { - log_warning("journal '%s', recovering metadata after crash", j->path); - ret = journal_recover(j); - if (ret != KNOT_EOK) { - log_error("journal '%s', unrecoverable corruption (%s)", - j->path, knot_strerror(ret)); - goto open_file_error; - } - } - /* Save file lock and return. */ return KNOT_EOK; @@ -489,9 +342,6 @@ static int journal_close_file(journal_t *journal) return KNOT_EINVAL; } - /* Recalculate CRC. */ - int ret = journal_update_crc(journal->fd); - /* Close file. */ if (journal->fd > 0) { close(journal->fd); @@ -502,7 +352,7 @@ static int journal_close_file(journal_t *journal) free(journal->nodes); journal->nodes = NULL; - return ret; + return KNOT_EOK; } /*! \brief Sync node state to permanent storage. */ @@ -515,9 +365,7 @@ static int journal_update(journal_t *journal, journal_node_t *n) /* Calculate node offset. */ const size_t node_len = sizeof(journal_node_t); size_t i = n - journal->nodes; - if (i > journal->max_nodes) { - return KNOT_EINVAL; - } + assert(i < journal->max_nodes); /* Calculate node position in permanent storage. */ long jn_fpos = JOURNAL_HSIZE + (i + 1) * node_len; @@ -541,60 +389,53 @@ int journal_write_in(journal_t *j, journal_node_t **rn, uint64_t id, size_t len) const size_t node_len = sizeof(journal_node_t); *rn = NULL; - /* Find next free node. */ - uint16_t jnext = (j->qtail + 1) % j->max_nodes; - dbg_journal("journal: will write id=%llu, node=%u, size=%zu, fsize=%zu\n", (unsigned long long)id, j->qtail, len, j->fsize); - /* Calculate file end position (with imposed limits). */ - size_t file_end = j->fsize; - if (file_end > j->fslimit) { - file_end = j->fslimit; - } - - int seek_ret = 0; - - /* Increase free segment if on the end of file. */ - dbg_journal("journal: free.pos = %u free.len = %u\n", - j->free.pos, j->free.len); - journal_node_t *n = j->nodes + j->qtail; - if (j->free.pos + len >= file_end) { - - dbg_journal_verb("journal: * is last node\n"); - - /* Grow journal file until the size limit. */ - if(j->free.pos + len < j->fslimit) { - size_t diff = len - j->free.len; - dbg_journal("journal: * growing by +%zu, pos=%u, " - "new fsize=%zu\n", - diff, j->free.pos, - j->fsize + diff); - j->fsize += diff; /* Appending increases file size. */ - j->free.len += diff; - - } else { - /* Rewind if resize is needed, but the limit is reached. */ - journal_node_t *head = j->nodes + j->qhead; - j->fsize = j->free.pos; - j->free.pos = head->pos; - j->free.len = 0; - dbg_journal_verb("journal: * fslimit reached, " - "rewinding to %u\n", - head->pos); - dbg_journal_verb("journal: * file size trimmed to %zu\n", - j->fsize); - } - } - - /* Count node visits to prevent looping. */ - uint16_t visit_count = 0; + /* Count rewinds. */ + bool already_rewound = false; /* Evict occupied nodes if necessary. */ - while (j->free.len < len || j->nodes[jnext].flags > JOURNAL_FREE) { + while (j->free.len < len || jnode_next(j, j->qtail) == j->qhead) { - /* Evict least recent node if not empty. */ + /* Increase free segment if on the end of file. */ + bool is_empty = (j->qtail == j->qhead); journal_node_t *head = j->nodes + j->qhead; + journal_node_t *last = j->nodes + jnode_prev(j, j->qtail); + if (is_empty || (head->pos <= last->pos && j->free.pos > last->pos)) { + + dbg_journal_verb("journal: * is last node\n"); + + /* Grow journal file until the size limit. */ + if(j->free.pos + len < j->fslimit && jnode_next(j, j->qtail) != j->qhead) { + size_t diff = len - j->free.len; + dbg_journal("journal: * growing by +%zu, pos=%u, " + "new fsize=%zu\n", + diff, j->free.pos, + j->fsize + diff); + j->fsize += diff; /* Appending increases file size. */ + j->free.len += diff; + continue; + + } else if (!already_rewound) { + /* Rewind if resize is needed, but the limit is reached. */ + j->free.pos = jnode_base_pos(j->max_nodes); + j->free.len = 0; + if (!is_empty) { + j->free.len = head->pos - j->free.pos; + } + dbg_journal_verb("journal: * fslimit/nodelimit reached, " + "rewinding to %u\n", + j->free.pos); + already_rewound = true; + } else { + /* Already rewound, but couldn't collect enough free space. */ + return KNOT_ESPACE; + } + + /* Continue until enough free space is collected. */ + continue; + } /* Check if it has been synced to disk. */ if ((head->flags & JOURNAL_DIRTY) && (head->flags & JOURNAL_VALID)) { @@ -603,7 +444,7 @@ int journal_write_in(journal_t *j, journal_node_t **rn, uint64_t id, size_t len) /* Write back evicted node. */ head->flags = JOURNAL_FREE; - seek_ret = lseek(j->fd, JOURNAL_HSIZE + (j->qhead + 1) * node_len, SEEK_SET); + int seek_ret = lseek(j->fd, JOURNAL_HSIZE + (j->qhead + 1) * node_len, SEEK_SET); if (seek_ret < 0 || !sfwrite(head, node_len, j->fd)) { return KNOT_ERROR; } @@ -621,20 +462,14 @@ int journal_write_in(journal_t *j, journal_node_t **rn, uint64_t id, size_t len) /* Increase free segment. */ j->free.len += head->len; - - /* Update node visit count. */ - visit_count += 1; - if (visit_count >= j->max_nodes) { - return KNOT_ESPACE; - } } - /* Invalidate node and write back. */ + /* Invalidate tail node and write back. */ + journal_node_t *n = j->nodes + j->qtail; n->id = id; n->pos = j->free.pos; n->len = len; n->flags = JOURNAL_FREE; - n->next = jnext; journal_update(j, n); *rn = n; return KNOT_EOK; @@ -643,30 +478,15 @@ int journal_write_in(journal_t *j, journal_node_t **rn, uint64_t id, size_t len) int journal_write_out(journal_t *journal, journal_node_t *n) { /* Mark node as valid and write back. */ - uint16_t jnext = n->next; + uint16_t jnext = (journal->qtail + 1) % journal->max_nodes; size_t size = n->len; const size_t node_len = sizeof(journal_node_t); n->flags = JOURNAL_VALID | journal->bflags; - n->next = 0; journal_update(journal, n); - /* Handle free segment on node rotation. */ - if (journal->qtail > jnext && journal->fslimit == FSLIMIT_INF) { - /* Trim free space. */ - journal->fsize -= journal->free.len; - dbg_journal_verb("journal: * trimmed filesize to %zu\n", - journal->fsize); - - /* Rewind free segment. */ - journal_node_t *n = journal->nodes + jnext; - journal->free.pos = n->pos; - journal->free.len = 0; - - } else { - /* Mark used space. */ - journal->free.pos += size; - journal->free.len -= size; - } + /* Mark used space. */ + journal->free.pos += size; + journal->free.len -= size; dbg_journal("journal: finishing node=%u id=%llu flags=0x%x, " "data=<%u, %u> free=<%u, %u>\n", @@ -764,6 +584,12 @@ static int journal_fetch(journal_t *journal, uint64_t id, size_t endp = jnode_prev(journal, journal->qhead); for(; i != endp; i = jnode_prev(journal, i)) { journal_node_t *n = journal->nodes + i; + + /* Skip invalid nodes. */ + if (!(n->flags & JOURNAL_VALID)) { + continue; + } + if (cf(n->id, id) == 0) { *dst = journal->nodes + i; return KNOT_EOK; @@ -805,12 +631,13 @@ int journal_map(journal_t *journal, uint64_t id, char **dst, size_t size, bool r /* Check if entry exists. */ journal_node_t *n = NULL; int ret = journal_fetch(journal, id, journal_cmp_eq, &n); - if (ret != KNOT_EOK) { - /* Return error if read only. */ - if (rdonly) { + + /* Return if read-only, invalidate if rewritten to avoid duplicates. */ + if (rdonly) { + if (ret != KNOT_EOK) { return ret; } - + } else { /* Prepare journal write. */ ret = journal_write_in(journal, &n, id, size); if (ret != KNOT_EOK) { @@ -833,11 +660,6 @@ int journal_map(journal_t *journal, uint64_t id, char **dst, size_t size, bool r } size -= wb; } - } else { - /* Entry resizing is not really supported now. */ - if (n->len < size) { - return KNOT_ESPACE; - } } /* Align offset to page size (required). */ @@ -1121,26 +943,28 @@ static int journal_walk(const char *fn, uint32_t from, uint32_t to, if (ret != KNOT_EOK) { goto finish; } + + size_t i = n - journal->nodes; + assert(i < journal->max_nodes); - while (n != 0 && n != journal_end(journal)) { - /* Check for history end. */ - if (to == found_to) { - break; - } + for (; i != journal->qtail; i = jnode_next(journal, i)) { + journal_node_t *n = journal->nodes + i; - /* Skip wrong changesets. */ + /* Skip invalid nodes. */ if (!(n->flags & JOURNAL_VALID)) { - ++n; continue; } + /* Check for history end. */ + if (to == found_to) { + break; + } + /* Callback. */ ret = cb(journal, n, zone, chgs); if (ret != KNOT_EOK) { break; } - - ++n; } finish: @@ -1177,8 +1001,7 @@ static int load_changeset(journal_t *journal, journal_node_t *n, const zone_t *z return KNOT_EOK; } -int journal_load_changesets(const zone_t *zone, list_t *dst, - uint32_t from, uint32_t to) +int journal_load_changesets(const zone_t *zone, list_t *dst, uint32_t from, uint32_t to) { int ret = journal_walk(zone->conf->ixfr_db, from, to, &load_changeset, zone, dst); if (ret != KNOT_EOK) { @@ -1274,7 +1097,7 @@ int journal_mark_synced(const char *path) } size_t i = journal->qhead; - for(; i != journal->qtail; i = (i + 1) % journal->max_nodes) { + for(; i != journal->qtail; i = jnode_next(journal, i)) { mark_synced(journal, journal->nodes + i); } diff --git a/src/knot/server/journal.h b/src/knot/server/journal.h index 81c3e981c..8c788b4d0 100644 --- a/src/knot/server/journal.h +++ b/src/knot/server/journal.h @@ -67,7 +67,7 @@ typedef struct journal_node_t { uint64_t id; /*!< Node ID. */ uint16_t flags; /*!< Node flags. */ - uint16_t next; /*!< Next node ptr. */ + uint16_t next; /*!< UNUSED */ uint32_t pos; /*!< Position in journal file. */ uint32_t len; /*!< Entry data length. */ } journal_node_t; diff --git a/src/knot/zone/zone-load.c b/src/knot/zone/zone-load.c index eefafb95c..a72577fa6 100644 --- a/src/knot/zone/zone-load.c +++ b/src/knot/zone/zone-load.c @@ -97,7 +97,11 @@ int zone_load_journal(zone_t *zone, zone_contents_t *contents) /*! \todo Check what should be the upper bound. */ list_t chgs; init_list(&chgs); + + pthread_mutex_lock(&zone->journal_lock); int ret = journal_load_changesets(zone, &chgs, serial, serial - 1); + pthread_mutex_unlock(&zone->journal_lock); + if ((ret != KNOT_EOK && ret != KNOT_ERANGE) || EMPTY_LIST(chgs)) { changesets_free(&chgs); /* Absence of records is not an error. */ diff --git a/src/knot/zone/zone.c b/src/knot/zone/zone.c index e3557ebbb..5b13fed3f 100644 --- a/src/knot/zone/zone.c +++ b/src/knot/zone/zone.c @@ -76,6 +76,9 @@ zone_t* zone_new(conf_zone_t *conf) zone->ddns_queue_size = 0; init_list(&zone->ddns_queue); + // Journal lock + pthread_mutex_init(&zone->journal_lock, NULL); + // Initialize events zone_events_init(zone); @@ -96,6 +99,7 @@ void zone_free(zone_t **zone_ptr) free_ddns_queue(zone); pthread_mutex_destroy(&zone->ddns_lock); + pthread_mutex_destroy(&zone->journal_lock); /* Free assigned config. */ conf_free_zone(zone->conf); @@ -114,18 +118,18 @@ int zone_change_store(zone_t *zone, changeset_t *change) conf_zone_t *conf = zone->conf; + pthread_mutex_lock(&zone->journal_lock); int ret = journal_store_changeset(change, conf->ixfr_db, conf->ixfr_fslimit); if (ret == KNOT_EBUSY) { log_zone_notice(zone->name, "journal is full, flushing"); /* Transaction rolled back, journal released, we may flush. */ ret = zone_flush_journal(zone); - if (ret != KNOT_EOK) { - return ret; + if (ret == KNOT_EOK) { + ret = journal_store_changeset(change, conf->ixfr_db, conf->ixfr_fslimit); } - - return journal_store_changeset(change, conf->ixfr_db, conf->ixfr_fslimit); } + pthread_mutex_unlock(&zone->journal_lock); return ret; } @@ -137,18 +141,19 @@ int zone_changes_store(zone_t *zone, list_t *chgs) conf_zone_t *conf = zone->conf; + pthread_mutex_lock(&zone->journal_lock); int ret = journal_store_changesets(chgs, conf->ixfr_db, conf->ixfr_fslimit); + if (ret == KNOT_EBUSY) { log_zone_notice(zone->name, "journal is full, flushing"); /* Transaction rolled back, journal released, we may flush. */ ret = zone_flush_journal(zone); - if (ret != KNOT_EOK) { - return ret; + if (ret == KNOT_EOK) { + ret = journal_store_changesets(chgs, conf->ixfr_db, conf->ixfr_fslimit); } - - return journal_store_changesets(chgs, conf->ixfr_db, conf->ixfr_fslimit); } + pthread_mutex_unlock(&zone->journal_lock); return ret; } diff --git a/src/knot/zone/zone.h b/src/knot/zone/zone.h index 069f60c8f..e10456d23 100644 --- a/src/knot/zone/zone.h +++ b/src/knot/zone/zone.h @@ -61,6 +61,9 @@ typedef struct zone_t pthread_mutex_t ddns_lock; size_t ddns_queue_size; list_t ddns_queue; + + /*! \brief Journal access lock. */ + pthread_mutex_t journal_lock; /*! \brief Zone events. */ zone_events_t events; /*!< Zone events timers. */ diff --git a/src/knot/zone/zonedb.c b/src/knot/zone/zonedb.c index 705db05b0..c90c0ee73 100644 --- a/src/knot/zone/zonedb.c +++ b/src/knot/zone/zonedb.c @@ -41,7 +41,11 @@ static void discard_zone(zone_t *zone) { /* Flush bootstrapped zones. */ if (zone->zonefile_mtime == 0) { + + pthread_mutex_lock(&zone->journal_lock); zone_flush_journal(zone); + pthread_mutex_unlock(&zone->journal_lock); + } zone_free(&zone); } diff --git a/tests-extra/tests/ixfr/stress/test.py b/tests-extra/tests/ixfr/stress/test.py new file mode 100644 index 000000000..a56e70686 --- /dev/null +++ b/tests-extra/tests/ixfr/stress/test.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 + +'''Stress test for multiple incoming DDNS and outgoing IXFRs''' + +import random, socket, os + +from dnstest.utils import * +from dnstest.test import Test + +UPDATE_COUNT = 512 +UPDATE_SIZE = 64 + +chars="qwertyuiopasdfghjklzxcvbnm123456789" + +def randstr(): + return ''.join(random.choice(chars) for _ in range(63)) + +def flood(server, zone): + rr = None + updates = [] + for i in range(UPDATE_COUNT): + update = server.update(zone) + for j in range(UPDATE_SIZE): + rr = [randstr() + "." + zone[0].name, 3600, "TXT", randstr()] + update.add(*rr) + update.send() + return rr + +random.seed() + +t = Test() + +zone = t.zone_rnd(1, dnssec=False) +master = t.server("knot") + +# set journal limit for the master +master.ixfr_fslimit = "1000k" + +slaves = [t.server("knot") for _ in range(2)] +# set journal limit for one of the slaves +slaves[0].ixfr_fslimit = "500k" + +for s in slaves: + t.link(zone, master, s, ddns=True, ixfr=True) + +t.start() + +for s in slaves + [master]: + s.zone_wait(zone) + +# flood server with updates +last_rr = flood(master, zone) + +# wait for update and ixfr processing +t.sleep(10) + +# restart servers and dig for last change +for s in slaves + [master]: + s.stop() + s.start() + s.zone_wait(zone) + resp = s.dig(last_rr[0], "TXT") + resp.check(rdata = last_rr[3]) + +# check journal sizes +st = os.stat(master.dir + "/" + zone[0].name.lower() + "diff.db") +if st.st_size > 1300 * 1024: + detail_log("Journal too big, should be max 1000k, is: " + str(st.st_size // 1024) + "k") + set_err("JOURNAL SIZE OVERFLOW") + +st = os.stat(slaves[0].dir + "/" + zone[0].name.lower() + "diff.db") +if st.st_size > 650 * 1024: + detail_log("Journal too big, should be max 500k, is: " + str(st.st_size // 1024) + "k") + set_err("JOURNAL SIZE OVERFLOW") + +t.stop() + diff --git a/tests-extra/tools/dnstest/server.py b/tests-extra/tools/dnstest/server.py index 699c871c3..7197d063e 100644 --- a/tests-extra/tools/dnstest/server.py +++ b/tests-extra/tools/dnstest/server.py @@ -155,6 +155,7 @@ class Server(object): self.disable_notify = None self.max_conn_idle = None self.zonefile_sync = None + self.ixfr_fslimit = None self.inquirer = None @@ -937,6 +938,8 @@ class Knot(Server): s.item("zonefile-sync", self.zonefile_sync) else: s.item("zonefile-sync", "1d") + if self.ixfr_fslimit: + s.item("ixfr-fslimit", self.ixfr_fslimit) s.item("notify-timeout", "5") s.item("notify-retries", "5") s.item("semantic-checks", "on") diff --git a/tests/journal.c b/tests/journal.c index 8aba457e6..851b06589 100644 --- a/tests/journal.c +++ b/tests/journal.c @@ -23,6 +23,11 @@ #include <tap/basic.h> #include "knot/server/journal.h" +#include "knot/zone/zone-diff.h" + +#define RAND_RR_LABEL 16 +#define RAND_RR_PAYLOAD 64 +#define MIN_SOA_SIZE 22 /*! \brief Generate random string with given length. */ static int randstr(char* dst, size_t len) @@ -35,37 +40,244 @@ static int randstr(char* dst, size_t len) return 0; } +/*! \brief Init RRSet with type SOA and given serial. */ +static void init_soa(knot_rrset_t *rr, const uint32_t serial, const knot_dname_t *apex) +{ + knot_rrset_init(rr, knot_dname_copy(apex, NULL), KNOT_RRTYPE_SOA, KNOT_CLASS_IN); + + assert(serial < 256); + uint8_t soa_data[MIN_SOA_SIZE] = { 0, 0, 0, 0, 0, serial }; + int ret = knot_rrset_add_rdata(rr, soa_data, sizeof(soa_data), 3600, NULL); + assert(ret == KNOT_EOK); +} + +/*! \brief Init RRSet with type TXT, random owner and random payload. */ +static void init_random_rr(knot_rrset_t *rr , const knot_dname_t *apex) +{ + /* Create random label. */ + char owner[RAND_RR_LABEL + knot_dname_size(apex)]; + owner[0] = RAND_RR_LABEL - 1; + randstr(owner + 1, RAND_RR_LABEL); + + /* Append zone apex. */ + memcpy(owner + RAND_RR_LABEL, apex, knot_dname_size(apex)); + knot_rrset_init(rr, knot_dname_copy((knot_dname_t *)owner, NULL), KNOT_RRTYPE_TXT, KNOT_CLASS_IN); + + /* Create random RDATA. */ + uint8_t txt[RAND_RR_PAYLOAD + 1]; + txt[0] = RAND_RR_PAYLOAD - 1; + randstr((char *)(txt + 1), RAND_RR_PAYLOAD); + + int ret = knot_rrset_add_rdata(rr, txt, RAND_RR_PAYLOAD, 3600, NULL); + assert(ret == KNOT_EOK); +} + +/*! \brief Init changeset with random changes. */ +static void init_random_changeset(changeset_t *ch, const uint32_t from, const uint32_t to, const size_t size, const knot_dname_t *apex) +{ + int ret = changeset_init(ch, apex); + assert(ret == KNOT_EOK); + + // Add SOAs + knot_rrset_t soa; + init_soa(&soa, from, apex); + + ch->soa_from = knot_rrset_copy(&soa, NULL); + assert(ch->soa_from); + knot_rrset_clear(&soa, NULL); + + init_soa(&soa, to, apex); + ch->soa_to = knot_rrset_copy(&soa, NULL); + assert(ch->soa_to); + knot_rrset_clear(&soa, NULL); + + // Add RRs to add section + for (size_t i = 0; i < size / 2; ++i) { + knot_rrset_t rr; + init_random_rr(&rr, apex); + int ret = changeset_add_rrset(ch, &rr); + assert(ret == KNOT_EOK); + knot_rrset_clear(&rr, NULL); + } + + // Add RRs to remove section + for (size_t i = 0; i < size / 2; ++i) { + knot_rrset_t rr; + init_random_rr(&rr, apex); + int ret = changeset_rem_rrset(ch, &rr); + assert(ret == KNOT_EOK); + knot_rrset_clear(&rr, NULL); + } +} + +/*! \brief Compare two changesets for equality. */ +static bool changesets_eq(const changeset_t *ch1, changeset_t *ch2) +{ + if (changeset_size(ch1) != changeset_size(ch2)) { + return false; + } + + changeset_iter_t it1; + changeset_iter_all(&it1, ch1, true); + changeset_iter_t it2; + changeset_iter_all(&it2, ch2, true); + + knot_rrset_t rr1 = changeset_iter_next(&it1); + knot_rrset_t rr2 = changeset_iter_next(&it2); + bool ret = true; + while (!knot_rrset_empty(&rr1)) { + if (!knot_rrset_equal(&rr1, &rr2, KNOT_RRSET_COMPARE_WHOLE)) { + ret = false; + break; + } + rr1 = changeset_iter_next(&it1); + rr2 = changeset_iter_next(&it2); + } + + changeset_iter_clear(&it1); + changeset_iter_clear(&it2); + + return ret; +} + /*! \brief Journal fillup test with size check. */ -static void test_fillup(journal_t *journal, size_t fsize, unsigned iter) +static void test_fillup(journal_t *journal, size_t fsize, unsigned iter, size_t chunk_size) { - const unsigned chunk = 512 + rand() % 512; int ret = KNOT_EOK; char *mptr = NULL; - size_t large_entry_len = chunk * 1024; - char *large_entry = malloc(chunk * 1024); + char *large_entry = malloc(chunk_size); + randstr(large_entry, chunk_size); assert(large_entry); - randstr(large_entry, large_entry_len); + unsigned i = 0; - for (; i < 512; ++i) { - uint64_t chk_key = 0xBEBE + (iter * 512) + i; - ret = journal_map(journal, chk_key, &mptr, large_entry_len, false); + bool read_passed = true; + for (; i < 2 * JOURNAL_NCOUNT; ++i) { + uint64_t chk_key = 0xBEBE + i; + size_t entry_len = chunk_size/2 + rand() % (chunk_size/2); + + /* Write */ + ret = journal_map(journal, chk_key, &mptr, entry_len, false); if (ret != KNOT_EOK) { break; } + memcpy(mptr, large_entry, entry_len); + ret = journal_unmap(journal, chk_key, mptr, 1); + if (ret != KNOT_EOK) { + diag("journal_unmap = %s", knot_strerror(ret)); + read_passed = true; + break; + } + + /* Read */ + ret = journal_map(journal, chk_key, &mptr, entry_len, true); + if (ret == KNOT_EOK) { + ret = memcmp(large_entry, mptr, entry_len); + if (ret != 0) { + diag("integrity check failed"); + read_passed = false; + } else { + ret = journal_unmap(journal, chk_key, mptr, 0); + if (ret != KNOT_EOK) { + diag("journal_unmap(rdonly) = %s", knot_strerror(ret)); + read_passed = false; + } + } + } else { + diag("journal_map(rdonly) = %s", knot_strerror(ret)); + read_passed = false; + } - memcpy(mptr, large_entry, large_entry_len); - journal_unmap(journal, chk_key, mptr, 1); + if (!read_passed) { + break; + } } - is_int(KNOT_EBUSY, ret, "journal: fillup #%u (%d entries)", iter, i); + ok(read_passed, "journal: fillup #%u, reading written entries", iter); + ok(ret != KNOT_EOK, "journal: fillup #%u (%d entries)", iter, i); free(large_entry); /* Check file size. */ struct stat st; fstat(journal->fd, &st); - ok(st.st_size < fsize + large_entry_len, "journal: fillup / size check #%u", iter); - if (st.st_size > fsize + large_entry_len) { - diag("journal: fillup / size check #%u fsize(%zu) > max(%zu)", iter, st.st_size, fsize + large_entry_len); + ok(st.st_size < fsize + chunk_size, "journal: fillup / size check #%u", iter); + if (st.st_size > fsize + chunk_size) { + diag("journal: fillup / size check #%u fsize(%zu) > max(%zu)", + iter, (size_t)st.st_size, fsize + chunk_size); + } +} + +/*! \brief Test behavior with real changesets. */ +static void test_store_load(const char *jfilename) +{ + const size_t filesize = 100 * 1024; + uint8_t *apex = (uint8_t *)"\4test"; + + /* Create fake zone. */ + conf_zone_t zconf = { .ixfr_db = (char *)jfilename, .ixfr_fslimit = filesize }; + zone_t z = { .name = apex, .conf = &zconf }; + + /* Save and load changeset. */ + changeset_t ch; + init_random_changeset(&ch, 0, 1, 128, apex); + int ret = journal_store_changeset(&ch, jfilename, filesize); + ok(ret == KNOT_EOK, "journal: store changeset"); + list_t l; + init_list(&l); + ret = journal_load_changesets(&z, &l, 0, 1); + ok(ret == KNOT_EOK && changesets_eq(TAIL(l), &ch), "journal: load changeset"); + changeset_clear(&ch); + changesets_free(&l); + init_list(&l); + + /* Fill the journal. */ + ret = KNOT_EOK; + uint32_t serial = 1; + for (; ret == KNOT_EOK; ++serial) { + init_random_changeset(&ch, serial, serial + 1, 128, apex); + ret = journal_store_changeset(&ch, jfilename, filesize); + changeset_clear(&ch); + } + ok(ret == KNOT_EBUSY, "journal: overfill with changesets"); + + /* Load all changesets stored until now. */ + serial--; + ret = journal_load_changesets(&z, &l, 0, serial); + changesets_free(&l); + ok(ret == KNOT_EOK, "journal: load changesets"); + + /* Flush the journal. */ + ret = journal_mark_synced(jfilename); + ok(ret == KNOT_EOK, "journal: flush"); + + /* Store next changeset. */ + init_random_changeset(&ch, serial, serial + 1, 128, apex); + ret = journal_store_changeset(&ch, jfilename, filesize); + changeset_clear(&ch); + ok(ret == KNOT_EOK, "journal: store after flush"); + + /* Load all changesets, except the first one that got evicted. */ + init_list(&l); + ret = journal_load_changesets(&z, &l, 1, serial + 1); + changesets_free(&l); + ok(ret == KNOT_EOK, "journal: load changesets after flush"); +} + +/*! \brief Test behavior when writing to jurnal and flushing it. */ +static void test_stress(const char *jfilename) +{ + uint8_t *apex = (uint8_t *)"\4test"; + const size_t filesize = 100 * 1024; + int ret = KNOT_EOK; + uint32_t serial = 0; + size_t update_size = 3; + for (; ret == KNOT_EOK && serial < 32; ++serial) { + changeset_t ch; + init_random_changeset(&ch, serial, serial + 1, update_size, apex); + update_size *= 1.5; + ret = journal_store_changeset(&ch, jfilename, filesize); + changeset_clear(&ch); + journal_mark_synced(jfilename); } + ok(ret == KNOT_ESPACE, "journal: does not overfill under load"); } int main(int argc, char *argv[]) @@ -87,8 +299,12 @@ int main(int argc, char *argv[]) close(tmp_fd); remove(jfilename); + /* Try to open journal with too small fsize. */ + journal_t *journal = journal_open(jfilename, 1024); + ok(journal == NULL, "journal: open too small"); + /* Open/create new journal. */ - journal_t *journal = journal_open(jfilename, fsize); + journal = journal_open(jfilename, fsize); ok(journal != NULL, "journal: open journal '%s'", jfilename); if (journal == NULL) { goto skip_all; @@ -145,13 +361,14 @@ int main(int argc, char *argv[]) } is_int(KNOT_EOK, ret, "journal: sustained mmap r/w"); - /* Overfill */ + /* Overfill (yields ESPACE/EBUSY) */ ret = journal_map(journal, chk_key, &mptr, fsize, false); - is_int(KNOT_ESPACE, ret, "journal: overfill"); + ok(ret != KNOT_EOK, "journal: overfill"); /* Fillup */ - unsigned iterations = 10; - for (unsigned i = 0; i < iterations; ++i) { + size_t sizes[] = {16, 64, 1024, 4096, 512 * 1024, 1024 * 1024 }; + const int num_sizes = sizeof(sizes)/sizeof(size_t); + for (unsigned i = 0; i < 2 * num_sizes; ++i) { /* Journal flush. */ journal_close(journal); ret = journal_mark_synced(jfilename); @@ -159,15 +376,21 @@ int main(int argc, char *argv[]) journal = journal_open(jfilename, fsize); ok(journal != NULL, "journal: reopen after flush #%u", i); /* Journal fillup. */ - test_fillup(journal, fsize, i); + test_fillup(journal, fsize, i, sizes[i % num_sizes]); } - - + /* Close journal. */ journal_close(journal); /* Delete journal. */ remove(jfilename); + + test_store_load(jfilename); + remove(jfilename); + + test_stress(jfilename); + remove(jfilename); + free(tmpdir); skip_all: -- GitLab