archivebox.cli.archivebox_update

Module Contents

Functions

_get_snapshot_crawl

_get_search_indexing_plugins

_build_filtered_snapshots_queryset

reindex_snapshots

update

Update snapshots: migrate old dirs, reconcile DB, and re-queue for archiving.

drain_old_archive_dirs

Drain old archive/ directories (0.8.x → 0.9.x migration).

process_all_db_snapshots

O(n) scan over entire DB from most recent to least recent.

process_filtered_snapshots

Process snapshots matching filters (DB query only).

print_stats

Print statistics for filtered mode.

print_combined_stats

Print statistics for full mode.

print_index_stats

main

API

archivebox.cli.archivebox_update._get_snapshot_crawl(snapshot: archivebox.core.models.Snapshot) archivebox.crawls.models.Crawl | None[source]
archivebox.cli.archivebox_update._get_search_indexing_plugins() list[str][source]
archivebox.cli.archivebox_update._build_filtered_snapshots_queryset(**kwargs)[source]
archivebox.cli.archivebox_update.reindex_snapshots(snapshots: django.db.models.QuerySet[archivebox.core.models.Snapshot, archivebox.core.models.Snapshot], *, search_plugins: list[str], batch_size: int, collect_ids: bool = False, wait_for_turn=None) dict[str, Any][source]
archivebox.cli.archivebox_update.update(filter_patterns: collections.abc.Iterable[str] = (), filter_type: str = 'exact', status: str | None = None, url__icontains: str | None = None, url__istartswith: str | None = None, tag: str | None = None, crawl_id: str | None = None, limit: int | None = None, sort: str | None = None, search: str | None = None, before: float | None = None, after: float | None = None, resume: str | None = None, batch_size: int = 500, continuous: bool = False, index_only: bool = False, migrate_only: bool = False, stop_daemon_stack: bool = True) None[source]

Update snapshots: migrate old dirs, reconcile DB, and re-queue for archiving.

Three-phase operation (without filters):

  • Phase 1: Drain old archive/ dirs by moving to new fs location (0.8.x → 0.9.x)

  • Phase 2: O(n) scan over entire DB from most recent to least recent

  • No orphan scans needed (trust 1:1 mapping between DB and filesystem after phase 1)

With filters: Only phase 2 (DB query), no filesystem operations. Without filters: All phases (full update).

archivebox.cli.archivebox_update.drain_old_archive_dirs(resume_from: str | None = None, batch_size: int = 500) dict[str, int][source]

Drain old archive/ directories (0.8.x → 0.9.x migration).

Only processes real directories (skips symlinks - those are already migrated). For each old dir found in archive/:

  1. Load or create DB snapshot

  2. Trigger fs migration on save() to move to data/archive/users/{user}/…

  3. Leave symlink in archive/ pointing to new location

After this drains, archive/ should only contain symlinks and we can trust 1:1 mapping between DB and filesystem.

archivebox.cli.archivebox_update.process_all_db_snapshots(batch_size: int = 500, resume: str | None = None, wait_for_turn=None) dict[str, int][source]

O(n) scan over entire DB from most recent to least recent.

For each snapshot:

  1. Reconcile index.json with DB (merge titles, tags, archive results)

  2. Mark migrated snapshots sealed unless explicitly re-queued elsewhere

No orphan detection needed - we trust 1:1 mapping between DB and filesystem after Phase 1 has drained all old archive/ directories.

archivebox.cli.archivebox_update.process_filtered_snapshots(filter_patterns: collections.abc.Iterable[str], filter_type: str, status: str | None, url__icontains: str | None, url__istartswith: str | None, tag: str | None, crawl_id: str | None, limit: int | None, sort: str | None, search: str | None, before: float | None, after: float | None, resume: str | None, batch_size: int, queue_for_archiving: bool = True, wait_for_turn=None) dict[str, Any][source]

Process snapshots matching filters (DB query only).

archivebox.cli.archivebox_update.print_stats(stats: dict)[source]

Print statistics for filtered mode.

archivebox.cli.archivebox_update.print_combined_stats(stats_combined: dict)[source]

Print statistics for full mode.

archivebox.cli.archivebox_update.print_index_stats(stats: dict[str, Any]) None[source]
archivebox.cli.archivebox_update.main(**kwargs)[source]