archivebox.core.models
Module Contents
Classes
Scalar subquery that should not be copied into the outer GROUP BY. |
|
Custom QuerySet for Snapshot model with export methods that persist through .filter() etc. |
|
Manager for Snapshot model - uses SnapshotQuerySet for chainable methods |
|
State machine for managing Snapshot lifecycle. |
|
API
- class archivebox.core.models.UngroupedSubquery[source]
Bases:
django.db.models.SubqueryScalar subquery that should not be copied into the outer GROUP BY.
- class archivebox.core.models.Tag[source]
- class archivebox.core.models.SnapshotQuerySet[source]
Bases:
django.db.models.QuerySetCustom QuerySet for Snapshot model with export methods that persist through .filter() etc.
- paged_iterator(chunk_size: int = 500)[source]
Iterate snapshots using bounded keyset pages instead of one streaming cursor.
Django’s iterator(chunk_size=…) still keeps a single SQLite SELECT cursor open until the full queryset is exhausted. That is fine for read-only exports, but update/migration code does filesystem work and writes while iterating; a long-lived read cursor there can stretch lock waits across thousands of rows. This respects the queryset’s existing filters, order_by(), select_related(), and prefetch_related() state; if no ordering is defined, it falls back to primary-key order.
- FILTER_ARG_KEYS[source]
(‘after’, ‘before’, ‘filter_type’, ‘filter_patterns’, ‘status’, ‘url__icontains’, ‘url__istartswith’…
- filter_by_patterns(patterns: list[str], filter_type: str = 'exact') archivebox.core.models.SnapshotQuerySet[source]
Filter snapshots by URL patterns using specified filter type
- search(**kwargs) archivebox.core.models.SnapshotQuerySet[source]
- class archivebox.core.models.SnapshotManager[source]
Bases:
models.Manager.from_queryset(SnapshotQuerySet)Manager for Snapshot model - uses SnapshotQuerySet for chainable methods
- class archivebox.core.models.Snapshot[source]
Bases:
archivebox.base_models.models.ModelWithDeleteAfter,archivebox.base_models.models.ModelWithOutputDir,archivebox.base_models.models.ModelWithConfig,archivebox.base_models.models.ModelWithNotes,archivebox.base_models.models.ModelWithHealthStats,archivebox.workers.models.ModelWithStateMachine- crawl: archivebox.crawls.models.Crawl[source]
‘ForeignKey(…)’
- archiveresult_set: django.db.models.Manager[ArchiveResult][source]
None
- class Meta[source]
Bases:
archivebox.base_models.models.ModelWithDeleteAfter.Meta,archivebox.base_models.models.ModelWithOutputDir.Meta,archivebox.base_models.models.ModelWithConfig.Meta,archivebox.base_models.models.ModelWithNotes.Meta,archivebox.base_models.models.ModelWithHealthStats.Meta,archivebox.workers.models.ModelWithStateMachine.Meta
- classmethod crawl_count_subquery(*, status: str | None = None, outer_ref: str = 'pk') django.db.models.QuerySet[source]
Return a scalar subquery counting Snapshots for one outer Crawl.
- classmethod crawl_total_and_status_counts(crawl_ids: collections.abc.Iterable[Any], *, status: str) dict[str, dict[str, int]][source]
Return total and status-filtered Snapshot counts keyed by Crawl ID.
- update_and_requeue(**kwargs) bool[source]
Update this Snapshot through the shared retry_at ownership path.
Any non-final Snapshot work means the parent Crawl must also be visible to the runner. Keep that invariant here so CLI/admin callers do not hand-edit the parent Crawl state every time they retry a hook.
- queue_for_extraction(*, when=None) bool[source]
Queue this Snapshot for the runner using the normal state path.
- restore_paused_scheduler_marker() None[source]
Keep explicit maintenance from accidentally resuming paused snapshots.
Targeted jobs such as
archivebox update --index-onlymay bump retry_at so the orchestrator can run only queued search ArchiveResult rows. After that maintenance pass, the lifecycle must remain PAUSED and retry_at must go back to MAX until a real resume transition happens.
- reconcile_parent_lifecycle(*, lock_seconds: int = 60) bool | None[source]
Follow parent Crawl pause/seal state before any Snapshot work runs.
Crawl.pause()/cancel() only wake child rows. The runner claims each due Snapshot and lets this method perform the actual child transition, so cancellation stays fast and Snapshot cleanup still runs from the normal state-machine owner.
- property created_by[source]
Convenience property to access the user who created this snapshot via its crawl.
- _fs_next_version(version: str) str[source]
Get next version in migration chain (0.7/0.8 had same layout, only 0.8→0.9 migration needed)
- static is_legacy_archive_dir(path: pathlib.Path) bool[source]
Return True for old-style archive/{timestamp} snapshot directories.
- migrate_filesystem_to_current_version(source_dir: pathlib.Path | None = None, config: ArchiveBoxBaseConfig | None = None) None[source]
Copy legacy snapshot output into the current layout and defer old-dir cleanup.
The ordering is intentionally crash-safe:
Copy from the legacy directory into the new directory idempotently.
Verify the new directory has every old file.
Convert metadata in the new directory.
Update fs_version in memory for the caller to save.
Cleanup is scheduled only after the DB commit succeeds.
- _fs_migrate_from_0_7_0_to_0_9_0(source_dir: pathlib.Path | None = None, config: ArchiveBoxBaseConfig | None = None)[source]
- _fs_migrate_from_0_8_0_to_0_9_0(source_dir: pathlib.Path | None = None, config: ArchiveBoxBaseConfig | None = None)[source]
- _fs_migrate_from_0_9_0_to_0_9_4(source_dir: pathlib.Path | None = None, config: ArchiveBoxBaseConfig | None = None)[source]
- _fs_migrate_legacy_to_0_9_0(source_dir: pathlib.Path | None = None, target_dir: pathlib.Path | None = None, config: ArchiveBoxBaseConfig | None = None)[source]
Migrate from flat to nested structure.
0.8.x: archive/{timestamp}/ 0.9.x: archive/users/{user}/snapshots/YYYYMMDD/{domain}/{uuid}/
- _cleanup_old_migration_dir(old_dir: pathlib.Path, new_dir: pathlib.Path)[source]
Delete old directory and create symlink after successful migration.
- static extract_domain_from_url(url: str) str[source]
Extract domain from URL for 0.9.x path structure. Uses full hostname with sanitized special chars.
Examples: https://example.com:8080 → example.com_8080 https://sub.example.com → sub.example.com file:///path → localhost data:text/html → data
- get_storage_path_for_version(version: str) pathlib.Path[source]
Calculate storage path for specific filesystem version. Centralizes path logic so it’s reusable.
0.7.x/0.8.x: archive/{timestamp} 0.9.x: archive/users/{username}/snapshots/YYYYMMDD/{domain}/{uuid}/
- classmethod load_from_directory(snapshot_dir: pathlib.Path) Optional[archivebox.core.models.Snapshot][source]
Load existing Snapshot from DB by reading index.jsonl or index.json.
Reads index file, extracts url+timestamp, queries DB. Returns existing Snapshot or None if not found/invalid. Does NOT create new snapshots.
ONLY used by: archivebox update (for orphan detection)
- classmethod create_from_directory(snapshot_dir: pathlib.Path) Optional[archivebox.core.models.Snapshot][source]
Create new Snapshot from orphaned directory.
Validates timestamp, ensures uniqueness. Returns new UNSAVED Snapshot or None if invalid.
ONLY used by: archivebox update (for orphan import)
- static _select_best_timestamp(index_timestamp: object | None, folder_name: str) str | None[source]
Select best timestamp from index.json vs folder name.
Validates range (1995-2035). When a valid legacy folder name is available it is the stable filesystem identity, so preserve it over normalized variants like “1508259732.0” found in old index files.
- classmethod _ensure_unique_timestamp(url: str, timestamp: str) str[source]
Ensure timestamp is globally unique. If there is a collision, add a tiny fractional suffix until unique.
- static _detect_fs_version_from_index(data: dict) str[source]
Detect fs_version from index.json structure.
Has fs_version field: use it
Has history dict: 0.7.0
Has archive_results list: 0.8.0
Default: 0.7.0
- reconcile_with_index(output_dir: pathlib.Path | None = None, update_existing_archive_results: bool = True)[source]
Merge index.json/index.jsonl with DB. DB is source of truth.
Title: longest non-URL
Tags: union
ArchiveResults: keep both (by plugin+start_ts)
Converts index.json to index.jsonl if needed, then writes back in JSONL format.
Used by: archivebox update (to sync index with DB)
- reconcile_with_index_json(output_dir: pathlib.Path | None = None, update_existing_archive_results: bool = True)[source]
Deprecated: use reconcile_with_index() instead.
- _merge_archive_results_from_index(index_data: dict, update_existing: bool = True)[source]
Merge ArchiveResults one row per hook; retries update the existing row.
- _create_archive_result_if_missing(result_data: dict, existing: dict, update_existing: bool = True)[source]
Create ArchiveResult if not already in DB.
- write_index_jsonl(output_dir: pathlib.Path | None = None)[source]
Write index.jsonl in flat JSONL format.
Each line is a JSON record with a ‘type’ field:
Snapshot: snapshot metadata (crawl_id, url, tags, etc.)
ArchiveResult: extractor results (plugin, status, output, etc.)
Binary: binary info used for the extraction
Process: process execution details (cmd, exit_code, timing, etc.)
- read_index_jsonl(output_dir: pathlib.Path | None = None) dict[source]
Read index.jsonl and return parsed records grouped by type.
Returns dict with keys: ‘snapshot’, ‘archive_results’, ‘binaries’, ‘processes’
- convert_index_json_to_jsonl(output_dir: pathlib.Path | None = None) bool[source]
Convert index.json to index.jsonl format.
Reads existing index.json, creates index.jsonl, and removes index.json. Returns True if conversion was performed, False if no conversion needed.
- static move_directory_to_invalid(snapshot_dir: pathlib.Path)[source]
Move invalid directory to data/invalid/YYYYMMDD/.
Used by: archivebox update (when encountering invalid directories)
- classmethod find_and_merge_duplicates() int[source]
Find and merge snapshots with same url:timestamp. Returns count of duplicate sets merged.
Used by: archivebox update (Phase 3: deduplication)
- classmethod _merge_snapshots(snapshots: collections.abc.Sequence[archivebox.core.models.Snapshot])[source]
Merge exact duplicates. Keep oldest, union files + ArchiveResults.
- icons(path: str | None = None) str[source]
Generate HTML icons showing which extractor plugins have succeeded for this snapshot
- ensure_legacy_archive_symlink() None[source]
Ensure the legacy archive/
path resolves to this snapshot.
- ensure_crawl_symlink(*, crawl_dir: pathlib.Path | None = None, snapshot_dir: pathlib.Path | None = None) None[source]
Ensure snapshot is symlinked under its crawl output directory.
- pending_archiveresults() django.db.models.QuerySet[archivebox.core.models.ArchiveResult][source]
- run() list[archivebox.core.models.ArchiveResult][source]
Execute snapshot by creating pending ArchiveResults for all enabled hooks.
Returns: list[ArchiveResult]: Newly created pending results
- cleanup()[source]
Clean up background ArchiveResult hooks and empty results.
Called by the state machine when entering the ‘sealed’ state. Deletes empty ArchiveResults after the abx-dl cleanup phase has finished.
- to_json() dict[source]
Convert Snapshot model instance to a JSON-serializable dict. Includes all fields needed to fully reconstruct/identify this snapshot.
- static from_json(record: dict[str, Any], overrides: dict[str, Any] | None = None, queue_for_extraction: bool = True)[source]
Create/update Snapshot from JSON dict.
Unified method that handles:
ID-based patching: {“id”: “…”, “title”: “new title”}
URL-based create/update: {“url”: “…”, “title”: “…”, “tags”: “…”}
Auto-creates Crawl if not provided
Optionally queues for extraction
Args: record: Dict with ‘url’ (for create) or ‘id’ (for patch), plus other fields overrides: Dict with ‘crawl’, ‘snapshot’ (parent), ‘created_by_id’ queue_for_extraction: If True, sets status=QUEUED and retry_at (default: True)
Returns: Snapshot instance or None
- create_pending_archiveresults() list[archivebox.core.models.ArchiveResult][source]
Create ArchiveResult records for all enabled hooks.
Uses the hooks system to discover available hooks from:
abx_plugins/plugins//on_Snapshot__.{py,sh,js}
data/custom_plugins//on_Snapshot__.{py,sh,js}
Creates one ArchiveResult per hook (not per plugin), with hook_name set. This enables step-based execution where all hooks in a step can run in parallel.
- is_finished_processing() bool[source]
Check if all ArchiveResults are finished.
Note: This is only called for observability/progress tracking. The shared runner owns execution and does not poll this.
- get_progress_stats() dict[source]
Get progress statistics for this snapshot’s archiving process.
Returns dict with: - total: Total number of archive results - succeeded: Number of succeeded results - failed: Number of failed results - running: Number of currently running results - pending: Number of pending/queued results - percent: Completion percentage (0-100) - output_size: Total output size in bytes - is_sealed: Whether the snapshot is in a final state
- retry_failed_archiveresults() int[source]
Reset failed/skipped ArchiveResults to queued for retry.
Returns count of ArchiveResults reset.
- latest_outputs(status: str | None = None) dict[str, Any][source]
Get the latest output that each plugin produced
- discover_outputs(include_filesystem_fallback: bool = True) list[dict][source]
Discover output files from ArchiveResults and filesystem.
- to_dict(extended: bool = False) dict[str, Any][source]
Convert Snapshot to a dictionary (replacement for Link._asdict())
- to_json_str(indent: int = 4) str[source]
Convert to JSON string (legacy method, use to_json() for dict)
- to_csv(cols: list[str] | None = None, separator: str = ',', ljust: int = 0) str[source]
Convert to CSV string
- write_json_details(out_dir: pathlib.Path | str | None = None) None[source]
Write JSON index file for this snapshot to its output directory
- write_html_details(out_dir: pathlib.Path | str | None = None) None[source]
Write HTML detail page for this snapshot to its output directory
- class archivebox.core.models.SnapshotMachine(obj, *args, **kwargs)[source]
Bases:
archivebox.workers.models.BaseStateMachineState machine for managing Snapshot lifecycle.
Hook Lifecycle: ┌─────────────────────────────────────────────────────────────┐ │ QUEUED State │ │ • Waiting for snapshot to be ready │ └─────────────────────────────────────────────────────────────┘ ↓ tick() when can_start() ┌─────────────────────────────────────────────────────────────┐ │ STARTED State → enter_started() │ │ 1. snapshot.run() │ │ • discover_hooks(‘Snapshot’) → finds all plugin hooks │ │ • create_pending_archiveresults() → creates ONE │ │ ArchiveResult per hook (NO execution yet) │ │ 2. The shared abx-dl runner executes hooks and the │ │ projector updates ArchiveResult rows from events │ │ 3. Advance through steps 0-9 as foreground hooks complete │ └─────────────────────────────────────────────────────────────┘ ↓ tick() when is_finished() ┌─────────────────────────────────────────────────────────────┐ │ SEALED State → enter_sealed() │ │ • cleanup() → kills any background hooks still running │ │ • Set retry_at=None (no more processing) │ └─────────────────────────────────────────────────────────────┘
https://github.com/ArchiveBox/ArchiveBox/wiki/ArchiveBox-Architecture-Diagrams
Initialization
- snapshot: archivebox.core.models.Snapshot[source]
None
- has_finished_archive_results() bool[source]
A queued snapshot with only final projected rows was interrupted after hook completion.
- class archivebox.core.models.ArchiveResult[source]
Bases:
archivebox.base_models.models.ModelWithDeleteAfter,archivebox.base_models.models.ModelWithOutputDir,archivebox.base_models.models.ModelWithNotes- classmethod get_plugin_choices()[source]
Get plugin choices from discovered hooks (for forms/admin).
- classmethod snapshot_count_subquery(*, status: str | None = None, outer_ref: str = 'pk') django.db.models.QuerySet[source]
Return a scalar subquery counting ArchiveResults for one outer Snapshot.
Use this instead of filtered join aggregates for per-row Snapshot counts: the scalar form lets SQLite probe the covering
(snapshot_id, status)or(status, snapshot_id)indexes once per visible Snapshot row, instead of joining and grouping the whole candidate Snapshot queryset.
- classmethod snapshot_half_count_subquery(*, outer_ref: str = 'snapshot_id') django.db.models.QuerySet[source]
- classmethod status_counts(queryset: django.db.models.QuerySet | None = None, statuses: collections.abc.Iterable[str] | None = None) dict[str, int][source]
Count requested statuses with separate indexed COUNT probes.
- classmethod snapshot_ids_with_majority_status(status: str | collections.abc.Iterable[str]) django.db.models.QuerySet[source]
Return Snapshot IDs where more than half of ArchiveResults have
status.Start from ArchiveResult.status for every majority-status filter. The
(status, snapshot_id)index keeps the plan predictable even when a user’s collection has an unusual status distribution.
- classmethod cached_snapshot_ids_with_majority_status(status: str | collections.abc.Iterable[str], *, timeout: int = 60) tuple[str, ...][source]
- snapshot: archivebox.core.models.Snapshot[source]
‘ForeignKey(…)’
- class Meta[source]
Bases:
archivebox.base_models.models.ModelWithDeleteAfter.Meta,archivebox.base_models.models.ModelWithOutputDir.Meta,archivebox.base_models.models.ModelWithNotes.Meta
- property created_by[source]
Convenience property to access the user who created this archive result via its snapshot’s crawl.
- to_json(*, snapshot_output_dir: pathlib.Path | None = None) dict[source]
Convert ArchiveResult model instance to a JSON-serializable dict.
- static from_json(record: dict[str, Any], overrides: dict[str, Any] | None = None)[source]
Create/update ArchiveResult from JSON dict.
Args: record: JSON dict with ‘snapshot_id’, ‘plugin’, etc. overrides: Optional dict of field overrides
Returns: ArchiveResult instance or None
- update_output_metadata_from_filesystem(snapshot_dir: pathlib.Path | None = None, save: bool = True) bool[source]
- static _looks_like_output_path(raw_output: str | None, plugin_name: str | None = None) bool[source]
- static _fallback_output_file_path(output_file_paths: collections.abc.Sequence[str], plugin_name: str | None = None, output_file_map: dict[str, dict[str, Any]] | None = None) str | None[source]
- static _find_best_output_file(dir_path: pathlib.Path, plugin_name: str | None = None) pathlib.Path | None[source]
- embed_path() str | None[source]
Get the relative path to the embeddable output file for this result.
This is intentionally DB-backed only so snapshot/admin rendering stays fast and predictable without filesystem probes.
- property pwd: str[source]
Working directory, derived from the snapshot/plugin path if the Process row is gone.
- update_from_output()[source]
Update this ArchiveResult from filesystem logs and output files.
Used for Snapshot cleanup / orphan recovery when a hook’s output exists on disk but the projector did not finalize the row in the database.
Updates:
status, output_str, output_json from ArchiveResult JSONL record
output_files, output_size, output_mimetypes by walking filesystem
end_ts, cmd, cmd_version, binary FK
Processes side-effect records (Snapshot, Tag, etc.) via process_hook_records()
- _set_binary_from_cmd(cmd: list) None[source]
Find Binary for command and set binary FK.
Tries matching by absolute path first, then by binary name. Only matches binaries on the current machine.