Unverified Commit dd209cd5 authored by Pavel Spirek's avatar Pavel Spirek
Browse files

Reworked zone data handlers, improved logging, fixed bugs and stability issues

parent 53550b6b
......@@ -6,260 +6,17 @@ import logging
import sys
import signal
from colorlog import info, warning as warn, error
from importlib import import_module
from yangson.instance import InstanceRoute, NonexistentInstance, ObjectValue, EntryKeys
from yangson.enumerations import ContentType
from . import usr_op_handlers, usr_state_data_handlers, knot_api
from . import usr_op_handlers, usr_state_data_handlers
from .rest_server import RestServer
from .config import CONFIG, load_config, print_config
from .nacm import NacmConfig
from .data import JsonDatastore, BaseDataListener, SchemaNode, PathFormat, ChangeType, DataChange, ConfHandlerResult
from .helpers import DataHelpers, ErrorHelpers
from .data import JsonDatastore
from .helpers import DataHelpers
from .handler_list import OP_HANDLERS, STATE_DATA_HANDLES, CONF_DATA_HANDLES
from .knot_api import knot_api_init, KnotConfig, SOARecord, KnotError, ARecord
epretty = ErrorHelpers.epretty
def knot_connect():
info("Connecting to KNOT socket")
knot_api.KNOT.knot_connect()
def knot_disconnect():
info("Disonnecting from KNOT socket")
knot_api.KNOT.knot_disconnect()
class KnotConfServerListener(BaseDataListener):
def process(self, sn: SchemaNode, ii: InstanceRoute, ch: DataChange):
print("Change at sn \"{}\", dn \"{}\"".format(sn.name, ii))
base_ii_str = self.schema_path
base_ii = self._ds.parse_ii(base_ii_str, PathFormat.URL)
base_nv = self._ds.get_node(self._ds.get_data_root(), base_ii).value
knot_api.KNOT.begin()
knot_api.KNOT.set_item(section="server", item="comment", data=base_nv.get("description"))
knot_api.KNOT.set_item(section="server", item="async-start", data=base_nv.get("knot-dns:async-start"))
knot_api.KNOT.set_item(section="server", item="nsid", data=base_nv.get("nsid-identity", {}).get("nsid"))
listen_endpoints = base_nv.get("listen-endpoint") or []
ep_str_list = []
for ep in listen_endpoints:
ep_str = ep["ip-address"]
if ep.get("port"):
ep_str += "@" + str(ep["port"])
ep_str_list.append(ep_str)
knot_api.KNOT.set_item_list(section="server", item="listen", data=ep_str_list)
knot_api.KNOT.set_item(section="server", item="rundir", data=base_nv.get("filesystem-paths", {}).get("run-time-dir"))
knot_api.KNOT.set_item(section="server", item="pidfile", data=base_nv.get("filesystem-paths", {}).get("pid-file"))
knot_api.KNOT.set_item(section="server", item="tcp-workers", data=base_nv.get("resources", {}).get("knot-dns:tcp-workers"))
knot_api.KNOT.set_item(section="server", item="udp-workers", data=base_nv.get("resources", {}).get("knot-dns:udp-workers"))
knot_api.KNOT.set_item(section="server", item="rate-limit-table-size", data=base_nv.get("response-rate-limiting", {}).get("table-size"))
knot_api.KNOT.commit()
return ConfHandlerResult.OK
class KnotConfLogListener(BaseDataListener):
def process(self, sn: SchemaNode, ii: InstanceRoute, ch: DataChange):
print("lChange at sn \"{}\", dn \"{}\"".format(sn.name, ii))
base_ii_str = self.schema_path
base_ii = self._ds.parse_ii(base_ii_str, PathFormat.URL)
base_nv = self._ds.get_node(self._ds.get_data_root(), base_ii).value
knot_api.KNOT.begin()
knot_api.KNOT.set_item(section="log", data=None)
for logitem in base_nv:
tgt = logitem.get("target")
if tgt is None:
continue
knot_api.KNOT.set_item(section="log", item="target", data=tgt)
knot_api.KNOT.set_item(section="log", identifier=tgt, item="comment", data=logitem.get("description"))
knot_api.KNOT.set_item(section="log", identifier=tgt, item="server", data=logitem.get("server"))
knot_api.KNOT.set_item(section="log", identifier=tgt, item="zone", data=logitem.get("zone"))
knot_api.KNOT.set_item(section="log", identifier=tgt, item="any", data=logitem.get("any"))
knot_api.KNOT.commit()
return ConfHandlerResult.OK
class KnotConfZoneListener(BaseDataListener):
def process(self, sn: SchemaNode, ii: InstanceRoute, ch: DataChange):
print("zChange at sn \"{}\", dn \"{}\"".format(sn.name, ii))
base_ii_str = self.schema_path
base_ii = self._ds.parse_ii(base_ii_str, PathFormat.URL)
base_nv = self._ds.get_node(self._ds.get_data_root(), base_ii).value
zone_nv = self._ds.get_node(self._ds.get_data_root(), ii[0:(len(base_ii) + 1)]).value
print("zone nv={}".format(zone_nv))
nv = self._ds.get_node(self._ds.get_data_root(), ii).value
# zone_name = tuple(ii[len(base_ii)].keys.values())[0]
knot_api.KNOT.begin()
domain = zone_nv.get("domain")
knot_api.KNOT.set_item(section="zone", zone=domain, data=None)
knot_api.KNOT.set_item(section="zone", item="domain", data=domain)
print("zn={}".format(domain))
knot_api.KNOT.set_item(section="zone", zone=domain, item="comment", data=zone_nv.get("description"))
knot_api.KNOT.set_item(section="zone", zone=domain, item="file", data=zone_nv.get("file"))
knot_api.KNOT.set_item_list(section="zone", zone=domain, item="master", data=zone_nv.get("master"))
knot_api.KNOT.set_item_list(section="zone", zone=domain, item="notify", data=zone_nv.get("notify", {}).get("recipient"))
knot_api.KNOT.set_item_list(section="zone", zone=domain, item="acl", data=zone_nv.get("access-control-list"))
knot_api.KNOT.set_item(section="zone", zone=domain, item="serial-policy", data=zone_nv.get("serial-update-method"))
anytotcp = zone_nv.get("any-to-tcp")
disable_any_str = str(not anytotcp) if isinstance(anytotcp, bool) else None
knot_api.KNOT.set_item(section="zone", zone=domain, item="disable-any", data=disable_any_str)
knot_api.KNOT.set_item(section="zone", zone=domain, item="max-journal-size", data=zone_nv.get("journal", {}).get("maximum-journal-size"))
knot_api.KNOT.set_item(section="zone", zone=domain, item="zonefile-sync", data=zone_nv.get("journal", {}).get("zone-file-sync-delay"))
knot_api.KNOT.set_item(section="zone", zone=domain, item="ixfr-from-differences", data=zone_nv.get("journal", {}).get("from-differences"))
qms = zone_nv.get("query-module")
if qms is not None:
qm_str_list = list(map(lambda n: n["name"] + "/" + n["type"], qms))
else:
qm_str_list = None
knot_api.KNOT.set_item_list(section="zone", zone=domain, item="module", data=qm_str_list)
# dnssec-signing:dnssec-signing ?
knot_api.KNOT.set_item(section="zone", zone=domain, item="semantic-checks", data=zone_nv.get("knot-dns:semantic-checks"))
knot_api.KNOT.commit()
return ConfHandlerResult.OK
class KnotConfControlListener(BaseDataListener):
def process(self, sn: SchemaNode, ii: InstanceRoute, ch: DataChange):
print("cChange at sn \"{}\", dn \"{}\"".format(sn.name, ii))
base_ii_str = self.schema_path
base_ii = self._ds.parse_ii(base_ii_str, PathFormat.URL)
base_nv = self._ds.get_node(self._ds.get_data_root(), base_ii).value
knot_api.KNOT.begin()
knot_api.KNOT.set_item(section="control", item="listen", data=base_nv.get("unix"))
knot_api.KNOT.commit()
return ConfHandlerResult.OK
class KnotConfAclListener(BaseDataListener):
def _process_list_item(self, acl_nv: ObjectValue):
name = acl_nv.get("name")
print("name={}".format(name))
knot_api.KNOT.set_item(section="acl", identifier=name, data=None)
knot_api.KNOT.set_item(section="acl", item="id", data=name)
knot_api.KNOT.set_item(section="acl", identifier=name, item="comment", data=acl_nv.get("description"))
knot_api.KNOT.set_item_list(section="acl", identifier=name, item="key", data=acl_nv.get("key"))
knot_api.KNOT.set_item_list(section="acl", identifier=name, item="action", data=acl_nv.get("operation"))
netws = acl_nv.get("network")
if netws is not None:
addrs = list(map(lambda n: n["ip-prefix"], netws))
knot_api.KNOT.set_item_list(section="acl", identifier=name, item="address", data=addrs)
action = acl_nv.get("action")
deny = "true" if action == "deny" else "false"
knot_api.KNOT.set_item(section="acl", identifier=name, item="deny", data=deny)
def process(self, sn: SchemaNode, ii: InstanceRoute, ch: DataChange):
base_ii_str = self.schema_path
print("aChange at sn \"{}\", dn \"{}\"".format(sn.name, ii))
base_ii = self._ds.parse_ii(base_ii_str, PathFormat.URL)
base_nv = self._ds.get_node(self._ds.get_data_root(), base_ii).value
knot_api.KNOT.begin()
knot_api.KNOT.set_item(section="acl", data=None)
if (len(ii) > len(base_ii)) and isinstance(ii[len(base_ii)], EntryKeys):
# Write only changed list item
acl_nv = self._ds.get_node(self._ds.get_data_root(), ii[0:(len(base_ii) + 1)]).value
print("acl nv={}".format(acl_nv))
self._process_list_item(acl_nv)
else:
# Delete all list items from KNOT
knot_api.KNOT.set_item(section="acl", data=None)
# Write whole list
for acl_nv in base_nv:
print("acl nv={}".format(acl_nv))
self._process_list_item(acl_nv)
knot_api.KNOT.commit()
return ConfHandlerResult.OK
class KnotZoneDataListener(BaseDataListener):
def process(self, sn: SchemaNode, ii: InstanceRoute, ch: DataChange):
base_ii_str = self.schema_path
ii_str = "".join([str(seg) for seg in ii])
print("zdChange at sn \"{}\", dn \"{}\"".format(sn.name, ii_str))
base_ii = self._ds.parse_ii(base_ii_str, PathFormat.URL)
base_nv = self._ds.get_node(self._ds.get_data_root(), base_ii).value
if (ii == base_ii) and (ch.change_type == ChangeType.CREATE):
name = ch.data["zone"]["name"]
print("--- Creating new zone \"{}\"".format(name))
knot_api.KNOT.begin()
knot_api.KNOT.zone_new(name)
knot_api.KNOT.commit()
elif (len(ii) == (len(base_ii) + 2)) and isinstance(ii[len(base_ii) + 1], EntryKeys) and (ch.change_type == ChangeType.DELETE):
name = ii[len(base_ii) + 1].keys["name"]
print("--- Deleting zone \"{}\"".format(name))
knot_api.KNOT.begin()
# knot_api.KNOT.zone_new(name)
knot_api.KNOT.commit()
elif (len(ii) > len(base_ii)) and isinstance(ii[len(base_ii) + 1], EntryKeys):
zone_name = ii[len(base_ii) + 1].keys["name"]
print("--- Zone \"{}\" resource {}".format(zone_name, ch.change_type.name.lower()))
if ch.change_type == ChangeType.CREATE:
soa = ch.data.get("SOA")
if soa is not None:
print("writing soa {}".format(soa))
knot_api.KNOT.begin_zone()
soarr = SOARecord()
soarr.mname = soa["mname"]
soarr.rname = soa["rname"]
soarr.serial = soa["serial"]
soarr.refresh = soa["refresh"]
soarr.retry = soa["retry"]
soarr.expire = soa["expire"]
soarr.minimum = soa["minimum"]
resp = knot_api.KNOT.zone_add_record(zone_name, soarr)
print("resp_soa = {}".format(resp))
knot_api.KNOT.commit_zone()
rr = ch.data.get("rrset", {})
rtype = rr.get("type")
if rtype == "iana-dns-parameters:A":
knot_api.KNOT.begin_zone()
arr = ARecord(zone_name)
arr.owner = rr["owner"]
arr.address = rr["rdata"][0]["A"]["address"]
resp = knot_api.KNOT.zone_add_record(zone_name, arr)
print("resp_a = {}".format(resp))
knot_api.KNOT.commit_zone()
return ConfHandlerResult.OK
from .knot_api import knot_api_init, knot_connect, knot_disconnect
from .usr_conf_data_handlers import *
def main():
......@@ -271,10 +28,16 @@ def main():
if CONFIG["GLOBAL"]["LOGFILE"] not in ("-", "stdout"):
info("Going to daemon mode.")
log_level = {
"error": logging.ERROR,
"warning": logging.WARNING,
"info": logging.INFO,
"debug": logging.INFO
}.get(CONFIG["GLOBAL"]["LOG_LEVEL"], logging.INFO)
logging.root.handlers.clear()
colorlog.basicConfig(
format="%(asctime)s %(log_color)s%(levelname)-8s%(reset)s %(message)s",
level=logging.INFO,
logging.basicConfig(
format="%(asctime)s %(levelname)-8s %(message)s",
level=log_level,
filename=CONFIG["GLOBAL"]["LOGFILE"]
)
......@@ -318,8 +81,8 @@ def main():
datamodel = DataHelpers.load_data_model("data/", "data/yang-library-data.json")
# Datastore init
datastore = JsonDatastore(datamodel, "DNS data")
datastore.load("jetconf/example-data.json")
datastore = JsonDatastore(datamodel, "jetconf/example-data.json", "DNS data")
datastore.load()
datastore.load_yl_data("data/yang-library-data.json")
nacmc = NacmConfig(datastore)
datastore.register_nacm(nacmc)
......@@ -330,7 +93,7 @@ def main():
# Register schema listeners
CONF_DATA_HANDLES.register_handler(KnotConfServerListener(datastore, "/dns-server:dns-server/server-options"))
CONF_DATA_HANDLES.register_handler(KnotConfLogListener(datastore, "/dns-server:dns-server/knot-dns:log"))
CONF_DATA_HANDLES.register_handler(KnotConfZoneListener(datastore, "/dns-server:dns-server/zones/zone"))
CONF_DATA_HANDLES.register_handler(KnotConfZoneListener(datastore, "/dns-server:dns-server/zones"))
CONF_DATA_HANDLES.register_handler(KnotConfControlListener(datastore, "/dns-server:dns-server/knot-dns:control-socket"))
CONF_DATA_HANDLES.register_handler(KnotConfAclListener(datastore, "/dns-server:dns-server/access-control-list"))
CONF_DATA_HANDLES.register_handler(KnotZoneDataListener(datastore, "/dns-zones:zone-data"))
......@@ -358,12 +121,36 @@ def main():
if __name__ == "__main__":
opts, args = (None, None)
colorlog.basicConfig(
format="%(asctime)s %(log_color)s%(levelname)-8s%(reset)s %(message)s",
level=logging.INFO,
stream=sys.stdout
log_level = {
"error": logging.ERROR,
"warning": logging.WARNING,
"info": logging.INFO,
"debug": logging.INFO
}.get(CONFIG["GLOBAL"]["LOG_LEVEL"], logging.INFO)
log_formatter = colorlog.ColoredFormatter(
"%(asctime)s %(log_color)s%(levelname)-8s%(reset)s %(message)s",
datefmt=None,
reset=True,
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red',
},
secondary_log_colors={},
style='%'
)
log_handler = colorlog.StreamHandler()
log_handler.setFormatter(log_formatter)
log_handler.stream = sys.stdout
logger = colorlog.getLogger()
logger.addHandler(log_handler)
logger.setLevel(log_level)
test_module = None
try:
......
import os
import yaml
from colorlog import warning as warn, info
from yaml.parser import ParserError
from colorlog import error, warning as warn, info
CONFIG_GLOBAL = {
"TIMEZONE": "GMT",
"LOGFILE": "-",
"PIDFILE": "/tmp/jetconf.pid"
"PIDFILE": "/tmp/jetconf.pid",
"PERSISTENT_CHANGES": True,
"LOG_LEVEL": "info",
"LOG_DBG_MODULES": ["*"]
}
CONFIG_HTTP = {
......@@ -59,6 +64,9 @@ def load_config(filename: str):
except FileNotFoundError:
warn("Configuration file does not exist")
except ParserError as e:
error("Configuration syntax error: " + str(e))
exit()
# Shortcuts
NACM_ADMINS = CONFIG["NACM"]["ALLOWED_USERS"]
......
......@@ -2,6 +2,9 @@ GLOBAL:
TIMEZONE: "Europe/Prague"
LOGFILE: "-"
PIDFILE: "/tmp/jetconf.pid"
PERSISTENT_CHANGES: true
LOG_LEVEL: "debug"
LOG_DBG_MODULES: ["usr_conf_data_handlers", "knot_api"]
HTTP_SERVER:
DOC_ROOT: "jetconf/doc-root"
......
import json
from threading import Lock
from enum import Enum
from colorlog import error, warning as warn, info, debug
from colorlog import error, warning as warn, info
from typing import List, Any, Dict, Callable
from yangson.schema import SchemaNode, NonexistentSchemaNode, ListNode, LeafListNode
from yangson.schema import SchemaNode, NonexistentSchemaNode, ListNode, LeafListNode, SchemaError, SemanticError
from yangson.datamodel import DataModel
from yangson.enumerations import ContentType
from yangson.instance import (
InstanceNode,
NonexistentInstance,
InstanceTypeError,
InstanceValueError,
ArrayValue,
ObjectValue,
MemberName,
......@@ -20,7 +21,11 @@ from yangson.instance import (
InstanceRoute
)
from .helpers import PathFormat
from .helpers import PathFormat, ErrorHelpers, LogHelpers
from .config import CONFIG
epretty = ErrorHelpers.epretty
debug_data = LogHelpers.create_module_dbg_logger(__name__)
class ChangeType(Enum):
......@@ -157,25 +162,34 @@ class UsrChangeJournal:
return chl_json
def commit(self, ds: "BaseDatastore"):
# ds.lock_data()
# Set new data root
if hash(ds.get_data_root()) == hash(self.root_origin):
info("Commiting new configuration (swapping roots)")
# Set new root
nr = self.get_root_head()
else:
info("Commiting new configuration (re-applying changes)")
nr = ds.get_data_root()
for cl in self.clists:
for change in cl.journal:
if change.change_type == ChangeType.CREATE:
nr = ds.create_node_rpc(nr, change.rpc_info, change.data)
elif change.change_type == ChangeType.REPLACE:
nr = ds.update_node_rpc(nr, change.rpc_info, change.data)
elif change.change_type == ChangeType.DELETE:
nr = ds.delete_node_rpc(nr, change.rpc_info)
try:
nr.validate(ContentType.config)
new_data_valid = True
except (SchemaError, SemanticError) as e:
error("Data validation error:")
error(epretty(e))
new_data_valid = False
if new_data_valid:
# Set new data root
if hash(ds.get_data_root()) == hash(self.root_origin):
info("Commiting new configuration (swapping roots)")
# Set new root
ds.set_data_root(self.get_root_head())
else:
info("Commiting new configuration (re-applying changes)")
nr = ds.get_data_root()
for cl in self.clists:
for change in cl.journal:
if change.change_type == ChangeType.CREATE:
nr = ds.create_node_rpc(nr, change.rpc_info, change.data)
elif change.change_type == ChangeType.REPLACE:
nr = ds.update_node_rpc(nr, change.rpc_info, change.data)
elif change.change_type == ChangeType.DELETE:
nr = ds.delete_node_rpc(nr, change.rpc_info)
ds.set_data_root(nr)
ds.set_data_root(nr)
# Notify schema node observers
for cl in self.clists:
......@@ -189,9 +203,6 @@ class UsrChangeJournal:
# Clear user changelists
self.clists.clear()
finally:
# ds.unlock_data()
pass
class BaseDatastore:
......@@ -300,7 +311,7 @@ class BaseDatastore:
sdh = STATE_DATA_HANDLES.get_handler(state_node_pth)
if sdh is not None:
root_val = sdh.update_node(ii, root, True)
root = self._data.update_from_raw(root_val)
root = self._data.update(root_val, raw=True)
else:
raise NoHandlerForStateDataError()
self.commit_end_callback()
......@@ -402,7 +413,7 @@ class BaseDatastore:
n = root.goto(ii)
sn = n.schema_node
sch_member_name = sn.iname2qname(input_member_name)
sch_member_name = sn._iname2qname(input_member_name)
member_sn = sn.get_data_child(*sch_member_name)
if isinstance(member_sn, ListNode):
......@@ -476,7 +487,7 @@ class BaseDatastore:
if nrpc.check_data_node_path(root, ii, Permission.NACM_ACCESS_UPDATE) == Action.DENY:
raise NacmForbiddenError()
new_n = n.update_from_raw(value)
new_n = n.update(value, raw=True)
return new_n.top()
......@@ -502,7 +513,7 @@ class BaseDatastore:
if isinstance(last_isel, MemberName):
new_n = n_parent.delete_member(last_isel.name)
else:
raise InstanceTypeError(n, "Invalid target node type")
raise InstanceValueError(n, "Invalid target node type")
return new_n.top()
......@@ -550,7 +561,16 @@ class BaseDatastore:
if usr_journal is not None:
if self.commit_begin_callback is not None:
self.commit_begin_callback()
usr_journal.commit(self)
try:
self.lock_data(rpc.username)
old_root = self._data
usr_journal.commit(self)
if CONFIG["GLOBAL"]["PERSISTENT_CHANGES"] is True:
self.save()
finally:
self.unlock_data()
if self.commit_end_callback is not None:
self.commit_end_callback()
del self._usr_journals[rpc.username]
......@@ -591,7 +611,7 @@ class BaseDatastore:
ret = self._data_lock.acquire(blocking=blocking, timeout=1)
if ret:
self._lock_username = username or "(unknown)"
debug("Acquired lock in datastore \"{}\" for user \"{}\"".format(self.name, username))
debug_data("Acquired lock in datastore \"{}\" for user \"{}\"".format(self.name, username))
else:
raise DataLockError(
"Failed to acquire lock in datastore \"{}\" for user \"{}\", already locked by \"{}\"".format(
......@@ -604,22 +624,26 @@ class BaseDatastore:
# Unlock datastore data
def unlock_data(self):
self._data_lock.release()
debug("Released lock in datastore \"{}\" for user \"{}\"".format(self.name, self._lock_username))
debug_data("Released lock in datastore \"{}\" for user \"{}\"".format(self.name, self._lock_username))
self._lock_username = None
# Load data from persistent storage
def load(self, filename: str):
def load(self):
raise NotImplementedError("Not implemented in base class")
# Save data to persistent storage
def save(self, filename: str):
def save(self):
raise NotImplementedError("Not implemented in base class")
class JsonDatastore(BaseDatastore):
def load(self, filename: str):
def __init__(self, dm: DataModel, json_file: str, name: str = ""):
super().__init__(dm, name)
self.json_file = json_file
def load(self):
self._data = None
with open(filename, "rt") as fp:
with open(self.json_file, "rt") as fp:
self._data = self._dm.from_raw(json.load(fp))
def load_yl_data(self, filename: str):
......@@ -627,11 +651,9 @@ class JsonDatastore(BaseDatastore):
with open(filename, "rt") as fp:
self._yang_lib_data = self._dm.from_raw(json.load(fp))
def save(self, filename: str):
with open(filename, "w") as jfd:
self.lock_data("json_save")
json.dump(self._data, jfd)
self.unlock_data()
def save(self):
with open(self.json_file, "w") as jfd:
json.dump(self._data.raw_value(), jfd, indent=4)
def test():
......
......@@ -308,7 +308,7 @@
},
{
"name": "permit-zone-access",
"path": "/dns-server:dns-server/zones/zone",
"path": "/dns-server:dns-server/zones",
"access-operations": "*",
"comment": "Users can write other zones.",
"action": "permit"
......