archivebox.services.runner

Module Contents

Classes

CrawlRunner

Functions

_bus_name

_runner_short_id

_runner_label

_runner_console_line

_count_selected_hooks

_discover_archivebox_plugins

_runner_task_context

_is_external_task_cancelled

_emit_machine_config

_run_event_now

ensure_background_runner

run_crawl

_run_binary

run_binary

_snapshot_hook_names_by_plugin

queued_plugins_for_snapshot

run_snapshot_maintenance

run_due_crawl

run_due_snapshot

run_due_binary

_run_install

run_install

_first_due_id

_run_due_crawl_status

_run_due_snapshot_query

_run_due_snapshot_id

_run_due_queued_plugin_result

_run_due_binary

_fast_forward_same_path_snapshot_fs_versions

run_pending_crawls

Data

QUEUED_PLUGIN_RESULT_BATCH_SIZE

API

archivebox.services.runner.QUEUED_PLUGIN_RESULT_BATCH_SIZE[source]

100

archivebox.services.runner._bus_name(prefix: str, identifier: str) str[source]
archivebox.services.runner._runner_short_id(identifier) str[source]
archivebox.services.runner._runner_label(value: str, *, reserve: int) str[source]
archivebox.services.runner._runner_console_line(*, crawl=None, crawl_id=None, snapshot=None, status: str = 'STARTED') None[source]
archivebox.services.runner._count_selected_hooks(plugins: dict[str, abx_dl.models.Plugin], selected_plugins: list[str] | None) int[source]
archivebox.services.runner._discover_archivebox_plugins() dict[str, abx_dl.models.Plugin][source]
archivebox.services.runner._runner_task_context() contextvars.Context[source]
archivebox.services.runner._is_external_task_cancelled(error: asyncio.CancelledError) bool[source]
async archivebox.services.runner._emit_machine_config(bus, *, config: dict[str, Any], derived_config: dict[str, Any], parent_event=None) None[source]
async archivebox.services.runner._run_event_now(event, timeout: float | None = None)[source]
archivebox.services.runner.ensure_background_runner(*, allow_under_pytest: bool = False) bool[source]
class archivebox.services.runner.CrawlRunner(crawl, *, snapshot_ids: list[str] | None = None, selected_plugins: list[str] | None = None, process_discovered_snapshots_inline: bool = True, show_progress: bool = True, interactive_interrupts: bool = False, config_overrides: dict[str, Any] | None = None)[source]

Initialization

_request_abort_from_signal(_sig: signal.Signals) None[source]
async crawl_is_cancelled() bool[source]
async crawl_is_paused() bool[source]
async watch_for_cancelled_crawl(parent_event: abxbus.BaseEvent, *, poll_interval: float = 1.0) None[source]
runtime_plugins() dict[str, abx_dl.models.Plugin][source]
property allow_maintenance_on_inactive_crawl: bool[source]

Run the requested hooks on a snapshot whose parent crawl is paused or sealed.

Maintenance entry paths β€” direct snapshot_ids + selected_plugins invocations for search backend backfill, fs migration, plugin-targeted updates β€” are legitimately allowed to operate on finished/paused crawls. Without this gate, crawl_is_cancelled would treat a SEALED parent as a cancellation signal and short-circuit every guard before any hook ran, leaving the queued ArchiveResult rows stuck and the orchestrator looping on them.

async run() None[source]
async enqueue_snapshot(snapshot_id: str, crawl_start_event: abx_dl.events.CrawlStartEvent | None = None) None[source]
async stop_snapshot_tasks() None[source]
async wait_for_snapshot_tasks() None[source]
async heartbeat_active_leases() None[source]
async drain_snapshot_tasks() None[source]
async enqueue_pending_snapshots_from_projection() None[source]
load_run_state() list[str][source]
finalize_run_state() None[source]
_create_live_ui() abx_dl.cli.LiveBusUI | None[source]
load_snapshot_payload(snapshot_id: str) dict[str, Any][source]
async enqueue_discovered_snapshots_from_outputs(snapshot_payload: dict[str, Any]) None[source]
async run_crawl(root_snapshot_id: str, snapshot_ids: list[str]) None[source]
async run_snapshot(snapshot_id: str, crawl_start_event: abx_dl.events.CrawlStartEvent | None = None) None[source]
seal_snapshot_due_to_limit(snapshot_id: str) None[source]
archivebox.services.runner.run_crawl(crawl_id: str, *, snapshot_ids: list[str] | None = None, selected_plugins: list[str] | None = None, process_discovered_snapshots_inline: bool = True, show_progress: bool = True, interactive_interrupts: bool = False, config_overrides: dict[str, Any] | None = None) None[source]
async archivebox.services.runner._run_binary(binary_id: str) None[source]
archivebox.services.runner.run_binary(binary_id: str) None[source]
archivebox.services.runner._snapshot_hook_names_by_plugin() dict[str, frozenset[str]][source]
archivebox.services.runner.queued_plugins_for_snapshot(snapshot_id: str) list[str] | None[source]
archivebox.services.runner.run_snapshot_maintenance(snapshot_id: str, *, output_dir: pathlib.Path | None = None) bool[source]
archivebox.services.runner.run_due_crawl(crawl, *, lock_seconds: int, interactive_interrupts: bool = False) bool[source]
archivebox.services.runner.run_due_snapshot(snapshot, *, lock_seconds: int, interactive_interrupts: bool = False, runtime_config=None) bool[source]
archivebox.services.runner.run_due_binary(binary, *, lock_seconds: int) bool[source]
async archivebox.services.runner._run_install(plugin_names: list[str] | None = None) None[source]
archivebox.services.runner.run_install(*, plugin_names: list[str] | None = None) None[source]
archivebox.services.runner._first_due_id(queryset)[source]
archivebox.services.runner._run_due_crawl_status(status: str, *, crawl_id: str | None, lock_seconds: int, interactive_interrupts: bool) bool[source]
archivebox.services.runner._run_due_snapshot_query(queryset, *, lock_seconds: int, interactive_interrupts: bool, runtime_config) bool[source]
archivebox.services.runner._run_due_snapshot_id(snapshot_id, *, lock_seconds: int, interactive_interrupts: bool, runtime_config) bool[source]
archivebox.services.runner._run_due_queued_plugin_result(plugin_names: frozenset[str], *, crawl_id: str | None, lock_seconds: int, interactive_interrupts: bool, runtime_config) bool[source]
archivebox.services.runner._run_due_binary() bool[source]
archivebox.services.runner._fast_forward_same_path_snapshot_fs_versions(batch_size: int = 10000) bool[source]
archivebox.services.runner.run_pending_crawls(*, daemon: bool = False, crawl_id: str | None = None, maintenance_only: bool = False, interactive_interrupts: bool = False) int[source]