archivebox.workers.models
Module Contents
Classes
Base class for all ArchiveBox state machines. |
Data
API
- archivebox.workers.models.default_retry_at_field: django.db.models.DateTimeField[source]
‘DateTimeField(…)’
- class archivebox.workers.models.BaseModelWithStateMachine[source]
Bases:
django.db.models.Model- StatusChoices: ClassVar[type[archivebox.workers.models.DefaultStatusChoices]][source]
None
- active_state: archivebox.workers.models.ObjectState[source]
None
- property sm: statemachine.StateMachine[source]
Build the python-statemachine wrapper only at transition callsites.
This model is loaded by high-volume paths that do not drive lifecycle transitions: admin lists, progress polling, index-only maintenance, and bulk recovery scans all instantiate thousands of rows just to read or update ordinary columns. python-statemachine setup is correct but not free: it creates per-instance state wrappers, callback registries, queues, locks, and callback adapters. Paying that cost from Django’s model init made plain ORM materialization scale with state-machine setup instead of row decoding.
ArchiveBox drives lifecycle transitions explicitly through
.sm(snapshot.sm.tick(),crawl.sm.seal(), etc.), so the machine can be cached on first use without changing the state model. Code that only needs database fields never constructs one.
- classmethod status_counts(queryset: django.db.models.QuerySet | None = None, statuses: collections.abc.Iterable[str] | None = None) dict[str, int][source]
Count requested statuses with separate indexed COUNT probes.
For live/progress views this is often faster on large SQLite data dirs than a grouped aggregate, because each status can use the status index directly and the caller usually needs only a few states.
- static _state_to_str(state: archivebox.workers.models.ObjectState) str[source]
Convert a statemachine.State, models.TextChoices.choices value, or Enum value to a str
- safe_update(update_fields: dict[str, Any], *, refresh: bool = True, extra_filter: dict[str, Any] | None = None) bool[source]
Atomic single-row UPDATE for scheduler writes that bypass save().
The write is unconditional unless the caller passes extra_filter — the previous implicit modified_at CAS predicate spuriously collided with concurrent writers to unrelated fields (every save bumps modified_at), which silently dropped state-machine transitions. Callers that need a transition guard (only advance from state A to state B; only requeue a row still holding lease X) pass extra_filter explicitly.
- update_and_requeue(*, refresh: bool = True, **kwargs) bool[source]
Scheduler-facing wrapper around safe_update().
Call this when a state-machine row should become visible to the runner. It preserves the current retry_at lease as an additional guard while safe_update() owns the modified_at CAS write and refresh.
- classmethod get_queue()[source]
Get the sorted and filtered QuerySet of objects that are ready for processing. retry_at is the only scheduler signal; callers branch on status after selection.
- classmethod claim_for_worker(obj: archivebox.workers.models.BaseModelWithStateMachine, lock_seconds: int = 60) bool[source]
Atomically claim a due object for processing using retry_at as the lock.
Correct lifecycle for any state-machine-driven work item:
Queue the item by setting retry_at <= now
Exactly one owner claims it by moving retry_at into the future
Only that owner may call .sm.tick() and perform side effects
State-machine callbacks update retry_at again when the work completes, backs off, or is re-queued
The critical rule is that future retry_at values are already owned. Callers must never “steal” those future timestamps and start another copy of the same work. That is what prevents duplicate installs, hook runs, and other concurrent side effects.
Returns True if successfully claimed, False if another worker got it first or the object is not currently due.
- claim_processing_lock(lock_seconds: int = 60) bool[source]
Claim this model instance immediately before executing one state-machine tick.
This helper is the safe entrypoint for any direct state-machine driver (workers, synchronous crawl dependency installers, one-off CLI helpers). Calling
.sm.tick()without claiming first turns retry_at into “just a schedule” instead of the ownership lock it is meant to be.Returns True only for the caller that successfully moved retry_at into the future. False means another process already owns the work item or it is not currently due.
- tick_claimed(lock_seconds: int = 60) bool[source]
Claim ownership via retry_at and then execute exactly one
.sm.tick().Future maintainers should prefer this helper over calling
.sm.tick()directly whenever there is any chance another process could see the same queued row. If this method returns False, someone else already owns the work and the caller must not run side effects for it.
- classmethod extend_choices(base_choices: type[django.db.models.TextChoices])[source]
Decorator to extend the base choices with extra choices, e.g.:
class MyModel(ModelWithStateMachine):
@ModelWithStateMachine.extend_choices(ModelWithStateMachine.StatusChoices) class StatusChoices(models.TextChoices): SUCCEEDED = 'succeeded' FAILED = 'failed' SKIPPED = 'skipped'
- classmethod StatusField(**kwargs) django.db.models.CharField[source]
Used on subclasses to extend/modify the status field with updated kwargs. e.g.:
class MyModel(ModelWithStateMachine): class StatusChoices(ModelWithStateMachine.StatusChoices): QUEUED = ‘queued’, ‘Queued’ STARTED = ‘started’, ‘Started’ SEALED = ‘sealed’, ‘Sealed’ BACKOFF = ‘backoff’, ‘Backoff’ FAILED = ‘failed’, ‘Failed’ SKIPPED = ‘skipped’, ‘Skipped’
status = ModelWithStateMachine.StatusField(choices=StatusChoices.choices, default=StatusChoices.QUEUED)
- class archivebox.workers.models.ModelWithStateMachine[source]
- class archivebox.workers.models.BaseStateMachine(obj, *args, **kwargs)[source]
Bases:
statemachine.StateMachineBase class for all ArchiveBox state machines.
Eliminates boilerplate init, repr, str methods that were duplicated across all 4 state machines (Snapshot, ArchiveResult, Crawl, Binary).
Subclasses must set model_attr_name to specify the attribute name (e.g., ‘snapshot’, ‘archiveresult’, ‘crawl’, ‘binary’).
Example usage: class SnapshotMachine(BaseStateMachine): model_attr_name = ‘snapshot’
# States and transitions... queued = State(value=Snapshot.StatusChoices.QUEUED, initial=True) # ...The model instance is accessible via self.{model_attr_name} (e.g., self.snapshot, self.archiveresult, etc.)
Initialization
- _register_callbacks(listeners: list[object])[source]
Register transition callbacks without scanning the Django model.
python-statemachine normally treats the wrapped model as a callback listener. That is useful when transition specs point at methods on the domain object, but ArchiveBox keeps all transition guards/actions on the machine classes themselves (
SnapshotMachine.can_start,CrawlMachine.enter_sealed, etc.). Scanning the Django model therefore only adds work:dir(model)is large, callback resolution walks that attribute set for every state/transition, and the cost lands on every.smconstruction.Keep support for explicit external listeners, but do not register
self.modelas an implicit listener. If a future machine wants model methods as callbacks, pass that model explicitly as a listener at the callsite so the cost is local and visible.