Skip to content

dp3.bin.shcmd.common

Shared helpers for the shell-oriented DP3 CLI.

APIError

Bases: RuntimeError

Raised when an API request fails.

DP3APIClient

DP3APIClient(config_dir: str, base_url: Optional[str] = None, timeout: float = 5.0, model_spec: Optional[ModelSpec] = None)

Small HTTP client for the DP3 API.

Source code in dp3/bin/shcmd/common.py
def __init__(
    self,
    config_dir: str,
    base_url: Optional[str] = None,
    timeout: float = 5.0,
    model_spec: Optional[ModelSpec] = None,
):
    self.config_dir = os.path.abspath(config_dir)
    self.model_spec = model_spec
    self.base_url = self._resolve_base_url(base_url)
    self.timeout = timeout
    self.session = requests.Session()

read_json_value

read_json_value(raw_value: str) -> Any

Decode a JSON literal from a command-line argument.

Source code in dp3/bin/shcmd/common.py
def read_json_value(raw_value: str) -> Any:
    """Decode a JSON literal from a command-line argument."""
    try:
        return json.loads(raw_value)
    except json.JSONDecodeError as e:
        raise APIError(f"Invalid JSON value: {e}") from e

read_json_input

read_json_input(path: Optional[str]) -> Any

Decode JSON from a file path or standard input.

Source code in dp3/bin/shcmd/common.py
def read_json_input(path: Optional[str]) -> Any:
    """Decode JSON from a file path or standard input."""
    if path in (None, "-"):
        content = sys.stdin.read()
    else:
        with open(path, encoding="utf-8") as file_handle:
            content = file_handle.read()
    try:
        return json.loads(content)
    except json.JSONDecodeError as e:
        raise APIError(f"Invalid JSON input: {e}") from e

print_response_json

print_response_json(response: Response) -> int

Write an API JSON response to standard output.

Source code in dp3/bin/shcmd/common.py
def print_response_json(response: requests.Response) -> int:
    """Write an API JSON response to standard output."""
    sys.stdout.write(response.text)
    if response.text and not response.text.endswith("\n"):
        sys.stdout.write("\n")
    return 0

common_time_params

common_time_params(args) -> dict[str, Any]

Build shared time-range query parameters from parsed arguments.

Source code in dp3/bin/shcmd/common.py
def common_time_params(args) -> dict[str, Any]:
    """Build shared time-range query parameters from parsed arguments."""
    params = {}
    if getattr(args, "date_from", None) is not None:
        params["date_from"] = args.date_from
    if getattr(args, "date_to", None) is not None:
        params["date_to"] = args.date_to
    return params

read_json_object

read_json_object(raw_value: str, flag_name: str) -> dict[str, Any]

Decode a JSON object from a command-line argument.

Source code in dp3/bin/shcmd/common.py
def read_json_object(raw_value: str, flag_name: str) -> dict[str, Any]:
    """Decode a JSON object from a command-line argument."""
    value = read_json_value(raw_value)
    if not isinstance(value, dict):
        raise APIError(f"{flag_name} must decode to a JSON object.")
    return value

stream_json_pages

stream_json_pages(client: DP3APIClient, path: str, params: dict[str, Any], start_skip: int, requested_limit: int, page_size: int = 100) -> int

Stream paged JSON API results as NDJSON.

Source code in dp3/bin/shcmd/common.py
def stream_json_pages(
    client: DP3APIClient,
    path: str,
    params: dict[str, Any],
    start_skip: int,
    requested_limit: int,
    page_size: int = 100,
) -> int:
    """Stream paged JSON API results as NDJSON."""
    emitted = 0
    skip = start_skip
    remaining = requested_limit

    try:
        while True:
            batch_limit = page_size if requested_limit == 0 else min(page_size, remaining)
            page_params = dict(params)
            page_params["skip"] = skip
            page_params["limit"] = batch_limit
            response = client.request("GET", path, params=page_params)
            items = _extract_page_items(response.json())
            if not items:
                break

            for item in items:
                sys.stdout.write(json.dumps(item))
                sys.stdout.write("\n")
                sys.stdout.flush()
                emitted += 1
                if requested_limit != 0 and emitted >= requested_limit:
                    return 0

            fetched = len(items)
            skip += fetched
            if requested_limit != 0:
                remaining -= fetched
                if remaining <= 0:
                    break
            if fetched < batch_limit:
                break
    except BrokenPipeError:
        return 0
    return 0

resolve_config_dir

resolve_config_dir(config_dir: Optional[str]) -> str

Resolve the configuration directory for the shell-oriented CLI.

Source code in dp3/bin/shcmd/common.py
def resolve_config_dir(config_dir: Optional[str]) -> str:
    """Resolve the configuration directory for the shell-oriented CLI."""
    if config_dir is not None:
        return os.path.abspath(config_dir)
    if os.environ.get("DP3_CONFIG_DIR"):
        return os.path.abspath(os.environ["DP3_CONFIG_DIR"])
    return os.path.abspath("config")

load_completion_model_spec cached

load_completion_model_spec(config_dir: str) -> Optional[ModelSpec]

Load the model specification used by shell completion.

Source code in dp3/bin/shcmd/common.py
@lru_cache(maxsize=32)
def load_completion_model_spec(config_dir: str) -> Optional[ModelSpec]:
    """Load the model specification used by shell completion."""
    try:
        config = read_config_dir(config_dir, recursive=True)
        return ModelSpec(config.get("db_entities"))
    except Exception:
        return None

load_completion_entity_catalog cached

load_completion_entity_catalog(config_dir: str, base_url: Optional[str], timeout: float) -> Optional[dict[str, Any]]

Load entity metadata from the API when config-based completion is unavailable.

Source code in dp3/bin/shcmd/common.py
@lru_cache(maxsize=32)
def load_completion_entity_catalog(
    config_dir: str, base_url: Optional[str], timeout: float
) -> Optional[dict[str, Any]]:
    """Load entity metadata from the API when config-based completion is unavailable."""
    try:
        client = DP3APIClient(config_dir, base_url, timeout)
        payload = client.request("GET", "/entities").json()
    except Exception:
        return None
    return payload if isinstance(payload, dict) else None

get_completion_context

get_completion_context(parsed_args) -> tuple[Optional[ModelSpec], Optional[dict[str, Any]]]

Return completion metadata derived from config and API sources.

Source code in dp3/bin/shcmd/common.py
def get_completion_context(
    parsed_args,
) -> tuple[Optional[ModelSpec], Optional[dict[str, Any]]]:
    """Return completion metadata derived from config and API sources."""
    config_dir = resolve_config_dir(getattr(parsed_args, "config", None))
    model_spec = load_completion_model_spec(config_dir)
    entity_catalog = None
    if model_spec is None:
        entity_catalog = load_completion_entity_catalog(
            config_dir,
            getattr(parsed_args, "url", None),
            getattr(parsed_args, "timeout", 5.0),
        )
    return model_spec, entity_catalog

complete_entity_type_names

complete_entity_type_names(prefix: str, parsed_args, **_kwargs) -> dict[str, str]

Complete entity type names from config or API metadata.

Source code in dp3/bin/shcmd/common.py
def complete_entity_type_names(prefix: str, parsed_args, **_kwargs) -> dict[str, str]:
    """Complete entity type names from config or API metadata."""
    model_spec, entity_catalog = get_completion_context(parsed_args)
    if model_spec is not None:
        values = sorted(model_spec.entities)
    elif entity_catalog is not None:
        values = sorted(entity_catalog)
    else:
        values = []
    return {
        value: _entity_type_description(value, model_spec, entity_catalog)
        for value in values
        if value.startswith(prefix)
    }