archivebox.ideas.process_plugin

Module Contents

Classes

ProcessRecord

ProcessLaunch

ProcessStarted

ProcessExited

ProcessKill

_RunningProcess

ProcessPlugin

Spawn and monitor processes using events (no Django required).

Functions

_utcnow

Data

JsonEventAdapter

__all__

API

archivebox.ideas.process_plugin._utcnow() datetime.datetime[source]
class archivebox.ideas.process_plugin.ProcessRecord[source]

Bases: pydantic.BaseModel

id: str[source]

‘Field(…)’

cmd: list[str][source]

None

cwd: str | None[source]

None

env: dict[str, str][source]

‘Field(…)’

pid: int | None[source]

None

started_at: datetime.datetime | None[source]

None

ended_at: datetime.datetime | None[source]

None

exit_code: int | None[source]

None

stdout_path: str | None[source]

None

stderr_path: str | None[source]

None

cmd_path: str | None[source]

None

pid_path: str | None[source]

None

is_background: bool[source]

False

parent_process_id: str | None[source]

None

class archivebox.ideas.process_plugin.ProcessLaunch[source]

Bases: archivebox.ideas.process_plugin.BaseEvent[archivebox.ideas.process_plugin.ProcessRecord]

cmd: list[str][source]

None

cwd: str | None[source]

None

env: dict[str, str] | None[source]

None

timeout: float | None[source]

None

output_dir: str | None[source]

None

log_prefix: str | None[source]

None

is_background: bool[source]

False

parent_process_id: str | None[source]

None

parse_stdout_events: bool[source]

True

class archivebox.ideas.process_plugin.ProcessStarted[source]

Bases: archivebox.ideas.process_plugin.BaseEvent[None]

process: archivebox.ideas.process_plugin.ProcessRecord[source]

None

class archivebox.ideas.process_plugin.ProcessExited[source]

Bases: archivebox.ideas.process_plugin.BaseEvent[None]

process: archivebox.ideas.process_plugin.ProcessRecord[source]

None

class archivebox.ideas.process_plugin.ProcessKill[source]

Bases: archivebox.ideas.process_plugin.BaseEvent[archivebox.ideas.process_plugin.ProcessRecord]

process_id: str[source]

None

signal: int[source]

None

timeout: float | None[source]

10.0

class archivebox.ideas.process_plugin._RunningProcess[source]
process: asyncio.subprocess.Process[source]

None

record: archivebox.ideas.process_plugin.ProcessRecord[source]

None

stdout_task: asyncio.Task[None] | None[source]

None

stderr_task: asyncio.Task[None] | None[source]

None

watcher_task: asyncio.Task[None] | None[source]

None

parent_event_id: str | None[source]

None

archivebox.ideas.process_plugin.JsonEventAdapter[source]

None

class archivebox.ideas.process_plugin.ProcessPlugin(bus: archivebox.ideas.process_plugin.EventBus, *, env: collections.abc.Mapping[str, str] | None = None, json_event_adapter: archivebox.ideas.process_plugin.JsonEventAdapter | None = None)[source]

Spawn and monitor processes using events (no Django required).

Initialization

register_event_handlers() None[source]
async on_ProcessLaunch(event: archivebox.ideas.process_plugin.ProcessLaunch) archivebox.ideas.process_plugin.ProcessRecord[source]
async on_ProcessKill(event: archivebox.ideas.process_plugin.ProcessKill) archivebox.ideas.process_plugin.ProcessRecord[source]
async _watch_process(process_id: str, timeout: float | None) None[source]
async _finalize_process(process_id: str) None[source]
async _consume_stream(stream: asyncio.StreamReader | None, path: pathlib.Path, parent_event_id: str | None, parse_events: bool) None[source]
async _maybe_dispatch_json_event(line: str, parent_event_id: str | None) None[source]
static _write_cmd_file(path: pathlib.Path, cmd: list[str]) None[source]
static _write_pid_file(path: pathlib.Path, pid: int) None[source]
static _terminate_process(proc: asyncio.subprocess.Process, sig: int) None[source]
archivebox.ideas.process_plugin.__all__[source]

[‘ProcessRecord’, ‘ProcessLaunch’, ‘ProcessStarted’, ‘ProcessExited’, ‘ProcessKill’, ‘ProcessPlugin’…