Source code for debusine.db.models.work_requests

# Copyright 2019, 2021-2024 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""Data models for db work requests."""

import json
import logging
from collections.abc import Generator
from contextlib import contextmanager
from datetime import datetime, timedelta
from typing import (
    Any,
    Literal,
    Optional,
    Self,
    TYPE_CHECKING,
    TypeAlias,
    assert_never,
    overload,
)

from django.conf import settings
from django.core.exceptions import FieldError, ValidationError
from django.core.serializers.json import DjangoJSONEncoder
from django.db import models
from django.db.models import F, JSONField, Q, QuerySet, UniqueConstraint
from django.db.models.functions import Coalesce
from django.utils import timezone

from debusine.artifacts.models import BareDataCategory, CollectionCategory
from debusine.client.models import LookupChildType
from debusine.db.models import CollectionItem
from debusine.server import notifications
from debusine.tasks import BaseTask, TaskConfigError
from debusine.tasks.models import (
    ActionRetryWithDelays,
    ActionTypes,
    ActionUpdateCollectionWithArtifacts,
    ActionUpdateCollectionWithData,
    EventReaction,
    EventReactions,
    LookupMultiple,
    LookupSingle,
    NotificationDataEmail,
    TaskTypes,
)
from debusine.tasks.server import TaskDatabaseInterface

if TYPE_CHECKING:
    from django_stubs_ext.db.models import TypedModelMeta

    from debusine.db.models.auth import User
    from debusine.db.models.workers import Worker
    from debusine.server.workflows.models import (
        WorkRequestManualUnblockAction,
        WorkRequestWorkflowData,
    )
else:
    TypedModelMeta = object

logger = logging.getLogger(__name__)


class CannotRetry(Exception):
    """Exception raised if a work request cannot be retried."""


class CannotUnblock(Exception):
    """Exception raised if a work request cannot be unblocked."""


class InternalTaskError(Exception):
    """Exception raised when trying to instantiate an internal task."""


[docs]class WorkflowTemplate(models.Model): """ Database model for Workflow templates. Workflow templates contain the information needed to instantiate a workflow, with a Workflow orchestrator and mandatory parameters. """ class Meta(TypedModelMeta): constraints = [ UniqueConstraint( fields=["name", "workspace"], name="%(app_label)s_%(class)s_unique_name_workspace", ), ] name = models.CharField(max_length=255) workspace = models.ForeignKey("Workspace", on_delete=models.PROTECT) task_name = models.CharField( max_length=100, verbose_name='Name of the Workflow orchestrator class' ) task_data = JSONField(default=dict, blank=True, encoder=DjangoJSONEncoder) priority = models.IntegerField( default=0, help_text="Base priority for work requests created from this template", )
[docs] def clean(self): """ Ensure that task_name and task data are valid. :raise ValidationError: for invalid data. """ # Import here to prevent circular imports from debusine.server.workflows import Workflow, WorkflowValidationError if not isinstance(self.task_data, dict): raise ValidationError( {"task_data": "task data must be a dictionary"} ) # Instantiate the orchestrator and use it to validate task_data workflow_cls = Workflow.from_name(self.task_name) try: workflow_cls.validate_template_data(self.task_data) except ( KeyError, ValueError, RuntimeError, WorkflowValidationError, ) as exc: raise ValidationError({"task_data": str(exc)})
class _WorkRequestStatuses(models.TextChoices): """Choices for WorkRequest.status.""" PENDING = "pending", "Pending" RUNNING = "running", "Running" COMPLETED = "completed", "Completed" ABORTED = "aborted", "Aborted" BLOCKED = "blocked", "Blocked" class WorkRequestManager(models.Manager["WorkRequest"]): """Manager for WorkRequest model.""" def create_workflow_callback( self, *, parent: "WorkRequest", step: str, display_name: str | None = None, status: _WorkRequestStatuses | None = None, ): """ Create a workflow callback WorkRequest. A parent is always required, as a callback only makes sense as part of a workflow. :param step: string set by the workflow to identify this callback """ # Import here to prevent circular imports from debusine.server.workflows.models import WorkRequestWorkflowData return self.create( workspace=parent.workspace, parent=parent, created_by=parent.created_by, status=status or WorkRequest.Statuses.PENDING, task_type=TaskTypes.INTERNAL, task_name="workflow", task_data={}, priority_base=parent.priority_effective, workflow_data_json=WorkRequestWorkflowData( step=step, display_name=display_name or step ).dict(exclude_unset=True), ) def create_workflow( self, *, template: WorkflowTemplate, data: dict[str, Any], created_by: "User", parent: Optional["WorkRequest"] = None, status: _WorkRequestStatuses | None = None, ) -> "WorkRequest": """Create a workflow from a template and user-provided data.""" # Import here to prevent circular imports from debusine.db.models import Collection from debusine.server.workflows import Workflow, WorkflowValidationError # Lookup the orchestrator workflow_cls = Workflow.from_name(template.task_name) # Merge user provided data into template data task_data = workflow_cls.build_workflow_data(template.task_data, data) # Build the WorkRequest work_request = self.create( workspace=template.workspace, parent=parent, created_by=created_by, status=status or WorkRequest.Statuses.PENDING, task_type=TaskTypes.WORKFLOW, task_name=template.task_name, task_data=task_data, priority_base=template.priority, ) # Root work requests need an internal collection # (:ref:`collection-workflow-internal`) if parent is None: work_request.internal_collection = Collection.objects.create( name=f"workflow-{work_request.id}", category=CollectionCategory.WORKFLOW_INTERNAL, workspace=work_request.workspace, retains_artifacts=Collection.RetainsArtifacts.WORKFLOW, ) work_request.save() try: # Instantiate the orchestrator orchestrator = workflow_cls(work_request) # Thorough input validation orchestrator.validate_input() except ( KeyError, ValueError, RuntimeError, WorkflowValidationError, TaskConfigError, ): # TODO: How can we store the error so that the user can see it? logger.exception("Cannot create a workflow") work_request.mark_completed(WorkRequest.Results.ERROR) return work_request def create_synchronization_point( self, *, parent: "WorkRequest", step: str, display_name: str | None = None, status: _WorkRequestStatuses | None = None, ): """ Create a synchronization point WorkRequest. A parent is always required, as a synchronization point only makes sense as part of a workflow. """ # Import here to prevent circular imports from debusine.server.workflows.models import WorkRequestWorkflowData return self.create( workspace=parent.workspace, parent=parent, created_by=parent.created_by, status=status or WorkRequest.Statuses.PENDING, task_type=TaskTypes.INTERNAL, task_name="synchronization_point", task_data={}, priority_base=parent.priority_effective, workflow_data_json=WorkRequestWorkflowData( step=step, display_name=display_name or step ).dict(exclude_unset=True), ) def pending( self, exclude_assigned: bool = False, worker: Optional["Worker"] = None ) -> QuerySet["WorkRequest"]: """ Return a QuerySet of tasks in WorkRequest.Statuses.PENDING status. QuerySet is ordered by descending effective priority, then by created_at. Filter out the assigned pending ones if exclude_assigned=True, and include only the WorkRequest for worker. PENDING is the default status of a task on creation. """ if exclude_assigned and worker is not None: raise ValueError("Cannot exclude_assigned and filter by worker") qs = WorkRequest.objects.filter(status=WorkRequest.Statuses.PENDING) if exclude_assigned: qs = qs.exclude(worker__isnull=False) if worker is not None: qs = qs.filter(worker=worker) qs = qs.order_by( (F("priority_base") + F("priority_adjustment")).desc(), "created_at" ) return qs def running( self, worker: Optional["Worker"] = None ) -> QuerySet["WorkRequest"]: """Return a QuerySet of tasks in running status.""" qs = WorkRequest.objects.filter(status=WorkRequest.Statuses.RUNNING) if worker is not None: qs = qs.filter(worker=worker) return qs def completed(self) -> QuerySet["WorkRequest"]: """Return a QuerySet of tasks in completed status.""" return WorkRequest.objects.filter(status=WorkRequest.Statuses.COMPLETED) def aborted(self) -> QuerySet["WorkRequest"]: """Return a QuerySet of tasks in aborted status.""" return WorkRequest.objects.filter(status=WorkRequest.Statuses.ABORTED) def aborted_or_failed(self) -> QuerySet["WorkRequest"]: """Return a QuerySet of tasks that aborted or failed.""" return WorkRequest.objects.filter( Q(status=WorkRequest.Statuses.ABORTED) | Q( status=WorkRequest.Statuses.COMPLETED, result=WorkRequest.Results.FAILURE, ) ) def expired(self, at: datetime) -> QuerySet["WorkRequest"]: """ Return queryset with work requests that have expired. :param at: datetime to check if the work requests are expired. :return: work requests which expire before the given datetime. """ return ( self.get_queryset() .annotate( effective_expiration_delay=Coalesce( "expiration_delay", "workspace__default_expiration_delay", ) ) .exclude(effective_expiration_delay=timedelta(0)) .filter( # https://github.com/typeddjango/django-stubs/issues/1548 created_at__lte=( at - F("effective_expiration_delay") # type: ignore[operator] # noqa: E501 ) ) )
[docs]class WorkRequest(models.Model): """ Database model of a request to execute a task. Time-consuming operations offloaded to Workers and using Artifacts (and associated Files) as input and output. Submission API needs to check if the request is valid using ontological rules - e.g. whether the specified distribution for a build task exists. Avoid exposing the status of tasks to the admin interface to avoid runaway changes whilst the scheduler process is running. The WorkRequest uses the non-Django tasks module to do the checks on whether a task can run on a particular worker. WorkRequest State Machine ========================= New WorkRequest database entries default to ``WorkRequest.Statuses.PENDING``. Once the WorkRequest is assigned to a worker and is running starts running the status is changed to ``WorkRequest.Statuses.RUNNING``. If the WorkRequest is aborted, the Scheduled.Task status is ``WorkRequest.Statuses.ABORTED``. If the task finish on the Worker the WorkRequest status will be ``WorkRequest.Statuses.COMPLETED`` and a WorkRequest.Result is then set, ``WorkRequest.Results.PASSED`` or ``WorkRequest.Results.FAILED``. .. graphviz:: digraph { Statuses_PENDING -> Statuses_RUNNING -> Statuses_COMPLETED; Statuses_PENDING -> Statuses_COMPLETED; Statuses_PENDING -> Statuses_ABORTED; Statuses_PENDING -> Statuses_RUNNING -> Statuses_ABORTED; } ``WorkRequest.started_at`` is set when the WorkRequest moves from ``WorkRequest.Statuses.PENDING`` to ``WorkRequest.Statuses.RUNNING``. ``WorkRequest.completed_at`` is set when the Task moves from ``WorkRequest.Statuses.RUNNING`` to ``WorkRequest.Statuses.COMPLETED``. """ objects = WorkRequestManager() Statuses: TypeAlias = _WorkRequestStatuses
[docs] class Results(models.TextChoices): NONE = "", "" SUCCESS = "success", "Success" FAILURE = "failure", "Failure" ERROR = "error", "Error"
[docs] class UnblockStrategy(models.TextChoices): DEPS = "deps", "Dependencies have completed" MANUAL = "manual", "Manually unblocked"
workspace = models.ForeignKey("Workspace", on_delete=models.PROTECT) created_at = models.DateTimeField(auto_now_add=True) started_at = models.DateTimeField(blank=True, null=True) completed_at = models.DateTimeField(blank=True, null=True) created_by = models.ForeignKey("User", on_delete=models.PROTECT) status = models.CharField( max_length=9, choices=Statuses.choices, default=Statuses.PENDING, editable=False, ) result = models.CharField( max_length=7, choices=Results.choices, default=Results.NONE, editable=False, ) # any one work request can only be on one worker # even if the worker can handle multiple work request. worker = models.ForeignKey( "Worker", null=True, blank=True, on_delete=models.CASCADE, related_name="assigned_work_requests", ) task_type = models.CharField( max_length=8, choices=TaskTypes.choices, default=TaskTypes.WORKER, editable=False, verbose_name="Type of task to execute", ) task_name = models.CharField( max_length=100, verbose_name='Name of the task to execute' ) task_data = JSONField(default=dict, blank=True, encoder=DjangoJSONEncoder) dynamic_task_data = JSONField( null=True, blank=True, encoder=DjangoJSONEncoder ) priority_base = models.IntegerField( default=0, help_text="Base priority of this work request" ) priority_adjustment = models.IntegerField( default=0, help_text=( "Administrator adjustment to the priority of this work request" ), ) # Workflows unblock_strategy = models.CharField( max_length=6, choices=UnblockStrategy.choices, default=UnblockStrategy.DEPS, editable=False, ) # workflow hierarchy tree (optional, null if stand-alone) parent = models.ForeignKey( "self", # WorkRequests are only deleted through expiration, use CASCADE on_delete=models.CASCADE, blank=True, null=True, related_name="children", ) # order of execution within a workflow hierarchy (optional) dependencies = models.ManyToManyField( "self", symmetrical=False, related_name="reverse_dependencies" ) workflow_data_json = JSONField( default=dict, blank=True, db_column="workflow_data", encoder=DjangoJSONEncoder, ) event_reactions_json = JSONField( default=dict, blank=True, db_column="event_reactions", encoder=DjangoJSONEncoder, ) internal_collection = models.OneToOneField( "Collection", related_name="workflow", null=True, blank=True, on_delete=models.PROTECT, ) expiration_delay = models.DurationField(blank=True, null=True) supersedes = models.OneToOneField( "WorkRequest", on_delete=models.SET_NULL, related_name="superseded", null=True, blank=True, ) class Meta(TypedModelMeta): indexes = [ # Handles the main scheduler query. models.Index( (F("priority_base") + F("priority_adjustment")).desc(), "created_at", name="%(app_label)s_%(class)s_pending_idx", condition=Q(status=_WorkRequestStatuses.PENDING), ), # Handles queries from workers. models.Index( "worker", name="%(app_label)s_%(class)s_worker_idx", condition=Q( status__in=( _WorkRequestStatuses.PENDING, _WorkRequestStatuses.RUNNING, ) ), ), ] permissions = [ ( "manage_workrequest_priorities", "Can set positive priority adjustments on work requests", ) ] ordering = ["id"] def __str__(self) -> str: """Return the id of the WorkRequest.""" return str(self.id)
[docs] def save(self, *args: Any, **kwargs: Any) -> None: """Save the current instance.""" if self._state.adding: self.process_event_reactions("on_creation") super().save(*args, **kwargs)
@property def workflow_data(self) -> Optional["WorkRequestWorkflowData"]: """Access workflow_data_json as a python structure.""" # Import here to avoid a circular dependency from debusine.server.workflows.models import WorkRequestWorkflowData if self.workflow_data_json: return WorkRequestWorkflowData(**self.workflow_data_json) else: return None @workflow_data.setter def workflow_data(self, value: Optional["WorkRequestWorkflowData"]) -> None: """Set workflow_data_json from a python structure.""" if value is None: self.workflow_data_json = {} else: # TODO: WorkRequestWorkflowData may contain # non-JSON-serializable values (timestamps in the manual unblock # log), and pydantic 1's BaseModel.dict doesn't have a good way # to deal with those, so we have to round-trip via JSON. Once # we can assume pydantic 2, we should use something like # BaseModel.model_dump(mode="json") here. self.workflow_data_json = json.loads(value.json(exclude_unset=True)) @property def event_reactions(self) -> EventReactions: """Access event_reactions_json as a pydantic structure.""" return EventReactions(**self.event_reactions_json) @event_reactions.setter def event_reactions(self, value: EventReactions) -> None: """Set event_reactions_json from a pydantic structure.""" self.event_reactions_json = value.dict()
[docs] @contextmanager def scheduling_disabled(self) -> Generator[None, None, None]: """Temporarily disable scheduling on changes to this work request.""" try: # See debusine.server.scheduler._work_request_changed. self._disable_signals = True yield finally: del self._disable_signals
[docs] def create_child( self, task_name: str, status: Statuses = Statuses.BLOCKED, task_type: TaskTypes = TaskTypes.WORKER, task_data: dict[str, Any] | None = None, workflow_data: Optional["WorkRequestWorkflowData"] = None, event_reactions: EventReactions | None = None, ) -> "WorkRequest": """Create a child WorkRequest.""" if self.task_type != TaskTypes.WORKFLOW: raise ValueError("Only workflows may have child work requests.") return WorkRequest.objects.create( workspace=self.workspace, parent=self, created_by=self.created_by, status=status, task_type=task_type, task_name=task_name, task_data=task_data or {}, workflow_data_json=( workflow_data.dict(exclude_unset=True) if workflow_data else {} ), event_reactions_json=( event_reactions.dict() if event_reactions else {} ), )
[docs] def can_satisfy_dynamic_data(self) -> bool: """Check if dynamic data can still be looked up.""" try: task = self.get_task() except TaskConfigError: return False try: task.compute_dynamic_data(TaskDatabase(self)) except LookupError: return False return True
[docs] def is_aborted_failed(self) -> bool: """Check if the work request was aborted or failed.""" return self.status == WorkRequest.Statuses.ABORTED or ( self.status == WorkRequest.Statuses.COMPLETED and self.result == WorkRequest.Results.FAILURE )
def _check_retry(self) -> None: """ Check if a work request can be retried. :raises ValueError: raised with an explanation of why it cannot be retried """ if self.task_type not in { TaskTypes.WORKFLOW, TaskTypes.WORKER, TaskTypes.INTERNAL, TaskTypes.SIGNING, }: raise CannotRetry( "Only workflow, worker, internal, or signing tasks" " can be retried" ) if hasattr(self, "superseded"): raise CannotRetry("Cannot retry old superseded tasks") if not self.is_aborted_failed(): raise CannotRetry("Only aborted or failed tasks can be retried") if self.task_type in {TaskTypes.INTERNAL, TaskTypes.WORKFLOW}: return if not self.can_satisfy_dynamic_data(): raise CannotRetry("Task dependencies cannot be satisfied")
[docs] def can_retry(self) -> bool: """Check if this work request can be retried.""" try: self._check_retry() except CannotRetry: return False return True
def _retry_workflow(self) -> Self: """Retry logic for workflows.""" # Import here to prevent circular imports from debusine.server.workflows.base import orchestrate_workflow from debusine.server.workflows.models import WorkRequestWorkflowData # Set the workflow back to running, so that further actions execute in # a running workflow self.status = self.Statuses.PENDING self.mark_running() # Increment the retry count if (workflow_data := self.workflow_data) is None: workflow_data = WorkRequestWorkflowData() workflow_data.retry_count += 1 self.workflow_data = workflow_data self.save() try: # TODO: workflow orchestrators may be slow; this should move to # a Celery task # Give the workflow a chance to update its graph orchestrate_workflow(self) # Retry failed child work requests for child in ( WorkRequest.objects.aborted_or_failed() .filter(parent=self) .order_by("id") ): try: child.retry() except CannotRetry as exc: raise CannotRetry( f"child work request {child.pk} cannot be retried" ) from exc finally: self.maybe_finish_workflow() return self def _retry_supersede(self) -> "WorkRequest": """Retry logic superseding work requests.""" # Import here to prevent circular imports from debusine.server.workflows.models import WorkRequestWorkflowData work_request = WorkRequest.objects.create( workspace=self.workspace, created_by=self.created_by, status=WorkRequest.Statuses.BLOCKED, task_type=self.task_type, task_name=self.task_name, task_data=self.task_data, priority_base=self.priority_base, priority_adjustment=self.priority_adjustment, unblock_strategy=self.unblock_strategy, parent=self.parent, workflow_data_json=self.workflow_data_json, event_reactions_json=self.event_reactions_json, expiration_delay=self.expiration_delay, supersedes=self, ) # Copy forward dependencies for dep in self.dependencies.all(): work_request.add_dependency(dep) # Update any reverse-dependencies of the old work request to point to # the new one, and move them from pending to blocked for dep in self.reverse_dependencies.all(): dep.dependencies.remove(self) dep.add_dependency(work_request) # If the task was part of an aborted workflow, set it back to running if (workflow := work_request.parent) and workflow.is_aborted_failed(): # We cannot use mark_running, as it would fail to run when the # workflow is failed or aborted workflow.status = WorkRequest.Statuses.RUNNING workflow.result = WorkRequest.Results.NONE workflow.completed_at = None workflow.save() # Increment the retry count if (workflow_data := work_request.workflow_data) is None: workflow_data = WorkRequestWorkflowData() workflow_data.retry_count += 1 work_request.workflow_data = workflow_data work_request.save() if work_request.can_be_automatically_unblocked(): work_request.mark_pending() # Update promises associated with the previous work request if ( self.parent is not None and self.parent.internal_collection is not None ): for debusine_promise in CollectionItem.objects.filter( category=BareDataCategory.PROMISE, parent_collection=self.parent.internal_collection, data__promise_work_request_id=self.id, ): debusine_promise.data["promise_work_request_id"] = ( work_request.id ) debusine_promise.save() return work_request
[docs] def retry(self) -> "WorkRequest": """Create a WorkRequest that supersedes this one.""" self._check_retry() if self.task_type == TaskTypes.WORKFLOW: return self._retry_workflow() else: return self._retry_supersede()
[docs] def clean(self): """ Ensure that task data is valid for this task name. :raise ValidationError: for invalid data. """ if not isinstance(self.task_data, dict): raise ValidationError( {"task_data": "task data must be a dictionary"} ) match self.task_type: case ( TaskTypes.WORKER | TaskTypes.SERVER | TaskTypes.SIGNING | TaskTypes.WAIT ): try: task_cls = BaseTask.class_from_name( self.task_type, self.task_name ) except (KeyError, ValueError) as e: raise ValidationError( { "task_name": f"{self.task_name}:" f" invalid {self.task_type} task name" } ) from e try: task_cls(task_data=self.task_data) except TaskConfigError as e: raise ValidationError( { "task_data": f"invalid {self.task_type}" f" task data: {e}" } ) from e case TaskTypes.WORKFLOW: # Import here to prevent circular imports from debusine.server.workflows import Workflow try: workflow_cls = Workflow.from_name(self.task_name) except (KeyError, ValueError) as e: raise ValidationError( { "task_name": f"{self.task_name}:" f" invalid workflow name" } ) from e try: workflow_cls(self) except TaskConfigError as e: raise ValidationError( {"task_data": f"invalid workflow data: {e}"} ) from e # TODO: do we want to run expensive tests here # (Workflow.validate_input), like looking up the types of # references artifacts to validate them? case TaskTypes.INTERNAL: if self.task_name not in ("synchronization_point", "workflow"): raise ValidationError( { "task_name": f"{self.task_name}:" " invalid task name for internal task" } ) # Without this pass, python coverage is currently unable to # detect that code does flow through here pass case _: raise NotImplementedError( f"task type {self.task_type} not yet supported" )
[docs] def get_task(self, worker: "Worker | None" = None) -> BaseTask[Any, Any]: """ Instantiate the Task for this work request. :param worker: if set, the worker that this work request is intended to run on; `worker_host_architecture` will be set to its host architecture :raise InternalTaskError: if the work request is for an internal task other than a workflow callback """ # Import here to prevent circular imports from debusine.server.workflows import Workflow match (self.task_type, self.task_name): case ( (TaskTypes.WORKER, _) | (TaskTypes.SERVER, _) | (TaskTypes.SIGNING, _) | (TaskTypes.WAIT, _) ): task_cls = BaseTask.class_from_name( TaskTypes(self.task_type), self.task_name ) task = task_cls( task_data=self.task_data, dynamic_task_data=self.dynamic_task_data, ) case (TaskTypes.WORKFLOW, _): workflow_cls = Workflow.from_name(self.task_name) task = workflow_cls(self) case (TaskTypes.INTERNAL, "workflow"): if (workflow := self.parent) is None: raise InternalTaskError( "Workflow callback is not contained in a workflow" ) assert workflow.task_type == TaskTypes.WORKFLOW workflow_cls = Workflow.from_name(workflow.task_name) task = workflow_cls(workflow) case (TaskTypes.INTERNAL, _): raise InternalTaskError( "Internal tasks other than workflow callbacks cannot be " "instantiated" ) case _ as unreachable: # noqa: F841 raise NotImplementedError( f"task type {self.task_type} not yet supported" ) if worker is not None: task.worker_host_architecture = worker.metadata().get( "system:host_architecture" ) return task
[docs] def mark_running(self) -> bool: """Worker has begun executing the task.""" if ( self.task_type in {TaskTypes.WORKER, TaskTypes.SERVER, TaskTypes.SIGNING} and self.worker is None ): logger.debug( "Cannot mark WorkRequest %s as running: it does not have " "an assigned worker", self.pk, ) return False if self.status == self.Statuses.RUNNING: # It was already running - nothing to do return True if self.status != self.Statuses.PENDING: logger.debug( "Cannot mark as running - current status %s", self.status ) return False if self.worker is not None: work_requests_running_for_worker = WorkRequest.objects.running( worker=self.worker ) # There is a possible race condition here. This check (and # other checks in this class) currently help to avoid # development mistakes not database full integrity if ( work_requests_running_for_worker.count() >= self.worker.concurrency ): logger.debug( "Cannot mark WorkRequest %s as running - the assigned " "worker %s is running too many other WorkRequests: %s", self.pk, self.worker, list(work_requests_running_for_worker.order_by("id")), ) return False self.started_at = timezone.now() self.status = self.Statuses.RUNNING self.save() logger.debug("Marked WorkRequest %s as running", self.pk) return True
[docs] def mark_completed(self, result: "WorkRequest.Results") -> bool: """Worker has finished executing the task.""" if self.status not in (self.Statuses.PENDING, self.Statuses.RUNNING): logger.debug( "Cannot mark WorkRequest %s as completed: current status is %s", self.pk, self.status, ) return False if result == self.Results.NONE: raise AssertionError("result cannot be NONE") self.result = result self.completed_at = timezone.now() self.status = self.Statuses.COMPLETED self.save() logger.debug("Marked WorkRequest %s as completed", self.pk) # mark dependencies ready before sending notification self.unblock_reverse_dependencies() match self.result: case self.Results.SUCCESS: self.process_event_reactions("on_success") case self.Results.FAILURE | self.Results.ERROR: self.process_event_reactions("on_failure") case _ as unreachable: assert_never(unreachable) if self.parent is not None: self.parent.maybe_finish_workflow() return True
[docs] def process_event_reactions( self, event_name: Literal[ "on_creation", "on_unblock", "on_success", "on_failure" ], ) -> None: """Process list of actions to perform.""" actions = self.get_triggered_actions(event_name) if event_name in {"on_success", "on_failure"}: notifications.notify_work_request_completed( self, actions[ActionTypes.SEND_NOTIFICATION] ) self.process_update_collection_with_artifacts( actions[ActionTypes.UPDATE_COLLECTION_WITH_ARTIFACTS] ) self.process_update_collection_with_data( actions[ActionTypes.UPDATE_COLLECTION_WITH_DATA] ) self.process_retry_with_delays(actions[ActionTypes.RETRY_WITH_DELAYS])
[docs] def get_triggered_actions( self, event_name: Literal[ "on_creation", "on_unblock", "on_success", "on_failure" ], ) -> dict[str, list[EventReaction]]: """ Filter events to trigger, grouped by type. :param event_name: the name of the event being triggered. """ actions: dict[str, list[EventReaction]] = { action: [] for action in ActionTypes } for action in getattr(self.event_reactions, event_name): action_type = action.action actions[action_type].append(action) return actions
[docs] def process_update_collection_with_artifacts( self, actions: list[EventReaction] ) -> None: """Update collection with artifacts following event_reactions.""" # local import to avoid circular dependency from debusine.db.models.collections import CollectionItem from debusine.server.collections import ( CollectionManagerInterface, ItemAdditionError, ItemRemovalError, ) from debusine.server.collections.lookup import lookup_single for update in actions: assert isinstance(update, ActionUpdateCollectionWithArtifacts) try: collection = lookup_single( update.collection, self.workspace, user=self.created_by, workflow_root=self.get_workflow_root(), expect_type=LookupChildType.COLLECTION, ).collection except LookupError as e: logger.error("%s in WorkRequest %s", e, self.pk) # noqa: G200 continue manager = CollectionManagerInterface.get_manager_for(collection) try: artifacts_to_add = self.artifact_set.filter( **update.artifact_filters ) except FieldError: logger.exception( "Invalid update-collection-with-artifacts" " artifact_filters in WorkRequest %s", self.pk, ) continue for artifact in artifacts_to_add: try: expanded_variables = CollectionItem.expand_variables( update.variables or {}, artifact.data ) except (KeyError, ValueError): logger.exception( "Invalid update-collection-with-artifacts variables " "in WorkRequest %s", self.pk, ) continue if update.name_template is not None: item_name = CollectionItem.expand_name( update.name_template, expanded_variables ) item_variables = None else: item_name = None item_variables = expanded_variables try: manager.add_artifact( artifact, user=self.created_by, workflow=self.parent, variables=item_variables, name=item_name, replace=True, ) except (ItemAdditionError, ItemRemovalError): logger.exception( "Cannot replace or add artifact %s to collection %s" " from WorkRequest %s", artifact, collection, self.pk, )
[docs] def process_update_collection_with_data( self, actions: list[EventReaction] ) -> None: """Update collection with bare data following event_reactions.""" # local import to avoid circular dependency from debusine.db.models.collections import CollectionItem from debusine.server.collections import ( CollectionManagerInterface, ItemAdditionError, ItemRemovalError, ) from debusine.server.collections.lookup import lookup_single for update in actions: assert isinstance(update, ActionUpdateCollectionWithData) try: collection = lookup_single( update.collection, self.workspace, user=self.created_by, workflow_root=self.get_workflow_root(), expect_type=LookupChildType.COLLECTION, ).collection except LookupError as e: logger.error("%s in WorkRequest %s", e, self.pk) # noqa: G200 continue manager = CollectionManagerInterface.get_manager_for(collection) data = update.data or {} if update.name_template is not None: item_name = CollectionItem.expand_name( update.name_template, data ) else: item_name = None try: manager.add_bare_data( update.category, user=self.created_by, workflow=self.parent, data=data, name=item_name, replace=True, ) except (ItemAdditionError, ItemRemovalError): logger.exception( "Cannot replace or add bare data of category %s to" " collection %s from WorkRequest %s", update.category, collection, self.pk, )
[docs] def process_retry_with_delays(self, actions: list[EventReaction]) -> None: """Retry a work request with delays.""" # Import here to prevent circular imports from debusine.server.tasks.wait.models import DelayData for action in actions: assert isinstance(action, ActionRetryWithDelays) if self.parent is None: raise CannotRetry( "retry-with-delays action may only be used in a workflow" ) retry_count = ( 0 if self.workflow_data is None else self.workflow_data.retry_count ) if retry_count >= len(action.delays): continue m = ActionRetryWithDelays._delay_re.match( action.delays[retry_count] ) # Checked by ActionRetryWithDelays.validate_delays. assert m is not None match m.group(2): case "m": delay = timedelta(minutes=int(m.group(1))) case "h": delay = timedelta(hours=int(m.group(1))) case "d": delay = timedelta(days=int(m.group(1))) case "w": delay = timedelta(weeks=int(m.group(1))) case _ as unreachable: raise AssertionError( f"Unexpected delay unit: {unreachable}" ) wr_delay = self.parent.create_child( task_type=TaskTypes.WAIT, task_name="delay", task_data=DelayData(delay_until=timezone.now() + delay).dict(), status=self.Statuses.PENDING, ) wr_retried = self.retry() wr_retried.add_dependency(wr_delay)
[docs] def mark_pending(self) -> bool: """Worker is ready for execution.""" if self.status != self.Statuses.BLOCKED: logger.debug( "Cannot mark WorkRequest %s as pending: current status is %s", self.pk, self.status, ) return False self.status = self.Statuses.PENDING self.save() logger.debug("Marked WorkRequest %s as pending", self.pk) self.process_event_reactions("on_unblock") return True
[docs] def add_dependency(self, dependency: "WorkRequest") -> None: """Make this work request depend on another one.""" if self.is_part_of_workflow: # Work requests in a workflow may only depend on other work # requests in the same workflow. my_root = self.get_workflow_root() assert my_root is not None if dependency.get_workflow_root() != my_root: raise ValueError( "Work requests in a workflow may not depend on other work " "requests outside that workflow" ) self.dependencies.add(dependency) if ( self.status == WorkRequest.Statuses.PENDING and self.unblock_strategy == WorkRequest.UnblockStrategy.DEPS and dependency.status != WorkRequest.Statuses.COMPLETED ): self.status = WorkRequest.Statuses.BLOCKED self.save()
[docs] def maybe_finish_workflow(self) -> bool: """Update workflow status if its children are no longer in progress.""" # Only relevant for running workflows where all work requests have been # either completed or aborted if ( self.task_type != TaskTypes.WORKFLOW or self.status != WorkRequest.Statuses.RUNNING or self.has_children_in_progress ): return False # If there are aborted child requests, abort the workflow if ( self.children.filter(superseded__isnull=True) .exclude(status=WorkRequest.Statuses.COMPLETED) .exists() ): self.mark_aborted() return True # If there are failed/errored child requests, fail the workflow if ( self.children.filter(superseded__isnull=True) .exclude( Q(result=WorkRequest.Results.SUCCESS) | Q(workflow_data_json__contains={"allow_failure": True}) ) .exists() ): self.mark_completed(WorkRequest.Results.FAILURE) return True # All child requests succeeded self.mark_completed(WorkRequest.Results.SUCCESS) return True
[docs] def can_be_unblocked(self) -> bool: """Return True iff this work request can be unblocked at all.""" # The work request must currently be blocked. if self.status != WorkRequest.Statuses.BLOCKED: return False # If this work request is in a workflow, then that workflow must # be running. if ( self.parent is not None and self.parent.status != WorkRequest.Statuses.RUNNING ): return False return True
[docs] def can_be_automatically_unblocked(self) -> bool: """Return True iff this work request can be automatically unblocked.""" if not self.can_be_unblocked(): return False if self.unblock_strategy == WorkRequest.UnblockStrategy.DEPS: # This work request can be unblocked if and only if all its # dependencies have completed. return not self.dependencies.filter( ~Q(status=WorkRequest.Statuses.COMPLETED) ).exists() else: # We don't know how to unblock this work request automatically. return False
[docs] def unblock_reverse_dependencies(self): """Unblock reverse dependencies.""" # Shortcuts to keep line length sane r = WorkRequest.Results s = WorkRequest.Statuses allow_failure = self.workflow_data and self.workflow_data.allow_failure if self.result == r.SUCCESS or allow_failure: # lookup work requests that depend on the completed work # request and unblock them if no other work request is # blocking them for rdep in self.reverse_dependencies.filter(status=s.BLOCKED): if rdep.can_be_automatically_unblocked(): rdep.mark_pending() else: # failure and !allow_failure for rdep in self.reverse_dependencies.filter(status=s.BLOCKED): rdep.mark_aborted()
[docs] def unblock_workflow_children(self) -> None: """Unblock children of a workflow that has just started running.""" for child in self.children.filter(status=WorkRequest.Statuses.BLOCKED): if child.can_be_automatically_unblocked(): child.mark_pending()
[docs] def review_manual_unblock( self, *, user: "User", notes: str, action: "WorkRequestManualUnblockAction | None", ) -> None: """Review a work request awaiting manual unblocking.""" # Import here to prevent circular imports from debusine.server.workflows.models import ( WorkRequestManualUnblockAction, WorkRequestManualUnblockData, WorkRequestManualUnblockLog, ) if self.unblock_strategy != WorkRequest.UnblockStrategy.MANUAL: raise CannotUnblock( f"Work request {self.id} cannot be manually unblocked" ) if ( not self.is_part_of_workflow or (workflow_data := self.workflow_data) is None ): raise CannotUnblock( f"Work request {self.id} is not part of a workflow" ) if not self.can_be_unblocked(): raise CannotUnblock(f"Work request {self.id} cannot be unblocked") if workflow_data.manual_unblock is None: workflow_data.manual_unblock = WorkRequestManualUnblockData() manual_unblock_log = list(workflow_data.manual_unblock.log) manual_unblock_log.append( WorkRequestManualUnblockLog( user_id=user.id, timestamp=timezone.now(), notes=notes, action=action, ) ) workflow_data.manual_unblock.log = manual_unblock_log self.workflow_data = workflow_data self.save() match action: case WorkRequestManualUnblockAction.ACCEPT: # Always succeeds, since we already checked that the current # status is BLOCKED (via `work_request.can_be_unblocked()`). if not self.mark_pending(): raise AssertionError( "Work request could not be marked pending" ) case WorkRequestManualUnblockAction.REJECT: # The current implementation always aborts the work request. if not self.mark_aborted(): raise AssertionError("Work request could not be aborted")
@property def requires_signature(self) -> bool: """True if this task is blocked on `debusine provide-signature`.""" # TODO: At some point it may be worth having this delegate to a # property on the task, but for now this is good enough. return ( self.task_type == TaskTypes.WAIT and self.task_name == "externaldebsign" and self.status == self.Statuses.BLOCKED )
[docs] def mark_aborted(self) -> bool: """ Worker has aborted the task after request from UI. Task will typically be in CREATED or RUNNING status. """ self.completed_at = timezone.now() self.status = self.Statuses.ABORTED self.save() self.unblock_reverse_dependencies() if self.parent is not None: self.parent.maybe_finish_workflow() logger.debug( "Marked WorkRequest %s as aborted (from status %s)", self.pk, self.status, ) return True
[docs] def assign_worker(self, worker: "Worker") -> None: """Assign worker and save it.""" self.worker = worker self.save() notifications.notify_work_request_assigned(self)
[docs] def de_assign_worker(self) -> bool: """ De-assign a worker from this work request. Only allowed if the status is RUNNING or PENDING. """ if self.status not in { WorkRequest.Statuses.RUNNING, WorkRequest.Statuses.PENDING, }: logger.debug( "WorkRequest %d cannot be de-assigned: current status: %s", self.pk, self.status, ) return False self.worker = None self.started_at = None self.status = WorkRequest.Statuses.PENDING self.save() return True
@property def duration(self) -> float | None: """Return duration in seconds between started_at and completed_at.""" if self.started_at and self.completed_at: return (self.completed_at - self.started_at).total_seconds() else: return None @property def priority_effective(self) -> int: """The effective priority of this work request.""" return self.priority_base + self.priority_adjustment @property def is_workflow(self) -> bool: """Return whether this work request is a workflow.""" return self.task_type == TaskTypes.WORKFLOW @property def is_part_of_workflow(self) -> bool: """Return whether this work request is part of a workflow.""" return self.parent is not None @property def is_workflow_root(self) -> bool: """Return whether this work request is a root workflow.""" return self.is_workflow and self.parent is None
[docs] def get_workflow_root(self) -> Optional["WorkRequest"]: """Return the root of this work request's workflow, if any.""" if not self.is_workflow and not self.is_part_of_workflow: return None parent = self while parent.parent is not None: parent = parent.parent return parent
@property def workflow_display_name(self) -> str: """Return this work request's name for display in a workflow.""" if ( self.workflow_data is not None and self.workflow_data.display_name is not None ): return self.workflow_data.display_name else: return self.task_name @property def has_children_in_progress(self) -> bool: """Return whether child work requests are still in progress.""" return self.children.exclude( status__in={ WorkRequest.Statuses.COMPLETED, WorkRequest.Statuses.ABORTED, } ).exists()
[docs] def effective_expiration_delay(self): """Return expiration_delay, inherited if None.""" expiration_delay = self.expiration_delay if self.expiration_delay is None: # inherit expiration_delay = self.workspace.default_expiration_delay return expiration_delay
@property def expire_at(self) -> datetime | None: """Return computed expiration date.""" delay = self.effective_expiration_delay() if delay == timedelta(0): return None return self.created_at + delay
[docs] def get_label(self, task: BaseTask[Any, Any] | None = None) -> str: """ Return a label for this work request. Optionally reuse an already instantiated task. """ try: if task is None: task = self.get_task() except TaskConfigError: if settings.DEBUG: raise logger.warning("task %s has invalid data", self, exc_info=True) return self.workflow_display_name except InternalTaskError: # Internal tasks can't be instantiated, but it's still useful to # be able to render some kind of label for them. return self.workflow_display_name return task.get_label()
[docs]class TaskDatabase(TaskDatabaseInterface): """Implementation of database interaction in worker tasks."""
[docs] def __init__(self, work_request: WorkRequest) -> None: """Construct a :py:class:`TaskDatabase`.""" self.work_request = work_request
@overload def lookup_single_artifact( self, lookup: LookupSingle, default_category: CollectionCategory | None = None, ) -> int: ... @overload def lookup_single_artifact( self, lookup: None, default_category: CollectionCategory | None = None, ) -> None: ...
[docs] def lookup_single_artifact( self, lookup: LookupSingle | None, default_category: CollectionCategory | None = None, ) -> int | None: """Look up a single artifact using :ref:`lookup-single`.""" # Import here to prevent circular imports from debusine.server.collections.lookup import lookup_single return ( None if lookup is None else lookup_single( lookup=lookup, workspace=self.work_request.workspace, user=self.work_request.created_by, default_category=default_category, workflow_root=self.work_request.get_workflow_root(), expect_type=LookupChildType.ARTIFACT, ).artifact.id )
[docs] def lookup_multiple_artifacts( self, lookup: LookupMultiple | None, default_category: CollectionCategory | None = None, ) -> list[int]: """Look up multiple artifacts using :ref:`lookup-multiple`.""" # Import here to prevent circular imports from debusine.server.collections.lookup import lookup_multiple return ( [] if lookup is None else [ result.artifact.id for result in lookup_multiple( lookup=lookup, workspace=self.work_request.workspace, user=self.work_request.created_by, default_category=default_category, workflow_root=self.work_request.get_workflow_root(), expect_type=LookupChildType.ARTIFACT, ) ] )
@overload def lookup_single_collection( self, lookup: LookupSingle, default_category: CollectionCategory | None = None, ) -> int: ... @overload def lookup_single_collection( self, lookup: None, default_category: CollectionCategory | None = None, ) -> None: ...
[docs] def lookup_single_collection( self, lookup: LookupSingle | None, default_category: CollectionCategory | None = None, ) -> int | None: """Look up a single collection using :ref:`lookup-single`.""" # Import here to prevent circular imports from debusine.server.collections.lookup import lookup_single return ( None if lookup is None else lookup_single( lookup=lookup, workspace=self.work_request.workspace, user=self.work_request.created_by, default_category=default_category, workflow_root=self.work_request.get_workflow_root(), expect_type=LookupChildType.COLLECTION, ).collection.id )
[docs] def get_server_setting(self, setting: str) -> str: """Look up a Django setting (strings only).""" value = getattr(settings, setting) assert isinstance(value, str) return value
[docs]class NotificationChannel(models.Model): """Model to store notification configuration."""
[docs] class Methods(models.TextChoices): EMAIL = "email", "Email"
data_validators = {Methods.EMAIL: NotificationDataEmail} name = models.CharField( max_length=20, unique=True, ) method = models.CharField(max_length=10, choices=Methods.choices) data = models.JSONField(default=dict, blank=True)
[docs] def clean(self): """ Ensure that data is valid for the specific method. :raise ValidationError: for invalid data. """ try: self.data_validators[self.method](**self.data) except (TypeError, ValueError) as exc: raise ValidationError( f"NotificationChannel data is not valid: {exc}" ) return super().clean()
[docs] def save(self, *args, **kwargs): """Run validators and save the instance.""" self.full_clean() return super().save(*args, **kwargs)
def __str__(self): """Return name.""" return self.name