# This file is part of A Boneless Datastructure Language
# Copyright (C) 2020 Soni L.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""[Internal] The VM Interpreter.
This module holds the VM instructions and the interpreter loop.
"""
import collections.abc
import re
from abdl import predicates
from abdl import exceptions
class PatternElement:
"""A parsed pattern element (token) and VM instruction."""
def on_not_in_key(self, frame, path, defs):
"""Called while the key matching step of the pattern isn't done.
Returns whether the key matching step is ready.
"""
raise RuntimeError(self)
def on_in_key(self, frame, path, defs):
"""Called while the key matching step of the pattern is done.
Returns whether the key matching step is ready.
"""
raise RuntimeError(self)
def on_end(self, frame, path, defs, in_key):
"""Called when the pattern has reached the end.
Returns the new value of in_key and a dict to be yielded, or
None and a dict to be yielded.
"""
raise RuntimeError(self)
def collect_params(self, res: list):
"""Appends parameter names used in this pattern to ``res``.
"""
@classmethod
def action(cls, toks):
"""Parse action, for pyparsing.
Returns:
PatternElement: Parsed token.
"""
return [cls(toks)]
class Arrow(PatternElement):
"""The 'arrow' token."""
def on_not_in_key(self, frame, path, defs):
assert not path[-1].empty
path.append(Holder(value=None, parent=path[-1].value, empty=True))
return False
@classmethod
def action(cls, toks):
return [cls()]
class StringKey(PatternElement):
"""The 'literal' token."""
def __init__(self, toks):
self.key = toks[0]
self.skippable = toks[1] == '?'
def on_in_key(self, frame, path, defs):
return self.on_not_in_key(frame, path, defs)
def on_not_in_key(self, frame, path, defs):
path[-1].iterator = self._extract(path[-1].parent)
path[-1].empty = False
return True
def _extract(self, obj):
try:
yield (self.key, obj[self.key])
except (TypeError, IndexError, KeyError):
if not self.skippable:
raise exceptions.ValidationError
class RegexKey(PatternElement):
"""The 'regex' token."""
def __init__(self, toks):
self.key = toks[0]
self.compiled = re.compile(self.key)
self.skippable = toks[1] == '?'
def on_in_key(self, frame, path, defs):
return self.on_not_in_key(frame, path, defs)
def on_not_in_key(self, frame, path, defs):
filtered_iterator = self._filter(path[-1].iterator)
del path[-1].iterator
path[-1].iterator = filtered_iterator
del filtered_iterator
path[-1].empty = False
return True
def _filter(self, iter_):
for elem in iter_:
try:
if self.compiled.search(elem[0]):
yield elem
elif not self.skippable:
raise exceptions.ValidationError
except TypeError:
if not self.skippable:
raise exceptions.ValidationError
class KeySubtree(PatternElement):
"""The 'keymatch' token."""
def __init__(self, toks):
self.key = toks[0]
self.skippable = toks[1] == '?'
def on_in_key(self, frame, path, defs):
return self.on_not_in_key(frame, path, defs)
def on_not_in_key(self, frame, path, defs):
path[-1].subtree = True
filtered_iterator = self._filter(path[-1].iterator, defs, name=path[-1].name)
del path[-1].iterator
path[-1].iterator = filtered_iterator
del filtered_iterator
path[-1].empty = False
return True
def _filter(self, iter_, defs, name):
for pair in iter_:
for matches in match_helper(self.key, defs, pair[0]):
if name:
# FIXME this "name" thing is a bit suboptimal
matches.setdefault(name, pair)
yield (matches, pair[1])
def collect_params(self, res: list):
for sub in self.key:
sub.collect_params(res)
class ValueSubtree(PatternElement):
"""The 'subvalue' token."""
def __init__(self, toks):
self.key = toks[0]
self.optional = toks[1] == '?'
def on_not_in_key(self, frame, path, defs):
assert not path[-1].empty
path.append(Holder(value=None, parent=path[-1].value, empty=False, subtree=True))
path[-1].iterator = self._filter(path[-1].parent, defs)
return True
def _filter(self, parent, defs):
has_results = False
for pair in match_helper(self.key, defs, parent):
has_results = True
yield (pair, parent)
# support for optional subtrees
if self.optional and not has_results:
yield ({}, parent)
def collect_params(self, res: list):
for sub in self.key:
sub.collect_params(res)
class Ident(PatternElement):
"""The 'identifier' token."""
def __init__(self, toks):
self.key = toks[0]
def on_not_in_key(self, frame, path, defs):
path[-1].name = self.key
path[-1].empty = False
return True
class Param(PatternElement):
"""The 'parameter' token."""
def __init__(self, toks):
assert isinstance(toks[1], Ident)
self.skippable = toks[0] == '?'
self.key = toks[1].key
def on_in_key(self, frame, path, defs):
return self.on_not_in_key(frame, path, defs)
def on_not_in_key(self, frame, path, defs):
path[-1].iterator = self._extract(path[-1].parent, defs[self.key])
path[-1].empty = False
return True
def _extract(self, obj, key):
try:
yield (key, obj[key])
except (TypeError, IndexError, KeyError):
if not self.skippable:
raise exceptions.ValidationError
def collect_params(self, res: list):
res.append(self.key)
class ApplyPredicate(PatternElement):
"""The 'predicate' token."""
def __init__(self, toks):
assert isinstance(toks[1], Ident)
self.skippable = toks[0] == '?'
self.key = toks[1].key
def on_in_key(self, frame, path, defs):
filtered_iterator = self._filter(path[-1].iterator, defs)
del path[-1].iterator
path[-1].iterator = filtered_iterator
del filtered_iterator
path[-1].empty = False
return True
def _check(self, defs, obj):
# pylint: disable=protected-access
if predicates._to_predicate(defs[self.key]).accept(obj):
return True
if self.skippable:
return False
raise exceptions.ValidationError
def on_not_in_key(self, frame, path, defs):
assert len(path) == 1
if not self._check(defs, path[-1].value):
path.clear()
return False
def _filter(self, iter_, defs):
for elem in iter_:
if self._check(defs, elem[1]):
yield elem
def collect_params(self, res: list):
res.append(self.key)
def on_end(self, frame, path, defs, in_key):
assert not in_key
res = {}
for holder in path:
if holder.subtree:
for name, pair in holder.match.items():
res[name] = pair
elif holder.name is not None:
res[holder.name] = (holder.match, holder.value)
path.clear()
return (False, res)
class End(PatternElement):
"""Pseudo-token, used to advance iteration."""
def on_in_key(self, frame, path, defs):
try:
path[-1].next()
return False
except StopIteration:
path.pop()
while frame.prev() and not isinstance(frame.current_op, End):
pass
if not frame.prev():
path.clear()
return True
def on_end(self, frame, path, defs, in_key):
assert not path[-1].empty
res = {}
for holder in path:
if holder.subtree:
for name, pair in holder.match.items():
res[name] = pair
elif holder.name is not None:
res[holder.name] = (holder.match, holder.value)
if not frame.prev():
# this should never happen
assert False
return (True, res)
@classmethod
def action(cls, toks):
return [cls()]
def _pairs(obj):
if isinstance(obj, collections.abc.Mapping):
return iter(obj.items())
if isinstance(obj, collections.abc.Sequence):
return iter(enumerate(obj, 0))
if isinstance(obj, collections.abc.Set):
return iter(((e, e) for e in obj))
# maybe there's more stuff I can implement later
raise TypeError
class Holder:
"""Stores a single match and associated metadata.
A single match is generally a key-value pair, but may be a collection of
named pairs in the case of subtree matches.
"""
def __init__(self, value, parent=None, iterator=None, empty=False, subtree=False):
self.name = None
self.match = None
self.value = value
self.empty = empty
self._iterator = iterator
self.parent = parent
self.subtree = subtree
@property
def iterator(self):
"""Returns the iterator for this match."""
if self._iterator is None:
self._iterator = _pairs(self.parent)
return self._iterator
@iterator.setter
def iterator(self, value):
assert self._iterator is None
self._iterator = value
@iterator.deleter
def iterator(self):
self._iterator = None
def next(self):
"""Updates the stored match."""
self.match, self.value = next(self.iterator)
class _Frame:
def __init__(self, ops):
self.ops = ops
self.iar = -1
def next(self):
"""Advances the instruction address register.
Returns:
``True`` if successful, ``False``otherwise.
"""
iar = self.iar + 1
if iar >= len(self.ops):
return False
self.iar = iar
return True
@property
def current_op(self):
"""Returns the current instruction."""
return self.ops[self.iar]
def prev(self):
"""Rewinds the instruction address register.
Returns:
``True`` if successful, ``False``otherwise.
"""
iar = self.iar - 1
if iar < 0:
return False
self.iar = iar
return True
def match_helper(ops, defs, tree):
"""The interpreter loop itself.
The opcode/token dispatch logic is implemented through ``PatternElement``.
Yields:
dict: Matches.
"""
frame = _Frame(ops)
if not len(frame.ops): # no ops?
yield {}
return # do nothing
path = [Holder(value=tree, parent=None, iterator=iter(()))]
in_key = False
while path:
if not frame.next():
in_key, res = frame.current_op.on_end(frame, path, defs, in_key)
yield res
else:
if in_key:
in_key = frame.current_op.on_in_key(frame, path, defs)
else:
in_key = frame.current_op.on_not_in_key(frame, path, defs)