Unverified Commit 261d8540 authored by Pavel Spirek's avatar Pavel Spirek
Browse files

Adapted to yangson 1.1, improvements in NACM module

parent 65d51760
......@@ -15,7 +15,7 @@ HTTP_SERVER:
SERVER_SSL_CERT: "jetconf/server.crt"
SERVER_SSL_PRIVKEY: "jetconf/server.key"
CA_CERT: "jetconf/ca.pem"
DBG_DISABLE_CERTS: true
DBG_DISABLE_CERTS: false
NACM:
ALLOWED_USERS: ["lojza@mail.cz"]
......
......@@ -4,9 +4,17 @@ from enum import Enum
from colorlog import error, warning as warn, info
from typing import List, Any, Dict, Callable, Optional
from yangson.schema import SchemaNode, NonexistentSchemaNode, ListNode, LeafListNode, SchemaError, SemanticError
from yangson.datamodel import DataModel
from yangson.enumerations import ContentType
from yangson.enumerations import ContentType, ValidationScope
from yangson.schema import (
SchemaNode,
NonexistentSchemaNode,
ListNode,
LeafListNode,
SchemaError,
SemanticError,
InternalNode
)
from yangson.instance import (
InstanceNode,
NonexistentInstance,
......@@ -16,13 +24,14 @@ from yangson.instance import (
MemberName,
EntryKeys,
EntryIndex,
InstanceIdParser,
ResourceIdParser,
InstanceRoute
InstanceRoute,
ArrayEntry
)
from .helpers import PathFormat, ErrorHelpers, LogHelpers, DataHelpers
from .config import CONFIG
from .nacm import NacmConfig, Permission, Action
from .handler_list import OP_HANDLERS, STATE_DATA_HANDLES, CONF_DATA_HANDLES
epretty = ErrorHelpers.epretty
debug_data = LogHelpers.create_module_dbg_logger(__name__)
......@@ -184,7 +193,8 @@ class UsrChangeJournal:
nr = ds.delete_node_rpc(nr, change.rpc_info)
try:
nr.validate(ContentType.config)
# Validate syntax and semantics of new data
nr.validate(ValidationScope.all, ContentType.config)
new_data_valid = True
except (SchemaError, SemanticError) as e:
error("Data validation error:")
......@@ -198,7 +208,7 @@ class UsrChangeJournal:
# Run schema node handlers
for cl in self.clists:
for change in cl.journal:
ii = ds.parse_ii(change.rpc_info.path, change.rpc_info.path_format)
ii = DataHelpers.parse_ii(change.rpc_info.path, change.rpc_info.path_format)
try:
ds.run_conf_edit_handler(ii, change)
except Exception as e:
......@@ -260,18 +270,10 @@ class BaseDatastore:
def get_schema_node(self, sch_pth: str) -> SchemaNode:
sn = self._dm.get_schema_node(sch_pth)
if sn is None:
raise NonexistentSchemaNode(sch_pth)
# raise NonexistentSchemaNode(sch_pth)
debug_data("Cannot find schema node for " + sch_pth)
return sn
# Parse Instance Identifier from string
def parse_ii(self, path: str, path_format: PathFormat) -> InstanceRoute:
if path_format == PathFormat.URL:
ii = ResourceIdParser(path).parse()
else:
ii = InstanceIdParser(path).parse()
return ii
# Notify data observers about change in datastore
def run_conf_edit_handler(self, ii: InstanceRoute, ch: DataChange) -> Optional[ConfHandlerResult]:
h_res = None
......@@ -308,7 +310,7 @@ class BaseDatastore:
# Get data node, evaluate NACM if required
def get_node_rpc(self, rpc: RpcInfo, yl_data=False) -> InstanceNode:
ii = self.parse_ii(rpc.path, rpc.path_format)
ii = DataHelpers.parse_ii(rpc.path, rpc.path_format)
if yl_data:
root = self._yang_lib_data
else:
......@@ -335,9 +337,7 @@ class BaseDatastore:
try:
with_defs = rpc.qs["with-defaults"][0]
except KeyError:
with_defs = None
except IndexError:
except (IndexError, KeyError):
with_defs = None
if with_defs == "report-all":
......@@ -345,17 +345,15 @@ class BaseDatastore:
if self.nacm:
nrpc = self.nacm.get_user_nacm(rpc.username)
if nrpc.check_data_node_path(root, ii, Permission.NACM_ACCESS_READ) == Action.DENY:
if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_READ) == Action.DENY:
raise NacmForbiddenError()
else:
# Prun subtree data
n = nrpc.check_data_read_path(n, root, ii)
# Prune nodes that should not be accessible to user
n = nrpc.prune_data_tree(n, root, ii, Permission.NACM_ACCESS_READ)
try:
max_depth = int(rpc.qs["depth"][0])
except KeyError:
max_depth = None
except IndexError:
except (IndexError, KeyError):
max_depth = None
except ValueError:
raise ValueError("Invalid value of query param \"depth\"")
......@@ -367,14 +365,14 @@ class BaseDatastore:
node.value = ObjectValue({})
else:
for child_key in sorted(node.value.keys()):
m = node.member(child_key)
m = node[child_key]
node = _tree_limit_depth(m, depth + 1).up()
elif isinstance(node.value, ArrayValue):
if depth > max_depth:
node.value = ArrayValue([])
else:
for i in range(len(node.value)):
e = node.entry(i)
e = node[i]
node = _tree_limit_depth(e, depth + 1).up()
return node
......@@ -384,29 +382,32 @@ class BaseDatastore:
# Get staging data node, evaluate NACM if required
def get_node_staging_rpc(self, rpc: RpcInfo) -> InstanceNode:
ii = self.parse_ii(rpc.path, rpc.path_format)
ii = DataHelpers.parse_ii(rpc.path, rpc.path_format)
root = self.get_data_root_staging(rpc.username)
n = root.goto(ii)
if self.nacm:
nrpc = self.nacm.get_user_nacm(rpc.username)
if nrpc.check_data_node_path(root, ii, Permission.NACM_ACCESS_READ) == Action.DENY:
if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_READ) == Action.DENY:
raise NacmForbiddenError()
else:
# Prun subtree data
n = nrpc.check_data_read_path(root, ii)
# Prune nodes that should not be accessible to user
n = nrpc.prune_data_tree(n, root, ii, Permission.NACM_ACCESS_READ)
return n
# Create new data node (Restconf draft compliant version)
def create_node_rpc(self, root: InstanceNode, rpc: RpcInfo, value: Any, insert=None, point=None) -> InstanceNode:
ii = self.parse_ii(rpc.path, rpc.path_format)
def create_node_rpc(self, root: InstanceNode, rpc: RpcInfo, value: Any) -> InstanceNode:
ii = DataHelpers.parse_ii(rpc.path, rpc.path_format)
n = root.goto(ii)
insert = rpc.qs.get("insert", [None])[0]
point = rpc.qs.get("point", [None])[0]
if self.nacm:
nrpc = self.nacm.get_user_nacm(rpc.username)
if nrpc.check_data_node_path(root, ii, Permission.NACM_ACCESS_CREATE) == Action.DENY:
if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_CREATE) == Action.DENY:
raise NacmForbiddenError()
# Get target member name
......@@ -420,14 +421,14 @@ class BaseDatastore:
# Check if target member already exists
try:
existing_member = n.member(input_member_name)
existing_member = n[input_member_name]
except NonexistentInstance:
existing_member = None
# Get target schema node
n = root.goto(ii)
sn = n.schema_node
sn = n.schema_node # type: InternalNode
sch_member_name = sn._iname2qname(input_member_name)
member_sn = sn.get_data_child(*sch_member_name)
......@@ -439,24 +440,35 @@ class BaseDatastore:
existing_member = n.put_member(input_member_name, ArrayValue([]))
# Convert input data from List/Dict to ArrayValue/ObjectValue
new_value_data = member_sn.from_raw([input_member_value])[0]
new_value_list = member_sn.from_raw([input_member_value])
new_value_item = new_value_list[0] # type: ObjectValue
list_node_key = member_sn.keys[0][0]
if new_value_data[list_node_key] in map(lambda x: x[list_node_key], existing_member.value):
if new_value_item[list_node_key] in map(lambda x: x[list_node_key], existing_member.value):
raise InstanceAlreadyPresent("Duplicate key")
if insert == "first":
new_member = existing_member.update(ArrayValue([new_value_data] + existing_member.value))
# Optimization
if len(existing_member) > 0:
list_entry_first = existing_member[0] # type: ArrayEntry
new_member = list_entry_first.insert_after(new_value_item).up()
else:
new_member = existing_member.update(new_value_list)
elif (insert == "last") or insert is None:
new_member = existing_member.update(ArrayValue(existing_member.value + [new_value_data]))
# Optimization
if len(existing_member) > 0:
list_entry_last = existing_member[-1] # type: ArrayEntry
new_member = list_entry_last.insert_after(new_value_item).up()
else:
new_member = existing_member.update(new_value_list)
elif insert == "before":
entry_sel = EntryKeys({list_node_key: point})
list_entry = entry_sel.goto_step(existing_member)
new_member = list_entry.insert_before(new_value_data).up()
list_entry = entry_sel.goto_step(existing_member) # type: ArrayEntry
new_member = list_entry.insert_before(new_value_item).up()
elif insert == "after":
entry_sel = EntryKeys({list_node_key: point})
list_entry = entry_sel.goto_step(existing_member)
new_member = list_entry.insert_after(new_value_data).up()
list_entry = entry_sel.goto_step(existing_member) # type: ArrayEntry
new_member = list_entry.insert_after(new_value_item).up()
else:
raise ValueError("Invalid 'insert' value")
elif isinstance(member_sn, LeafListNode):
......@@ -467,12 +479,12 @@ class BaseDatastore:
existing_member = n.put_member(input_member_name, ArrayValue([]))
# Convert input data from List/Dict to ArrayValue/ObjectValue
new_value_data = member_sn.from_raw([input_member_value])[0]
new_value_item = member_sn.from_raw([input_member_value])[0]
if insert == "first":
new_member = existing_member.update(ArrayValue([new_value_data] + existing_member.value))
new_member = existing_member.update(ArrayValue([new_value_item] + existing_member.value))
elif (insert == "last") or insert is None:
new_member = existing_member.update(ArrayValue(existing_member.value + [new_value_data]))
new_member = existing_member.update(ArrayValue(existing_member.value + [new_value_item]))
else:
raise ValueError("Invalid 'insert' value")
else:
......@@ -480,10 +492,10 @@ class BaseDatastore:
# Create new data node
# Convert input data from List/Dict to ArrayValue/ObjectValue
new_value_data = member_sn.from_raw(input_member_value)
new_value_item = member_sn.from_raw(input_member_value)
# Create new node (object member)
new_member = n.put_member(input_member_name, new_value_data)
new_member = n.put_member(input_member_name, new_value_item)
else:
# Data node already exists
raise InstanceAlreadyPresent("Member \"{}\" already present in \"{}\"".format(input_member_name, ii))
......@@ -492,12 +504,12 @@ class BaseDatastore:
# PUT data node
def update_node_rpc(self, root: InstanceNode, rpc: RpcInfo, value: Any) -> InstanceNode:
ii = self.parse_ii(rpc.path, rpc.path_format)
ii = DataHelpers.parse_ii(rpc.path, rpc.path_format)
n = root.goto(ii)
if self.nacm:
nrpc = self.nacm.get_user_nacm(rpc.username)
if nrpc.check_data_node_path(root, ii, Permission.NACM_ACCESS_UPDATE) == Action.DENY:
if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_UPDATE) == Action.DENY:
raise NacmForbiddenError()
new_n = n.update(value, raw=True)
......@@ -506,25 +518,25 @@ class BaseDatastore:
# Delete data node
def delete_node_rpc(self, root: InstanceNode, rpc: RpcInfo) -> InstanceNode:
ii = self.parse_ii(rpc.path, rpc.path_format)
ii = DataHelpers.parse_ii(rpc.path, rpc.path_format)
n = root.goto(ii)
n_parent = n.up()
new_n = n_parent
last_isel = ii[-1]
if self.nacm:
nrpc = self.nacm.get_user_nacm(rpc.username)
if nrpc.check_data_node_path(root, ii, Permission.NACM_ACCESS_DELETE) == Action.DENY:
if nrpc.check_data_node_permission(root, ii, Permission.NACM_ACCESS_DELETE) == Action.DENY:
raise NacmForbiddenError()
new_n = n_parent
if isinstance(n_parent.value, ArrayValue):
if isinstance(last_isel, EntryIndex):
new_n = n_parent.delete_entry(last_isel.index)
new_n = n_parent.delete_item(last_isel.key)
elif isinstance(last_isel, EntryKeys):
new_n = n_parent.delete_entry(n.index)
new_n = n_parent.delete_item(n.index)
elif isinstance(n_parent.value, ObjectValue):
if isinstance(last_isel, MemberName):
new_n = n_parent.delete_member(last_isel.name)
new_n = n_parent.delete_item(last_isel.key)
else:
raise InstanceValueError(n, "Invalid target node type")
......@@ -674,7 +686,3 @@ class JsonDatastore(BaseDatastore):
def test():
error("Tests moved to tests/tests_jetconf.py")
from .nacm import NacmConfig, Permission, Action
from .handler_list import OP_HANDLERS, STATE_DATA_HANDLES, CONF_DATA_HANDLES
......@@ -2,7 +2,7 @@ import logging
from colorlog import debug, getLogger
from enum import Enum
from typing import Dict, Any, Iterable
from typing import List, Dict, Union, Any, Iterable
from datetime import datetime
from pytz import timezone
from yangson.instance import InstanceRoute, MemberName, EntryKeys, InstanceIdParser, ResourceIdParser
......@@ -10,6 +10,7 @@ from yangson.datamodel import DataModel
from .config import CONFIG_GLOBAL, CONFIG_HTTP
JsonNodeT = Union[Dict[str, Any], List]
SSLCertT = Dict[str, Any]
......@@ -34,19 +35,14 @@ class CertHelpers:
class DataHelpers:
# Create parent data nodes to JSON subtree up to top level
@staticmethod
def node2doc(id: InstanceRoute, val: Any) -> Dict[str, Any]:
def node2doc(ii: InstanceRoute, val: Any) -> Dict[str, Any]:
n = val
for isel in reversed(id):
for isel in reversed(ii):
if isinstance(isel, MemberName):
new_node = {}
new_node[isel.name] = n
n = new_node
n = {isel.key: n}
if isinstance(isel, EntryKeys):
new_node = []
for k in isel.keys:
n[k] = isel.keys[k]
new_node.append(n)
n = new_node
n.update(isel.keys)
n = [n]
return n
@staticmethod
......
......@@ -228,6 +228,7 @@ def _post(ds: BaseDatastore, pth: str, username: str, data: str) -> HttpResponse
rpc1 = RpcInfo()
rpc1.username = username
rpc1.path = url_path
rpc1.qs = query_string
try:
json_data = json.loads(data) if len(data) > 0 else {}
......@@ -237,9 +238,7 @@ def _post(ds: BaseDatastore, pth: str, username: str, data: str) -> HttpResponse
try:
ds.lock_data(username)
ins_pos = (query_string.get("insert") or [None])[0]
point = (query_string.get("point") or [None])[0]
new_root = ds.create_node_rpc(ds.get_data_root_staging(rpc1.username), rpc1, json_data, insert=ins_pos, point=point)
new_root = ds.create_node_rpc(ds.get_data_root_staging(rpc1.username), rpc1, json_data)
ds.add_to_journal_rpc(ChangeType.CREATE, rpc1, json_data, new_root)
http_resp = HttpResponse.empty(HttpStatus.Created)
except DataLockError as e:
......
......@@ -186,7 +186,7 @@ class NacmConfig:
self._user_nacm_rpc = {}
try:
nacm_json = self.nacm_ds.get_data_root().member("ietf-netconf-acm:nacm").value
nacm_json = self.nacm_ds.get_data_root()["ietf-netconf-acm:nacm"].value
except NonexistentInstance:
raise ValueError("Data does not contain \"ietf-netconf-acm:nacm\" root element")
......@@ -303,7 +303,7 @@ class UserNacm:
self.rule_tree = DataRuleTree(self.rule_lists)
debug_nacm("Rule tree for user \"{}\":\n{}".format(username, self.rule_tree.print_rule_tree()))
def check_data_node_path(self, root: InstanceNode, ii: InstanceRoute, access: Permission) -> Action:
def check_data_node_permission(self, root: InstanceNode, ii: InstanceRoute, access: Permission) -> Action:
if not self.nacm_enabled:
return Action.PERMIT
......@@ -341,22 +341,22 @@ class UserNacm:
# debug_nacm("check_data_node_path, result = {}".format(retval.name))
return retval
def _check_data_read_path(self, node: InstanceNode, root: InstanceNode, ii: InstanceRoute) -> InstanceNode:
def _prune_data_tree(self, node: InstanceNode, root: InstanceNode, ii: InstanceRoute, access: Permission) -> InstanceNode:
if isinstance(node.value, ObjectValue):
# print("obj: {}".format(node.value))
nsel = MemberName("")
mii = ii + [nsel]
for child_key in node.value.keys():
nsel.name = child_key
nsel.key = child_key
m = nsel.goto_step(node)
# debug_nacm("checking mii {}".format(mii))
if self.check_data_node_path(root, mii, Permission.NACM_ACCESS_READ) == Action.DENY:
if self.check_data_node_permission(root, mii, access) == Action.DENY:
# debug_nacm("Pruning node {} {}".format(id(node.value[child_key]), node.value[child_key]))
debug_nacm("Pruning node {}".format(DataHelpers.ii2str(mii)))
node = node.delete_member(child_key)
node = node.delete_item(child_key)
else:
node = self._check_data_read_path(m, root, mii).up()
node = self._prune_data_tree(m, root, mii, access).up()
elif isinstance(node.value, ArrayValue):
# print("array: {}".format(node.value))
nsel = EntryIndex(0)
......@@ -368,22 +368,22 @@ class UserNacm:
e = nsel.goto_step(node)
# debug_nacm("checking eii {}".format(eii))
if self.check_data_node_path(root, eii, Permission.NACM_ACCESS_READ) == Action.DENY:
if self.check_data_node_permission(root, eii, access) == Action.DENY:
# debug_nacm("Pruning node {} {}".format(id(node.value[i]), node.value[i]))
debug_nacm("Pruning node {}".format(DataHelpers.ii2str(eii)))
node = node.delete_entry(i)
node = node.delete_item(i)
arr_len -= 1
else:
i += 1
node = self._check_data_read_path(e, root, eii).up()
node = self._prune_data_tree(e, root, eii, access).up()
return node
def check_data_read_path(self, node: InstanceNode, root: InstanceNode, ii: InstanceRoute) -> InstanceNode:
def prune_data_tree(self, node: InstanceNode, root: InstanceNode, ii: InstanceRoute, access: Permission) -> InstanceNode:
if not self.nacm_enabled:
return node
else:
return self._check_data_read_path(node, root, ii)
return self._prune_data_tree(node, root, ii, access)
def check_rpc_name(self, rpc_name: str) -> Action:
if not self.nacm_enabled:
......
......@@ -3,8 +3,8 @@ from typing import List, Dict, Union, Any
from yangson.instance import InstanceRoute, ObjectValue, EntryKeys, MemberName
from . import knot_api
from .data import BaseDataListener, SchemaNode, PathFormat, ChangeType, DataChange, ConfHandlerResult
from .helpers import ErrorHelpers, LogHelpers
from .data import BaseDataListener, SchemaNode, ChangeType, DataChange, ConfHandlerResult
from .helpers import PathFormat, ErrorHelpers, LogHelpers, DataHelpers
from .knot_api import RRecordBase, SOARecord, ARecord, AAAARecord, NSRecord, MXRecord
JsonNodeT = Union[Dict[str, Any], List]
......@@ -17,7 +17,7 @@ class KnotConfServerListener(BaseDataListener):
debug_confh(self.__class__.__name__ + " triggered")
base_ii_str = self.schema_path
base_ii = self.ds.parse_ii(base_ii_str, PathFormat.URL)
base_ii = DataHelpers.parse_ii(base_ii_str, PathFormat.URL)
base_nv = self.ds.get_node(self.ds.get_data_root(), base_ii).value
knot_api.KNOT.begin()
......@@ -26,7 +26,7 @@ class KnotConfServerListener(BaseDataListener):
knot_api.KNOT.set_item(section="server", item="async-start", data=base_nv.get("knot-dns:async-start"))
knot_api.KNOT.set_item(section="server", item="nsid", data=base_nv.get("nsid-identity", {}).get("nsid"))
listen_endpoints = base_nv.get("listen-endpoint") or []
listen_endpoints = base_nv.get("listen-endpoint", [])
ep_str_list = []
for ep in listen_endpoints:
......@@ -51,7 +51,7 @@ class KnotConfLogListener(BaseDataListener):
debug_confh(self.__class__.__name__ + " triggered")
base_ii_str = self.schema_path
base_ii = self.ds.parse_ii(base_ii_str, PathFormat.URL)
base_ii = DataHelpers.parse_ii(base_ii_str, PathFormat.URL)
base_nv = self.ds.get_node(self.ds.get_data_root(), base_ii).value
knot_api.KNOT.begin()
......@@ -78,7 +78,7 @@ class KnotConfZoneListener(BaseDataListener):
# ii_str = "".join([str(seg) for seg in ii])
base_ii_str = self.schema_path
base_ii = self.ds.parse_ii(base_ii_str, PathFormat.URL)
base_ii = DataHelpers.parse_ii(base_ii_str, PathFormat.URL)
knot_api.KNOT.begin()
......@@ -131,7 +131,7 @@ class KnotConfControlListener(BaseDataListener):
debug_confh(self.__class__.__name__ + " triggered")
base_ii_str = self.schema_path
base_ii = self.ds.parse_ii(base_ii_str, PathFormat.URL)
base_ii = DataHelpers.parse_ii(base_ii_str, PathFormat.URL)
base_nv = self.ds.get_node(self.ds.get_data_root(), base_ii).value
knot_api.KNOT.begin()
......@@ -163,7 +163,7 @@ class KnotConfAclListener(BaseDataListener):
debug_confh(self.__class__.__name__ + " triggered")
base_ii_str = self.schema_path
base_ii = self.ds.parse_ii(base_ii_str, PathFormat.URL)
base_ii = DataHelpers.parse_ii(base_ii_str, PathFormat.URL)
base_nv = self.ds.get_node(self.ds.get_data_root(), base_ii).value
knot_api.KNOT.begin()
......@@ -218,7 +218,7 @@ class KnotZoneDataListener(BaseDataListener):
debug_confh(self.__class__.__name__ + " triggered")
base_ii_str = "/dns-zones:zone-data"
base_ii = self.ds.parse_ii(base_ii_str, PathFormat.URL)
base_ii = DataHelpers.parse_ii(base_ii_str, PathFormat.URL)
base_ii_len = len(base_ii)
ii_str = "".join([str(seg) for seg in ii])
......@@ -251,7 +251,7 @@ class KnotZoneDataListener(BaseDataListener):
# Add resource record to particular zone
elif (
len(ii) == (base_ii_len + 2)) \
and isinstance(ii[base_ii_len], MemberName) and (ii[base_ii_len].name == "zone") \
and isinstance(ii[base_ii_len], MemberName) and (ii[base_ii_len].key == "zone") \
and isinstance(ii[base_ii_len + 1], EntryKeys) \
and (ch.change_type == ChangeType.CREATE) \
and (ch.data.get("rrset") is not None):
......@@ -275,9 +275,9 @@ class KnotZoneDataListener(BaseDataListener):
# Add resource record to particular zone (only specific "rdata" item)
elif (
len(ii) == (base_ii_len + 4)) \
and isinstance(ii[base_ii_len], MemberName) and (ii[base_ii_len].name == "zone") \
and isinstance(ii[base_ii_len], MemberName) and (ii[base_ii_len].key == "zone") \
and isinstance(ii[base_ii_len + 1], EntryKeys) \
and isinstance(ii[base_ii_len + 2], MemberName) and (ii[base_ii_len + 2].name == "rrset") \
and isinstance(ii[base_ii_len + 2], MemberName) and (ii[base_ii_len + 2].key == "rrset") \
and isinstance(ii[base_ii_len + 3], EntryKeys) \
and (ch.change_type == ChangeType.CREATE) \
and (ch.data.get("rdata") is not None):
......@@ -304,9 +304,9 @@ class KnotZoneDataListener(BaseDataListener):
# Delete resource record from particular zone
elif (
len(ii) == (base_ii_len + 4)) \
and isinstance(ii[base_ii_len], MemberName) and (ii[base_ii_len].name == "zone") \
and isinstance(ii[base_ii_len], MemberName) and (ii[base_ii_len].key == "zone") \
and isinstance(ii[base_ii_len + 1], EntryKeys) \
and isinstance(ii[base_ii_len + 2], MemberName) and (ii[base_ii_len + 2].name == "rrset") \
and isinstance(ii[base_ii_len + 2], MemberName) and (ii[base_ii_len + 2].key == "rrset") \
and isinstance(ii[base_ii_len + 3], EntryKeys) \
and (ch.change_type == ChangeType.DELETE):
domain_name = ii[base_ii_len + 1].keys["name"]
......@@ -322,11 +322,11 @@ class KnotZoneDataListener(BaseDataListener):
# Delete resource record from particular zone (only specific "rdata" item)
elif (
len(ii) == (base_ii_len + 6)) \
and isinstance(ii[base_ii_len], MemberName) and (ii[base_ii_len].name == "zone") \
and isinstance(ii[base_ii_len], MemberName) and (ii[base_ii_len].key == "zone") \
and isinstance(ii[base_ii_len + 1], EntryKeys) \
and isinstance(ii[base_ii_len + 2], MemberName) and (ii[base_ii_len + 2].name == "rrset") \
and isinstance(ii[base_ii_len + 2], MemberName) and (ii[base_ii_len + 2].key == "rrset") \
and isinstance(ii[base_ii_len + 3], EntryKeys) \
and isinstance(ii[base_ii_len + 4], MemberName) and (ii[base_ii_len + 4].name == "rdata") \
and isinstance(ii[base_ii_len + 4], MemberName) and (ii[base_ii_len + 4].key == "rdata") \
and isinstance(ii[base_ii_len + 5], EntryKeys) \
and (ch.change_type == ChangeType.DELETE):
domain_name = ii[base_ii_len + 1].keys["name"]
......
......@@ -7,11 +7,9 @@ from yangson.instance import InstanceRoute, InstanceNode, Value, EntryKeys, None
from jetconf.knot_api import KnotInternalError
from .libknot.control import KnotCtl
from . import knot_api
from .helpers import DataHelpers
from .helpers import DataHelpers, JsonNodeT
from .handler_list import StateDataHandlerList
JsonNodeT = Union[Dict[str, Any], List]
class StateNonexistentInstance(NonexistentInstance):
def __init__(self, ii: InstanceRoute, text: str) -> None:
......
......@@ -99,8 +99,8 @@ def test_nacm(datastore_1, nacm_datastore_1):
# debug("Node contents: {}".format(datanode.value))
test_ii = data.parse_ii(test_path[0], PathFormat.XPATH)
rule = []
action = nacm_conf.get_user_nacm(test_user).check_data_node_path(data.get_data_root(), test_ii, test_path[1],
out_matching_rule=rule)
action = nacm_conf.get_user_nacm(test_user).check_data_node_permission(data.get_data_root(), test_ii, test_path[1],
out_matching_rule=rule)