# This file is part of A Boneless Datastructure Language # Copyright (C) 2020 Soni L. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """[Internal] The VM Interpreter. This module holds the VM instructions and the interpreter loop. """ import collections.abc import re from abdl import predicates from abdl import exceptions class PatternElement: """A parsed pattern element (token) and VM instruction.""" def on_not_in_key(self, frame, path, defs): """Called while the key matching step of the pattern isn't done. Returns whether the key matching step is ready. """ raise RuntimeError(self) def on_in_key(self, frame, path, defs): """Called while the key matching step of the pattern is done. Returns whether the key matching step is ready. """ raise RuntimeError(self) def on_end(self, frame, path, defs, in_key): """Called when the pattern has reached the end. Returns the new value of in_key and a dict to be yielded, or None and a dict to be yielded. """ raise RuntimeError(self) def collect_params(self, res: list): """Appends parameter names used in this pattern to ``res``. """ @classmethod def action(cls, toks): """Parse action, for pyparsing. Returns: PatternElement: Parsed token. """ return [cls(toks)] class Arrow(PatternElement): """The 'arrow' token.""" def on_not_in_key(self, frame, path, defs): assert not path[-1].empty path.append(Holder(value=None, parent=path[-1].value, empty=True)) return False @classmethod def action(cls, toks): return [cls()] class StringKey(PatternElement): """The 'literal' token.""" def __init__(self, toks): self.key = toks[0] self.skippable = toks[1] == '?' def on_in_key(self, frame, path, defs): return self.on_not_in_key(frame, path, defs) def on_not_in_key(self, frame, path, defs): path[-1].iterator = self._extract(path[-1].parent) path[-1].empty = False return True def _extract(self, obj): try: yield (self.key, obj[self.key]) except (TypeError, IndexError, KeyError): if not self.skippable: raise exceptions.ValidationError class RegexKey(PatternElement): """The 'regex' token.""" def __init__(self, toks): self.key = toks[0] self.compiled = re.compile(self.key) self.skippable = toks[1] == '?' def on_in_key(self, frame, path, defs): return self.on_not_in_key(frame, path, defs) def on_not_in_key(self, frame, path, defs): filtered_iterator = self._filter(path[-1].iterator) del path[-1].iterator path[-1].iterator = filtered_iterator del filtered_iterator path[-1].empty = False return True def _filter(self, iter_): for elem in iter_: try: if self.compiled.search(elem[0]): yield elem elif not self.skippable: raise exceptions.ValidationError except TypeError: if not self.skippable: raise exceptions.ValidationError class KeySubtree(PatternElement): """The 'keymatch' token.""" def __init__(self, toks): self.key = toks[0] self.skippable = toks[1] == '?' def on_in_key(self, frame, path, defs): return self.on_not_in_key(frame, path, defs) def on_not_in_key(self, frame, path, defs): path[-1].subtree = True filtered_iterator = self._filter(path[-1].iterator, defs, name=path[-1].name) del path[-1].iterator path[-1].iterator = filtered_iterator del filtered_iterator path[-1].empty = False return True def _filter(self, iter_, defs, name): for pair in iter_: for matches in match_helper(self.key, defs, pair[0]): if name: # FIXME this "name" thing is a bit suboptimal matches.setdefault(name, pair) yield (matches, pair[1]) def collect_params(self, res: list): for sub in self.key: sub.collect_params(res) class ValueSubtree(PatternElement): """The 'subvalue' token.""" def __init__(self, toks): self.key = toks[0] self.optional = toks[1] == '?' def on_not_in_key(self, frame, path, defs): assert not path[-1].empty path.append(Holder(value=None, parent=path[-1].value, empty=False, subtree=True)) path[-1].iterator = self._filter(path[-1].parent, defs) return True def _filter(self, parent, defs): has_results = False for pair in match_helper(self.key, defs, parent): has_results = True yield (pair, parent) # support for optional subtrees if self.optional and not has_results: yield ({}, parent) def collect_params(self, res: list): for sub in self.key: sub.collect_params(res) class Ident(PatternElement): """The 'identifier' token.""" def __init__(self, toks): self.key = toks[0] def on_not_in_key(self, frame, path, defs): path[-1].name = self.key path[-1].empty = False return True class Param(PatternElement): """The 'parameter' token.""" def __init__(self, toks): assert isinstance(toks[1], Ident) self.skippable = toks[0] == '?' self.key = toks[1].key def on_in_key(self, frame, path, defs): return self.on_not_in_key(frame, path, defs) def on_not_in_key(self, frame, path, defs): path[-1].iterator = self._extract(path[-1].parent, defs[self.key]) path[-1].empty = False return True def _extract(self, obj, key): try: yield (key, obj[key]) except (TypeError, IndexError, KeyError): if not self.skippable: raise exceptions.ValidationError def collect_params(self, res: list): res.append(self.key) class ApplyPredicate(PatternElement): """The 'predicate' token.""" def __init__(self, toks): assert isinstance(toks[1], Ident) self.skippable = toks[0] == '?' self.key = toks[1].key def on_in_key(self, frame, path, defs): filtered_iterator = self._filter(path[-1].iterator, defs) del path[-1].iterator path[-1].iterator = filtered_iterator del filtered_iterator path[-1].empty = False return True def _check(self, defs, obj): # pylint: disable=protected-access if predicates._to_predicate(defs[self.key]).accept(obj): return True if self.skippable: return False raise exceptions.ValidationError def on_not_in_key(self, frame, path, defs): assert len(path) == 1 if not self._check(defs, path[-1].value): path.clear() return False def _filter(self, iter_, defs): for elem in iter_: if self._check(defs, elem[1]): yield elem def collect_params(self, res: list): res.append(self.key) def on_end(self, frame, path, defs, in_key): assert not in_key res = {} for holder in path: if holder.subtree: for name, pair in holder.match.items(): res[name] = pair elif holder.name is not None: res[holder.name] = (holder.match, holder.value) path.clear() return (False, res) class End(PatternElement): """Pseudo-token, used to advance iteration.""" def on_in_key(self, frame, path, defs): try: path[-1].next() return False except StopIteration: path.pop() while frame.prev() and not isinstance(frame.current_op, End): pass if not frame.prev(): path.clear() return True def on_end(self, frame, path, defs, in_key): assert not path[-1].empty res = {} for holder in path: if holder.subtree: for name, pair in holder.match.items(): res[name] = pair elif holder.name is not None: res[holder.name] = (holder.match, holder.value) if not frame.prev(): # this should never happen assert False return (True, res) @classmethod def action(cls, toks): return [cls()] def _pairs(obj): if isinstance(obj, collections.abc.Mapping): return iter(obj.items()) if isinstance(obj, collections.abc.Sequence): return iter(enumerate(obj, 0)) if isinstance(obj, collections.abc.Set): return iter(((e, e) for e in obj)) # maybe there's more stuff I can implement later raise TypeError class Holder: """Stores a single match and associated metadata. A single match is generally a key-value pair, but may be a collection of named pairs in the case of subtree matches. """ def __init__(self, value, parent=None, iterator=None, empty=False, subtree=False): self.name = None self.match = None self.value = value self.empty = empty self._iterator = iterator self.parent = parent self.subtree = subtree @property def iterator(self): """Returns the iterator for this match.""" if self._iterator is None: self._iterator = _pairs(self.parent) return self._iterator @iterator.setter def iterator(self, value): assert self._iterator is None self._iterator = value @iterator.deleter def iterator(self): self._iterator = None def next(self): """Updates the stored match.""" self.match, self.value = next(self.iterator) class _Frame: def __init__(self, ops): self.ops = ops self.iar = -1 def next(self): """Advances the instruction address register. Returns: ``True`` if successful, ``False``otherwise. """ iar = self.iar + 1 if iar >= len(self.ops): return False self.iar = iar return True @property def current_op(self): """Returns the current instruction.""" return self.ops[self.iar] def prev(self): """Rewinds the instruction address register. Returns: ``True`` if successful, ``False``otherwise. """ iar = self.iar - 1 if iar < 0: return False self.iar = iar return True def match_helper(ops, defs, tree): """The interpreter loop itself. The opcode/token dispatch logic is implemented through ``PatternElement``. Yields: dict: Matches. """ frame = _Frame(ops) if not len(frame.ops): # no ops? yield {} return # do nothing path = [Holder(value=tree, parent=None, iterator=iter(()))] in_key = False while path: if not frame.next(): in_key, res = frame.current_op.on_end(frame, path, defs, in_key) yield res else: if in_key: in_key = frame.current_op.on_in_key(frame, path, defs) else: in_key = frame.current_op.on_not_in_key(frame, path, defs)