Source code for archivebox.index

__package__ = 'archivebox.index'

import os
import shutil
from pathlib import Path

from itertools import chain
from typing import List, Tuple, Dict, Optional, Iterable
from collections import OrderedDict
from contextlib import contextmanager
from urllib.parse import urlparse
from django.db.models import QuerySet, Q

from ..util import (
    scheme,
    enforce_types,
    ExtendedEncoder,
)
from ..config import (
    ARCHIVE_DIR_NAME,
    SQL_INDEX_FILENAME,
    JSON_INDEX_FILENAME,
    OUTPUT_DIR,
    TIMEOUT,
    URL_DENYLIST_PTN,
    URL_ALLOWLIST_PTN,
    stderr,
    OUTPUT_PERMISSIONS
)
from ..logging_util import (
    TimedProgress,
    log_indexing_process_started,
    log_indexing_process_finished,
    log_indexing_started,
    log_indexing_finished,
    log_parsing_finished,
    log_deduping_finished,
)

from .schema import Link, ArchiveResult
from .html import (
    write_html_link_details,
)
from .json import (
    pyjson,
    parse_json_link_details, 
    write_json_link_details,
)
from .sql import (
    write_sql_main_index,
    write_sql_link_details,
)

from ..search import search_backend_enabled, query_search_index

### Link filtering and checking
























[docs] @enforce_types def lowest_uniq_timestamp(used_timestamps: OrderedDict, timestamp: str) -> str: """resolve duplicate timestamps by appending a decimal 1234, 1234 -> 1234.1, 1234.2""" timestamp = timestamp.split('.')[0] nonce = 0 # first try 152323423 before 152323423.0 if timestamp not in used_timestamps: return timestamp new_timestamp = '{}.{}'.format(timestamp, nonce) while new_timestamp in used_timestamps: nonce += 1 new_timestamp = '{}.{}'.format(timestamp, nonce) return new_timestamp
### Main Links Index
[docs] @contextmanager @enforce_types def timed_index_update(out_path: Path): log_indexing_started(out_path) timer = TimedProgress(TIMEOUT * 2, prefix=' ') try: yield finally: timer.end() assert out_path.exists(), f'Failed to write index file: {out_path}' log_indexing_finished(out_path)
[docs] @enforce_types def write_main_index(links: List[Link], out_dir: Path=OUTPUT_DIR) -> None: """Writes links to sqlite3 file for a given list of links""" log_indexing_process_started(len(links)) try: with timed_index_update(out_dir / SQL_INDEX_FILENAME): write_sql_main_index(links, out_dir=out_dir) os.chmod(out_dir / SQL_INDEX_FILENAME, int(OUTPUT_PERMISSIONS, base=8)) # set here because we don't write it with atomic writes except (KeyboardInterrupt, SystemExit): stderr('[!] Warning: Still writing index to disk...', color='lightyellow') stderr(' Run archivebox init to fix any inconsistencies from an ungraceful exit.') with timed_index_update(out_dir / SQL_INDEX_FILENAME): write_sql_main_index(links, out_dir=out_dir) os.chmod(out_dir / SQL_INDEX_FILENAME, int(OUTPUT_PERMISSIONS, base=8)) # set here because we don't write it with atomic writes raise SystemExit(0) log_indexing_process_finished()
[docs] @enforce_types def load_main_index(out_dir: Path=OUTPUT_DIR, warn: bool=True) -> List[Link]: """parse and load existing index with any new links from import_path merged in""" from core.models import Snapshot try: return Snapshot.objects.all() except (KeyboardInterrupt, SystemExit): raise SystemExit(0)
[docs] @enforce_types def load_main_index_meta(out_dir: Path=OUTPUT_DIR) -> Optional[dict]: index_path = out_dir / JSON_INDEX_FILENAME if index_path.exists(): with open(index_path, 'r', encoding='utf-8') as f: meta_dict = pyjson.load(f) meta_dict.pop('links') return meta_dict return None
### Link Details Index LINK_FILTERS = { 'exact': lambda pattern: Q(url=pattern), 'substring': lambda pattern: Q(url__icontains=pattern), 'regex': lambda pattern: Q(url__iregex=pattern), 'domain': lambda pattern: Q(url__istartswith=f"http://{pattern}") | Q(url__istartswith=f"https://{pattern}") | Q(url__istartswith=f"ftp://{pattern}"), 'tag': lambda pattern: Q(tags__name=pattern), 'timestamp': lambda pattern: Q(timestamp=pattern), }
[docs] @enforce_types def q_filter(snapshots: QuerySet, filter_patterns: List[str], filter_type: str='exact') -> QuerySet: q_filter = Q() for pattern in filter_patterns: try: q_filter = q_filter | LINK_FILTERS[filter_type](pattern) except KeyError: stderr() stderr( f'[X] Got invalid pattern for --filter-type={filter_type}:', color='red', ) stderr(f' {pattern}') raise SystemExit(2) return snapshots.filter(q_filter)
[docs] def search_filter(snapshots: QuerySet, filter_patterns: List[str], filter_type: str='search') -> QuerySet: if not search_backend_enabled(): stderr() stderr( '[X] The search backend is not enabled, set config.USE_SEARCHING_BACKEND = True', color='red', ) raise SystemExit(2) from core.models import Snapshot qsearch = Snapshot.objects.none() for pattern in filter_patterns: try: qsearch |= query_search_index(pattern) except: raise SystemExit(2) return snapshots & qsearch
[docs] @enforce_types def snapshot_filter(snapshots: QuerySet, filter_patterns: List[str], filter_type: str='exact') -> QuerySet: if filter_type != 'search': return q_filter(snapshots, filter_patterns, filter_type) else: return search_filter(snapshots, filter_patterns, filter_type)
[docs] def get_indexed_folders(snapshots, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Link]]: """indexed links without checking archive status or data directory validity""" links = [snapshot.as_link_with_details() for snapshot in snapshots.iterator()] return { link.link_dir: link for link in links }
[docs] def get_archived_folders(snapshots, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Link]]: """indexed links that are archived with a valid data directory""" links = [snapshot.as_link_with_details() for snapshot in snapshots.iterator()] return { link.link_dir: link for link in filter(is_archived, links) }
[docs] def get_unarchived_folders(snapshots, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Link]]: """indexed links that are unarchived with no data directory or an empty data directory""" links = [snapshot.as_link_with_details() for snapshot in snapshots.iterator()] return { link.link_dir: link for link in filter(is_unarchived, links) }
[docs] def get_present_folders(snapshots, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Link]]: """dirs that actually exist in the archive/ folder""" all_folders = {} for entry in (out_dir / ARCHIVE_DIR_NAME).iterdir(): if entry.is_dir(): link = None try: link = parse_json_link_details(entry.path) except Exception: pass all_folders[entry.name] = link return all_folders
[docs] def get_valid_folders(snapshots, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Link]]: """dirs with a valid index matched to the main index and archived content""" links = [snapshot.as_link_with_details() for snapshot in snapshots.iterator()] return { link.link_dir: link for link in filter(is_valid, links) }
[docs] def get_invalid_folders(snapshots, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Link]]: """dirs that are invalid for any reason: corrupted/duplicate/orphaned/unrecognized""" duplicate = get_duplicate_folders(snapshots, out_dir=OUTPUT_DIR) orphaned = get_orphaned_folders(snapshots, out_dir=OUTPUT_DIR) corrupted = get_corrupted_folders(snapshots, out_dir=OUTPUT_DIR) unrecognized = get_unrecognized_folders(snapshots, out_dir=OUTPUT_DIR) return {**duplicate, **orphaned, **corrupted, **unrecognized}
[docs] def get_duplicate_folders(snapshots, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Link]]: """dirs that conflict with other directories that have the same link URL or timestamp""" by_url = {} by_timestamp = {} duplicate_folders = {} data_folders = ( str(entry) for entry in (Path(out_dir) / ARCHIVE_DIR_NAME).iterdir() if entry.is_dir() and not snapshots.filter(timestamp=entry.name).exists() ) for path in chain(snapshots.iterator(), data_folders): link = None if type(path) is not str: path = path.as_link().link_dir try: link = parse_json_link_details(path) except Exception: pass if link: # link folder has same timestamp as different link folder by_timestamp[link.timestamp] = by_timestamp.get(link.timestamp, 0) + 1 if by_timestamp[link.timestamp] > 1: duplicate_folders[path] = link # link folder has same url as different link folder by_url[link.url] = by_url.get(link.url, 0) + 1 if by_url[link.url] > 1: duplicate_folders[path] = link return duplicate_folders
[docs] def get_orphaned_folders(snapshots, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Link]]: """dirs that contain a valid index but aren't listed in the main index""" orphaned_folders = {} for entry in (Path(out_dir) / ARCHIVE_DIR_NAME).iterdir(): if entry.is_dir(): link = None try: link = parse_json_link_details(str(entry)) except Exception: pass if link and not snapshots.filter(timestamp=entry.name).exists(): # folder is a valid link data dir with index details, but it's not in the main index orphaned_folders[str(entry)] = link return orphaned_folders
[docs] def get_corrupted_folders(snapshots, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Link]]: """dirs that don't contain a valid index and aren't listed in the main index""" corrupted = {} for snapshot in snapshots.iterator(): link = snapshot.as_link() if is_corrupt(link): corrupted[link.link_dir] = link return corrupted
[docs] def get_unrecognized_folders(snapshots, out_dir: Path=OUTPUT_DIR) -> Dict[str, Optional[Link]]: """dirs that don't contain recognizable archive data and aren't listed in the main index""" unrecognized_folders: Dict[str, Optional[Link]] = {} for entry in (Path(out_dir) / ARCHIVE_DIR_NAME).iterdir(): if entry.is_dir(): index_exists = (entry / "index.json").exists() link = None try: link = parse_json_link_details(str(entry)) except KeyError: # Try to fix index if index_exists: try: # Last attempt to repair the detail index link_guessed = parse_json_link_details(str(entry), guess=True) write_json_link_details(link_guessed, out_dir=str(entry)) link = parse_json_link_details(str(entry)) except Exception: pass if index_exists and link is None: # index exists but it's corrupted or unparseable unrecognized_folders[str(entry)] = link elif not index_exists: # link details index doesn't exist and the folder isn't in the main index timestamp = entry.name if not snapshots.filter(timestamp=timestamp).exists(): unrecognized_folders[str(entry)] = link return unrecognized_folders
[docs] def is_valid(link: Link) -> bool: dir_exists = Path(link.link_dir).exists() index_exists = (Path(link.link_dir) / "index.json").exists() if not dir_exists: # unarchived links are not included in the valid list return False if dir_exists and not index_exists: return False if dir_exists and index_exists: try: parsed_link = parse_json_link_details(link.link_dir, guess=True) return link.url == parsed_link.url except Exception: pass return False
[docs] def is_corrupt(link: Link) -> bool: if not Path(link.link_dir).exists(): # unarchived links are not considered corrupt return False if is_valid(link): return False return True
[docs] def is_archived(link: Link) -> bool: return is_valid(link) and link.is_archived
[docs] def is_unarchived(link: Link) -> bool: if not Path(link.link_dir).exists(): return True return not link.is_archived
[docs] def fix_invalid_folder_locations(out_dir: Path=OUTPUT_DIR) -> Tuple[List[str], List[str]]: fixed = [] cant_fix = [] for entry in os.scandir(out_dir / ARCHIVE_DIR_NAME): if entry.is_dir(follow_symlinks=True): if (Path(entry.path) / 'index.json').exists(): try: link = parse_json_link_details(entry.path) except KeyError: link = None if not link: continue if not entry.path.endswith(f'/{link.timestamp}'): dest = out_dir / ARCHIVE_DIR_NAME / link.timestamp if dest.exists(): cant_fix.append(entry.path) else: shutil.move(entry.path, dest) fixed.append(dest) timestamp = entry.path.rsplit('/', 1)[-1] assert link.link_dir == entry.path assert link.timestamp == timestamp write_json_link_details(link, out_dir=entry.path) return fixed, cant_fix