diff --git a/tests/test_model.py b/tests/test_model.py index 560df61c8000ee2834da79e858cc67b3f364ad4f..18f94436d808a501448c570f577fda831f4dd8ad 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -121,7 +121,7 @@ def test_context(data_model): tid = Context.last_revision("test") stid = Context.last_revision("subtest") tbid = Context.last_revision("testb") - assert Context.modules[tid].argument == "test" + assert Context.modules[tid].statement.argument == "test" assert Context.translate_pname("t:foo", tbid) == ("foo", "test") assert Context.translate_pname("sd:foo", stid) == ("foo", "defs") with pytest.raises(UnknownPrefix): @@ -313,7 +313,7 @@ def test_instance(instance): def test_xpath(instance): def xptest(expr, res=True, node=instance, module="test"): - mid = (module, Context.revisions[module][0]) + mid = Context.last_revision(module) assert XPathParser(expr, mid).parse().evaluate(node) == res conta = instance.member("test:contA") lr = conta.member("testb:leafR") diff --git a/yangson/context.py b/yangson/context.py index 0d469ff6692dff37d6dc7e17e123e6cc8c4eecaa..9fc812e4a0c1bcb6fe904c415c7ed826554ed6bf 100644 --- a/yangson/context.py +++ b/yangson/context.py @@ -1,4 +1,4 @@ -from typing import Dict, List, MutableSet +from typing import Dict, List, MutableSet, Optional, Tuple from .constants import YangsonException from .parser import Parser, ParserException from .statement import DefinitionNotFound, ModuleParser, Statement @@ -9,12 +9,14 @@ Essential data model structures and methods. This module implements the following classes: +* ModuleData: Data related to a YANG module or submodule. * Context: Repository of data model structures and methods. * FeatureExprParser: Parser for if-feature expressions. The module defines the following exceptions: * ModuleNotFound: YANG module not found. +* ModuleNotRegistered: Module is not registered in YANG library. * BadYangLibraryData: Invalid YANG library data. * BadPath: Invalid schema path * UnknownPrefix: Unknown namespace prefix. @@ -26,30 +28,36 @@ The module defines the following exceptions: * CyclicImports: Imports of YANG modules form a cycle. """ +class ModuleData: + """Data related to a YANG module or submodule.""" + + def __init__(self, main_module: YangIdentifier): + """Initialize the class instance.""" + self.main_module = main_module # type: ModuleId + """Main module of the receiver.""" + self.statement = None # type: Statement + """Corresponding (sub)module statements.""" + self.prefix_map = {} # type: Dict[YangIdentifier, ModuleId] + """Map of prefixes to module identifiers.""" + self.features = set() # type: MutableSet[YangIdentifier] + """Set of supported features.""" + self.submodules = set() # type: MutableSet[ModuleId] + """Set of submodules.""" + class Context: """Global repository of data model structures and utility methods.""" @classmethod def _initialize(cls) -> None: - """Initialize the context variables.""" - cls.features = set() # type: MutableSet[QualName] - """Set of supported features.""" - cls.identity_bases = {} # type: Dict[QualName, MutableSet[QualName]] - """Dictionary of identity bases.""" - cls.implement = set() # type: MutableSet[YangIdentifier] - """Set of main modules with conformance type "implement".""" + """Initialize the context structures.""" cls.module_search_path = [] # type: List[str] """List of directories where to look for YANG modules.""" - cls.modules = {} # type: Dict[ModuleId, Statement] - """Dictionary of modules and submodules comprising the data model.""" - cls.prefix_map = {} # type: Dict[ModuleId, Dict[YangIdentifier, ModuleId]] - """Dictionary of prefix mappings.""" - cls.revisions = {} # type: Dict[YangIdentifier, List[str]] - """Dictionary of module and submodule revisions.""" - cls.submodules = {} # type: Dict[ModuleId, List[ModuleId]] - """Dictionary of submodules belonging to a main module.""" - cls._main_module = {} # type: Dict[YangIdentifier, YangIdentifier] - """Dictionary mapping submodules to their main modules.""" + cls.modules = {} # type: Dict[ModuleId, ModuleData] + """Dictionary of module data.""" + cls.implement = {} # type: Dict[YangIdentifier, RevisionDate] + """Dictionary of implemented revisions.""" + cls.identity_bases = {} # type: Dict[QualName, MutableSet[QualName]] + """Dictionary of identity bases.""" cls._module_sequence = [] # type: List[ModuleId] """List that defines the order of module processing.""" @@ -59,7 +67,7 @@ class Context: """Set the data model structures from YANG library data. This method requires that the class variable `schema` be - initialized with a GroupNode instance. + initialized beforehand with a GroupNode instance. Args: yang_lib: Dictionary with YANG library data. @@ -79,40 +87,38 @@ class Context: try: for item in yang_lib["ietf-yang-library:modules-state"]["module"]: name = item["name"] - if "feature" in item: - cls.features.update( - [ (f,name) for f in item["feature"] ]) rev = item["revision"] mid = (name, rev) - ct = item["conformance-type"] - if ct == "implement": + mdata = ModuleData(mid) + cls.modules[mid] = mdata + if item["conformance-type"] == "implement": if name in cls.implement: raise MultipleImplementedRevisions(name) - cls.implement.add(name) - cls.revisions.setdefault(name, []).append(rev) + cls.implement[name] = rev mod = cls._load_module(name, rev) + mdata.statement = mod + if "feature" in item: + mdata.features.update(item["feature"]) locpref = mod.find1("prefix", required=True).argument - cls.prefix_map[mid] = { locpref: mid } + mdata.prefix_map[locpref] = mid if "submodule" in item: for s in item["submodule"]: sname = s["name"] - cls._main_module[sname] = name - rev = s["revision"] - smid = (sname, rev) - cls.revisions.setdefault(sname, []).append(rev) - cls.submodules.setdefault(mid, []).append((sname, rev)) - submod = cls._load_module(sname, rev) + smid = (sname, s["revision"]) + sdata = ModuleData(mid) + cls.modules[smid] = sdata + mdata.submodules.add(smid) + submod = cls._load_module(*smid) + sdata.statement = submod bt = submod.find1("belongs-to", name, required=True) locpref = bt.find1("prefix", required=True).argument - cls.prefix_map[smid] = { locpref: mid } + sdata.prefix_map[locpref] = mid except (KeyError, AttributeError) as e: raise BadYangLibraryData() - for mod in cls.revisions: - cls.revisions[mod].sort() cls._process_imports() cls._check_feature_dependences() for mid in cls._module_sequence: - cls.schema._handle_substatements(cls.modules[mid], mid) + cls.schema._handle_substatements(cls.modules[mid].statement, mid) cls._apply_augments() cls.schema._post_process() cls.schema._make_schema_patterns() @@ -120,7 +126,7 @@ class Context: @classmethod def _load_module(cls, name: YangIdentifier, rev: RevisionDate) -> Statement: - """Read, parse and register YANG module or submodule.""" + """Read and parse a YANG module or submodule.""" for d in cls.module_search_path: fn = "{}/{}".format(d, name) if rev: fn += "@" + rev @@ -130,73 +136,73 @@ class Context: res = ModuleParser(infile.read()).parse() except FileNotFoundError: continue - cls.modules[(name, rev)] = res return res raise ModuleNotFound(name, rev) @classmethod - def main_module(cls, name: YangIdentifier) -> YangIdentifier: - """For any module, return the corresponding main module. + def namespace(cls, mid: ModuleId) -> YangIdentifier: + """Return the namespace corresponding to a module or submodule. Args: - name: Name of a main module or submodule. + mid: Module identifier. """ - return cls._main_module.get(name, name) + return cls.modules[mid].main_module[0] @classmethod - def last_revision(cls, mname: YangIdentifier) -> ModuleId: + def last_revision(cls, name: YangIdentifier) -> ModuleId: """Return last revision of a module that's part of the data model.""" - return (mname, cls.revisions[mname][-1]) + revs = [mn for mn in cls.modules if mn[0] == name] + if not revs: + raise ModuleNotRegistered(impn, rev) + return sorted(revs, key=lambda x: x[1])[-1] @classmethod def _process_imports(cls) -> None: - deps = { mn: set() for mn in cls.implement } - impby = { mn: set() for mn in cls.implement } + impl = set(cls.implement.items()) + deps = { mid: set() for mid in impl } + impby = { mid: set() for mid in impl } for mid in cls.modules: - mod = cls.modules[mid] + mod = cls.modules[mid].statement for impst in mod.find_all("import"): impn = impst.argument prefix = impst.find1("prefix", required=True).argument revst = impst.find1("revision-date") if revst: - rev = revst.argument - if rev in cls.revisions[impn]: - imid = (impn, rev) - else: - raise ModuleNotFound(impn, rev) + imid = (impn, revst.argument) + if imid not in cls.modules: + raise ModuleNotRegistered(impn, rev) else: # use last revision imid = cls.last_revision(impn) - cls.prefix_map[mid][prefix] = imid - mm = cls.main_module(mid[0]) - if mm in cls.implement and impn in cls.implement: - deps[mm].add(impn) - impby[impn].add(mm) - free = [mn for mn in deps if len(deps[mn]) == 0] + cls.modules[mid].prefix_map[prefix] = imid + mm = cls.modules[mid].main_module + if mm in impl and imid in impl: + deps[mm].add(imid) + impby[imid].add(mm) + free = [mid for mid in deps if len(deps[mid]) == 0] if not free: raise CyclicImports() while free: - n = free.pop() - nid = cls.last_revision(n) + nid = free.pop() cls._module_sequence.append(nid) - if nid in cls.submodules: - cls._module_sequence.extend(cls.submodules[nid]) - for m in impby[n]: - deps[m].remove(n) - if len(deps[m]) == 0: - free.append(m) - if [mn for mn in deps if len(deps[mn]) > 0]: raise CyclicImports() + cls._module_sequence.extend(cls.modules[nid].submodules) + for mid in impby[nid]: + deps[mid].remove(nid) + if len(deps[mid]) == 0: + free.append(mid) + if [mid for mid in deps if len(deps[mid]) > 0]: + raise CyclicImports() @classmethod def _apply_augments(cls) -> None: """Apply top-level augments from all implemented modules.""" for mid in cls._module_sequence: - mod = cls.modules[mid] + mod = cls.modules[mid].statement for aug in mod.find_all("augment"): cls.schema._augment_stmt(aug, mid, True) @classmethod def prefix2ns(cls, prefix: YangIdentifier, mid: ModuleId) -> YangIdentifier: """Return the namespace corresponding to the prefix.""" - return cls.prefix_map[mid][prefix][0] + return cls.modules[mid].prefix_map[prefix][0] @classmethod def resolve_pname(cls, pname: PrefName, @@ -212,7 +218,7 @@ class Context: """ p, s, loc = pname.partition(":") try: - return (loc, cls.prefix_map[mid][p]) if s else (p, mid) + return (loc, cls.modules[mid].prefix_map[p]) if s else (p, mid) except KeyError: raise UnknownPrefix(pname) from None @@ -225,7 +231,7 @@ class Context: mid: Identifier of the context module. """ loc, nid = cls.resolve_pname(pname, mid) - return (loc, cls.main_module(nid[0])) + return (loc, cls.namespace(nid)) @classmethod def sid2route(cls, sid: str, mid: ModuleId) -> SchemaRoute: @@ -239,8 +245,8 @@ class Context: return [ cls.translate_pname(qn, mid) for qn in (nlist[1:] if sid[0] == "/" else nlist) ] - @classmethod - def path2route(cls, path: SchemaPath) -> SchemaRoute: + @staticmethod + def path2route(path: SchemaPath) -> SchemaRoute: """Translate a schema/data path to a schema/data route. Args: @@ -276,12 +282,11 @@ class Context: kw = "grouping" if stmt.keyword == "uses" else "typedef" loc, did = cls.resolve_pname(stmt.argument, mid) if did == mid: return (stmt.get_definition(loc, kw), mid) - dstmt = cls.modules[did].find1(kw, loc) + dstmt = cls.modules[did].statement.find1(kw, loc) if dstmt: return (dstmt, did) - if did in cls.submodules: - for sid in cls.submodules[did]: - dstmt = cls.modules[sid].find1(kw, loc) - if dstmt: return (dstmt, sid) + for sid in cls.modules[did].submodules: + dstmt = cls.modules[sid].statement.find1(kw, loc) + if dstmt: return (dstmt, sid) raise DefinitionNotFound(kw, stmt.argument) @classmethod @@ -300,15 +305,15 @@ class Context: def _check_feature_dependences(cls): """Verify feature dependences.""" for mid in cls.modules: - for fst in cls.modules[mid].find_all("feature"): - fn = cls.translate_pname(fst.argument, mid) - if fn not in cls.features: continue + for fst in cls.modules[mid].statement.find_all("feature"): + fn, fid = cls.resolve_pname(fst.argument, mid) + if fn not in cls.modules[fid].features: continue if not cls.if_features(fst, mid): raise FeaturePrerequisiteError(*fn) @classmethod def if_features(cls, stmt: Statement, mid: ModuleId) -> bool: - """Evaluate ``if-feature`` substatements, if any. + """Evaluate ``if-feature`` substatements on a statement, if any. Args: stmt: Yang statement that is tested on if-features. @@ -375,14 +380,13 @@ class FeatureExprParser(Parser): return res n, p = self.qualified_name() self.skip_ws() - ns = (Context.prefix2ns(p, self.mid) if p - else Context.main_module(self.mid[0])) - return (n, ns) in Context.features + fid = Context.modules[self.mid].prefix_map[p] if p else self.mid + return n in Context.modules[fid].features -class ModuleNotFound(YangsonException): - """A module is not found.""" +class MissingModule(YangsonException): + """Abstract exception – a module is missing.""" - def __init__(self, name: YangIdentifier, rev: str = None) -> None: + def __init__(self, name: YangIdentifier, rev: str = "") -> None: self.name = name self.rev = rev @@ -391,6 +395,14 @@ class ModuleNotFound(YangsonException): return self.name + "@" + self.rev return self.name +class ModuleNotFound(MissingModule): + """A module that is listed in YANG library is not found.""" + pass + +class ModuleNotRegistered(MissingModule): + """A module is not registered in YANG library.""" + pass + class BadYangLibraryData(YangsonException): """Broken YANG Library data.""" diff --git a/yangson/datamodel.py b/yangson/datamodel.py index 023603e2fa2733f91d49810e64559a478e68e373..892891cb5b7c137a9482cebec0d3654a3f800fb7 100644 --- a/yangson/datamodel.py +++ b/yangson/datamodel.py @@ -70,7 +70,7 @@ class DataModel(metaclass=Singleton): Returns: String consisting of hexadecimal digits. """ - fnames = sorted(["@".join(m) for m in Context.modules.keys()]) + fnames = sorted(["@".join(m) for m in Context.modules]) return hashlib.sha1("".join(fnames).encode("ascii")).hexdigest() @staticmethod diff --git a/yangson/datatype.py b/yangson/datatype.py index 70df3f670b658670c5f074e0d62b8ee79d67dee5..a8be0927661ebd339f6f4e5f9f91e2d6b718ce7b 100644 --- a/yangson/datatype.py +++ b/yangson/datatype.py @@ -473,7 +473,7 @@ class IdentityrefType(DataType): def _convert_raw(self, raw: str) -> QualName: i1, s, i2 = raw.partition(":") - return (i2, i1) if s else (i1, Context.main_module(self.module_id[0])) + return (i2, i1) if s else (i1, Context.namespace(self.module_id)) def _constraints(self, val: QualName) -> bool: for b in self.bases: diff --git a/yangson/schema.py b/yangson/schema.py index 95396d2de05715784a20f1383e20a69d88ff1d8a..ea337a72a3ad56a0705db5a84be692d06e503dcf 100644 --- a/yangson/schema.py +++ b/yangson/schema.py @@ -88,7 +88,7 @@ class SchemaNode: """Dispatch actions for substatements of `stmt`.""" for s in stmt.substatements: if s.prefix: - key = Context.prefix_map[mid][s.prefix][0] + ":" + s.keyword + key = Context.modules[mid].prefix_map[s.prefix][0] + ":" + s.keyword else: key = s.keyword mname = SchemaNode._stmt_callback.get(key, "_noop") @@ -349,7 +349,7 @@ class InternalNode(SchemaNode): """Add child node to the receiver and handle substatements.""" if not Context.if_features(stmt, mid): return node.name = stmt.argument - node.ns = Context.main_module(mid[0]) if self._nsswitch else self.ns + node.ns = Context.namespace(mid) if self._nsswitch else self.ns self.add_child(node) node._handle_substatements(stmt, mid) @@ -370,12 +370,10 @@ class InternalNode(SchemaNode): """Handle **refine** statement.""" target = self.get_schema_descendant( Context.sid2route(stmt.argument, mid)) - for ist in stmt.find_all("if-feature"): - if Context.translate_pname( - stmt.argument, mid) not in Context.features: - target.parent.remove_child(target) - return - target._handle_substatements(stmt, mid) + if not Context.if_features(stmt, mid): + target.parent.remove_child(target) + else: + target._handle_substatements(stmt, mid) def _uses_stmt(self, stmt: Statement, mid: ModuleId) -> None: """Handle uses statement.""" @@ -401,7 +399,7 @@ class InternalNode(SchemaNode): if not Context.if_features(stmt, mid): return bases = stmt.find_all("base") Context.identity_bases[ - (stmt.argument, Context.main_module(mid[0]))] = set( + (stmt.argument, Context.namespace(mid))] = set( [Context.translate_pname(ist.argument, mid) for ist in bases]) def _list_stmt(self, stmt: Statement, mid: ModuleId) -> None: @@ -775,7 +773,7 @@ class ChoiceNode(InternalNode): else: cn = CaseNode() cn.name = stmt.argument - cn.ns = Context.main_module(mid[0]) if self._nsswitch else self.ns + cn.ns = Context.namespace(mid) if self._nsswitch else self.ns self.add_child(cn) cn._handle_child(node, stmt, mid) diff --git a/yangson/xpathast.py b/yangson/xpathast.py index d76e6d26196a0e379880caed652a2d8de422f9a8..dc6df986a8af445f83f3bef7836a65640dea69a9 100644 --- a/yangson/xpathast.py +++ b/yangson/xpathast.py @@ -388,8 +388,8 @@ class FuncDerivedFrom(BinaryExpr): self.mid = mid def _properties_str(self): - return (("OR-SELF, " if self.or_self else "") + - Context.main_module(self.mid[0])) + return ("OR-SELF, " if self.or_self + else "") + Context.namespace(self.mid) def _eval(self, xctx: XPathContext) -> bool: ns = self.left._eval(xctx) diff --git a/yangson/xpathparser.py b/yangson/xpathparser.py index b43f9d57b52dc106e9728c55066becb4c116c905..dec8d42de641b7e418440d495afefdd165cafb4e 100644 --- a/yangson/xpathparser.py +++ b/yangson/xpathparser.py @@ -217,7 +217,7 @@ class XPathParser(Parser): try: next = self.peek() except EndOfInput: - return (Axis.child, (yid, Context.main_module(self.mid[0]))) + return (Axis.child, (yid, Context.namespace(self.mid))) if next == "(": return (Axis.child, self._node_type(yid)) if next == ":": @@ -239,7 +239,7 @@ class XPathParser(Parser): loc = self.yang_identifier() self.skip_ws() return (Axis.child, (loc, nsp)) - return (Axis.child, (yid, Context.main_module(self.mid[0]))) + return (Axis.child, (yid, Context.namespace(self.mid))) def _node_type(self, typ): if typ == "node": @@ -261,13 +261,13 @@ class XPathParser(Parser): try: next = self.peek() except EndOfInput: - return (ident, Context.main_module(self.mid[0])) + return (ident, Context.namespace(self.mid)) if next == "(": return self._node_type(ident) if not ws and self.test_string(":"): res = (self.yang_identifier(), Context.prefix2ns(ident, self.mid)) else: - res = (ident, Context.main_module(self.mid[0])) + res = (ident, Context.namespace(self.mid)) self.skip_ws() return res