From 2a344c63c4145c2c96ac2096dcf02d64694a3dc8 Mon Sep 17 00:00:00 2001 From: SoniEx2 Date: Wed, 7 Feb 2024 20:59:05 -0300 Subject: Handle untrusted input more better --- ganarchy/data.py | 136 +++++++++++++++++++++++++++++++++---------------------- 1 file changed, 81 insertions(+), 55 deletions(-) (limited to 'ganarchy/data.py') diff --git a/ganarchy/data.py b/ganarchy/data.py index 34d5a2a..c803b9b 100644 --- a/ganarchy/data.py +++ b/ganarchy/data.py @@ -71,7 +71,9 @@ def _check_uri(obj, ports=range(1,65536), schemes=('https',)): _commit_pattern = re.compile(r"^[0-9a-fA-F]{40}$") _commit_sha256_pattern = re.compile(r"^[0-9a-fA-F]{40}$|^[0-9a-fA-F]{64}$") -def _is_commit(obj, sha256ready=True): +def _is_commit_id(obj, sha256ready=True): + if not isinstance(obj, str): + return False if sha256ready: return _commit_sha256_pattern.match(obj) else: @@ -126,7 +128,7 @@ class PCTP(OverridableProperty): if branch == "HEAD": self.branch = None else: - self.branch = branch or None + self.branch = branch self.options = options def as_key(self): @@ -221,19 +223,21 @@ class DataSource(abc.ABC): try: # note: unpacking ret, = iterator - except LookupError as exc: raise RuntimeError from exc # don't accidentally swallow bugs in the iterator + except LookupError as exc: + # don't accidentally swallow bugs in the iterator + raise RuntimeError from exc return ret @abc.abstractmethod def get_property_values(self, prop): - """Yields the values associated with the given property. + """Returns the values associated with the given property as an iterable. If duplicated, earlier values should override later values. Args: prop (DataProperty): The property. - Yields: + Returns: The values associated with the given property. Raises: @@ -254,49 +258,77 @@ class ObjectDataSource(DataSource): Updates to the backing object will be immediately reflected in this DataSource. """ - # these must all be generators... + + @staticmethod + def _get_instance_title(obj): + result = obj.get('title') + if not isinstance(result, str): + raise _ValidationError + return [result] + + @staticmethod + def _get_instance_base_uri(obj): + result = obj.get('base_url') + if not isinstance(result, str): + raise _ValidationError + if not result.isprintable() and not _is_uri(result): + raise _ValidationError + return [result] + + @staticmethod + def _get_instance_fedito(obj): + result = obj.get('fedi-to') + if not isinstance(result, int): + raise _ValidationError + return [result] + + @staticmethod + def _get_vcs_repos(obj): + projects = obj.get('projects') + if not isinstance(projects, dict): + raise _ValidationError + return ( + PCTP(commit, uri, branch, + {k: v + for k, v in options.items() + if (k in {'active', 'federate', 'pinned'} + and isinstance(v, bool)) + }) + for (commit, uris) in projects.items() + if _is_commit_id(commit) + if isinstance(uris, dict) + for (uri, branches) in uris.items() + if isinstance(uri, str) and uri.isprintable() and _is_uri(uri) + if isinstance(branches, dict) + for (branch, options) in branches.items() + if branch is None or isinstance(branch, str) + and branch.isprintable() + if isinstance(options, dict) + and isinstance(options.get('active'), bool) + ) + + @staticmethod + def _get_repo_list_sources(obj): + sources = obj.get('repo_list_srcs') + if not isinstance(sources, dict): + raise _ValidationError + return ( + RepoListSource(src, options) + for (src, options) in sources.items() + if isinstance(src, str) + and _is_uri(src, schemes=('https','file')) + if isinstance(options, dict) + and isinstance(options.get('active'), bool) + # TODO it would probably make sense to add + # options.get('type', 'toml') somewhere... + ) + _SUPPORTED_PROPERTIES = { - DataProperty.INSTANCE_TITLE: - lambda obj: (yield _check_type(obj.get('title'), str)), - DataProperty.INSTANCE_BASE_URL: - lambda obj: (yield _check_uri(obj.get('base_url'))), - DataProperty.INSTANCE_FEDITO: - lambda obj: (yield _check_type(obj.get('fedi-to'), int)), - DataProperty.VCS_REPOS: - lambda obj: ( - PCTP(commit, uri, branch, - {k: v - for k, v in options.items() - if (k in {'active', 'federate', 'pinned'} - and isinstance(v, bool)) - }) - for lazy_obj in (obj,) - for (commit, uris) in _check_type(lazy_obj.get('projects'), - dict).items() - if isinstance(commit, str) - if _is_commit(commit) - if isinstance(uris, dict) - for (uri, branches) in uris.items() - if isinstance(uri, str) - if _is_uri(uri) - if isinstance(branches, dict) - for (branch, options) in branches.items() - if isinstance(options, dict) - if isinstance(options.get('active'), bool) - ), - DataProperty.REPO_LIST_SOURCES: - lambda obj: ( - RepoListSource(src, options) - for lazy_obj in (obj,) - for (src, options) in _check_type(lazy_obj.get('repo_list_srcs'), - dict).items() - if isinstance(src, str) - if _is_uri(src, schemes=('https','file')) - if isinstance(options, dict) - if isinstance(options.get('active'), bool) - # TODO it would probably make sense to add - # options.get('type', 'toml') somewhere... - ), + DataProperty.INSTANCE_TITLE: _get_instance_title, + DataProperty.INSTANCE_BASE_URL: _get_instance_base_uri, + DataProperty.INSTANCE_FEDITO: _get_instance_fedito, + DataProperty.VCS_REPOS: _get_vcs_repos, + DataProperty.REPO_LIST_SOURCES: _get_repo_list_sources, } def __init__(self, obj): @@ -313,17 +345,11 @@ class ObjectDataSource(DataSource): factory = self.get_supported_properties()[prop] except KeyError as exc: raise PropertyError from exc - iterator = factory(self._obj) try: - first = next(iterator) - except StopIteration: - return (x for x in ()) + iterable = factory(self._obj) except _ValidationError as exc: raise LookupError from exc - except LookupError as exc: - # don't accidentally swallow bugs in the iterator - raise RuntimeError from exc - return itertools.chain([first], iterator) + return iterable @classmethod def get_supported_properties(cls): -- cgit 1.4.1