archivebox.machine.modelsο
Module Contentsο
Classesο
Tracks a binary on a specific machine. |
|
Manager for Process model. |
|
Tracks a single OS process execution. |
|
State machine for managing Binary installation lifecycle. |
|
State machine for managing Process (OS subprocess) lifecycle. |
Functionsο
Validate |
Dataο
APIο
- archivebox.machine.models._CURRENT_MACHINE: archivebox.machine.models.Machine | None[source]ο
None
- archivebox.machine.models._CURRENT_INTERFACE: archivebox.machine.models.NetworkInterface | None[source]ο
None
- archivebox.machine.models._CURRENT_BINARIES: dict[str, archivebox.machine.models.Binary][source]ο
None
- archivebox.machine.models._CURRENT_PROCESS: archivebox.machine.models.Process | None[source]ο
None
- archivebox.machine.models._find_existing_binary_for_reference(machine: Machine, reference: str) Binary | None[source]ο
- archivebox.machine.models._get_process_binary_env_keys(plugin_name: str, hook_path: str, env: dict[str, Any] | None) list[str][source]ο
- archivebox.machine.models._sanitize_machine_config(config: dict[str, Any] | None, *, lib_dir: str | pathlib.Path | None = None) dict[str, Any][source]ο
Validate
Machine.configin place.Drops stale
*_BINARYoverrides whose path no longer exists or whose path falls outside ofLIB_DIR(so a binary uninstall or a lib_dir move clears the override automatically). Non-_BINARYkeys (BASE_URL,SERVER_SECURITY_MODE, plugin tunables, etc.) are pass-through β theyβre arbitrary config overrides and not ours to filter.
- class archivebox.machine.models.Machine[source]ο
Bases:
archivebox.base_models.models.ModelWithHealthStats- networkinterface_set: django.db.models.Manager[NetworkInterface][source]ο
None
- class Meta[source]ο
Bases:
archivebox.base_models.models.ModelWithHealthStats.Meta
- classmethod current(refresh: bool = False) archivebox.machine.models.Machine[source]ο
- classmethod _sanitize_config(machine: archivebox.machine.models.Machine) archivebox.machine.models.Machine[source]ο
- class archivebox.machine.models.NetworkInterface[source]ο
Bases:
archivebox.base_models.models.ModelWithHealthStats- class Meta[source]ο
Bases:
archivebox.base_models.models.ModelWithHealthStats.Meta
- classmethod current(refresh: bool = False) archivebox.machine.models.NetworkInterface[source]ο
- class archivebox.machine.models.BinaryManager[source]ο
Bases:
django.db.models.Manager- get_from_db_or_cache(name: str, abspath: str = '', version: str = '', sha256: str = '', binprovider: str = 'env') archivebox.machine.models.Binary[source]ο
Get or create an Binary record from the database or cache.
- get_valid_binary(name: str, machine: archivebox.machine.models.Machine | None = None) archivebox.machine.models.Binary | None[source]ο
Get a valid Binary for the given name on the current machine, or None if not found.
- class archivebox.machine.models.Binary[source]ο
Bases:
archivebox.base_models.models.ModelWithHealthStats,archivebox.workers.models.ModelWithStateMachineTracks a binary on a specific machine.
Simple state machine with 2 states:
queued: Binary needs to be installed
installed: Binary installed successfully (abspath, version, sha256 populated)
Installation is synchronous during queuedβinstalled transition. If installation fails, Binary stays in queued with retry_at set for later retry.
State machine calls run(), which emits an abxpkg BinaryRequestEvent through the ArchiveBox runner and installs the binary using the specified providers.
- class Meta[source]ο
Bases:
archivebox.base_models.models.ModelWithHealthStats.Meta,archivebox.workers.models.ModelWithStateMachine.Meta
- property is_valid: bool[source]ο
A binary is valid if it has a resolved path and is marked installed.
- property output_dir: pathlib.Path[source]ο
Get output directory for this binaryβs hook logs. Path: data/machines/{machine_uuid}/binaries/{binary_name}/{binary_uuid}
- static from_json(record: dict[str, Any], overrides: dict[str, Any] | None = None)[source]ο
Create/update Binary from JSON dict.
Handles two cases:
From binaries.json: creates queued binary with name, binproviders, overrides
From hook output: updates binary with abspath, version, sha256, binprovider
Args: record: JSON dict with βnameβ and either: - βbinprovidersβ, βoverridesβ (from binaries.json) - βabspathβ, βversionβ, βsha256β, βbinproviderβ (from hook output) overrides: Not used
Returns: Binary instance or None
- _allowed_binproviders() set[str] | None[source]ο
Return the allowed binproviders for this binary, or None for wildcard.
- cleanup()[source]ο
Clean up background binary installation hooks.
Called by state machine if needed (not typically used for binaries since installations are foreground, but included for consistency).
- symlink_to_lib_bin(lib_bin_dir: str | pathlib.Path) pathlib.Path | None[source]ο
Symlink this binary into LIB_BIN_DIR for human-facing convenience.
After a binary is installed by any binprovider (pip, npm, brew, apt, etc), we can optionally expose a flat convenience directory for shell users. ArchiveBox/abx-dl runtime lookup must use the provider-specific LIB_DIR paths, not this indirection.
Args: lib_bin_dir: Path to LIB_BIN_DIR (e.g., /data/lib/arm64-darwin/bin)
Returns: Path to the created symlink, or None if symlinking failed
Example: >>> binary = Binary.objects.get(name=βyt-dlpβ) >>> binary.symlink_to_lib_bin(β/data/lib/arm64-darwin/binβ) Path(β/data/lib/arm64-darwin/bin/yt-dlpβ)
- symlink_to_lib_bin_after_commit(lib_bin_dir: str | pathlib.Path) None[source]ο
Symlink after the current DB transaction commits.
Binary rows are projections of provider/hook state and are allowed to be updated directly, but filesystem writes must not run while an outer transaction is still open. Refetch after commit so the symlink points at the committed row, not a possibly-rolled-back in-memory value.
- class archivebox.machine.models.ProcessManager[source]ο
Bases:
django.db.models.ManagerManager for Process model.
- current() archivebox.machine.models.Process[source]ο
Get the Process record for the current OS process.
- get_by_pid(pid: int, machine: archivebox.machine.models.Machine | None = None) archivebox.machine.models.Process | None[source]ο
Find a Process by PID with proper validation against PID reuse.
IMPORTANT: PIDs are reused by the OS! This method:
Filters by machine (required - PIDs are only unique per machine)
Filters by time window (processes older than 24h are stale)
Validates via psutil that start times match
Args: pid: OS process ID machine: Machine instance (defaults to current machine)
Returns: Process if found and validated, None otherwise
- class archivebox.machine.models.Process[source]ο
Bases:
archivebox.base_models.models.ModelWithDeleteAfter,django.db.models.ModelTracks a single OS process execution.
Process represents the actual subprocess spawned to execute a hook. One Process can optionally be associated with an ArchiveResult (via OneToOne), but Process can also exist standalone for internal operations.
Follows the unified state machine pattern:
queued: Process ready to launch
running: Process actively executing
exited: Process completed (check exit_code for success/failure)
State machine calls launch() to spawn the process and monitors its lifecycle.
- children: django.db.models.Manager[archivebox.machine.models.Process][source]ο
None
- archiveresult: archivebox.core.models.ArchiveResult[source]ο
None
- class Meta[source]ο
Bases:
archivebox.base_models.models.ModelWithDeleteAfter.Meta
- hydrate_binary_from_context(*, plugin_name: str = '', hook_path: str = '') archivebox.machine.models.Binary | None[source]ο
- classmethod parse_records_from_text(text: str) list[dict][source]ο
Parse JSONL records from raw text using the shared JSONL parser.
- static from_json(record: dict[str, Any], overrides: dict[str, Any] | None = None)[source]ο
Create/update Process from JSON dict.
Args: record: JSON dict with βidβ or process details overrides: Optional dict of field overrides
Returns: Process instance or None
- safe_update(update_fields: dict[str, Any], *, refresh: bool = True, extra_filter: dict[str, Any] | None = None) bool[source]ο
Compare-and-swap update for short Process scheduler writes.
Process is not a ModelWithStateMachine subclass yet, but its state-machine methods still need the same modified_at CAS behavior as Crawl/Snapshot/Binary without falling back to save().
- mark_running(*, process_type: str | None = None, pwd: str | pathlib.Path | None = None, url: str | None = None, worker_type: str = '', timeout: int | None = None) None[source]ο
Record the current process role without changing ownership state elsewhere.
- heartbeat() None[source]ο
Touch modified_at so standby/leader selection can see this parent is alive.
- mark_exited(*, exit_code: int = 0) None[source]ο
Mark a foreground/internal process row exited after command cleanup.
- classmethod current() archivebox.machine.models.Process[source]ο
Get or create the Process record for the current OS process.
Similar to Machine.current(), this:
Checks cache for existing Process with matching PID
Validates the cached Process is still valid (PID not reused)
Creates new Process if needed
IMPORTANT: Uses psutil to validate PID hasnβt been reused. PIDs are recycled by OS, so we compare start times.
- classmethod _find_parent_process(machine: archivebox.machine.models.Machine | None = None) archivebox.machine.models.Process | None[source]ο
Find the parent Process record by looking up PPID.
IMPORTANT: Validates against PID reuse by checking:
Same machine (PIDs are only unique per machine)
Start time matches OS process start time
Process is still RUNNING and recent
Returns None if parent is not an ArchiveBox process.
- classmethod _detect_process_type() str[source]ο
Detect the type of the current process from sys.argv.
archivebox add --bgis a fire-and-forget queue write β it does not run the runner or own the runtime stack β so itβs classified as CLI instead of ADD. The ADD process_type is reserved for the foregroundarchivebox addflow that actually takes over the runtime stack viacurrent_command(TypeChoices.ADD, ...). Misclassifying--bgas ADD makesruntime_stack_ownertreat it as a newer stack owner for the few seconds itβs alive, knocks the runningarchivebox serverout of leadership, and triggers a supervisord tear-down + respawn cycle (~5s of dead time per add). Detecting bg here at insert time avoids any race window where the row briefly exists as ADD before a higher-level demotion.
- classmethod cleanup_stale_running(machine: archivebox.machine.models.Machine | None = None) int[source]ο
Mark stale RUNNING processes as EXITED in the DB.
Processes are stale if:
Status is RUNNING but OS process no longer exists
Status is RUNNING but exceeded its timeout plus a small grace margin
Status is RUNNING but started_at is older than PID_REUSE_WINDOW
Returns count of processes cleaned up.
- property root: archivebox.machine.models.Process[source]ο
Get the root process (CLI command) of this hierarchy.
- property ancestors: list[archivebox.machine.models.Process][source]ο
Get all ancestor processes from parent to root.
- property proc: psutil.Process | None[source]ο
Get validated psutil.Process for this record.
Returns psutil.Process ONLY if:
Process with this PID exists in OS
OS process start time matches our started_at (within tolerance)
Process is on current machine
Returns None if:
PID doesnβt exist (process exited)
PID was reused by a different process (start times donβt match)
Weβre on a different machine than where process ran
psutil is not available
This prevents accidentally matching a stale/recycled PID.
- property is_running: bool[source]ο
Check if process is currently running via psutil.
More reliable than checking status field since it validates the actual OS process exists and matches our record.
- property hook_script_name: str | None[source]ο
Best-effort hook filename extracted from the process command.
- property runtime_dir: pathlib.Path | None[source]ο
Directory where this process stores runtime stdout/stderr logs.
- tail_stdout(lines: int = 50, follow: bool = False)[source]ο
Tail stdout log file (like
tailortail -f).Args: lines: Number of lines to show (default 50) follow: If True, follow the file and yield new lines as they appear
Yields: Lines from stdout
- tail_stderr(lines: int = 50, follow: bool = False)[source]ο
Tail stderr log file (like
tailortail -f).Args: lines: Number of lines to show (default 50) follow: If True, follow the file and yield new lines as they appear
Yields: Lines from stderr
- pipe_stdout(lines: int = 10, follow: bool = True)[source]ο
Pipe stdout to sys.stdout.
Args: lines: Number of initial lines to show follow: If True, follow the file and print new lines as they appear
- pipe_stderr(lines: int = 10, follow: bool = True)[source]ο
Pipe stderr to sys.stderr.
Args: lines: Number of initial lines to show follow: If True, follow the file and print new lines as they appear
- launch(background: bool = False, cwd: str | None = None) archivebox.machine.models.Process[source]ο
Spawn the subprocess and update this Process record.
Args: background: If True, donβt wait for completion (for daemons/bg hooks) cwd: Working directory for the subprocess (defaults to self.pwd)
Returns: self (updated with pid, started_at, etc.)
- kill(signal_num: int = 15) bool[source]ο
Kill this process and update status.
Uses self.proc for safe killing - only kills if PID matches our recorded process (prevents killing recycled PIDs).
Args: signal_num: Signal to send (default SIGTERM=15)
Returns: True if killed successfully, False otherwise
- poll() int | None[source]ο
Check if process has exited and update status if so.
Cleanup when process exits:
Copy stdout/stderr to DB (keep files for debugging)
Delete PID file
Returns: exit_code if exited, None if still running
- wait(timeout: int | None = None) int[source]ο
Wait for process to exit, polling periodically.
Args: timeout: Max seconds to wait (None = use self.timeout)
Returns: exit_code
Raises: TimeoutError if process doesnβt exit in time
- terminate(graceful_timeout: float = 5.0) bool[source]ο
Gracefully terminate process: SIGTERM β wait β SIGKILL.
This consolidates the scattered SIGTERM/SIGKILL logic from:
crawls/models.py Crawl.cleanup()
workers/pid_utils.py stop_worker()
supervisord_util.py stop_existing_supervisord_process()
Args: graceful_timeout: Seconds to wait after SIGTERM before SIGKILL
Returns: True if process was terminated, False if already dead
- kill_tree(graceful_timeout: float = 2.0) int[source]ο
Kill this process and all its children (OS children, not DB children) in parallel.
Uses parallel polling approach - sends SIGTERM to all processes at once, then polls all simultaneously with individual deadline tracking.
This consolidates the scattered child-killing logic from:
crawls/models.py Crawl.cleanup() os.killpg()
supervisord_util.py stop_existing_supervisord_process()
Args: graceful_timeout: Seconds to wait after SIGTERM before SIGKILL
Returns: Number of processes killed (including self)
- kill_children_db() int[source]ο
Kill all DB-tracked child processes (via parent FK).
Different from kill_tree() which uses OS children. This kills processes created via Process.create(parent=self).
Returns: Number of child Process records killed
- classmethod get_running(process_type: str | None = None, machine: archivebox.machine.models.Machine | None = None) django.db.models.QuerySet[archivebox.machine.models.Process][source]ο
Get all running processes, optionally filtered by type.
Replaces:
workers/pid_utils.py get_all_worker_pids()
workers/orchestrator.py get_total_worker_count()
Args: process_type: Filter by TypeChoices (e.g., βworkerβ, βhookβ) machine: Filter by machine (defaults to current)
Returns: QuerySet of running Process records
- classmethod get_running_count(process_type: str | None = None, machine: archivebox.machine.models.Machine | None = None) int[source]ο
Get count of running processes.
Replaces:
workers/pid_utils.py get_running_worker_count()
- classmethod stop_all(process_type: str | None = None, machine: archivebox.machine.models.Machine | None = None, graceful: bool = True) int[source]ο
Stop all running processes of a given type.
Args: process_type: Filter by TypeChoices machine: Filter by machine graceful: If True, use terminate() (SIGTERMβSIGKILL), else kill()
Returns: Number of processes stopped
- classmethod get_next_worker_id(process_type: str = 'worker', machine: archivebox.machine.models.Machine | None = None) int[source]ο
Get the next available worker ID for spawning new workers.
Replaces workers/pid_utils.py get_next_worker_id(). Simply returns count of running workers of this type.
Args: process_type: Worker type to count machine: Machine to scope query
Returns: Next available worker ID (0-indexed)
- classmethod cleanup_orphaned_chrome() int[source]ο
Kill orphaned Chrome processes using chrome_utils.js killZombieChrome.
Scans DATA_DIR for chrome/*.pid files from stale crawls (>5 min old) and kills any orphaned Chrome processes.
Called by:
Orchestrator on startup (cleanup from previous crashes)
Orchestrator periodically (every N minutes)
Returns: Number of zombie Chrome processes killed
- class archivebox.machine.models.BinaryMachine(obj, *args, **kwargs)[source]ο
Bases:
archivebox.workers.models.BaseStateMachineState machine for managing Binary installation lifecycle.
Simple 2-state machine: βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β QUEUED State β β β’ Binary needs to be installed β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β tick() when can_install() β Synchronous installation during transition βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β INSTALLED State β β β’ Binary installed (abspath, version, sha256 set) β β β’ Health stats incremented β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
If installation fails, Binary stays in QUEUED with retry_at bumped.
Initialization
- binary: archivebox.machine.models.Binary[source]ο
None
- class archivebox.machine.models.ProcessMachine(obj, *args, **kwargs)[source]ο
Bases:
archivebox.workers.models.BaseStateMachineState machine for managing Process (OS subprocess) lifecycle.
Process Lifecycle: βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β QUEUED State β β β’ Process ready to launch, waiting for resources β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β tick() when can_start() βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β RUNNING State β enter_running() β β 1. process.launch() β β β’ Spawn subprocess with cmd, pwd, env, timeout β β β’ Set pid, started_at β β β’ Process runs in background or foreground β β 2. Monitor process completion β β β’ Check exit code when process completes β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β tick() checks is_exited() βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β EXITED State β β β’ Process completed (exit_code set) β β β’ Health stats incremented β β β’ stdout/stderr captured β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Note: This is a simpler state machine than ArchiveResult. Process is just about execution lifecycle. ArchiveResult handles the archival-specific logic (status, output parsing, etc.).
Initialization
- process: archivebox.machine.models.Process[source]ο
None