Source code for debusine.db.models.workspaces

# Copyright 2019, 2021-2024 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""Data models for db workspaces."""

import re
from collections.abc import Sequence
from datetime import timedelta
from typing import Any, Generic, TYPE_CHECKING, TypeAlias, TypeVar, Union, cast

from django.conf import settings
from django.contrib.auth.models import AnonymousUser
from django.core.exceptions import PermissionDenied, ValidationError
from django.db import models
from django.db.models import Case, Q, QuerySet, UniqueConstraint, When

from debusine.db.context import ContextConsistencyError, context
from debusine.db.models import permissions
from debusine.db.models.auth import Group
from debusine.db.models.files import File, FileStore
from debusine.db.models.permissions import (
    PermissionUser,
    ROLES,
    permission_check,
    permission_filter,
)
from debusine.db.models.scopes import Scope
from debusine.utils.typing_utils import copy_signature_from

if TYPE_CHECKING:
    from django_stubs_ext.db.models import TypedModelMeta

    from debusine.db.models import Collection, User
else:
    TypedModelMeta = object

A = TypeVar("A")

#: Workspace names reserved for use in toplevel URL path components
RESERVED_WORKSPACE_NAMES = frozenset(
    # TODO: trim after !1274
    (
        "accounts",
        "artifact",
        "task-status",
        "user",
        "workers",
        "work-request",
        "workspaces",
    )
)

#: Regexp matching the structure of workspace names
workspace_name_regex = re.compile(r"^[A-Za-z][A-Za-z0-9+._-]*$")


def is_valid_workspace_name(value: str) -> bool:
    """Check if value is a valid scope name."""
    if value in RESERVED_WORKSPACE_NAMES:
        return False
    return bool(workspace_name_regex.match(value))


def validate_workspace_name(value: str) -> None:
    """Validate workspace names."""
    if not is_valid_workspace_name(value):
        raise ValidationError(
            "%(value)r is not a valid workspace name", params={"value": value}
        )


class WorkspaceQuerySet(QuerySet["Workspace", A], Generic[A]):
    """Custom QuerySet for Workspace."""

    def in_current_scope(self) -> "WorkspaceQuerySet[A]":
        """Filter to workspaces in the current scope."""
        if context.scope is None:
            raise ContextConsistencyError("scope is not set")
        return self.filter(scope=context.scope)

    @permission_filter
    def can_display(self, user: PermissionUser) -> "WorkspaceQuerySet[A]":
        """Keep only Scopes that can be displayed."""
        assert user is not None  # Enforced by decorator
        # Workers can currently access all workspaces
        # TODO: see #523
        if context.worker_token:
            return self
        constraints = Q(public=True)
        # This is the same check done in Workspace.set_current, and if changed
        # they need to be kept in sync
        if user.is_authenticated:
            # Delegate to scope ownership
            owned_scopes = Scope.objects.filter(ROLES(user, Scope.Roles.OWNER))
            constraints |= Q(scope__in=owned_scopes)
            # Add owned workspaces
            constraints |= ROLES(user, Workspace.Roles.OWNER)
        return self.filter(constraints)


class WorkspaceManager(models.Manager["Workspace"]):
    """Manager for Workspace model."""

    def get_roles_model(self) -> type["WorkspaceRole"]:
        """Get the model used for role assignment."""
        return WorkspaceRole

    def get_queryset(self) -> WorkspaceQuerySet[Any]:
        """Use the custom QuerySet."""
        return WorkspaceQuerySet(self.model, using=self._db)


DEFAULT_WORKSPACE_NAME = "System"


[docs]def default_workspace() -> "Workspace": """Return the default Workspace.""" return Workspace.objects.get( scope__name=settings.DEBUSINE_DEFAULT_SCOPE, name=DEFAULT_WORKSPACE_NAME )
class WorkspaceRoles(permissions.Roles): """Available roles for a Workspace.""" OWNER = "owner", "Owner"
[docs]class Workspace(models.Model): """Workspace model.""" Roles: TypeAlias = WorkspaceRoles objects = WorkspaceManager.from_queryset(WorkspaceQuerySet)() name = models.CharField( max_length=255, validators=[validate_workspace_name] ) default_file_store = models.ForeignKey( FileStore, on_delete=models.PROTECT, related_name="default_workspaces" ) other_file_stores = models.ManyToManyField( FileStore, related_name="other_workspaces" ) public = models.BooleanField(default=False) default_expiration_delay = models.DurationField( default=timedelta(0), help_text="minimal time that a new artifact is kept in the" " workspace before being expired", ) inherits = models.ManyToManyField( "db.Workspace", through="db.WorkspaceChain", through_fields=("child", "parent"), related_name="inherited_by", ) scope = models.ForeignKey( Scope, on_delete=models.PROTECT, related_name="workspaces" ) class Meta(TypedModelMeta): constraints = [ UniqueConstraint( fields=["scope", "name"], name="%(app_label)s_%(class)s_unique_scope_name", ), ]
[docs] @copy_signature_from(models.Model.save) def save(self, **kwargs: Any) -> None: """Wrap save with permission checks.""" from debusine.db.context import context if self._state.adding: # Create if not self.scope.can_create_workspace(context.user): raise PermissionDenied( f"{context.user} cannot create workspaces in {self.scope}" ) else: # Update ... # TODO: check for update permissions return super().save(**kwargs)
[docs] def set_current(self) -> None: """ Set this as the current workspace. This needs to be called after ``context.set_scope`` and ``context.set_user``. """ if (old_workspace := context.workspace) is not None: raise ContextConsistencyError( f"Workspace was already set to {old_workspace}" ) if (scope := context.scope) is None: raise ContextConsistencyError("Cannot set workspace before scope") if self.scope != scope: raise ContextConsistencyError( f"workspace scope {self.scope.name!r}" f" does not match current scope {scope.name!r}" ) if (user := context.user) is None: if context.worker_token: user = AnonymousUser() else: raise ContextConsistencyError( "Cannot set workspace before user" ) workspace_roles: frozenset[Workspace.Roles] if not user.is_authenticated and context.worker_token: workspace_roles = frozenset() else: workspace_roles = frozenset(self.get_roles(user)) # Check workspace visibility. This is the same as the can_display # predicate, and if changed they need to be kept in sync if ( self.public or context.worker_token or Scope.Roles.OWNER in context.scope_roles or Workspace.Roles.OWNER in workspace_roles ): context._workspace.set(self) context._workspace_roles.set(workspace_roles) else: raise ContextConsistencyError( f"User {user} cannot access workspace {self}" )
[docs] @permission_check def can_display(self, user: PermissionUser) -> bool: """Check if the workspace can be displayed.""" assert user is not None # enforced by decorator # Shortcuts to avoid hitting the database for common cases if self.public: return True # Workers can currently access all workspaces # TODO: see #523 if context.worker_token: return True if user == context.user: if self == context.workspace: # Already checked in set_current return True elif ( self.scope == context.scope and Scope.Roles.OWNER in context.scope_roles ): # We can delegate to roles for current scope return True return Workspace.objects.can_display(user).filter(pk=self.pk).exists()
[docs] def assign_role( self, role: "WorkspaceRole.Roles", group: "Group" ) -> "WorkspaceRole": """Assign a role to a group.""" if group.scope != self.scope: raise ValueError(f"group {group} is not in scope {self.scope}") workspace_role, _ = WorkspaceRole.objects.get_or_create( resource=self, group=group, role=role ) return workspace_role
# See https://github.com/typeddjango/django-stubs/issues/1047 for the typing
[docs] def get_roles( self, user: Union["User", "AnonymousUser"] ) -> QuerySet["WorkspaceRole", "WorkspaceRoles"]: """Get the roles of the user on this workspace.""" if not user.is_authenticated: result = WorkspaceRole.objects.none().values_list("role", flat=True) else: result = ( self.roles.filter(group__users=user) .values_list("role", flat=True) .distinct() ) return cast(QuerySet["WorkspaceRole", "WorkspaceRoles"], result)
[docs] def file_stores(self, fileobj: File) -> QuerySet[FileStore]: """ Find the file stores in this workspace that have fileobj, if any. The returned query set is ordered so that the workspace's default file store comes first if it has the file, followed by any of the workspace's other file stores that have it. """ return FileStore.objects.filter( Q(default_workspaces=self) | Q(other_workspaces=self), files=fileobj, ).order_by(Case(When(default_workspaces=self, then=0), default=1).asc())
[docs] def is_file_in_workspace(self, fileobj: File) -> bool: """ Return True if fileobj is part of this workspace. To be considered part of a workspace, a file must be in one of its stores, and must also be in an artifact which is in the workspace. """ from debusine.db.models import FileInArtifact if not self.file_stores(fileobj).exists(): return False if not FileInArtifact.objects.filter( artifact__workspace=self, file=fileobj, complete=True ).exists(): return False return True
[docs] def set_inheritance(self, chain: Sequence["Workspace"]) -> None: """Set the inheritance chain for this workspace.""" # Check for duplicates in the chain before altering the database seen: set[int] = set() for workspace in chain: if workspace.pk in seen: raise ValueError( f"duplicate workspace {workspace.name!r}" " in inheritance chain" ) seen.add(workspace.pk) WorkspaceChain.objects.filter(child=self).delete() for idx, workspace in enumerate(chain): WorkspaceChain.objects.create( child=self, parent=workspace, order=idx )
[docs] def get_collection( self, *, # TODO: allow user to be None to mean take it from context? user: Union["User", "AnonymousUser"], category: str, name: str, visited: set[int] | None = None, ) -> "Collection": """ Lookup a collection by category and name. If the collection is not found in this workspace, it follows the workspace inheritance chain using a depth-first search. :param user: user to use for permission checking :param category: collection category :param name: collection name :param visited: for internal use only: state used during graph traversal :raises Collection.DoesNotExist: if the collection was not found """ from debusine.db.models import Collection # Ensure that the user can access this workspace if not self.can_display(user): raise Collection.DoesNotExist # Lookup in this workspace try: return Collection.objects.get( workspace=self, category=category, name=name ) except Collection.DoesNotExist: pass if visited is None: visited = set() visited.add(self.pk) # Follow the inheritance chain for node in self.chain_parents.order_by("order").select_related( "parent" ): workspace = node.parent # Break inheritance loops if workspace.pk in visited: continue try: return workspace.get_collection( user=user, category=category, name=name, visited=visited ) except Collection.DoesNotExist: pass raise Collection.DoesNotExist
[docs] def get_singleton_collection( self, *, user: Union["User", "AnonymousUser"], category: str ) -> "Collection": """ Lookup a singleton collection by category. If the collection is not found in this workspace, it follows the workspace inheritance chain using a depth-first search. :param user: user to use for permission checking :param category: collection category :raises Collection.DoesNotExist: if the collection was not found """ return self.get_collection(user=user, category=category, name="_")
def __str__(self) -> str: """Return basic information of Workspace.""" return f"{self.scope.name}/{self.name}"
class WorkspaceChain(models.Model): """Workspace chaining model.""" child = models.ForeignKey( Workspace, on_delete=models.CASCADE, related_name="chain_parents", help_text="Workspace that falls back on `parent` for lookups", ) parent = models.ForeignKey( Workspace, on_delete=models.CASCADE, related_name="chain_children", help_text="Workspace to be looked up if lookup in `child` fails", ) order = models.IntegerField( help_text="Lookup order of this element in the chain", ) class Meta(TypedModelMeta): constraints = [ UniqueConstraint( fields=["child", "parent"], name="%(app_label)s_%(class)s_unique_child_parent", ), UniqueConstraint( fields=["child", "order"], name="%(app_label)s_%(class)s_unique_child_order", ), ] def __str__(self) -> str: """Return basic information of Workspace.""" return f"{self.order}:{self.child.name}{self.parent.name}" class WorkspaceRole(models.Model): """Role assignments for workspaces.""" Roles: TypeAlias = WorkspaceRoles resource = models.ForeignKey( Workspace, on_delete=models.PROTECT, related_name="roles", ) group = models.ForeignKey( "Group", on_delete=models.PROTECT, related_name="workspace_roles", ) role = models.CharField(max_length=16, choices=Roles.choices) class Meta(TypedModelMeta): constraints = [ UniqueConstraint( fields=["resource", "group", "role"], name="%(app_label)s_%(class)s_unique_resource_group_role", ), ] def __str__(self) -> str: """Return a description of the role assignment.""" return f"{self.group}{self.role}{self.resource}"