archivebox.workers.models

Module Contents

Classes

DefaultStatusChoices

ModelStateMachine

BaseModelWithStateMachine

ModelWithStateMachine

BaseStateMachine

Base class for all ArchiveBox state machines.

Data

default_status_field

default_retry_at_field

RETRY_AT_MAX

ACTIVE_STATE_LEASE_SECONDS

logger

MODULE_PATH

REPO_ROOT

PACKAGE_ROOT

ObjectState

ObjectStateList

API

class archivebox.workers.models.DefaultStatusChoices[source]

Bases: django.db.models.TextChoices

QUEUED[source]

(‘queued’, ‘Queued’)

STARTED[source]

(‘started’, ‘Started’)

PAUSED[source]

(‘paused’, ‘Paused’)

SEALED[source]

(‘sealed’, ‘Sealed’)

archivebox.workers.models.default_status_field: django.db.models.CharField[source]

‘CharField(…)’

archivebox.workers.models.default_retry_at_field: django.db.models.DateTimeField[source]

‘DateTimeField(…)’

archivebox.workers.models.RETRY_AT_MAX[source]

‘replace(…)’

archivebox.workers.models.ACTIVE_STATE_LEASE_SECONDS[source]

60

archivebox.workers.models.logger[source]

‘getLogger(…)’

archivebox.workers.models.MODULE_PATH[source]

‘resolve(…)’

archivebox.workers.models.REPO_ROOT[source]

None

archivebox.workers.models.PACKAGE_ROOT[source]

None

archivebox.workers.models.ObjectState[source]

None

archivebox.workers.models.ObjectStateList[source]

None

class archivebox.workers.models.ModelStateMachine[source]

Bases: typing.Protocol

tick() Any[source]
pause_requested() Any[source]
resume_requested() Any[source]
class archivebox.workers.models.BaseModelWithStateMachine[source]

Bases: django.db.models.Model

StatusChoices: ClassVar[type[archivebox.workers.models.DefaultStatusChoices]][source]

None

state_machine_name: str | None[source]

None

state_field_name: str[source]

None

state_machine_attr: str[source]

‘sm’

bind_events_as_methods: bool[source]

False

warn_on_save_outside_runner: ClassVar[bool][source]

True

active_state: archivebox.workers.models.ObjectState[source]

None

retry_at_field_name: str[source]

None

class Meta[source]

Bases: django_stubs_ext.db.models.TypedModelMeta

app_label[source]

‘workers’

abstract[source]

True

property sm: statemachine.StateMachine[source]

Build the python-statemachine wrapper only at transition callsites.

This model is loaded by high-volume paths that do not drive lifecycle transitions: admin lists, progress polling, index-only maintenance, and bulk recovery scans all instantiate thousands of rows just to read or update ordinary columns. python-statemachine setup is correct but not free: it creates per-instance state wrappers, callback registries, queues, locks, and callback adapters. Paying that cost from Django’s model init made plain ORM materialization scale with state-machine setup instead of row decoding.

ArchiveBox drives lifecycle transitions explicitly through .sm (snapshot.sm.tick(), crawl.sm.seal(), etc.), so the machine can be cached on first use without changing the state model. Code that only needs database fields never constructs one.

classmethod status_counts(queryset: django.db.models.QuerySet | None = None, statuses: collections.abc.Iterable[str] | None = None) dict[str, int][source]

Count requested statuses with separate indexed COUNT probes.

For live/progress views this is often faster on large SQLite data dirs than a grouped aggregate, because each status can use the status index directly and the caller usually needs only a few states.

classmethod check(sender=None, **kwargs)[source]
static _state_to_str(state: archivebox.workers.models.ObjectState) str[source]

Convert a statemachine.State, models.TextChoices.choices value, or Enum value to a str

property RETRY_AT: datetime.datetime[source]
property STATE: str[source]
bump_retry_at(seconds: int = 10)[source]
property is_paused: bool[source]
safe_update(update_fields: dict[str, Any], *, refresh: bool = True, extra_filter: dict[str, Any] | None = None) bool[source]

Atomic single-row UPDATE for scheduler writes that bypass save().

The write is unconditional unless the caller passes extra_filter — the previous implicit modified_at CAS predicate spuriously collided with concurrent writers to unrelated fields (every save bumps modified_at), which silently dropped state-machine transitions. Callers that need a transition guard (only advance from state A to state B; only requeue a row still holding lease X) pass extra_filter explicitly.

save(*args, **kwargs)[source]
pause(*, save: bool = True) bool[source]
resume(*, when: datetime.datetime | None = None, save: bool = True) bool[source]
update_and_requeue(*, refresh: bool = True, **kwargs) bool[source]

Scheduler-facing wrapper around safe_update().

Call this when a state-machine row should become visible to the runner. It preserves the current retry_at lease as an additional guard while safe_update() owns the modified_at CAS write and refresh.

classmethod get_queue()[source]

Get the sorted and filtered QuerySet of objects that are ready for processing. retry_at is the only scheduler signal; callers branch on status after selection.

classmethod claim_for_worker(obj: archivebox.workers.models.BaseModelWithStateMachine, lock_seconds: int = 60) bool[source]

Atomically claim a due object for processing using retry_at as the lock.

Correct lifecycle for any state-machine-driven work item:

  1. Queue the item by setting retry_at <= now

  2. Exactly one owner claims it by moving retry_at into the future

  3. Only that owner may call .sm.tick() and perform side effects

  4. State-machine callbacks update retry_at again when the work completes, backs off, or is re-queued

The critical rule is that future retry_at values are already owned. Callers must never “steal” those future timestamps and start another copy of the same work. That is what prevents duplicate installs, hook runs, and other concurrent side effects.

Returns True if successfully claimed, False if another worker got it first or the object is not currently due.

claim_processing_lock(lock_seconds: int = 60) bool[source]

Claim this model instance immediately before executing one state-machine tick.

This helper is the safe entrypoint for any direct state-machine driver (workers, synchronous crawl dependency installers, one-off CLI helpers). Calling .sm.tick() without claiming first turns retry_at into “just a schedule” instead of the ownership lock it is meant to be.

Returns True only for the caller that successfully moved retry_at into the future. False means another process already owns the work item or it is not currently due.

tick_claimed(lock_seconds: int = 60) bool[source]

Claim ownership via retry_at and then execute exactly one .sm.tick().

Future maintainers should prefer this helper over calling .sm.tick() directly whenever there is any chance another process could see the same queued row. If this method returns False, someone else already owns the work and the caller must not run side effects for it.

ACTIVE_STATE() str[source]
INITIAL_STATE() str[source]
FINAL_STATES() list[str][source]
FINAL_OR_ACTIVE_STATES() list[str][source]
classmethod extend_choices(base_choices: type[django.db.models.TextChoices])[source]

Decorator to extend the base choices with extra choices, e.g.:

class MyModel(ModelWithStateMachine):

@ModelWithStateMachine.extend_choices(ModelWithStateMachine.StatusChoices)
class StatusChoices(models.TextChoices):
    SUCCEEDED = 'succeeded'
    FAILED = 'failed'
    SKIPPED = 'skipped'
classmethod StatusField(**kwargs) django.db.models.CharField[source]

Used on subclasses to extend/modify the status field with updated kwargs. e.g.:

class MyModel(ModelWithStateMachine): class StatusChoices(ModelWithStateMachine.StatusChoices): QUEUED = ‘queued’, ‘Queued’ STARTED = ‘started’, ‘Started’ SEALED = ‘sealed’, ‘Sealed’ BACKOFF = ‘backoff’, ‘Backoff’ FAILED = ‘failed’, ‘Failed’ SKIPPED = ‘skipped’, ‘Skipped’

status = ModelWithStateMachine.StatusField(choices=StatusChoices.choices, default=StatusChoices.QUEUED)
classmethod RetryAtField(**kwargs) django.db.models.DateTimeField[source]

Used on subclasses to extend/modify the retry_at field with updated kwargs. e.g.:

class MyModel(ModelWithStateMachine): retry_at = ModelWithStateMachine.RetryAtField(editable=False)

StateMachineClass() type[statemachine.StateMachine][source]

Get the StateMachine class for the given django Model.

class archivebox.workers.models.ModelWithStateMachine[source]

Bases: archivebox.workers.models.BaseModelWithStateMachine

StatusChoices[source]

None

status: django.db.models.CharField[source]

‘StatusField(…)’

retry_at: django.db.models.DateTimeField[source]

‘RetryAtField(…)’

state_machine_name: str | None[source]

None

state_field_name: str[source]

‘status’

state_machine_attr: str[source]

‘sm’

bind_events_as_methods: bool[source]

False

active_state[source]

None

retry_at_field_name: str[source]

‘retry_at’

class Meta[source]

Bases: archivebox.workers.models.BaseModelWithStateMachine

abstract[source]

True

class archivebox.workers.models.BaseStateMachine(obj, *args, **kwargs)[source]

Bases: statemachine.StateMachine

Base class for all ArchiveBox state machines.

Eliminates boilerplate init, repr, str methods that were duplicated across all 4 state machines (Snapshot, ArchiveResult, Crawl, Binary).

Subclasses must set model_attr_name to specify the attribute name (e.g., ‘snapshot’, ‘archiveresult’, ‘crawl’, ‘binary’).

Example usage: class SnapshotMachine(BaseStateMachine): model_attr_name = ‘snapshot’

    # States and transitions...
    queued = State(value=Snapshot.StatusChoices.QUEUED, initial=True)
    # ...

The model instance is accessible via self.{model_attr_name} (e.g., self.snapshot, self.archiveresult, etc.)

Initialization

model_attr_name: str[source]

‘obj’

_register_callbacks(listeners: list[object])[source]

Register transition callbacks without scanning the Django model.

python-statemachine normally treats the wrapped model as a callback listener. That is useful when transition specs point at methods on the domain object, but ArchiveBox keeps all transition guards/actions on the machine classes themselves (SnapshotMachine.can_start, CrawlMachine.enter_sealed, etc.). Scanning the Django model therefore only adds work: dir(model) is large, callback resolution walks that attribute set for every state/transition, and the cost lands on every .sm construction.

Keep support for explicit external listeners, but do not register self.model as an implicit listener. If a future machine wants model methods as callbacks, pass that model explicitly as a listener at the callsite so the cost is local and visible.

__repr__() str[source]
__str__() str[source]