archivebox.core.models

Module Contents

Classes

UngroupedSubquery

Scalar subquery that should not be copied into the outer GROUP BY.

Tag

SnapshotTag

SnapshotQuerySet

Custom QuerySet for Snapshot model with export methods that persist through .filter() etc.

SnapshotManager

Manager for Snapshot model - uses SnapshotQuerySet for chainable methods

Snapshot

SnapshotMachine

State machine for managing Snapshot lifecycle.

ArchiveResult

API

class archivebox.core.models.UngroupedSubquery[source]

Bases: django.db.models.Subquery

Scalar subquery that should not be copied into the outer GROUP BY.

get_group_by_cols()[source]
class archivebox.core.models.Tag[source]

Bases: archivebox.base_models.models.ModelWithUUID

id[source]

‘AutoField(…)’

created_by[source]

‘ForeignKey(…)’

created_at[source]

‘DateTimeField(…)’

modified_at[source]

‘DateTimeField(…)’

name[source]

‘CharField(…)’

snapshot_set: django.db.models.Manager[Snapshot][source]

None

class Meta[source]

Bases: archivebox.base_models.models.ModelWithUUID.Meta

app_label[source]

‘core’

verbose_name[source]

‘Tag’

verbose_name_plural[source]

‘Tags’

__str__()[source]
property slug: str[source]

ASCII-safe slugified form of the tag name (derived, not stored).

property api_url: str[source]
to_json() dict[source]

Convert Tag model instance to a JSON-serializable dict.

static from_json(record: dict[str, Any], overrides: dict[str, Any] | None = None)[source]

Create/update Tag from JSON dict.

Args: record: JSON dict with ‘name’ field overrides: Optional dict with ‘snapshot’ to auto-attach tag

Returns: Tag instance or None

class archivebox.core.models.SnapshotTag[source]

Bases: django.db.models.Model

id[source]

‘AutoField(…)’

snapshot[source]

‘ForeignKey(…)’

tag[source]

‘ForeignKey(…)’

class Meta[source]
app_label[source]

‘core’

db_table[source]

‘core_snapshot_tags’

unique_together[source]

[(‘snapshot’, ‘tag’)]

class archivebox.core.models.SnapshotQuerySet[source]

Bases: django.db.models.QuerySet

Custom QuerySet for Snapshot model with export methods that persist through .filter() etc.

paged_iterator(chunk_size: int = 500)[source]

Iterate snapshots using bounded keyset pages instead of one streaming cursor.

Django’s iterator(chunk_size=…) still keeps a single SQLite SELECT cursor open until the full queryset is exhausted. That is fine for read-only exports, but update/migration code does filesystem work and writes while iterating; a long-lived read cursor there can stretch lock waits across thousands of rows. This respects the queryset’s existing filters, order_by(), select_related(), and prefetch_related() state; if no ordering is defined, it falls back to primary-key order.

FILTER_TYPES[source]

None

FILTER_TYPE_CHOICES[source]

‘tuple(…)’

FILTER_ARG_KEYS[source]

(‘after’, ‘before’, ‘filter_type’, ‘filter_patterns’, ‘status’, ‘url__icontains’, ‘url__istartswith’…

SPECIAL_FILTER_ARG_KEYS[source]

‘frozenset(…)’

filter_by_patterns(patterns: list[str], filter_type: str = 'exact') archivebox.core.models.SnapshotQuerySet[source]

Filter snapshots by URL patterns using specified filter type

search(**kwargs) archivebox.core.models.SnapshotQuerySet[source]
to_json(with_headers: bool = False) str[source]

Generate JSON index from snapshots

to_csv(cols: list[str] | None = None, header: bool = True, separator: str = ',', ljust: int = 0) str[source]

Generate CSV output from snapshots

to_html(with_headers: bool = True) str[source]

Generate main index HTML from snapshots

class archivebox.core.models.SnapshotManager[source]

Bases: models.Manager.from_queryset(SnapshotQuerySet)

Manager for Snapshot model - uses SnapshotQuerySet for chainable methods

filter(*args, **kwargs)[source]
get_queryset()[source]
remove(atomic: bool = False) tuple[source]

Remove snapshots from the database

class archivebox.core.models.Snapshot[source]

Bases: archivebox.base_models.models.ModelWithDeleteAfter, archivebox.base_models.models.ModelWithOutputDir, archivebox.base_models.models.ModelWithConfig, archivebox.base_models.models.ModelWithNotes, archivebox.base_models.models.ModelWithHealthStats, archivebox.workers.models.ModelWithStateMachine

id[source]

‘CompactUUIDField(…)’

created_at[source]

‘DateTimeField(…)’

modified_at[source]

‘DateTimeField(…)’

url[source]

‘CharField(…)’

timestamp[source]

‘CharField(…)’

bookmarked_at[source]

‘DateTimeField(…)’

crawl: archivebox.crawls.models.Crawl[source]

‘ForeignKey(…)’

parent_snapshot[source]

‘ForeignKey(…)’

title[source]

‘CharField(…)’

downloaded_at[source]

‘DateTimeField(…)’

depth[source]

‘PositiveSmallIntegerField(…)’

fs_version[source]

‘CharField(…)’

current_step[source]

‘PositiveSmallIntegerField(…)’

retry_at[source]

‘RetryAtField(…)’

status[source]

‘StatusField(…)’

config[source]

‘JSONField(…)’

permissions[source]

‘GeneratedField(…)’

output_size[source]

‘BigIntegerField(…)’

notes[source]

‘TextField(…)’

tags[source]

‘ManyToManyField(…)’

state_machine_name[source]

‘archivebox.core.models.SnapshotMachine’

state_field_name[source]

‘status’

retry_at_field_name[source]

‘retry_at’

StatusChoices[source]

None

active_state[source]

None

delete_after_final_statuses[source]

()

RUNNABLE_STATES[source]

()

OPEN_STATES[source]

()

crawl_id: uuid.UUID[source]

None

parent_snapshot_id: uuid.UUID | None[source]

None

_prefetched_objects_cache: dict[str, Any][source]

None

objects[source]

‘SnapshotManager(…)’

archiveresult_set: django.db.models.Manager[ArchiveResult][source]

None

class Meta[source]

Bases: archivebox.base_models.models.ModelWithDeleteAfter.Meta, archivebox.base_models.models.ModelWithOutputDir.Meta, archivebox.base_models.models.ModelWithConfig.Meta, archivebox.base_models.models.ModelWithNotes.Meta, archivebox.base_models.models.ModelWithHealthStats.Meta, archivebox.workers.models.ModelWithStateMachine.Meta

app_label[source]

‘core’

verbose_name[source]

‘Snapshot’

verbose_name_plural[source]

‘Snapshots’

indexes[source]

None

constraints[source]

None

__str__()[source]
classmethod crawl_count_subquery(*, status: str | None = None, outer_ref: str = 'pk') django.db.models.QuerySet[source]

Return a scalar subquery counting Snapshots for one outer Crawl.

classmethod crawl_count_expr(*, status: str | None = None, outer_ref: str = 'pk')[source]
classmethod crawl_total_and_status_counts(crawl_ids: collections.abc.Iterable[Any], *, status: str) dict[str, dict[str, int]][source]

Return total and status-filtered Snapshot counts keyed by Crawl ID.

update_and_requeue(**kwargs) bool[source]

Update this Snapshot through the shared retry_at ownership path.

Any non-final Snapshot work means the parent Crawl must also be visible to the runner. Keep that invariant here so CLI/admin callers do not hand-edit the parent Crawl state every time they retry a hook.

queue_for_extraction(*, when=None) bool[source]

Queue this Snapshot for the runner using the normal state path.

pause(*, save: bool = True) bool[source]
resume(*, when: datetime.datetime | None = None, save: bool = True) bool[source]
restore_paused_scheduler_marker() None[source]

Keep explicit maintenance from accidentally resuming paused snapshots.

Targeted jobs such as archivebox update --index-only may bump retry_at so the orchestrator can run only queued search ArchiveResult rows. After that maintenance pass, the lifecycle must remain PAUSED and retry_at must go back to MAX until a real resume transition happens.

reconcile_parent_lifecycle(*, lock_seconds: int = 60) bool | None[source]

Follow parent Crawl pause/seal state before any Snapshot work runs.

Crawl.pause()/cancel() only wake child rows. The runner claims each due Snapshot and lets this method perform the actual child transition, so cancellation stays fast and Snapshot cleanup still runs from the normal state-machine owner.

finalize_completed_upload_results() int[source]
reset_abandoned_results() tuple[int, int][source]
cancel() None[source]
get_delete_after_config_value()[source]
classmethod missing_delete_at_candidates()[source]
classmethod is_archivebox_internal_url(url: str) bool[source]
property created_by[source]

Convenience property to access the user who created this snapshot via its crawl.

property process_set[source]

Get all Process objects related to this snapshot’s ArchiveResults.

property binary_set[source]

Get all Binary objects used by processes related to this snapshot.

save(*args, **kwargs)[source]
static _fs_current_version() str[source]

Get current ArchiveBox filesystem layout version.

property fs_migration_needed: bool[source]

Check if snapshot needs filesystem migration

_fs_next_version(version: str) str[source]

Get next version in migration chain (0.7/0.8 had same layout, only 0.8→0.9 migration needed)

static is_legacy_archive_dir(path: pathlib.Path) bool[source]

Return True for old-style archive/{timestamp} snapshot directories.

migrate_filesystem_to_current_version(source_dir: pathlib.Path | None = None, config: ArchiveBoxBaseConfig | None = None) None[source]

Copy legacy snapshot output into the current layout and defer old-dir cleanup.

The ordering is intentionally crash-safe:

  1. Copy from the legacy directory into the new directory idempotently.

  2. Verify the new directory has every old file.

  3. Convert metadata in the new directory.

  4. Update fs_version in memory for the caller to save.

  5. Cleanup is scheduled only after the DB commit succeeds.

_fs_migrate_from_0_7_0_to_0_9_0(source_dir: pathlib.Path | None = None, config: ArchiveBoxBaseConfig | None = None)[source]
_fs_migrate_from_0_8_0_to_0_9_0(source_dir: pathlib.Path | None = None, config: ArchiveBoxBaseConfig | None = None)[source]
_fs_migrate_from_0_9_0_to_0_9_4(source_dir: pathlib.Path | None = None, config: ArchiveBoxBaseConfig | None = None)[source]
_fs_migrate_legacy_to_0_9_0(source_dir: pathlib.Path | None = None, target_dir: pathlib.Path | None = None, config: ArchiveBoxBaseConfig | None = None)[source]

Migrate from flat to nested structure.

0.8.x: archive/{timestamp}/ 0.9.x: archive/users/{user}/snapshots/YYYYMMDD/{domain}/{uuid}/

_cleanup_old_migration_dir(old_dir: pathlib.Path, new_dir: pathlib.Path)[source]

Delete old directory and create symlink after successful migration.

static extract_domain_from_url(url: str) str[source]

Extract domain from URL for 0.9.x path structure. Uses full hostname with sanitized special chars.

Examples: https://example.com:8080 → example.com_8080 https://sub.example.comsub.example.com file:///path → localhost data:text/html → data

get_storage_path_for_version(version: str) pathlib.Path[source]

Calculate storage path for specific filesystem version. Centralizes path logic so it’s reusable.

0.7.x/0.8.x: archive/{timestamp} 0.9.x: archive/users/{username}/snapshots/YYYYMMDD/{domain}/{uuid}/

classmethod load_from_directory(snapshot_dir: pathlib.Path) Optional[archivebox.core.models.Snapshot][source]

Load existing Snapshot from DB by reading index.jsonl or index.json.

Reads index file, extracts url+timestamp, queries DB. Returns existing Snapshot or None if not found/invalid. Does NOT create new snapshots.

ONLY used by: archivebox update (for orphan detection)

classmethod create_from_directory(snapshot_dir: pathlib.Path) Optional[archivebox.core.models.Snapshot][source]

Create new Snapshot from orphaned directory.

Validates timestamp, ensures uniqueness. Returns new UNSAVED Snapshot or None if invalid.

ONLY used by: archivebox update (for orphan import)

static _select_best_timestamp(index_timestamp: object | None, folder_name: str) str | None[source]

Select best timestamp from index.json vs folder name.

Validates range (1995-2035). When a valid legacy folder name is available it is the stable filesystem identity, so preserve it over normalized variants like “1508259732.0” found in old index files.

classmethod _ensure_unique_timestamp(url: str, timestamp: str) str[source]

Ensure timestamp is globally unique. If there is a collision, add a tiny fractional suffix until unique.

static _detect_fs_version_from_index(data: dict) str[source]

Detect fs_version from index.json structure.

  • Has fs_version field: use it

  • Has history dict: 0.7.0

  • Has archive_results list: 0.8.0

  • Default: 0.7.0

reconcile_with_index(output_dir: pathlib.Path | None = None, update_existing_archive_results: bool = True)[source]

Merge index.json/index.jsonl with DB. DB is source of truth.

  • Title: longest non-URL

  • Tags: union

  • ArchiveResults: keep both (by plugin+start_ts)

Converts index.json to index.jsonl if needed, then writes back in JSONL format.

Used by: archivebox update (to sync index with DB)

reconcile_with_index_json(output_dir: pathlib.Path | None = None, update_existing_archive_results: bool = True)[source]

Deprecated: use reconcile_with_index() instead.

_merge_title_from_index(index_data: dict)[source]

Merge title - prefer longest non-URL title.

_merge_tags_from_index(index_data: dict)[source]

Merge tags - union of both sources.

_merge_archive_results_from_index(index_data: dict, update_existing: bool = True)[source]

Merge ArchiveResults one row per hook; retries update the existing row.

_create_archive_result_if_missing(result_data: dict, existing: dict, update_existing: bool = True)[source]

Create ArchiveResult if not already in DB.

write_index_json()[source]

Write index.json in 0.9.x format (deprecated, use write_index_jsonl).

write_index_jsonl(output_dir: pathlib.Path | None = None)[source]

Write index.jsonl in flat JSONL format.

Each line is a JSON record with a ‘type’ field:

  • Snapshot: snapshot metadata (crawl_id, url, tags, etc.)

  • ArchiveResult: extractor results (plugin, status, output, etc.)

  • Binary: binary info used for the extraction

  • Process: process execution details (cmd, exit_code, timing, etc.)

read_index_jsonl(output_dir: pathlib.Path | None = None) dict[source]

Read index.jsonl and return parsed records grouped by type.

Returns dict with keys: ‘snapshot’, ‘archive_results’, ‘binaries’, ‘processes’

convert_index_json_to_jsonl(output_dir: pathlib.Path | None = None) bool[source]

Convert index.json to index.jsonl format.

Reads existing index.json, creates index.jsonl, and removes index.json. Returns True if conversion was performed, False if no conversion needed.

static move_directory_to_invalid(snapshot_dir: pathlib.Path)[source]

Move invalid directory to data/invalid/YYYYMMDD/.

Used by: archivebox update (when encountering invalid directories)

classmethod find_and_merge_duplicates() int[source]

Find and merge snapshots with same url:timestamp. Returns count of duplicate sets merged.

Used by: archivebox update (Phase 3: deduplication)

classmethod _merge_snapshots(snapshots: collections.abc.Sequence[archivebox.core.models.Snapshot])[source]

Merge exact duplicates. Keep oldest, union files + ArchiveResults.

property output_dir_parent: str[source]
property output_dir_name: str[source]
archive(overwrite=False, methods=None)[source]
tags_str(nocache=True) str | None[source]
icons(path: str | None = None) str[source]

Generate HTML icons showing which extractor plugins have succeeded for this snapshot

property api_url: str[source]
get_absolute_url()[source]
domain() str[source]
property title_stripped: str[source]
static _normalize_title_candidate(candidate: str | None, *, snapshot_url: str) str[source]
property resolved_title: str[source]
hashes_index() dict[str, dict[str, Any]][source]
property output_dir: pathlib.Path[source]

The filesystem path to the snapshot’s output directory.

Ensure the legacy archive/ path resolves to this snapshot.

Ensure snapshot is symlinked under its crawl output directory.

legacy_archive_path() str[source]
archive_path_from_db() str[source]

Best-effort public URL path derived from DB fields only.

url_path() str[source]

URL path matching the current snapshot output_dir layout.

archive_path()[source]
archive_size()[source]
save_tags(tags: collections.abc.Iterable[str] = ()) None[source]
pending_archiveresults() django.db.models.QuerySet[archivebox.core.models.ArchiveResult][source]
run() list[archivebox.core.models.ArchiveResult][source]

Execute snapshot by creating pending ArchiveResults for all enabled hooks.

Returns: list[ArchiveResult]: Newly created pending results

cleanup()[source]

Clean up background ArchiveResult hooks and empty results.

Called by the state machine when entering the ‘sealed’ state. Deletes empty ArchiveResults after the abx-dl cleanup phase has finished.

to_json() dict[source]

Convert Snapshot model instance to a JSON-serializable dict. Includes all fields needed to fully reconstruct/identify this snapshot.

static from_json(record: dict[str, Any], overrides: dict[str, Any] | None = None, queue_for_extraction: bool = True)[source]

Create/update Snapshot from JSON dict.

Unified method that handles:

  • ID-based patching: {“id”: “…”, “title”: “new title”}

  • URL-based create/update: {“url”: “…”, “title”: “…”, “tags”: “…”}

  • Auto-creates Crawl if not provided

  • Optionally queues for extraction

Args: record: Dict with ‘url’ (for create) or ‘id’ (for patch), plus other fields overrides: Dict with ‘crawl’, ‘snapshot’ (parent), ‘created_by_id’ queue_for_extraction: If True, sets status=QUEUED and retry_at (default: True)

Returns: Snapshot instance or None

create_pending_archiveresults() list[archivebox.core.models.ArchiveResult][source]

Create ArchiveResult records for all enabled hooks.

Uses the hooks system to discover available hooks from:

  • abx_plugins/plugins//on_Snapshot__.{py,sh,js}

  • data/custom_plugins//on_Snapshot__.{py,sh,js}

Creates one ArchiveResult per hook (not per plugin), with hook_name set. This enables step-based execution where all hooks in a step can run in parallel.

is_finished_processing() bool[source]

Check if all ArchiveResults are finished.

Note: This is only called for observability/progress tracking. The shared runner owns execution and does not poll this.

get_progress_stats() dict[source]

Get progress statistics for this snapshot’s archiving process.

Returns dict with: - total: Total number of archive results - succeeded: Number of succeeded results - failed: Number of failed results - running: Number of currently running results - pending: Number of pending/queued results - percent: Completion percentage (0-100) - output_size: Total output size in bytes - is_sealed: Whether the snapshot is in a final state

retry_failed_archiveresults() int[source]

Reset failed/skipped ArchiveResults to queued for retry.

Returns count of ArchiveResults reset.

url_hash() str[source]
scheme() str[source]
path() str[source]
basename() str[source]
extension() str[source]
base_url() str[source]
is_static() bool[source]
is_archived() bool[source]
bookmarked_date() str | None[source]
downloaded_datestr() str | None[source]
archive_dates() list[datetime.datetime][source]
oldest_archive_date() datetime.datetime | None[source]
newest_archive_date() datetime.datetime | None[source]
num_outputs() int[source]
num_failures() int[source]
latest_outputs(status: str | None = None) dict[str, Any][source]

Get the latest output that each plugin produced

discover_outputs(include_filesystem_fallback: bool = True) list[dict][source]

Discover output files from ArchiveResults and filesystem.

to_dict(extended: bool = False) dict[str, Any][source]

Convert Snapshot to a dictionary (replacement for Link._asdict())

to_json_str(indent: int = 4) str[source]

Convert to JSON string (legacy method, use to_json() for dict)

to_csv(cols: list[str] | None = None, separator: str = ',', ljust: int = 0) str[source]

Convert to CSV string

write_json_details(out_dir: pathlib.Path | str | None = None) None[source]

Write JSON index file for this snapshot to its output directory

write_html_details(out_dir: pathlib.Path | str | None = None) None[source]

Write HTML detail page for this snapshot to its output directory

get_detail_page_auxiliary_items(outputs: list[dict] | None = None, hidden_card_plugins: set[str] | None = None) tuple[list[dict[str, object]], list[dict[str, object]]][source]
static _ts_to_date_str(dt: datetime.datetime | None) str | None[source]
class archivebox.core.models.SnapshotMachine(obj, *args, **kwargs)[source]

Bases: archivebox.workers.models.BaseStateMachine

State machine for managing Snapshot lifecycle.

Hook Lifecycle: ┌─────────────────────────────────────────────────────────────┐ │ QUEUED State │ │ • Waiting for snapshot to be ready │ └─────────────────────────────────────────────────────────────┘ ↓ tick() when can_start() ┌─────────────────────────────────────────────────────────────┐ │ STARTED State → enter_started() │ │ 1. snapshot.run() │ │ • discover_hooks(‘Snapshot’) → finds all plugin hooks │ │ • create_pending_archiveresults() → creates ONE │ │ ArchiveResult per hook (NO execution yet) │ │ 2. The shared abx-dl runner executes hooks and the │ │ projector updates ArchiveResult rows from events │ │ 3. Advance through steps 0-9 as foreground hooks complete │ └─────────────────────────────────────────────────────────────┘ ↓ tick() when is_finished() ┌─────────────────────────────────────────────────────────────┐ │ SEALED State → enter_sealed() │ │ • cleanup() → kills any background hooks still running │ │ • Set retry_at=None (no more processing) │ └─────────────────────────────────────────────────────────────┘

https://github.com/ArchiveBox/ArchiveBox/wiki/ArchiveBox-Architecture-Diagrams

Initialization

model_attr_name[source]

‘snapshot’

queued[source]

‘State(…)’

started[source]

‘State(…)’

paused[source]

‘State(…)’

sealed[source]

‘State(…)’

tick[source]

None

seal[source]

None

pause_requested[source]

None

resume_requested[source]

‘to(…)’

snapshot: archivebox.core.models.Snapshot[source]

None

can_start() bool[source]
is_finished() bool[source]

Check if all ArchiveResults for this snapshot are finished.

has_finished_archive_results() bool[source]

A queued snapshot with only final projected rows was interrupted after hook completion.

enter_queued()[source]
enter_paused()[source]
enter_started()[source]

Just mark as started. The shared runner creates ArchiveResults and runs hooks.

enter_sealed()[source]
class archivebox.core.models.ArchiveResult[source]

Bases: archivebox.base_models.models.ModelWithDeleteAfter, archivebox.base_models.models.ModelWithOutputDir, archivebox.base_models.models.ModelWithNotes

class StatusChoices[source]

Bases: django.db.models.TextChoices

QUEUED[source]

(‘queued’, ‘Queued’)

STARTED[source]

(‘started’, ‘Started’)

PAUSED[source]

(‘paused’, ‘Paused’)

BACKOFF[source]

(‘backoff’, ‘Waiting to retry’)

SUCCEEDED[source]

(‘succeeded’, ‘Succeeded’)

FAILED[source]

(‘failed’, ‘Failed’)

SKIPPED[source]

(‘skipped’, ‘Skipped’)

NORESULTS[source]

(‘noresults’, ‘No Results’)

INITIAL_STATE[source]

None

ACTIVE_STATE[source]

None

FINAL_STATES[source]

()

FINAL_OR_ACTIVE_STATES[source]

()

delete_after_final_statuses[source]

None

classmethod normalize_status(status: str | None) str[source]
static output_files_upload_complete(output_files: dict[str, dict[str, Any]]) bool[source]
classmethod get_plugin_choices()[source]

Get plugin choices from discovered hooks (for forms/admin).

classmethod snapshot_count_subquery(*, status: str | None = None, outer_ref: str = 'pk') django.db.models.QuerySet[source]

Return a scalar subquery counting ArchiveResults for one outer Snapshot.

Use this instead of filtered join aggregates for per-row Snapshot counts: the scalar form lets SQLite probe the covering (snapshot_id, status) or (status, snapshot_id) indexes once per visible Snapshot row, instead of joining and grouping the whole candidate Snapshot queryset.

classmethod snapshot_half_count_subquery(*, outer_ref: str = 'snapshot_id') django.db.models.QuerySet[source]
classmethod snapshot_count_expr(*, status: str | None = None, outer_ref: str = 'pk')[source]
classmethod status_counts(queryset: django.db.models.QuerySet | None = None, statuses: collections.abc.Iterable[str] | None = None) dict[str, int][source]

Count requested statuses with separate indexed COUNT probes.

classmethod snapshot_ids_with_majority_status(status: str | collections.abc.Iterable[str]) django.db.models.QuerySet[source]

Return Snapshot IDs where more than half of ArchiveResults have status.

Start from ArchiveResult.status for every majority-status filter. The (status, snapshot_id) index keeps the plan predictable even when a user’s collection has an unusual status distribution.

classmethod cached_snapshot_ids_with_majority_status(status: str | collections.abc.Iterable[str], *, timeout: int = 60) tuple[str, ...][source]
classmethod clear_majority_status_cache() None[source]
id[source]

‘CompactUUIDField(…)’

created_at[source]

‘DateTimeField(…)’

modified_at[source]

‘DateTimeField(…)’

snapshot: archivebox.core.models.Snapshot[source]

‘ForeignKey(…)’

plugin[source]

‘CharField(…)’

hook_name[source]

‘CharField(…)’

process[source]

‘OneToOneField(…)’

output_str[source]

‘TextField(…)’

output_json[source]

‘JSONField(…)’

output_files[source]

‘JSONField(…)’

output_size[source]

‘BigIntegerField(…)’

output_mimetypes[source]

‘CharField(…)’

start_ts[source]

‘DateTimeField(…)’

end_ts[source]

‘DateTimeField(…)’

status[source]

‘CharField(…)’

retry_at[source]

‘DateTimeField(…)’

notes[source]

‘TextField(…)’

snapshot_id: uuid.UUID[source]

None

process_id: uuid.UUID | None[source]

None

class Meta[source]

Bases: archivebox.base_models.models.ModelWithDeleteAfter.Meta, archivebox.base_models.models.ModelWithOutputDir.Meta, archivebox.base_models.models.ModelWithNotes.Meta

app_label[source]

‘core’

verbose_name[source]

‘Archive Result’

verbose_name_plural[source]

‘Archive Results Log’

indexes[source]

None

constraints[source]

None

__str__()[source]
static _format_output_line_for_display(line: str) str[source]
output_str_for_display() str[source]
get_delete_after_config_value()[source]
classmethod missing_delete_at_candidates()[source]
property created_by[source]

Convenience property to access the user who created this archive result via its snapshot’s crawl.

to_json(*, snapshot_output_dir: pathlib.Path | None = None) dict[source]

Convert ArchiveResult model instance to a JSON-serializable dict.

static from_json(record: dict[str, Any], overrides: dict[str, Any] | None = None)[source]

Create/update ArchiveResult from JSON dict.

Args: record: JSON dict with ‘snapshot_id’, ‘plugin’, etc. overrides: Optional dict of field overrides

Returns: ArchiveResult instance or None

save(*args, **kwargs)[source]
delete(*args, **kwargs)[source]
static refresh_snapshot_output_sizes(snapshot_ids)[source]
snapshot_dir()[source]
url()[source]
property api_url: str[source]
get_absolute_url()[source]
reset_for_retry(*, save: bool = True) None[source]
property is_paused: bool[source]
classmethod pause_queryset(queryset) int[source]
classmethod resume_queryset(queryset, *, when: datetime.datetime | None = None) int[source]
pause(*, save: bool = True) bool[source]
resume(*, when: datetime.datetime | None = None, save: bool = True) bool[source]
property plugin_module: Any | None[source]
static _normalize_output_files(raw_output_files: Any) dict[str, dict[str, Any]][source]
static _coerce_output_file_size(value: Any) int[source]
output_file_map() dict[str, dict[str, Any]][source]
output_file_paths() list[str][source]
output_file_count() int[source]
output_size_from_files() int[source]
update_output_metadata_from_filesystem(snapshot_dir: pathlib.Path | None = None, save: bool = True) bool[source]
output_exists() bool[source]
static _looks_like_output_path(raw_output: str | None, plugin_name: str | None = None) bool[source]
_existing_output_path(raw_output: str | None) str | None[source]
static _fallback_output_file_path(output_file_paths: collections.abc.Sequence[str], plugin_name: str | None = None, output_file_map: dict[str, dict[str, Any]] | None = None) str | None[source]
static _find_best_output_file(dir_path: pathlib.Path, plugin_name: str | None = None) pathlib.Path | None[source]
embed_path_db(output_file_map: dict[str, dict[str, Any]] | None = None) str | None[source]
embed_path() str | None[source]

Get the relative path to the embeddable output file for this result.

This is intentionally DB-backed only so snapshot/admin rendering stays fast and predictable without filesystem probes.

property output_dir_name: str[source]
property output_dir_parent: str[source]
property process_record[source]
property pwd: str[source]

Working directory, derived from the snapshot/plugin path if the Process row is gone.

property cmd: list[source]

Command array (from Process).

property cmd_version: str[source]

Command version (from Process.binary).

property binary[source]

Binary FK (from Process).

property iface[source]

Network interface FK (from Process).

property machine[source]

Machine FK (from Process).

property timeout: int[source]

Timeout in seconds (from Process).

save_search_index()[source]
update_from_output()[source]

Update this ArchiveResult from filesystem logs and output files.

Used for Snapshot cleanup / orphan recovery when a hook’s output exists on disk but the projector did not finalize the row in the database.

Updates:

  • status, output_str, output_json from ArchiveResult JSONL record

  • output_files, output_size, output_mimetypes by walking filesystem

  • end_ts, cmd, cmd_version, binary FK

  • Processes side-effect records (Snapshot, Tag, etc.) via process_hook_records()

_set_binary_from_cmd(cmd: list) None[source]

Find Binary for command and set binary FK.

Tries matching by absolute path first, then by binary name. Only matches binaries on the current machine.

_url_passes_filters(url: str) bool[source]

Check if URL passes URL_ALLOWLIST and URL_DENYLIST config filters.

Uses the centralized config resolver so frozen crawl/snapshot values and live Machine/Persona execution values apply in their scoped order.

property output_dir: pathlib.Path[source]

Get the output directory for this plugin’s results.