# This file is part of GAnarchy - decentralized project hub
# Copyright (C) 2019, 2020, 2024 Soni L.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""This module handles GAnarchy's data and config sources.
A data source can be either a config source or a repo list source, but be
careful: they use identical syntax, but have different semantics! Mistaking
a repo list source for a config source is a recipe for security bugs!
"""
import abc
import itertools
import os
import re
import time
import tomllib
import requests
from enum import Enum
from urllib.parse import urlparse
import ganarchy.dirs
# TODO move elsewhere
class _ValidationError(Exception):
# we have no idea how classes work in python anymore, it's been 2 years
pass
def _check_type(obj, ty):
if isinstance(obj, ty):
return obj
raise _ValidationError # TODO...
def _is_uri(obj, ports=range(1,65536), schemes=('https',)):
try:
u = urlparse(obj)
if not u:
return False
# also raises for invalid ports, see
# https://docs.python.org/3/library/urllib.parse.html#urllib.parse.urlparse
# "Reading the port attribute will raise a ValueError if an
# invalid port is specified in the URI. [...]"
if u.port is not None and u.port not in ports:
return False
if u.scheme not in schemes:
return False
except ValueError:
return False
return True
def _check_uri(obj, ports=range(1,65536), schemes=('https',)):
_check_type(obj, str)
if _is_uri(obj, ports, schemes):
return obj
raise _ValidationError # TODO...
_commit_pattern = re.compile(r"^[0-9a-fA-F]{40}$")
_commit_sha256_pattern = re.compile(r"^[0-9a-fA-F]{40}$|^[0-9a-fA-F]{64}$")
def _is_commit_id(obj, sha256ready=True):
if not isinstance(obj, str):
return False
if sha256ready:
return _commit_sha256_pattern.match(obj)
else:
return _commit_pattern.match(obj)
# TODO
#_MATCHER_ALIASES = abdl.compile("""->'project_settings':$dict
# ->commit/[0-9a-fA-F]{40}|[0-9a-fA-F]{64}/?:?$dict
# """, {'dict': dict}) # FIXME check for aliases, might require changes to abdl
# TODO
#_MATCHER_URI_FILTERS = abdl.compile("""->'uri_filters':$dict
# ->filter[:?$str]:?$dict
# (->'active'?:?$bool)""",
# dict(dict=dict, str=str, bool=bool))
class OverridableProperty(abc.ABC):
"""An overridable property, with options.
Attributes:
options (dict): Options.
"""
@abc.abstractmethod
def as_key(self):
"""Returns an opaque representation of this OverridablePRoperty
suitable for use as a dict key.
The returned object is not suitable for other purposes.
"""
return ()
@property
def active(self):
"""Whether this property is active.
"""
return self.options.get('active', False)
class PCTP(OverridableProperty):
"""A Project Commit-Tree Path.
Attributes:
project_commit (str): The project commit.
uri (str): The URI of a fork of the project.
branch (str): The branch name, or None for the default branch.
options (dict): A dict of fork-specific options.
"""
def __init__(self, project_commit, uri, branch, options):
self.project_commit = project_commit
self.uri = uri
if branch == "HEAD":
self.branch = None
else:
self.branch = branch
self.options = options
def as_key(self):
return (self.project_commit, self.uri, self.branch, )
@property
def federate(self):
return self.options.get('federate', True)
@property
def pinned(self):
return self.options.get('pinned', False)
class RepoListSource(OverridableProperty):
"""A source for a repo list.
Attributes:
uri (str): The URI of the repo list.
options (dict): A dict of repo list-specific options.
"""
def __init__(self, uri, options):
self.uri = uri
self.options = options
def as_key(self):
return (self.uri, )
class DataProperty(Enum):
"""Represents values that can be returned by a data source.
See documentation for DataSource get_property_value and
DataSource get_property_values for more details.
"""
INSTANCE_TITLE = (1, str)
INSTANCE_BASE_URI = (2, str)
VCS_REPOS = (3, PCTP)
REPO_LIST_SOURCES = (4, RepoListSource)
INSTANCE_FEDITO = (5, int)
def get_type(self):
"""Returns the expected type for values from this DataProperty.
"""
return self.value[1]
class PropertyError(LookupError):
"""Raised to indicate improper use of a DataProperty.
"""
pass
class DataSource(abc.ABC):
@abc.abstractmethod
def update(self):
"""Refreshes the data associated with this source, if necessary.
"""
pass
@abc.abstractmethod
def exists(self):
"""Returns whether this source has usable data.
"""
pass
@abc.abstractmethod
def get_supported_properties(self):
"""Returns an iterable of properties supported by this data source.
Returns:
Iterable of DataProperty: Supported properties.
"""
return ()
def get_property_value(self, prop):
"""Returns the value associated with the given property.
If duplicated, an earlier value should override a later value.
Args:
prop (DataProperty): The property.
Returns:
The value associated with the given property.
Raises:
PropertyError: If the property is not supported by this data
source.
LookupError: If the property is supported, but isn't available.
ValueError: If the property doesn't have exactly one value.
"""
iterator = self.get_property_values(prop)
try:
# note: unpacking
ret, = iterator
except LookupError as exc:
# don't accidentally swallow bugs in the iterator
raise RuntimeError from exc
return ret
@abc.abstractmethod
def get_property_values(self, prop):
"""Returns the values associated with the given property as an iterable.
If duplicated, earlier values should override later values.
Args:
prop (DataProperty): The property.
Returns:
The values associated with the given property.
Raises:
PropertyError: If the property is not supported by this data
source.
LookupError: If the property is supported, but isn't available.
"""
raise PropertyError
class DummyDataSource(DataSource):
"""A DataSource that provides nothing.
"""
class ObjectDataSource(DataSource):
"""A DataSource backed by a Python object.
Updates to the backing object will be immediately reflected in this
DataSource.
"""
@staticmethod
def _get_instance_title(obj):
result = obj.get('title')
if not isinstance(result, str):
raise _ValidationError
return [result]
@staticmethod
def _get_instance_base_uri(obj):
result = obj.get('base_url')
if not isinstance(result, str):
raise _ValidationError
if not result.isprintable() and not _is_uri(result):
raise _ValidationError
return [result]
@staticmethod
def _get_instance_fedito(obj):
result = obj.get('fedi-to')
if not isinstance(result, int):
raise _ValidationError
return [result]
@staticmethod
def _get_vcs_repos(obj):
projects = obj.get('projects')
if not isinstance(projects, dict):
raise _ValidationError
return (
PCTP(commit, uri, branch,
{k: v
for k, v in options.items()
if (k in {'active', 'federate', 'pinned'}
and isinstance(v, bool))
})
for (commit, uris) in projects.items()
if _is_commit_id(commit)
if isinstance(uris, dict)
for (uri, branches) in uris.items()
if isinstance(uri, str) and uri.isprintable() and _is_uri(uri)
if isinstance(branches, dict)
for (branch, options) in branches.items()
if branch is None or isinstance(branch, str)
and branch.isprintable()
if isinstance(options, dict)
and isinstance(options.get('active'), bool)
)
@staticmethod
def _get_repo_list_sources(obj):
sources = obj.get('repo_list_srcs')
if not isinstance(sources, dict):
raise _ValidationError
return (
RepoListSource(src, options)
for (src, options) in sources.items()
if isinstance(src, str)
and _is_uri(src, schemes=('https','file'))
if isinstance(options, dict)
and isinstance(options.get('active'), bool)
# TODO it would probably make sense to add
# options.get('type', 'toml') somewhere...
)
_SUPPORTED_PROPERTIES = {
DataProperty.INSTANCE_TITLE: _get_instance_title,
DataProperty.INSTANCE_BASE_URI: _get_instance_base_uri,
DataProperty.INSTANCE_FEDITO: _get_instance_fedito,
DataProperty.VCS_REPOS: _get_vcs_repos,
DataProperty.REPO_LIST_SOURCES: _get_repo_list_sources,
}
def __init__(self, obj):
self._obj = obj
def update(self):
pass
def exists(self):
return True
def get_property_values(self, prop):
try:
factory = self.get_supported_properties()[prop]
except KeyError as exc:
raise PropertyError from exc
try:
iterable = factory(self._obj)
except _ValidationError as exc:
raise LookupError from exc
return iterable
@classmethod
def get_supported_properties(cls):
return cls._SUPPORTED_PROPERTIES
class LocalDataSource(ObjectDataSource):
def __init__(self, filename):
super().__init__({})
self.file_exists = False
self.last_updated = None
self.filename = filename
def update(self):
try:
updtime = self.last_updated
self.last_updated = os.stat(self.filename).st_mtime
if not self.file_exists or updtime != self.last_updated:
with open(self.filename, 'rb') as f:
self._obj = tomllib.load(f)
self.file_exists = True
except (OSError, UnicodeDecodeError, tomllib.TOMLDecodeError) as e:
self.file_exists = False
self.last_updated = None
self._obj = {}
return e
def exists(self):
return self.file_exists
def __repr__(self):
return "LocalDataSource({!r})".format(self.filename)
class RemoteDataSource(ObjectDataSource):
def __init__(self, uri):
super().__init__({})
self.uri = uri
self.remote_exists = False
self.next_update = 0
def update(self):
if self.next_update > time.time():
return
# I long for the day when toml has a registered media type
# FIXME this should be JSON
# (also doesn't it have one nowadays? -- nvm, not a registered one :/)
response = requests.get(self.uri, headers={'user-agent': 'ganarchy/0.0.0', 'accept': '*/*'})
self.remote_exists = response.status_code == 200
seconds = 3600
if (refresh := response.headers.get('Refresh', None)) is not None:
try:
seconds = int(refresh)
except ValueError:
refresh = refresh.split(';', 1)
try:
seconds = int(refresh[0])
except ValueError:
pass
self.next_update = time.time() + seconds
if self.remote_exists:
response.encoding = 'utf-8'
try:
self._obj = tomllib.loads(response.text)
except (UnicodeDecodeError, tomllib.TOMLDecodeError) as e:
self._obj = {}
return e
else:
return response
def exists(self):
return self.remote_exists
def __repr__(self):
return "RemoteDataSource({!r})".format(self.uri)
class DefaultsDataSource(ObjectDataSource):
"""Provides a way for contributors to define/encourage some default
settings.
In particular, enables contributors to have a say in default domain
blocks.
"""
DEFAULTS = {}
def __init__(self):
super().__init__(self.DEFAULTS)
def exists(self):
return True
def update(self):
return
def __repr__(self):
return "DefaultsDataSource()"
class ConfigManager(DataSource):
"""A ConfigManager takes care of managing config sources and
collecting their details.
Args:
sources (list of DataSource): The config sources to be managed.
"""
def __init__(self, sources):
self.sources = sources
@classmethod
def new_default(cls):
srcs = [LocalDataSource(d + "/config.toml") for d in [ganarchy.dirs.CONFIG_HOME] + ganarchy.dirs.CONFIG_DIRS]
return cls(srcs + [DefaultsDataSource()])
def exists(self):
return True
def update(self):
excs = []
for source in self.sources:
excs.append(source.update())
return excs
def get_supported_properties(self):
return DataProperty
def get_property_values(self, prop):
if prop not in self.get_supported_properties():
raise PropertyError
elif prop == DataProperty.VCS_REPOS:
return self._get_vcs_repos()
elif prop == DataProperty.REPO_LIST_SOURCES:
return self._get_repo_list_sources()
else:
# short-circuiting, as these are only supposed to return a single value
for source in self.sources:
try:
return source.get_property_values(prop)
except PropertyError:
pass
except LookupError:
pass
raise LookupError
def _get_vcs_repos(self):
for source in self.sources:
if DataProperty.VCS_REPOS in source.get_supported_properties():
try:
iterator = source.get_property_values(DataProperty.VCS_REPOS)
except LookupError:
pass
else:
yield from iterator
def _get_repo_list_sources(self):
for source in self.sources:
if DataProperty.REPO_LIST_SOURCES in source.get_supported_properties():
try:
iterator = source.get_property_values(DataProperty.REPO_LIST_SOURCES)
except LookupError:
pass
else:
yield from iterator
class RepoListManager(DataSource):
"""A RepoListManager takes care of managing repo lists.
Args:
config_manager (DataSource): The config manager from which the repo
lists come.
"""
def __init__(self, config_manager):
self.config_manager = EffectiveSource(config_manager)
self.sources = [self.config_manager]
def exists(self):
return True
def update(self):
excs = [self.config_manager.update()]
if DataProperty.REPO_LIST_SOURCES in self.config_manager.get_supported_properties():
self.sources = [self.config_manager]
try:
it = self.config_manager.get_property_values(DataProperty.REPO_LIST_SOURCES)
except LookupError:
pass
else:
self.sources.extend(RemoteDataSource(rls.uri) for rls in it if rls.active)
for source in self.sources[1:]:
excs.append(source.update())
return excs
def get_supported_properties(self):
return {DataProperty.VCS_REPOS}
def get_property_values(self, prop):
if prop not in self.get_supported_properties():
raise PropertyError
assert prop == DataProperty.VCS_REPOS
return self._get_vcs_repos()
def _get_vcs_repos(self):
assert self.config_manager == self.sources[0]
try:
# config manager may override repo lists
iterator = self.config_manager.get_property_values(DataProperty.VCS_REPOS)
except (PropertyError, LookupError):
pass
else:
yield from iterator
for source in self.sources:
if DataProperty.VCS_REPOS in source.get_supported_properties():
try:
iterator = source.get_property_values(DataProperty.VCS_REPOS)
except LookupError:
pass
else:
for pctp in iterator:
# but repo lists aren't allowed to override anything
for filtered in ['federate', 'pinned']:
try:
del pctp.options[filtered]
except KeyError:
pass
if pctp.active:
yield pctp
class EffectiveSource(DataSource):
"""Wraps another ``DataSource`` and yields "unique" results suitable
for general use.
Methods on this class, in particular ``get_property_values``, handle
``OverridableProperty`` overrides both to avoid code duplication and
so the user doesn't have to.
Args:
raw_source (DataSource): The raw backing source.
"""
def __init__(self, raw_source):
self.raw_source = raw_source
def exists(self):
return self.raw_source.exists()
def update(self):
return self.raw_source.update()
def get_property_value(self, prop):
return self.raw_source.get_property_value(prop)
def get_supported_properties(self):
return self.raw_source.get_supported_properties()
def get_property_values(self, prop):
# must raise exceptions *now*
# not when the generator runs
return self._wrap_values(prop, self.raw_source.get_property_values(prop))
def _wrap_values(self, prop, it):
if issubclass(prop.get_type(), OverridableProperty):
seen = {}
for v in it:
k = v.as_key()
if k in seen:
continue
seen[k] = v
yield v
else:
yield from it
def __repr__(self):
return "EffectiveSource({!r})".format(self.raw_source)