summary refs log tree commit diff stats
path: root/ganarchy
diff options
context:
space:
mode:
authorSoniEx2 <endermoneymod@gmail.com>2024-02-07 20:59:05 -0300
committerSoniEx2 <endermoneymod@gmail.com>2024-02-07 20:59:05 -0300
commit2a344c63c4145c2c96ac2096dcf02d64694a3dc8 (patch)
tree37eac63acfa50e233ea623cd7bfc26a9df4bd3de /ganarchy
parent2f5654e49e2aae96b2a1bc0f5b6a1b784a00ff01 (diff)
Handle untrusted input more better
Diffstat (limited to 'ganarchy')
-rw-r--r--ganarchy/data.py136
1 files changed, 81 insertions, 55 deletions
diff --git a/ganarchy/data.py b/ganarchy/data.py
index 34d5a2a..c803b9b 100644
--- a/ganarchy/data.py
+++ b/ganarchy/data.py
@@ -71,7 +71,9 @@ def _check_uri(obj, ports=range(1,65536), schemes=('https',)):
 _commit_pattern = re.compile(r"^[0-9a-fA-F]{40}$")
 _commit_sha256_pattern = re.compile(r"^[0-9a-fA-F]{40}$|^[0-9a-fA-F]{64}$")
 
-def _is_commit(obj, sha256ready=True):
+def _is_commit_id(obj, sha256ready=True):
+    if not isinstance(obj, str):
+        return False
     if sha256ready:
         return _commit_sha256_pattern.match(obj)
     else:
@@ -126,7 +128,7 @@ class PCTP(OverridableProperty):
         if branch == "HEAD":
             self.branch = None
         else:
-            self.branch = branch or None
+            self.branch = branch
         self.options = options
 
     def as_key(self):
@@ -221,19 +223,21 @@ class DataSource(abc.ABC):
         try:
             # note: unpacking
             ret, = iterator
-        except LookupError as exc: raise RuntimeError from exc  # don't accidentally swallow bugs in the iterator
+        except LookupError as exc:
+            # don't accidentally swallow bugs in the iterator
+            raise RuntimeError from exc
         return ret
 
     @abc.abstractmethod
     def get_property_values(self, prop):
-        """Yields the values associated with the given property.
+        """Returns the values associated with the given property as an iterable.
 
         If duplicated, earlier values should override later values.
 
         Args:
             prop (DataProperty): The property.
 
-        Yields:
+        Returns:
             The values associated with the given property.
 
         Raises:
@@ -254,49 +258,77 @@ class ObjectDataSource(DataSource):
     Updates to the backing object will be immediately reflected in this
     DataSource.
     """
-    # these must all be generators...
+
+    @staticmethod
+    def _get_instance_title(obj):
+        result = obj.get('title')
+        if not isinstance(result, str):
+            raise _ValidationError
+        return [result]
+
+    @staticmethod
+    def _get_instance_base_uri(obj):
+        result = obj.get('base_url')
+        if not isinstance(result, str):
+            raise _ValidationError
+        if not result.isprintable() and not _is_uri(result):
+            raise _ValidationError
+        return [result]
+
+    @staticmethod
+    def _get_instance_fedito(obj):
+        result = obj.get('fedi-to')
+        if not isinstance(result, int):
+            raise _ValidationError
+        return [result]
+
+    @staticmethod
+    def _get_vcs_repos(obj):
+        projects = obj.get('projects')
+        if not isinstance(projects, dict):
+            raise _ValidationError
+        return (
+            PCTP(commit, uri, branch,
+                 {k: v
+                  for k, v in options.items()
+                  if (k in {'active', 'federate', 'pinned'}
+                      and isinstance(v, bool))
+                 })
+            for (commit, uris) in projects.items()
+            if _is_commit_id(commit)
+            if isinstance(uris, dict)
+            for (uri, branches) in uris.items()
+            if isinstance(uri, str) and uri.isprintable() and _is_uri(uri)
+            if isinstance(branches, dict)
+            for (branch, options) in branches.items()
+            if branch is None or isinstance(branch, str)
+            and branch.isprintable()
+            if isinstance(options, dict)
+            and isinstance(options.get('active'), bool)
+        )
+
+    @staticmethod
+    def _get_repo_list_sources(obj):
+        sources = obj.get('repo_list_srcs')
+        if not isinstance(sources, dict):
+            raise _ValidationError
+        return (
+            RepoListSource(src, options)
+            for (src, options) in sources.items()
+            if isinstance(src, str)
+            and _is_uri(src, schemes=('https','file'))
+            if isinstance(options, dict)
+            and isinstance(options.get('active'), bool)
+            # TODO it would probably make sense to add
+            # options.get('type', 'toml') somewhere...
+        )
+
     _SUPPORTED_PROPERTIES = {
-        DataProperty.INSTANCE_TITLE:
-            lambda obj: (yield _check_type(obj.get('title'), str)),
-        DataProperty.INSTANCE_BASE_URL:
-            lambda obj: (yield _check_uri(obj.get('base_url'))),
-        DataProperty.INSTANCE_FEDITO:
-            lambda obj: (yield _check_type(obj.get('fedi-to'), int)),
-        DataProperty.VCS_REPOS:
-            lambda obj: (
-                PCTP(commit, uri, branch,
-                     {k: v
-                      for k, v in options.items()
-                      if (k in {'active', 'federate', 'pinned'}
-                          and isinstance(v, bool))
-                     })
-                for lazy_obj in (obj,)
-                for (commit, uris) in _check_type(lazy_obj.get('projects'),
-                                                  dict).items()
-                if isinstance(commit, str)
-                if _is_commit(commit)
-                if isinstance(uris, dict)
-                for (uri, branches) in uris.items()
-                if isinstance(uri, str)
-                if _is_uri(uri)
-                if isinstance(branches, dict)
-                for (branch, options) in branches.items()
-                if isinstance(options, dict)
-                if isinstance(options.get('active'), bool)
-            ),
-        DataProperty.REPO_LIST_SOURCES:
-            lambda obj: (
-                RepoListSource(src, options)
-                for lazy_obj in (obj,)
-                for (src, options) in _check_type(lazy_obj.get('repo_list_srcs'),
-                                                  dict).items()
-                if isinstance(src, str)
-                if _is_uri(src, schemes=('https','file'))
-                if isinstance(options, dict)
-                if isinstance(options.get('active'), bool)
-                # TODO it would probably make sense to add
-                # options.get('type', 'toml') somewhere...
-            ),
+        DataProperty.INSTANCE_TITLE: _get_instance_title,
+        DataProperty.INSTANCE_BASE_URL: _get_instance_base_uri,
+        DataProperty.INSTANCE_FEDITO: _get_instance_fedito,
+        DataProperty.VCS_REPOS: _get_vcs_repos,
+        DataProperty.REPO_LIST_SOURCES: _get_repo_list_sources,
     }
 
     def __init__(self, obj):
@@ -313,17 +345,11 @@ class ObjectDataSource(DataSource):
             factory = self.get_supported_properties()[prop]
         except KeyError as exc:
             raise PropertyError from exc
-        iterator = factory(self._obj)
         try:
-            first = next(iterator)
-        except StopIteration:
-            return (x for x in ())
+            iterable = factory(self._obj)
         except _ValidationError as exc:
             raise LookupError from exc
-        except LookupError as exc:
-            # don't accidentally swallow bugs in the iterator
-            raise RuntimeError from exc
-        return itertools.chain([first], iterator)
+        return iterable
 
     @classmethod
     def get_supported_properties(cls):