Skip to content
Snippets Groups Projects
Commit c3ad1f83 authored by Marek Vavruša's avatar Marek Vavruša
Browse files

daemon/bindings: Lua API for worker (resolve, stats)

parent 66cc9ded
No related branches found
No related tags found
No related merge requests found
......@@ -15,6 +15,7 @@
*/
#include <uv.h>
#include <libknot/descriptor.h>
#include "lib/cache.h"
#include "daemon/bindings.h"
......@@ -543,3 +544,75 @@ int lib_event(lua_State *L)
register_lib(L, "event", lib);
return 1;
}
static inline struct worker_ctx *wrk_luaget(lua_State *L) {
lua_getglobal(L, "__worker");
struct worker_ctx *worker = lua_touserdata(L, -1);
lua_pop(L, 1);
return worker;
}
static int wrk_resolve(lua_State *L)
{
struct worker_ctx *worker = wrk_luaget(L);
if (!worker) {
return 0;
}
/* Create query packet */
knot_pkt_t *pkt = knot_pkt_new(NULL, KNOT_EDNS_MAX_UDP_PAYLOAD, NULL);
if (!pkt) {
lua_pushstring(L, strerror(ENOMEM));
lua_error(L);
}
uint8_t dname[KNOT_DNAME_MAXLEN];
knot_dname_from_str(dname, lua_tostring(L, 1), sizeof(dname));
/* Check class and type */
uint16_t rrtype = lua_tointeger(L, 2);
if (!lua_isnumber(L, 2)) {
lua_pushstring(L, "invalid RR type");
lua_error(L);
}
uint16_t rrclass = lua_tointeger(L, 3);
if (!lua_isnumber(L, 3)) { /* Default class is IN */
rrclass = KNOT_CLASS_IN;
}
knot_pkt_put_question(pkt, dname, rrclass, rrtype);
knot_wire_set_rd(pkt->wire);
/* Resolve it */
int ret = worker_resolve(worker, pkt);
knot_pkt_free(&pkt);
if (ret != 0) {
lua_pushstring(L, kr_strerror(ret));
lua_error(L);
}
lua_pushboolean(L, true);
return 1;
}
/** Return worker statistics. */
static int wrk_stats(lua_State *L)
{
struct worker_ctx *worker = wrk_luaget(L);
if (!worker) {
return 0;
}
lua_newtable(L);
lua_pushnumber(L, worker->stats.concurrent);
lua_setfield(L, -2, "concurrent");
lua_pushnumber(L, worker->stats.udp);
lua_setfield(L, -2, "udp");
lua_pushnumber(L, worker->stats.tcp);
lua_setfield(L, -2, "tcp");
return 1;
}
int lib_worker(lua_State *L)
{
static const luaL_Reg lib[] = {
{ "resolve", wrk_resolve },
{ "stats", wrk_stats },
{ NULL, NULL }
};
register_lib(L, "worker", lib);
return 1;
}
......@@ -75,4 +75,11 @@ int lib_cache(lua_State *L);
* @param L scriptable
* @return number of packages to load
*/
int lib_event(lua_State *L);
\ No newline at end of file
int lib_event(lua_State *L);
/**
* Load worker API.
* @param L scriptable
* @return number of packages to load
*/
int lib_worker(lua_State *L);
\ No newline at end of file
......@@ -135,6 +135,7 @@ static struct worker_ctx *init_worker(uv_loop_t *loop, struct engine *engine, mm
engine_lualib(engine, "cache", lib_cache);
engine_lualib(engine, "event", lib_event);
engine_lualib(engine, "kres", lib_kres);
engine_lualib(engine, "worker", lib_worker);
/* Create main worker. */
struct worker_ctx *worker = mm_alloc(pool, sizeof(*worker));
......@@ -146,6 +147,9 @@ static struct worker_ctx *init_worker(uv_loop_t *loop, struct engine *engine, mm
worker->loop = loop;
loop->data = worker;
worker_reserve(worker, MP_FREELIST_SIZE);
/* Register worker in Lua thread */
lua_pushlightuserdata(engine->L, worker);
lua_setglobal(engine->L, "__worker");
return worker;
}
......
......@@ -117,9 +117,10 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
task->next_query = pktbuf;
/* Start resolution */
uv_timer_init(handle->loop, &task->timeout);
uv_timer_init(worker->loop, &task->timeout);
task->timeout.data = task;
kr_resolve_begin(&task->req, &engine->resolver, answer);
worker->stats.concurrent += 1;
return task;
}
......@@ -128,7 +129,7 @@ static void qr_task_free(uv_handle_t *handle)
struct qr_task *task = handle->data;
/* Return handle to the event loop in case
* it was exclusively taken by this task. */
if (!uv_has_ref(task->source.handle)) {
if (task->source.handle && !uv_has_ref(task->source.handle)) {
uv_ref(task->source.handle);
io_start_read(task->source.handle);
}
......@@ -149,6 +150,8 @@ static void qr_task_free(uv_handle_t *handle)
mp_delete_count = 0;
}
#endif
/* Update stats */
worker->stats.concurrent -= 1;
}
static void qr_task_timeout(uv_timer_t *req)
......@@ -178,9 +181,14 @@ static int qr_task_on_send(struct qr_task *task, int status)
static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt)
{
int ret = 0;
if (!handle) {
return qr_task_on_send(task, kr_error(EIO));
}
if (handle->type == UV_UDP) {
uv_buf_t buf = { (char *)pkt->wire, pkt->size };
ret = uv_udp_try_send((uv_udp_t *)handle, &buf, 1, addr);
if (handle != task->source.handle)
task->worker->stats.udp += 1;
} else {
uint16_t pkt_size = htons(pkt->size);
uv_buf_t buf[2] = {
......@@ -188,6 +196,8 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockad
{ (char *)pkt->wire, pkt->size }
};
ret = uv_try_write((uv_stream_t *)handle, buf, 2);
if (handle != task->source.handle)
task->worker->stats.tcp += 1;
}
return qr_task_on_send(task, (ret >= 0) ? 0 : -1);
}
......@@ -204,6 +214,7 @@ static void qr_task_on_connect(uv_connect_t *connect, int status)
static int qr_task_finalize(struct qr_task *task, int state)
{
kr_resolve_finish(&task->req, state);
/* Send back answer */
(void) qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer);
return state == KNOT_STATE_DONE ? 0 : kr_error(EIO);
}
......@@ -238,8 +249,7 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
}
/* Create connection for iterative query */
uv_handle_t *source_handle = task->source.handle;
task->next_handle = io_create(source_handle->loop, sock_type);
task->next_handle = io_create(task->worker->loop, sock_type);
if (task->next_handle == NULL) {
return qr_task_finalize(task, KNOT_STATE_FAIL);
}
......@@ -256,6 +266,7 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
}
connect->data = task;
} else {
printf("sending: %s %u\n", knot_dname_to_str_alloc(knot_pkt_qname(next_query)), knot_pkt_qtype(next_query));
if (qr_task_send(task, task->next_handle, addr, next_query) != 0) {
return qr_task_step(task, NULL);
}
......@@ -293,6 +304,20 @@ int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *quer
return qr_task_step(task, query);
}
int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query)
{
if (!worker) {
return kr_error(EINVAL);
}
/* Create task */
struct qr_task *task = qr_task_create(worker, NULL, query, NULL);
if (!task) {
return kr_error(ENOMEM);
}
return qr_task_step(task, query);
}
int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
{
array_init(worker->pools);
......
......@@ -33,21 +33,26 @@ struct worker_ctx {
uv_loop_t *loop;
mm_ctx_t *mm;
uint8_t wire_buf[KNOT_WIRE_MAX_PKTSIZE];
struct {
size_t concurrent;
size_t udp;
size_t tcp;
} stats;
mp_freelist_t pools;
};
/**
* Resolve query.
*
* @param worker
* @param handle
* @param answer
* @param query
* @param addr
* @return 0, error code
* Process incoming packet (query or answer to subrequest).
* @return 0 or an error code
*/
int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr);
/**
* Schedule query for resolution.
* @return 0 or an error code
*/
int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query);
/** Reserve worker buffers */
int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment