archivebox.services.binary_service

Module Contents

Classes

ArchiveBoxDBBinaryCacheBackend

ArchiveBox machine.Binary projection backend for abxpkg BinaryCacheService.

ArchiveBoxBinaryService

Preserve ArchiveBox’s legacy Binary Process rows around abxpkg requests.

Functions

_provider_names

_binproviders_to_str

_providers_for_names

_provider_for_name

_persisted_overrides_for_request

Data

_LIB_DIR_MANAGED_PROVIDERS

API

archivebox.services.binary_service._LIB_DIR_MANAGED_PROVIDERS[source]

None

class archivebox.services.binary_service.ArchiveBoxDBBinaryCacheBackend[source]

ArchiveBox machine.Binary projection backend for abxpkg BinaryCacheService.

async get(request: abxpkg.binary_service.BinaryRequestEvent) abxpkg.Binary | None[source]
async set(request: abxpkg.binary_service.BinaryRequestEvent | None, binary: abxpkg.Binary) None[source]
async invalidate(request: abxpkg.binary_service.BinaryRequestEvent, binary: abxpkg.Binary, reason: str) None[source]
class archivebox.services.binary_service.ArchiveBoxBinaryService(bus: abxbus.EventBus)[source]

Bases: abx_dl.services.base.BaseService

Preserve ArchiveBox’s legacy Binary Process rows around abxpkg requests.

Initialization

LISTENS_TO[source]

None

EMITS: list[type[abxbus.BaseEvent]][source]

[]

async on_BinaryRequestEvent__project_process(request: abxpkg.binary_service.BinaryRequestEvent) None[source]
async on_BinaryEvent__finalize_process(event: abxpkg.binary_service.BinaryEvent) None[source]
async _get_or_create_binary(machine, binary_name: str, request: abxpkg.binary_service.BinaryRequestEvent)[source]
_process_cmd(request: abxpkg.binary_service.BinaryRequestEvent) list[str][source]
_binary_event_json(event: abxpkg.binary_service.BinaryEvent, binary) dict[str, Any][source]
async _finalize_missing_process(request: abxpkg.binary_service.BinaryRequestEvent) None[source]
async _finalize_request_when_done(request: abxpkg.binary_service.BinaryRequestEvent) None[source]
_schedule_missing_finalize(request: abxpkg.binary_service.BinaryRequestEvent) None[source]
async on_BinaryRequestEvent__schedule_missing_finalize(request: abxpkg.binary_service.BinaryRequestEvent) None[source]
_process_output_dir(binary, request: abxpkg.binary_service.BinaryRequestEvent) pathlib.Path[source]
_write_binary_index(binary, process, output_dir: pathlib.Path) None[source]
archivebox.services.binary_service._provider_names(binproviders: str | list[str] | None) list[str][source]
archivebox.services.binary_service._binproviders_to_str(binproviders: str | list[str] | None) str[source]
archivebox.services.binary_service._providers_for_names(names: list[str]) list[abxpkg.BinProvider][source]
archivebox.services.binary_service._provider_for_name(provider_name: str, binary_name: str, overrides: dict[str, Any] | None) abxpkg.BinProvider | None[source]
archivebox.services.binary_service._persisted_overrides_for_request(request: abxpkg.binary_service.BinaryRequestEvent | None) dict[str, Any][source]