archivebox.services.archive_result_service

Module Contents

Classes

ModelDumpable

ArchiveResultService

Functions

_perf_trace

_perf_span

_collect_output_metadata

_coerce_output_file_size

_normalize_output_files

_has_structured_output_metadata

_summarize_output_files

_resolve_output_metadata

_normalize_status

_normalize_snapshot_title

_extract_snapshot_title

_should_update_snapshot_title

_has_content_files

_iter_archiveresult_records

_save_archiveresult_event_to_db

Project one ArchiveResultEvent with a single thread-sensitive ORM hop.

API

archivebox.services.archive_result_service._perf_trace(label)[source]
archivebox.services.archive_result_service._perf_span(label: str)[source]
class archivebox.services.archive_result_service.ModelDumpable[source]

Bases: typing.Protocol

model_dump() dict[str, Any][source]
archivebox.services.archive_result_service._collect_output_metadata(plugin_dir: pathlib.Path) tuple[dict[str, dict], int, str][source]
archivebox.services.archive_result_service._coerce_output_file_size(value: Any) int[source]
archivebox.services.archive_result_service._normalize_output_files(raw_output_files: Any) dict[str, dict][source]
archivebox.services.archive_result_service._has_structured_output_metadata(output_files: dict[str, dict]) bool[source]
archivebox.services.archive_result_service._summarize_output_files(output_files: dict[str, dict]) tuple[int, str][source]
archivebox.services.archive_result_service._resolve_output_metadata(raw_output_files: Any, plugin_dir: pathlib.Path) tuple[dict[str, dict], int, str][source]
archivebox.services.archive_result_service._normalize_status(status: str) str[source]
archivebox.services.archive_result_service._normalize_snapshot_title(candidate: str, *, snapshot_url: str) str[source]
archivebox.services.archive_result_service._extract_snapshot_title(snapshot_output_dir: str, plugin: str, output_str: str, *, snapshot_url: str) str[source]
archivebox.services.archive_result_service._should_update_snapshot_title(current_title: str, next_title: str, *, snapshot_url: str) bool[source]
archivebox.services.archive_result_service._has_content_files(output_files: Any) bool[source]
archivebox.services.archive_result_service._iter_archiveresult_records(stdout: str) list[dict][source]
archivebox.services.archive_result_service._save_archiveresult_event_to_db(event: abx_dl.events.ArchiveResultEvent, process_started: abx_dl.events.ProcessStartedEvent | None) None[source]

Project one ArchiveResultEvent with a single thread-sensitive ORM hop.

Django’s async ORM still delegates each query to sync Django work. The hot search/index maintenance path was paying that handoff separately for Snapshot lookup, Process lookup, ArchiveResult lookup, update, and title checks. Keep the public ArchiveResultEvent path intact, but run the DB projection as one short synchronous block so SQLite sees the same indexed reads/writes without per-query asyncio/threadpool churn.

class archivebox.services.archive_result_service.ArchiveResultService(bus)[source]

Bases: abx_dl.services.base.BaseService

Initialization

LISTENS_TO[source]

None

EMITS[source]

[]

async on_ArchiveResultEvent__save_to_db(event: abx_dl.events.ArchiveResultEvent) None[source]
async on_ProcessCompletedEvent__save_to_db(event: abx_dl.events.ProcessCompletedEvent) None[source]