# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
File system registry is a plain hierarchical file based locally-accessible structure.
"""
import abc
import functools
import logging
import pathlib
import shutil
import typing
import uuid
from forml import project as prj
from forml import setup
from forml.io import asset
LOGGER = logging.getLogger(__name__)
class Path(type(pathlib.Path())): # https://bugs.python.org/issue24132
"""Repository path utility."""
class Matcher(metaclass=abc.ABCMeta):
"""Registry component validator."""
@staticmethod
@abc.abstractmethod
def constructor(key: str) -> asset.Level.Key:
"""Level key constructor.
Args:
key: Key text value.
Returns:
Key instance.
"""
@classmethod
def key(cls, path: pathlib.Path) -> bool:
"""Check given path is a valid registry level key.
Args:
path: Path to be checked.
Returns:
True if valid.
"""
def constructs(name: str) -> bool:
try:
cls.constructor(name)
except (TypeError, ValueError):
return False
return True
return path.is_dir() and constructs(path.name)
@staticmethod
@abc.abstractmethod
def content(level: pathlib.Path) -> bool:
"""Test the level content is valid for given level.
Args:
level: Root path of given level
Returns:
True if valid.
"""
@classmethod
def valid(cls, path: pathlib.Path) -> bool:
"""Check given path is a valid level.
Args:
path: Path to be checked.
Returns:
True if valid.
"""
if not cls.key(path):
LOGGER.debug('Path %s is not a valid level key', path)
return False
if not cls.content(path):
LOGGER.debug('Path %s does not have a valid level content', path)
return False
return True
class Project(Matcher):
"""Project matcher."""
constructor = staticmethod(asset.Project.Key)
@staticmethod
def content(level: pathlib.Path) -> bool:
return any(Path.Release.valid(i) for i in level.iterdir())
class Release(Matcher):
"""Release matcher."""
constructor = staticmethod(asset.Release.Key)
@staticmethod
def content(level: pathlib.Path) -> bool:
return (level / Path.PKGFILE).exists()
class Generation(Matcher):
"""Generation matcher."""
constructor = staticmethod(asset.Generation.Key)
@staticmethod
def content(level: pathlib.Path) -> bool:
return (level / Path.TAGFILE).exists()
STAGEDIR = '.stage'
STATESFX = 'bin'
TAGFILE = 'tag.toml'
PKGFILE = f'package.{prj.Package.FORMAT}'
@functools.lru_cache
def project(self, project: asset.Project.Key) -> pathlib.Path:
"""Get the project directory path.
Args:
project: Name of the project.
Returns:
Project directory path.
"""
return self / project
@functools.lru_cache
def release(self, project: asset.Project.Key, release: asset.Release.Key) -> pathlib.Path:
"""Get the project directory path.
Args:
project: Name of the project.
release: Release key.
Returns:
Project directory path.
"""
return self.project(project) / str(release)
@functools.lru_cache
def generation(
self, project: asset.Project.Key, release: asset.Release.Key, generation: asset.Generation.Key
) -> pathlib.Path:
"""Get the project directory path.
Args:
project: Name of the project.
release: Release key.
generation: Generation key.
Returns:
Project directory path.
"""
return self.release(project, release) / str(generation)
@functools.lru_cache
def package(self, project: asset.Project.Key, release: asset.Release.Key) -> pathlib.Path:
"""Package file path of given project name/release.
Args:
project: Name of the project.
release: Release key.
Returns:
Package file path.
"""
return self.release(project, release) / self.PKGFILE
@functools.lru_cache
def state(
self,
sid: uuid.UUID,
project: asset.Project.Key,
release: asset.Release.Key,
generation: typing.Optional[asset.Generation.Key] = None,
) -> pathlib.Path:
"""State file path of given sid an project name.
Args:
project: Name of the project.
release: Release key.
generation: Generation key.
sid: State id.
Returns:
State file path.
"""
if generation is None:
generation = self.STAGEDIR
return self.generation(project, release, generation) / f'{sid}.{self.STATESFX}'
@functools.lru_cache
def tag(
self, project: asset.Project.Key, release: asset.Release.Key, generation: asset.Generation.Key
) -> pathlib.Path:
"""Tag file path of given project name.
Args:
project: Name of the project.
release: Release key.
generation: Generation key.
Returns:
Tag file path.
"""
return self.generation(project, release, generation) / self.TAGFILE
[docs]class Registry(asset.Registry, alias='posix'):
"""File-based registry backed by a locally-accessible posix file system.
Args:
path: Registry root file system location.
Defaults to :file:`$FORML_HOME/registry`.
staging: File system location reachable from all runner nodes to be used for
:ref:`package staging <registry-staging>` (defaults to :file:`<path>/.stage`).
The provider can be enabled using the following :ref:`platform configuration <platform-config>`:
.. code-block:: toml
:caption: config.toml
[REGISTRY.devrepo]
provider = "posix"
path = "/mnt/forml/dev/repo/"
"""
def __init__(
self,
path: typing.Union[str, pathlib.Path] = setup.USRDIR / 'registry',
staging: typing.Optional[typing.Union[str, pathlib.Path]] = None,
):
path = pathlib.Path(path).resolve()
super().__init__(staging or path / Path.STAGEDIR)
self._path: Path = Path(path)
@staticmethod
def _listing(path: pathlib.Path, matcher: type[Path.Matcher]) -> typing.Iterable:
"""Helper for listing given repository level.
Args:
path: Path to be listed.
matcher: Item matcher.
Returns:
Repository level listing.
"""
try:
return [matcher.constructor(p.name) for p in path.iterdir() if matcher.valid(p)]
except NotADirectoryError as err:
raise asset.Level.Invalid(f'Path {path} is not a valid registry component') from err
except FileNotFoundError:
return ()
def projects(self) -> typing.Iterable[asset.Project.Key]:
return self._listing(self._path, Path.Project)
def releases(self, project: asset.Project.Key) -> typing.Iterable[asset.Release.Key]:
return self._listing(self._path.project(project), Path.Release)
def generations(
self, project: asset.Project.Key, release: asset.Release.Key
) -> typing.Iterable[asset.Generation.Key]:
return self._listing(self._path.release(project, release), Path.Generation)
def pull(self, project: asset.Project.Key, release: asset.Release.Key) -> 'prj.Package':
return prj.Package(self._path.package(project, release))
def push(self, package: 'prj.Package') -> None:
project = package.manifest.name
release = package.manifest.version
path = self._path.package(project, release)
path.parent.mkdir(parents=True, exist_ok=True)
if package.path.is_dir():
shutil.copytree(package.path, path, ignore=lambda *_: {'__pycache__'})
else:
assert package.path.is_file(), 'Expecting file package'
path.write_bytes(package.path.read_bytes())
def read(
self,
project: asset.Project.Key,
release: asset.Release.Key,
generation: asset.Generation.Key,
sid: uuid.UUID,
) -> bytes:
path = self._path.state(sid, project, release, generation)
LOGGER.debug('Reading state from %s', path)
if not path.parent.exists():
raise asset.Level.Invalid(f'Invalid registry component {project}/{release}/{generation}')
try:
with path.open('rb') as statefile:
return statefile.read()
except FileNotFoundError:
LOGGER.warning('No state %s under %s', sid, path)
return b''
def write(self, project: asset.Project.Key, release: asset.Release.Key, sid: uuid.UUID, state: bytes) -> None:
path = self._path.state(sid, project, release)
LOGGER.debug('Staging state of %d bytes to %s', len(state), path)
path.parent.mkdir(parents=True, exist_ok=True)
with path.open('wb') as statefile:
statefile.write(state)
def open(
self, project: asset.Project.Key, release: asset.Release.Key, generation: asset.Generation.Key
) -> asset.Tag:
path = self._path.tag(project, release, generation)
try:
with path.open('rb') as tagfile:
return asset.Tag.loads(tagfile.read())
except FileNotFoundError as err:
raise asset.Level.Listing.Empty(f'No tag under {path}') from err
def close(
self,
project: asset.Project.Key,
release: asset.Release.Key,
generation: asset.Generation.Key,
tag: asset.Tag,
) -> None:
path = self._path.tag(project, release, generation)
LOGGER.debug('Committing states of tag %s as %s', tag, path)
path.parent.mkdir(parents=True, exist_ok=True)
for sid in tag.states:
source = self._path.state(sid, project, release)
if not source.exists():
raise asset.Level.Invalid(f'State {sid} not staged')
target = self._path.state(sid, project, release, generation)
source.rename(target)
with path.open('wb') as tagfile:
tagfile.write(tag.dumps())