summary refs log blame commit diff stats
path: root/ganarchy/data.py
blob: 4d675cbac67a17acca377f2382058ef0495fbfd4 (plain) (tree)
1
2
                                                           
                                         

























                                                                             
              
 




                                 

                    
                     


                                                                           
 








                                                           
                        


                                                                                   
                                                      
















                                                                           


                                         



                                                
 




                                                                                                                     





                                                                         



























                                                                       
                                                                      





                                                             


                              
                                




                                                             



                                                 



                                                





















                                                               
                                

                                           
                              






















































                                                                           


                                                             



                                        
                                                                                





                                                                   
                









                                                                           



                                          





                                                                       
































































                                                                          
                             
                                                         
                                                               


                                                               
     












                                                           

                                        
            
                                         

                                       
                       
















                                                                    

                                                    
                                   
                                                                           





















                                                                  

                                                                              















                                                                                                    

                                                                      












































                                                                       
                                                                                                                     
                                                 










































































                                                                                            
                                       









                                                       
                                    
 
                             
                                                     






                                                                                      








                                                                                 




                                                               















































                                                                                 
# This file is part of GAnarchy - decentralized project hub
# Copyright (C) 2019, 2020, 2024  Soni L.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.

"""This module handles GAnarchy's data and config sources.

A data source can be either a config source or a repo list source, but be
careful: they use identical syntax, but have different semantics! Mistaking
a repo list source for a config source is a recipe for security bugs!
"""

import abc
import itertools
import os
import re
import time
import tomllib

import requests

from enum import Enum
from urllib.parse import urlparse

import ganarchy.dirs

# TODO move elsewhere
class _ValidationError(Exception):
    # we have no idea how classes work in python anymore, it's been 2 years
    pass

def _check_type(obj, ty):
    if isinstance(obj, ty):
        return obj
    raise _ValidationError # TODO...

def _is_uri(obj, ports=range(1,65536), schemes=('https',)):
    try:
        u = urlparse(obj)
        if not u:
            return False
        # also raises for invalid ports, see
        # https://docs.python.org/3/library/urllib.parse.html#urllib.parse.urlparse
        # "Reading the port attribute will raise a ValueError if an
        # invalid port is specified in the URI. [...]"
        if u.port is not None and u.port not in ports:
            return False
        if u.scheme not in schemes:
            return False
    except ValueError:
        return False
    return True

def _check_uri(obj, ports=range(1,65536), schemes=('https',)):
    _check_type(obj, str)
    if _is_uri(obj, ports, schemes):
        return obj
    raise _ValidationError # TODO...

_commit_pattern = re.compile(r"^[0-9a-fA-F]{40}$")
_commit_sha256_pattern = re.compile(r"^[0-9a-fA-F]{40}$|^[0-9a-fA-F]{64}$")

def _is_commit_id(obj, sha256ready=True):
    if not isinstance(obj, str):
        return False
    if sha256ready:
        return _commit_sha256_pattern.match(obj)
    else:
        return _commit_pattern.match(obj)

# TODO
#_MATCHER_ALIASES = abdl.compile("""->'project_settings':$dict
#                                     ->commit/[0-9a-fA-F]{40}|[0-9a-fA-F]{64}/?:?$dict
#                                       """, {'dict': dict}) # FIXME check for aliases, might require changes to abdl

# TODO
#_MATCHER_URI_FILTERS = abdl.compile("""->'uri_filters':$dict
#                                         ->filter[:?$str]:?$dict
#                                           (->'active'?:?$bool)""",
#                                    dict(dict=dict, str=str, bool=bool))

class OverridableProperty(abc.ABC):
    """An overridable property, with options.

    Attributes:
        options (dict): Options.
    """

    @abc.abstractmethod
    def as_key(self):
        """Returns an opaque representation of this OverridablePRoperty
        suitable for use as a dict key.

        The returned object is not suitable for other purposes.
        """
        return ()

    @property
    def active(self):
        """Whether this property is active.
        """
        return self.options.get('active', False)

class PCTP(OverridableProperty):
    """A Project Commit-Tree Path.

    Attributes:
        project_commit (str): The project commit.
        uri (str): The URI of a fork of the project.
        branch (str): The branch name, or None for the default branch.
        options (dict): A dict of fork-specific options.
    """

    def __init__(self, project_commit, uri, branch, options):
        self.project_commit = project_commit
        self.uri = uri
        if branch == "HEAD":
            self.branch = None
        else:
            self.branch = branch
        self.options = options

    def as_key(self):
        return (self.project_commit, self.uri, self.branch, )

    @property
    def federate(self):
        return self.options.get('federate', True)

    @property
    def pinned(self):
        return self.options.get('pinned', False)

class RepoListSource(OverridableProperty):
    """A source for a repo list.

    Attributes:
        uri (str): The URI of the repo list.
        options (dict): A dict of repo list-specific options.
    """

    def __init__(self, uri, options):
        self.uri = uri
        self.options = options

    def as_key(self):
        return (self.uri, )

class DataProperty(Enum):
    """Represents values that can be returned by a data source.

    See documentation for DataSource get_property_value and
    DataSource get_property_values for more details.
    """
    INSTANCE_TITLE = (1, str)
    INSTANCE_BASE_URI = (2, str)
    VCS_REPOS = (3, PCTP)
    REPO_LIST_SOURCES = (4, RepoListSource)
    INSTANCE_FEDITO = (5, int)

    def get_type(self):
        """Returns the expected type for values from this DataProperty.
        """
        return self.value[1]

class PropertyError(LookupError):
    """Raised to indicate improper use of a DataProperty.
    """
    pass

class DataSource(abc.ABC):
    @abc.abstractmethod
    def update(self):
        """Refreshes the data associated with this source, if necessary.
        """
        pass

    @abc.abstractmethod
    def exists(self):
        """Returns whether this source has usable data.
        """
        pass

    @abc.abstractmethod
    def get_supported_properties(self):
        """Returns an iterable of properties supported by this data source.

        Returns:
            Iterable of DataProperty: Supported properties.

        """
        return ()

    def get_property_value(self, prop):
        """Returns the value associated with the given property.

        If duplicated, an earlier value should override a later value.

        Args:
            prop (DataProperty): The property.

        Returns:
            The value associated with the given property.

        Raises:
            PropertyError: If the property is not supported by this data
            source.
            LookupError: If the property is supported, but isn't available.
            ValueError: If the property doesn't have exactly one value.
        """
        iterator = self.get_property_values(prop)
        try:
            # note: unpacking
            ret, = iterator
        except LookupError as exc:
            # don't accidentally swallow bugs in the iterator
            raise RuntimeError from exc
        return ret

    @abc.abstractmethod
    def get_property_values(self, prop):
        """Returns the values associated with the given property as an iterable.

        If duplicated, earlier values should override later values.

        Args:
            prop (DataProperty): The property.

        Returns:
            The values associated with the given property.

        Raises:
            PropertyError: If the property is not supported by this data
            source.
            LookupError: If the property is supported, but isn't available.

        """
        raise PropertyError

class DummyDataSource(DataSource):
    """A DataSource that provides nothing.
    """

class ObjectDataSource(DataSource):
    """A DataSource backed by a Python object.

    Updates to the backing object will be immediately reflected in this
    DataSource.
    """

    @staticmethod
    def _get_instance_title(obj):
        result = obj.get('title')
        if not isinstance(result, str):
            raise _ValidationError
        return [result]

    @staticmethod
    def _get_instance_base_uri(obj):
        result = obj.get('base_url')
        if not isinstance(result, str):
            raise _ValidationError
        if not result.isprintable() and not _is_uri(result):
            raise _ValidationError
        return [result]

    @staticmethod
    def _get_instance_fedito(obj):
        result = obj.get('fedi-to')
        if not isinstance(result, int):
            raise _ValidationError
        return [result]

    @staticmethod
    def _get_vcs_repos(obj):
        projects = obj.get('projects')
        if not isinstance(projects, dict):
            raise _ValidationError
        return (
            PCTP(commit, uri, branch,
                 {k: v
                  for k, v in options.items()
                  if (k in {'active', 'federate', 'pinned'}
                      and isinstance(v, bool))
                 })
            for (commit, uris) in projects.items()
            if _is_commit_id(commit)
            if isinstance(uris, dict)
            for (uri, branches) in uris.items()
            if isinstance(uri, str) and uri.isprintable() and _is_uri(uri)
            if isinstance(branches, dict)
            for (branch, options) in branches.items()
            if branch is None or isinstance(branch, str)
            and branch.isprintable()
            if isinstance(options, dict)
            and isinstance(options.get('active'), bool)
        )

    @staticmethod
    def _get_repo_list_sources(obj):
        sources = obj.get('repo_list_srcs')
        if not isinstance(sources, dict):
            raise _ValidationError
        return (
            RepoListSource(src, options)
            for (src, options) in sources.items()
            if isinstance(src, str)
            and _is_uri(src, schemes=('https','file'))
            if isinstance(options, dict)
            and isinstance(options.get('active'), bool)
            # TODO it would probably make sense to add
            # options.get('type', 'toml') somewhere...
        )

    _SUPPORTED_PROPERTIES = {
        DataProperty.INSTANCE_TITLE: _get_instance_title,
        DataProperty.INSTANCE_BASE_URI: _get_instance_base_uri,
        DataProperty.INSTANCE_FEDITO: _get_instance_fedito,
        DataProperty.VCS_REPOS: _get_vcs_repos,
        DataProperty.REPO_LIST_SOURCES: _get_repo_list_sources,
    }

    def __init__(self, obj):
        self._obj = obj

    def update(self):
        pass

    def exists(self):
        return True

    def get_property_values(self, prop):
        try:
            factory = self.get_supported_properties()[prop]
        except KeyError as exc:
            raise PropertyError from exc
        try:
            iterable = factory(self._obj)
        except _ValidationError as exc:
            raise LookupError from exc
        return iterable

    @classmethod
    def get_supported_properties(cls):
        return cls._SUPPORTED_PROPERTIES

class LocalDataSource(ObjectDataSource):
    def __init__(self, filename):
        super().__init__({})
        self.file_exists = False
        self.last_updated = None
        self.filename = filename

    def update(self):
        try:
            updtime = self.last_updated
            self.last_updated = os.stat(self.filename).st_mtime
            if not self.file_exists or updtime != self.last_updated:
                with open(self.filename, 'rb') as f:
                    self._obj = tomllib.load(f)
            self.file_exists = True
        except (OSError, UnicodeDecodeError, tomllib.TOMLDecodeError) as e:
            self.file_exists = False
            self.last_updated = None
            self._obj = {}
            return e

    def exists(self):
        return self.file_exists

    def __repr__(self):
        return "LocalDataSource({!r})".format(self.filename)

class RemoteDataSource(ObjectDataSource):
    def __init__(self, uri):
        super().__init__({})
        self.uri = uri
        self.remote_exists = False
        self.next_update = 0

    def update(self):
        if self.next_update > time.time():
            return
        # I long for the day when toml has a registered media type
        # FIXME this should be JSON
        # (also doesn't it have one nowadays? -- nvm, not a registered one :/)
        response = requests.get(self.uri, headers={'user-agent': 'ganarchy/0.0.0', 'accept': '*/*'})
        self.remote_exists = response.status_code == 200
        seconds = 3600
        if (refresh := response.headers.get('Refresh', None)) is not None:
            try:
                seconds = int(refresh)
            except ValueError:
                refresh = refresh.split(';', 1)
                try:
                    seconds = int(refresh[0])
                except ValueError:
                    pass
        self.next_update = time.time() + seconds
        if self.remote_exists:
            response.encoding = 'utf-8'
            try:
                self._obj = tomllib.loads(response.text)
            except (UnicodeDecodeError, tomllib.TOMLDecodeError) as e:
                self._obj = {}
                return e
        else:
            return response

    def exists(self):
        return self.remote_exists

    def __repr__(self):
        return "RemoteDataSource({!r})".format(self.uri)

class DefaultsDataSource(ObjectDataSource):
    """Provides a way for contributors to define/encourage some default
    settings.

    In particular, enables contributors to have a say in default domain
    blocks.
    """
    DEFAULTS = {}

    def __init__(self):
        super().__init__(self.DEFAULTS)

    def exists(self):
        return True

    def update(self):
        return

    def __repr__(self):
        return "DefaultsDataSource()"


class ConfigManager(DataSource):
    """A ConfigManager takes care of managing config sources and
    collecting their details.

    Args:
        sources (list of DataSource): The config sources to be managed.
    """
    def __init__(self, sources):
        self.sources = sources

    @classmethod
    def new_default(cls):
        srcs = [LocalDataSource(d + "/config.toml") for d in [ganarchy.dirs.CONFIG_HOME] + ganarchy.dirs.CONFIG_DIRS]
        return cls(srcs + [DefaultsDataSource()])

    def exists(self):
        return True

    def update(self):
        excs = []
        for source in self.sources:
            excs.append(source.update())
        return excs

    def get_supported_properties(self):
        return DataProperty

    def get_property_values(self, prop):
        if prop not in self.get_supported_properties():
            raise PropertyError
        elif prop == DataProperty.VCS_REPOS:
            return self._get_vcs_repos()
        elif prop == DataProperty.REPO_LIST_SOURCES:
            return self._get_repo_list_sources()
        else:
            # short-circuiting, as these are only supposed to return a single value
            for source in self.sources:
                try:
                    return source.get_property_values(prop)
                except PropertyError:
                    pass
                except LookupError:
                    pass
            raise LookupError

    def _get_vcs_repos(self):
        for source in self.sources:
            if DataProperty.VCS_REPOS in source.get_supported_properties():
                try:
                    iterator = source.get_property_values(DataProperty.VCS_REPOS)
                except LookupError:
                    pass
                else:
                    yield from iterator

    def _get_repo_list_sources(self):
        for source in self.sources:
            if DataProperty.REPO_LIST_SOURCES in source.get_supported_properties():
                try:
                    iterator = source.get_property_values(DataProperty.REPO_LIST_SOURCES)
                except LookupError:
                    pass
                else:
                    yield from iterator

class RepoListManager(DataSource):
    """A RepoListManager takes care of managing repo lists.

    Args:
        config_manager (DataSource): The config manager from which the repo
            lists come.
    """
    def __init__(self, config_manager):
        self.config_manager = EffectiveSource(config_manager)
        self.sources = [self.config_manager]

    def exists(self):
        return True

    def update(self):
        excs = [self.config_manager.update()]
        if DataProperty.REPO_LIST_SOURCES in self.config_manager.get_supported_properties():
            self.sources = [self.config_manager]
            try:
                it = self.config_manager.get_property_values(DataProperty.REPO_LIST_SOURCES)
            except LookupError:
                pass
            else:
                self.sources.extend(RemoteDataSource(rls.uri) for rls in it if rls.active)
        for source in self.sources[1:]:
            excs.append(source.update())
        return excs

    def get_supported_properties(self):
        return {DataProperty.VCS_REPOS}

    def get_property_values(self, prop):
        if prop not in self.get_supported_properties():
            raise PropertyError
        assert prop == DataProperty.VCS_REPOS
        return self._get_vcs_repos()

    def _get_vcs_repos(self):
        assert self.config_manager == self.sources[0]
        try:
            # config manager may override repo lists
            iterator = self.config_manager.get_property_values(DataProperty.VCS_REPOS)
        except (PropertyError, LookupError):
            pass
        else:
            yield from iterator
        for source in self.sources:
            if DataProperty.VCS_REPOS in source.get_supported_properties():
                try:
                    iterator = source.get_property_values(DataProperty.VCS_REPOS)
                except LookupError:
                    pass
                else:
                    for pctp in iterator:
                        # but repo lists aren't allowed to override anything
                        for filtered in ['federate', 'pinned']:
                            try:
                                del pctp.options[filtered]
                            except KeyError:
                                pass
                        if pctp.active:
                            yield pctp

class EffectiveSource(DataSource):
    """Wraps another ``DataSource`` and yields "unique" results suitable
    for general use.

    Methods on this class, in particular ``get_property_values``, handle
    ``OverridableProperty`` overrides both to avoid code duplication and
    so the user doesn't have to.

    Args:
        raw_source (DataSource): The raw backing source.
    """
    def __init__(self, raw_source):
        self.raw_source = raw_source

    def exists(self):
        return self.raw_source.exists()

    def update(self):
        return self.raw_source.update()

    def get_property_value(self, prop):
        return self.raw_source.get_property_value(prop)

    def get_supported_properties(self):
        return self.raw_source.get_supported_properties()

    def get_property_values(self, prop):
        # must raise exceptions *now*
        # not when the generator runs
        return self._wrap_values(prop, self.raw_source.get_property_values(prop))

    def _wrap_values(self, prop, it):
        if issubclass(prop.get_type(), OverridableProperty):
            seen = {}
            for v in it:
                k = v.as_key()
                if k in seen:
                    continue
                seen[k] = v
                yield v
        else:
            yield from it

    def __repr__(self):
        return "EffectiveSource({!r})".format(self.raw_source)