Skip to content
Snippets Groups Projects
Commit d941a536 authored by Jan Včelák's avatar Jan Včelák :rocket:
Browse files

requestor: migrate to one layer instead of an overlay

parent c17f8fec
No related branches found
No related tags found
No related merge requests found
......@@ -114,7 +114,6 @@ static int request_recv(struct knot_request *request, int timeout_ms)
return ret;
}
_public_
struct knot_request *knot_request_make(knot_mm_t *mm,
const struct sockaddr *dst,
const struct sockaddr *src,
......@@ -145,7 +144,6 @@ struct knot_request *knot_request_make(knot_mm_t *mm,
return request;
}
_public_
void knot_request_free(struct knot_request *request, knot_mm_t *mm)
{
if (request == NULL) {
......@@ -161,7 +159,6 @@ void knot_request_free(struct knot_request *request, knot_mm_t *mm)
mm_free(mm, request);
}
_public_
int knot_requestor_init(struct knot_requestor *requestor, knot_mm_t *mm)
{
if (requestor == NULL) {
......@@ -176,19 +173,12 @@ int knot_requestor_init(struct knot_requestor *requestor, knot_mm_t *mm)
}
init_list(pending);
int ret = knot_overlay_init(&requestor->overlay, mm);
if (ret != KNOT_EOK) {
mm_free(mm, pending);
return ret;
}
requestor->mm = mm;
requestor->pending = pending;
return KNOT_EOK;
}
_public_
void knot_requestor_clear(struct knot_requestor *requestor)
{
if (requestor == NULL) {
......@@ -198,18 +188,19 @@ void knot_requestor_clear(struct knot_requestor *requestor)
while (knot_requestor_dequeue(requestor) == KNOT_EOK)
;
knot_overlay_finish(&requestor->overlay);
knot_overlay_deinit(&requestor->overlay);
// XXX: API migration, should be run per request
if (requestor->layer.api) {
knot_layer_finish(&requestor->layer);
}
mm_free(requestor->mm, PENDING(requestor));
}
_public_
bool knot_requestor_finished(struct knot_requestor *requestor)
{
return requestor == NULL || EMPTY_LIST(*PENDING(requestor));
}
_public_
int knot_requestor_overlay(struct knot_requestor *requestor,
const knot_layer_api_t *proc, void *param)
{
......@@ -217,10 +208,15 @@ int knot_requestor_overlay(struct knot_requestor *requestor,
return KNOT_EINVAL;
}
return knot_overlay_add(&requestor->overlay, proc, param);
// XXX: API migration, prevent adding multiple layers
assert(requestor->layer.api == NULL);
knot_layer_init(&requestor->layer, requestor->mm, proc);
requestor->layer.state = knot_layer_begin(&requestor->layer, param);
return KNOT_EOK;
}
_public_
int knot_requestor_enqueue(struct knot_requestor *requestor,
struct knot_request *request)
{
......@@ -243,7 +239,6 @@ int knot_requestor_enqueue(struct knot_requestor *requestor,
return KNOT_EOK;
}
_public_
int knot_requestor_dequeue(struct knot_requestor *requestor)
{
if (requestor == NULL) {
......@@ -270,12 +265,12 @@ static int request_io(struct knot_requestor *req, struct knot_request *last,
knot_pkt_t *resp = last->resp;
/* Data to be sent. */
if (req->overlay.state == KNOT_STATE_PRODUCE) {
if (req->layer.state == KNOT_STATE_PRODUCE) {
/* Process query and send it out. */
knot_overlay_produce(&req->overlay, query);
knot_layer_produce(&req->layer, query);
if (req->overlay.state == KNOT_STATE_CONSUME) {
if (req->layer.state == KNOT_STATE_CONSUME) {
ret = request_send(last, timeout_ms);
if (ret != KNOT_EOK) {
return ret;
......@@ -284,7 +279,7 @@ static int request_io(struct knot_requestor *req, struct knot_request *last,
}
/* Data to be read. */
if (req->overlay.state == KNOT_STATE_CONSUME) {
if (req->layer.state == KNOT_STATE_CONSUME) {
/* Read answer and process it. */
ret = request_recv(last, timeout_ms);
if (ret < 0) {
......@@ -292,7 +287,7 @@ static int request_io(struct knot_requestor *req, struct knot_request *last,
}
(void) knot_pkt_parse(resp, 0);
knot_overlay_consume(&req->overlay, resp);
knot_layer_consume(&req->layer, resp);
}
return KNOT_EOK;
......@@ -304,26 +299,25 @@ static int exec_request(struct knot_requestor *req, struct knot_request *last,
int ret = KNOT_EOK;
/* Do I/O until the processing is satisifed or fails. */
while (req->overlay.state & (KNOT_STATE_PRODUCE|KNOT_STATE_CONSUME)) {
while (req->layer.state & (KNOT_STATE_PRODUCE|KNOT_STATE_CONSUME)) {
ret = request_io(req, last, timeout_ms);
if (ret != KNOT_EOK) {
knot_overlay_reset(&req->overlay);
knot_layer_reset(&req->layer);
return ret;
}
}
/* Expect complete request. */
if (req->overlay.state != KNOT_STATE_DONE) {
if (req->layer.state != KNOT_STATE_DONE) {
ret = KNOT_LAYER_ERROR;
}
/* Finish current query processing. */
knot_overlay_reset(&req->overlay);
knot_layer_reset(&req->layer);
return ret;
}
_public_
int knot_requestor_exec(struct knot_requestor *requestor, int timeout_ms)
{
if (knot_requestor_finished(requestor)) {
......
......@@ -19,9 +19,9 @@
#include <sys/socket.h>
#include <sys/time.h>
#include "knot/query/overlay.h"
#include "libknot/rrtype/tsig.h"
#include "knot/query/layer.h"
#include "libknot/mm_ctx.h"
#include "libknot/rrtype/tsig.h"
struct knot_request;
......@@ -37,7 +37,7 @@ enum {
struct knot_requestor {
knot_mm_t *mm; /*!< Memory context. */
void *pending; /*!< Pending requests (FIFO). */
struct knot_overlay overlay; /*!< Response processing overlay. */
struct knot_layer layer; /*!< Response processing layer. */
};
/*! \brief Request data (socket, payload, response, TSIG and endpoints). */
......
......@@ -21,7 +21,8 @@
#include <string.h>
#include <fcntl.h>
#include "knot/conf/conf.h"
#include "libknot/descriptor.h"
#include "libknot/errcode.h"
#include "knot/query/layer.h"
#include "knot/query/requestor.h"
#include "contrib/mempattern.h"
......@@ -52,11 +53,12 @@ static void set_blocking_mode(int sock)
fcntl(sock, F_SETFL, flags);
}
static void* responder_thread(void *arg)
static void *responder_thread(void *arg)
{
int fd = *((int *)arg);
int fd = *(int *)arg;
set_blocking_mode(fd);
uint8_t buf[KNOT_WIRE_MAX_PKTSIZE];
uint8_t buf[KNOT_WIRE_MAX_PKTSIZE] = { 0 };
while (true) {
int client = accept(fd, NULL, NULL);
if (client < 0) {
......@@ -71,27 +73,32 @@ static void* responder_thread(void *arg)
net_dns_tcp_send(client, buf, len, -1);
close(client);
}
return NULL;
}
/* Test implementations. */
static struct knot_request *make_query(struct knot_requestor *requestor, conf_remote_t *remote)
static struct knot_request *make_query(struct knot_requestor *requestor,
const struct sockaddr_storage *dst,
const struct sockaddr_storage *src)
{
knot_pkt_t *pkt = knot_pkt_new(NULL, KNOT_WIRE_MAX_PKTSIZE, requestor->mm);
assert(pkt);
static const knot_dname_t *root = (uint8_t *)"";
knot_pkt_put_question(pkt, root, KNOT_CLASS_IN, KNOT_RRTYPE_SOA);
const struct sockaddr *dst = (const struct sockaddr *)&remote->addr;
const struct sockaddr *src = (const struct sockaddr *)&remote->via;
return knot_request_make(requestor->mm, dst, src, pkt, 0);
return knot_request_make(requestor->mm, (struct sockaddr *)dst,
(struct sockaddr *)src, pkt, 0);
}
static void test_disconnected(struct knot_requestor *requestor, conf_remote_t *remote)
static void test_disconnected(struct knot_requestor *requestor,
const struct sockaddr_storage *dst,
const struct sockaddr_storage *src)
{
/* Enqueue packet. */
int ret = knot_requestor_enqueue(requestor, make_query(requestor, remote));
struct knot_request *req = make_query(requestor, dst, src);
int ret = knot_requestor_enqueue(requestor, req);
is_int(KNOT_EOK, ret, "requestor: disconnected/enqueue");
/* Wait for completion. */
......@@ -99,10 +106,13 @@ static void test_disconnected(struct knot_requestor *requestor, conf_remote_t *r
is_int(KNOT_ECONN, ret, "requestor: disconnected/wait");
}
static void test_connected(struct knot_requestor *requestor, conf_remote_t *remote)
static void test_connected(struct knot_requestor *requestor,
const struct sockaddr_storage *dst,
const struct sockaddr_storage *src)
{
/* Enqueue packet. */
int ret = knot_requestor_enqueue(requestor, make_query(requestor, remote));
struct knot_request *req = make_query(requestor, dst, src);
int ret = knot_requestor_enqueue(requestor, req);
is_int(KNOT_EOK, ret, "requestor: connected/enqueue");
/* Wait for completion. */
......@@ -112,7 +122,8 @@ static void test_connected(struct knot_requestor *requestor, conf_remote_t *remo
/* Enqueue multiple queries. */
ret = KNOT_EOK;
for (unsigned i = 0; i < 10; ++i) {
ret |= knot_requestor_enqueue(requestor, make_query(requestor, remote));
struct knot_request *req = make_query(requestor, dst, src);
ret |= knot_requestor_enqueue(requestor, req);
}
is_int(KNOT_EOK, ret, "requestor: multiple enqueue");
......@@ -131,50 +142,44 @@ int main(int argc, char *argv[])
knot_mm_t mm;
mm_ctx_mempool(&mm, MM_DEFAULT_BLKSIZE);
conf_remote_t remote;
memset(&remote, 0, sizeof(conf_remote_t));
sockaddr_set(&remote.addr, AF_INET, "127.0.0.1", 0);
sockaddr_set(&remote.via, AF_INET, "127.0.0.1", 0);
/* Initialize requestor. */
struct knot_requestor requestor;
knot_requestor_init(&requestor, &mm);
knot_requestor_overlay(&requestor, &dummy_module, NULL);
/* Test requestor in disconnected environment. */
test_disconnected(&requestor, &remote);
/* Define endpoints. */
struct sockaddr_storage client = { 0 };
sockaddr_set(&client, AF_INET, "127.0.0.1", 0);
struct sockaddr_storage server = { 0 };
sockaddr_set(&server, AF_INET, "127.0.0.1", 0);
/* Bind to random port. */
int origin_fd = net_bound_socket(SOCK_STREAM, &remote.addr, 0);
assert(origin_fd > 0);
socklen_t addr_len = sockaddr_len((struct sockaddr *)&remote.addr);
getsockname(origin_fd, (struct sockaddr *)&remote.addr, &addr_len);
int ret = listen(origin_fd, 10);
assert(ret == 0);
int responder_fd = net_bound_socket(SOCK_STREAM, &server, 0);
assert(responder_fd >= 0);
socklen_t addr_len = sockaddr_len((struct sockaddr *)&server);
getsockname(responder_fd, (struct sockaddr *)&server, &addr_len);
/* Responder thread. */
/* Test requestor in disconnected environment. */
test_disconnected(&requestor, &server, &client);
/* Start responder. */
int ret = listen(responder_fd, 10);
assert(ret == 0);
pthread_t thread;
pthread_create(&thread, 0, responder_thread, &origin_fd);
pthread_create(&thread, 0, responder_thread, &responder_fd);
/* Test requestor in connected environment. */
test_connected(&requestor, &remote);
/*! \todo #243 TSIG secured requests test should be implemented. */
test_connected(&requestor, &server, &client);
/* Terminate responder. */
int responder = net_connected_socket(SOCK_STREAM, &remote.addr, NULL);
assert(responder > 0);
net_dns_tcp_send(responder, (const uint8_t *)"", 1, TIMEOUT);
(void) pthread_join(thread, 0);
close(responder);
/* Close requestor. */
knot_requestor_clear(&requestor);
close(origin_fd);
int conn = net_connected_socket(SOCK_STREAM, &server, NULL);
assert(conn > 0);
net_dns_tcp_send(conn, (uint8_t *)"", 1, TIMEOUT);
pthread_join(thread, NULL);
close(responder_fd);
/* Cleanup. */
mp_delete((struct mempool *)mm.ctx);
conf_free(conf());
return 0;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment