Skip to content
Snippets Groups Projects
Commit fcf6ae51 authored by Vaclav Sraier's avatar Vaclav Sraier Committed by Aleš Mrázek
Browse files

kresctl: basic implementation using the new HTTP API

parent 18d0d688
No related branches found
No related tags found
1 merge request!1331manager: fully featured HTTP API
def main():
print("Knot Resolver CLI successfully running...")
print("... unfortunatelly, it does nothing at the moment")
from typing import TYPE_CHECKING, cast
from typing_extensions import Literal
from knot_resolver_manager.utils.requests import request
if TYPE_CHECKING:
from knot_resolver_manager.cli.__main__ import Args, ConfigArgs
def config(args: "Args") -> None:
cfg: "ConfigArgs" = cast("ConfigArgs", args.command)
if not cfg.path.startswith("/"):
cfg.path = "/" + cfg.path
method: Literal["GET", "POST"] = "GET" if cfg.replacement_value is None else "POST"
url = f"{args.socket}/v1/config{cfg.path}"
response = request(method, url, cfg.replacement_value)
print(response)
def stop(args: "Args") -> None:
url = f"{args.socket}/stop"
response = request("POST", url)
print(response)
from knot_resolver_manager.cli import main
import argparse
from abc import ABC
from typing import Optional
from knot_resolver_manager.cli import config, stop
from knot_resolver_manager.compat.dataclasses import dataclass
class Cmd(ABC):
def __init__(self, ns: argparse.Namespace) -> None:
pass
def run(self, args: "Args") -> None:
raise NotImplementedError()
class ConfigArgs(Cmd):
def __init__(self, ns: argparse.Namespace) -> None:
super().__init__(ns)
self.path: str = str(ns.path)
self.replacement_value: Optional[str] = ns.new_value
self.delete: bool = ns.delete
self.stdin: bool = ns.stdin
def run(self, args: "Args") -> None:
config(args)
class StopArgs(Cmd):
def run(self, args: "Args") -> None:
stop(args)
@dataclass
class Args:
socket: str
command: Cmd # union in the future
def parse_args() -> Args:
# pylint: disable=redefined-outer-name
parser = argparse.ArgumentParser("kresctl", description="CLI for controlling Knot Resolver")
parser.add_argument(
"-s",
"--socket",
action="store",
type=str,
help="manager API listen address",
default="http+unix://%2Fvar%2Frun%2Fknot-resolver%2Fmanager.sock",
nargs=1,
required=True,
)
subparsers = parser.add_subparsers()
config = subparsers.add_parser(
"config", help="dynamically change configuration of a running resolver", aliases=["c", "conf"]
)
config.add_argument("path", type=str, help="which part of config should we work with")
config.add_argument(
"new_value",
type=str,
nargs="?",
help="optional, what value should we set for the given path (JSON)",
default=None,
)
config.add_argument("-d", "--delete", action="store_true", help="delete part of the config tree", default=False)
config.add_argument("--stdin", help="read new config value on stdin", action="store_true", default=False)
config.set_defaults(command_type=ConfigArgs)
stop = subparsers.add_parser("stop", help="shutdown everything")
stop.set_defaults(command_type=StopArgs)
ns = parser.parse_args()
return Args(socket=ns.socket[0], command=ns.command_type(ns)) # type: ignore[call-arg]
if __name__ == "__main__":
main()
_args = parse_args()
_args.command.run(_args)
import ipaddress
import json
import multiprocessing
import subprocess
import time
import urllib.parse
from pathlib import Path
from typing import Dict, List, Union
import requests
from knot_resolver_manager import compat
from knot_resolver_manager.server import start_server
from knot_resolver_manager.utils.modeling import ParsedTree
class KnotManagerClient:
def __init__(self, url: str):
self._url = url
def _create_url(self, path: str) -> str:
return urllib.parse.urljoin(self._url, path)
def stop(self) -> None:
response = requests.post(self._create_url("/stop"))
print(response.text)
def set_num_workers(self, n: int) -> None:
response = requests.post(self._create_url("/config/server/workers"), data=str(n))
print(response.text)
def set_groupid(self, gid: str) -> None:
response = requests.post(self._create_url("/config/server/groupid"), data=f'"{gid}"')
print(response.text)
def set_static_hints(self, hints: Dict[str, List[Union[ipaddress.IPv4Address, ipaddress.IPv6Address]]]) -> None:
payload = {name: [str(a) for a in addrs] for name, addrs in hints.items()}
response = requests.post(self._create_url("/config/static-hints/hints"), json=payload)
print(response.text)
def set_listen_ip_address(self, ip: Union[ipaddress.IPv4Address, ipaddress.IPv6Address], port: int) -> None:
payload = [{"listen": {"ip": str(ip), "port": port}}]
response = requests.post(self._create_url("/config/network/interfaces"), json=payload)
print(response)
def wait_for_initialization(self, timeout_sec: float = 5, time_step: float = 0.4) -> None:
started = time.time()
while True:
try:
response = requests.get(self._create_url("/"))
data = json.loads(response.text)
if data["status"] == "RUNNING":
return
except BaseException:
pass
if time.time() - started > timeout_sec:
raise TimeoutError("The manager did not start in time")
time.sleep(time_step)
def count_running_kresds() -> int:
"""
Inteded use-case is testing... Nothing more
Looks at running processes in the system and returns the number of kresd instances observed.
"""
cmd = subprocess.run(
"ps aux | grep kresd | grep -v grep", shell=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, check=False
)
return len(str(cmd.stdout, "utf8").strip().split("\n"))
class _DefaultSentinel:
pass
_DEFAULT_SENTINEL = _DefaultSentinel()
def start_manager_in_background(
initial_config: Union[Path, ParsedTree, _DefaultSentinel] = _DEFAULT_SENTINEL
) -> multiprocessing.Process:
if isinstance(initial_config, _DefaultSentinel):
p = multiprocessing.Process(target=compat.asyncio.run, args=(start_server(),))
else:
p = multiprocessing.Process(target=compat.asyncio.run, args=(start_server(config=initial_config),))
p.start()
return p
import ipaddress
import sys
import click
from click.exceptions import ClickException
from knot_resolver_manager.client import KnotManagerClient
from knot_resolver_manager.datamodel.config_schema import KresConfig
from knot_resolver_manager.exceptions import KresManagerException
from knot_resolver_manager.utils.modeling import parse_yaml
BASE_URL = "base_url"
@click.group()
@click.option(
"-u",
"--url",
"base_url",
nargs=1,
default="http://localhost:5000/",
help="Set base URL on which the manager communicates",
)
@click.pass_context
def main(ctx: click.Context, base_url: str) -> None:
ctx.ensure_object(dict)
ctx.obj[BASE_URL] = base_url
@main.command(help="Shutdown the manager and all workers")
@click.pass_context
def stop(ctx: click.Context) -> None:
client = KnotManagerClient(ctx.obj[BASE_URL])
client.stop()
@main.command("gen-lua", help="Generate LUA config from a given declarative config")
@click.argument("config_path", type=str, nargs=1)
def gen_lua(config_path: str) -> None:
try:
with open(config_path, "r", encoding="utf8") as f:
data = f.read()
parsed = parse_yaml(data)
config = KresConfig(parsed)
lua = config.render_lua()
click.echo_via_pager(lua)
except KresManagerException as e:
ne = ClickException(str(e))
ne.exit_code = 1
raise ne
@main.command(help="Set number of workers")
@click.argument("instances", type=int, nargs=1)
@click.pass_context
def workers(ctx: click.Context, instances: int) -> None:
client = KnotManagerClient(ctx.obj[BASE_URL])
client.set_num_workers(instances)
@main.command(help="Set the manager groupid")
@click.argument("gid", type=str, nargs=1)
@click.pass_context
def groupid(ctx: click.Context, gid: str) -> None:
client = KnotManagerClient(ctx.obj[BASE_URL])
client.set_groupid(gid)
@main.command("one-static-hint", help="Set one inline static-hint hints (replaces old static hints)")
@click.argument("name", type=str, nargs=1)
@click.argument("ip", type=str, nargs=1)
@click.pass_context
def one_static_hint(ctx: click.Context, name: str, ip: str) -> None:
client = KnotManagerClient(ctx.obj[BASE_URL])
client.set_static_hints({name: [ipaddress.ip_address(ip)]})
@main.command("listen-ip", help="Configure where the resolver should listen (replaces all previous locations)")
@click.argument("ip", type=str, nargs=1)
@click.argument("port", type=int, nargs=1)
@click.pass_context
def listen_ip(ctx: click.Context, ip: str, port: int) -> None:
client = KnotManagerClient(ctx.obj[BASE_URL])
client.set_listen_ip_address(ipaddress.ip_address(ip), port)
@main.command(help="Wait for manager initialization")
@click.pass_context
def wait(ctx: click.Context) -> None:
client = KnotManagerClient(ctx.obj[BASE_URL])
try:
client.wait_for_initialization()
except TimeoutError as e:
click.echo(f"ERR: {e}")
sys.exit(1)
if __name__ == "__main__":
main() # pylint: disable=no-value-for-parameter
......@@ -26,7 +26,9 @@ class ConfigStore:
err_res = filter(lambda r: r.is_err(), results)
errs = list(map(lambda r: r.unwrap_err(), err_res))
if len(errs) > 0:
raise KresManagerException("Validation of the new config failed. The reasons are:", *errs)
raise KresManagerException(
"Validation of the new config failed. The reasons are:\n - " + "\n - ".join(errs)
)
async with self._update_lock:
# update the stored config with the new version
......
......@@ -50,10 +50,11 @@ async def error_handler(request: web.Request, handler: Any) -> web.Response:
try:
return await handler(request)
except DataValidationError as e:
return web.Response(text=f"validation of configuration failed: {e}", status=HTTPStatus.BAD_REQUEST)
return web.Response(text=f"validation of configuration failed:\n{e}", status=HTTPStatus.BAD_REQUEST)
except DataParsingError as e:
return web.Response(text=f"request processing error:\n{e}", status=HTTPStatus.BAD_REQUEST)
except KresManagerException as e:
logger.error("Request processing failed", exc_info=True)
return web.Response(text=f"Request processing failed: {e}", status=HTTPStatus.INTERNAL_SERVER_ERROR)
return web.Response(text=f"request processing failed:\n{e}", status=HTTPStatus.INTERNAL_SERVER_ERROR)
class Server:
......
......@@ -553,6 +553,10 @@ class BaseSchema(Serializable):
if self._LAYER is not None:
source = self._LAYER(source, object_path=object_path) # pylint: disable=not-callable
# prevent failure when user provides a different type than object
if isinstance(source, ParsedTree) and not source.is_dict():
raise DataValidationError(f"expected object, found '{source.type()}'", object_path)
# assign fields
used_keys = self._assign_fields(source, object_path)
......
......@@ -2,7 +2,7 @@ import base64
import json
from enum import Enum, auto
from hashlib import blake2b
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from typing import Any, Dict, List, Optional, Set, Tuple, Type, Union
import yaml
from typing_extensions import Literal
......@@ -44,6 +44,12 @@ class ParsedTree:
assert isinstance(self._data, dict)
return self._data[ParsedTree._convert_internal_field_name_to_external(key)]
def is_dict(self) -> bool:
return isinstance(self._data, dict)
def type(self) -> Type[Any]:
return type(self._data)
def __contains__(self, key: str) -> bool:
assert isinstance(self._data, dict)
return ParsedTree._convert_internal_field_name_to_external(key) in self._data
......@@ -142,7 +148,9 @@ class _Format(Enum):
"text/vnd.yaml": _Format.YAML,
}
if mime_type not in formats:
raise DataParsingError("Unsupported MIME type")
raise DataParsingError(
f"unsupported MIME type '{mime_type}', expected 'application/json' or 'text/vnd.yaml'"
)
return formats[mime_type]
......
import socket
from http.client import HTTPConnection
from typing import Any, Optional, Union
from urllib.error import HTTPError
from urllib.request import AbstractHTTPHandler, Request, build_opener, install_opener, urlopen
from typing_extensions import Literal
class Response:
def __init__(self, status: int, body: str) -> None:
self.status = status
self.body = body
def __repr__(self) -> str:
return f"status: {self.status}\nbody:\n{self.body}"
def request(
method: Literal["GET", "POST", "HEAD", "PUT", "DELETE"],
url: str,
body: Optional[str] = None,
content_type: str = "application/json",
) -> Response:
req = Request(
url,
method=method,
data=body.encode("utf8") if body is not None else None,
headers={"Content-Type": content_type},
)
# req.add_header("Authorization", _authorization_header)
try:
with urlopen(req) as response:
return Response(response.status, response.read().decode("utf8"))
except HTTPError as err:
return Response(err.code, response.read().decode("utf8"))
# Code heavily inspired by requests-unixsocket
# https://github.com/msabramo/requests-unixsocket/blob/master/requests_unixsocket/adapters.py
class UnixHTTPConnection(HTTPConnection):
def __init__(self, unix_socket_url: str, timeout: Union[int, float] = 60):
"""Create an HTTP connection to a unix domain socket
:param unix_socket_url: A URL with a scheme of 'http+unix' and the
netloc is a percent-encoded path to a unix domain socket. E.g.:
'http+unix://%2Ftmp%2Fprofilesvc.sock/status/pid'
"""
super().__init__("localhost", timeout=timeout)
self.unix_socket_path = unix_socket_url
self.timeout = timeout
self.sock: Optional[socket.socket] = None
def __del__(self): # base class does not have d'tor
if self.sock:
self.sock.close()
def connect(self):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(1) # there is something weird stored in self.timeout
sock.connect(self.unix_socket_path)
self.sock = sock
class UnixHTTPHandler(AbstractHTTPHandler):
def __init__(self) -> None:
super().__init__()
def open_(self: UnixHTTPHandler, req: Any) -> Any:
return self.do_open(UnixHTTPConnection, req)
setattr(UnixHTTPHandler, "http+unix_open", open_)
setattr(UnixHTTPHandler, "http+unix_request", AbstractHTTPHandler.do_request_)
opener = build_opener(UnixHTTPHandler())
install_opener(opener)
......@@ -17,9 +17,7 @@ generate-setup-file = true
python = "^3.6.8"
aiohttp = "^3.6.12"
Jinja2 = "^2.11.3"
click = "^7.1.2"
PyYAML = "^5.4.1"
requests = "^2.25.1"
typing-extensions = ">=3.7.2"
prometheus-client = "^0.6"
supervisor = "^4.2.2"
......@@ -31,19 +29,14 @@ black = "^20.8b1"
tox = "^3.21.4"
tox-pyenv = "^1.1.0"
poethepoet = "^0.13.0"
requests = "^2.25.1"
requests-unixsocket = "^0.2.0"
click = "^7.1.2"
toml = "^0.10.2"
debugpy = "^1.2.1"
Sphinx = "^4.0.2"
pylint = "^2.11.1"
pytest-asyncio = "^0.16.0"
pytest = "^6.2.5"
types-requests = "^2.26.3"
types-PyYAML = "^6.0.1"
mypy = "^0.930"
types-click = "^7.1.8"
types-Jinja2 = "^2.11.9"
types-dataclasses = "^0.6.4"
poetry = "^1.1.12"
......
......@@ -4,7 +4,6 @@ from setuptools import setup
packages = \
['knot_resolver_manager',
'knot_resolver_manager.cli',
'knot_resolver_manager.client',
'knot_resolver_manager.compat',
'knot_resolver_manager.datamodel',
'knot_resolver_manager.datamodel.types',
......@@ -22,9 +21,7 @@ install_requires = \
['Jinja2>=2.11.3',
'PyYAML>=5.4.1',
'aiohttp>=3.6.12',
'click>=7.1.2',
'prometheus-client>=0.6',
'requests>=2.25.1',
'supervisor>=4.2.2',
'typing-extensions>=3.7.2']
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment