Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cwmscli/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ def cli():
cli.add_command(commands_cwms.shefcritimport)
cli.add_command(commands_cwms.csv2cwms_cmd)
cli.add_command(commands_cwms.blob_group)
cli.add_command(commands_cwms.clob_group)
117 changes: 10 additions & 107 deletions cwmscli/commands/blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
import os
import re
import sys
from typing import Optional, Sequence
from typing import Optional, Sequence, Union

import cwms
import pandas as pd
import requests

from cwmscli.utils import get_api_key
from cwmscli.utils import get_api_key, has_invalid_chars
from cwmscli.utils.deps import requires

# used to rebuild data URL for images
Expand All @@ -27,7 +27,7 @@
"link": "https://docs.python.org/3/library/imghdr.html",
}
)
def _determine_ext(data: bytes | str, write_type: str) -> str:
def _determine_ext(data: Union[bytes, str], write_type: str) -> str:
"""
Attempt to determine the file extension from the data itself.
Requires the imghdr module (lazy import) to inspect the bytes for image types.
Expand All @@ -51,7 +51,7 @@ def _determine_ext(data: bytes | str, write_type: str) -> str:
def _save_base64(
b64_or_dataurl: str,
dest: str,
media_type_hint: str | None = None,
media_type_hint: Optional[str] = None,
) -> str:
m = DATA_URL_RE.match(b64_or_dataurl.strip())
if m:
Expand Down Expand Up @@ -90,107 +90,6 @@ def _save_base64(
return dest


def store_blob(**kwargs):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for removing this! I missed it

file_data = kwargs.get("file_data")
blob_id = kwargs.get("blob_id", "").upper()
# Attempt to determine what media type should be used for the mime-type if one is not presented based on the file extension
media = kwargs.get("media_type") or get_media_type(kwargs.get("input_file"))

logging.debug(
f"Office: {kwargs.get('office')} Output ID: {blob_id} Media: {media}"
)

blob = {
"office-id": kwargs.get("office"),
"id": blob_id,
"description": json.dumps(kwargs.get("description")),
"media-type-id": media,
"value": base64.b64encode(file_data).decode("utf-8"),
}

params = {"fail-if-exists": not kwargs.get("overwrite")}

if kwargs.get("dry_run"):
logging.info(
f"--dry-run enabled. Would POST to {kwargs.get('api_root')}/blobs with params={params}"
)
logging.info(
f"Blob payload summary: office-id={kwargs.get('office')}, id={blob_id}, media={media}",
)
logging.info(
json.dumps(
{
"url": f"{kwargs.get('api_root')}blobs",
"params": params,
"blob": {**blob, "value": f"<base64:{len(blob['value'])} chars>"},
},
indent=2,
)
)
sys.exit(0)

try:
cwms.store_blobs(blob, fail_if_exists=kwargs.get("overwrite"))
logging.info(f"Successfully stored blob with ID: {blob_id}")
logging.info(
f"View: {kwargs.get('api_root')}blobs/{blob_id}?office={kwargs.get('office')}"
)
except requests.HTTPError as e:
# Include response text when available
detail = getattr(e.response, "text", "") or str(e)
logging.error(f"Failed to store blob (HTTP): {detail}")
sys.exit(1)
except Exception as e:
logging.error(f"Failed to store blob: {e}")
sys.exit(1)


def retrieve_blob(**kwargs):
blob_id = kwargs.get("blob_id", "").upper()
if not blob_id:
logging.warning(
"Valid blob_id required to download a blob. cwms-cli blob download --blob-id=myid. Run the list directive to see options for your office."
)
sys.exit(0)
logging.debug(f"Office: {kwargs.get('office')} Blob ID: {blob_id}")
try:
blob = cwms.get_blob(
office_id=kwargs.get("office"),
blob_id=blob_id,
)
logging.info(
f"Successfully retrieved blob with ID: {blob_id}",
)
_save_base64(blob, dest=blob_id)
logging.info(f"Downloaded blob to: {blob_id}")
except requests.HTTPError as e:
detail = getattr(e.response, "text", "") or str(e)
logging.error(f"Failed to retrieve blob (HTTP): {detail}")
sys.exit(1)
except Exception as e:
logging.error(f"Failed to retrieve blob: {e}")
sys.exit(1)


def delete_blob(**kwargs):
blob_id = kwargs.get("blob_id").upper()
logging.debug(f"Office: {kwargs.get('office')} Blob ID: {blob_id}")

try:
# cwms.delete_blob(
# office_id=kwargs.get("office"),
# blob_id=kwargs.get("blob_id").upper(),
# )
logging.info(f"Successfully deleted blob with ID: {blob_id}")
except requests.HTTPError as e:
details = getattr(e.response, "text", "") or str(e)
logging.error(f"Failed to delete blob (HTTP): {details}")
sys.exit(1)
except Exception as e:
logging.error(f"Failed to delete blob: {e}")
sys.exit(1)


def list_blobs(
office: Optional[str] = None,
blob_id_like: Optional[str] = None,
Expand Down Expand Up @@ -295,7 +194,10 @@ def upload_cmd(
try:
cwms.store_blobs(blob, fail_if_exists=not overwrite)
logging.info(f"Uploaded blob: {blob_id_up}")
logging.info(f"View: {api_root}blobs/{blob_id_up}?office={office}")
if has_invalid_chars(blob_id_up):
logging.info(f"View: {api_root}blobs/ignored?blob-id={blob_id_up}&office={office}")
Copy link
Collaborator

@krowvin krowvin Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blobs/ignored? Is this a hidden endpoint to allow for blob id in a URI with a path?

If this requires your fix to CDA (i.e. it's not a hidden endpoint now) we might need to lock the package to a specific CDA version and call the version endpoint to test the version on cwms-cli first run. Instead of assuming the /ignored endpoint exists in a given instance.

Think delayed adoption type stuff. And the /v2 /v3 etc does not exist in CDA yet.

Copy link
Collaborator Author

@DanielTOsborne DanielTOsborne Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'ignored' is recommended in the CDA documentation, see /blobs/{blob-id} endpoint.

else:
logging.info(f"View: {api_root}blobs/{blob_id_up}?office={office}")
except requests.HTTPError as e:
detail = getattr(e.response, "text", "") or str(e)
logging.error(f"Failed to upload (HTTP): {detail}")
Expand Down Expand Up @@ -415,4 +317,5 @@ def list_cmd(
else:
# Friendly console preview
with pd.option_context("display.max_rows", 500, "display.max_columns", None):
logging.info(df.to_string(index=False))
# Left-align all columns
logging.info("\n" + df.apply(lambda s: (s:=s.astype(str).str.strip()).str.ljust(s.str.len().max())).to_string(index=False, justify='left'))
231 changes: 231 additions & 0 deletions cwmscli/commands/clob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
import base64
import json
import logging
import mimetypes
import os
import re
import sys
from typing import Optional, Sequence

import cwms
import pandas as pd
import requests

from cwmscli.utils import get_api_key, has_invalid_chars

def list_clobs(
office: Optional[str] = None,
clob_id_like: Optional[str] = None,
columns: Optional[Sequence[str]] = None,
sort_by: Optional[Sequence[str]] = None,
ascending: bool = True,
limit: Optional[int] = None,
) -> pd.DataFrame:
logging.info(f"Listing clobs for office: {office!r}...")
result = cwms.get_clobs(office_id=office, clob_id_like=clob_id_like)

# Accept either a DataFrame or a JSON/dict-like response
if isinstance(result, pd.DataFrame):
df = result.copy()
else:
# Expecting normal clob return structure
data = getattr(result, "json", None)
if callable(data):
data = result.json()
df = pd.DataFrame((data or {}).get("clobs", []))

# Allow column filtering
if columns:
keep = [c for c in columns if c in df.columns]
if keep:
df = df[keep]

# Sort by option
if sort_by:
by = [c for c in sort_by if c in df.columns]
if by:
df = df.sort_values(by=by, ascending=ascending, kind="stable")

# Optional limit
if limit is not None:
df = df.head(limit)

logging.info(f"Found {len(df):,} clob(s)")
# List the clobs in the logger
for _, row in df.iterrows():
logging.info(f"clob ID: {row['id']}, Description: {row.get('description')}")
return df


def upload_cmd(
input_file: str,
clob_id: str,
description: str,
overwrite: bool,
dry_run: bool,
office: str,
api_root: str,
api_key: str,
):
cwms.init_session(api_root=api_root, api_key=get_api_key(api_key, ""))
try:
file_size = os.path.getsize(input_file)
with open(input_file, "r") as f:
file_data = f.read()
logging.info(f"Read file: {input_file} ({file_size} bytes)")
except Exception as e:
logging.error(f"Failed to read file: {e}")
sys.exit(1)

clob_id_up = clob_id.upper()
logging.debug(f"Office={office} clobID={clob_id_up}")

clob = {
"office-id": office,
"id": clob_id_up,
"description": (
json.dumps(description)
if isinstance(description, (dict, list))
else description
),
"value": file_data,
}
params = {"fail-if-exists": not overwrite}

if dry_run:
logging.info(f"DRY RUN: would POST {api_root}clobs with params={params}")
logging.info(
json.dumps(
{
"url": f"{api_root}clobs",
"params": params,
"clob": {**clob, "value": f'<{len(clob["value"])} chars>'},
},
indent=2,
)
)
return

try:
cwms.store_clobs(clob, fail_if_exists=not overwrite)
logging.info(f"Uploaded clob: {clob_id_up}")
# IDs with / can't be used directly in the path
# TODO: check for other disallowed characters
if has_invalid_chars(clob_id_up):
logging.info(f"View: {api_root}clobs/ignored?clob-id={clob_id_up}&office={office}")
else:
logging.info(f"View: {api_root}clobs/{clob_id_up}?office={office}")
except requests.HTTPError as e:
detail = getattr(e.response, "text", "") or str(e)
logging.error(f"Failed to upload (HTTP): {detail}")
sys.exit(1)
except Exception as e:
logging.error(f"Failed to upload: {e}")
sys.exit(1)


def download_cmd(
clob_id: str, dest: str, office: str, api_root: str, api_key: str, dry_run: bool
):
if dry_run:
logging.info(
f"DRY RUN: would GET {api_root} clob with clob-id={clob_id} office={office}."
)
return
cwms.init_session(api_root=api_root, api_key=get_api_key(api_key, ""))
bid = clob_id.upper()
logging.debug(f"Office={office} clobID={bid}")

try:
clob = cwms.get_clob(office_id=office, clob_id=bid)
os.makedirs(os.path.dirname(dest) or ".", exist_ok=True)
sys.stderr.write(repr(clob.json) + "\n")
with open(dest, "wt") as f:
f.write(clob.json["value"])

logging.info(f"Downloaded clob to: {dest}")
except requests.HTTPError as e:
detail = getattr(e.response, "text", "") or str(e)
logging.error(f"Failed to download (HTTP): {detail}")
sys.exit(1)
except Exception as e:
logging.error(f"Failed to download: {e}")
sys.exit(1)


def delete_cmd(clob_id: str, office: str, api_root: str, api_key: str, dry_run: bool):

if dry_run:
logging.info(
f"DRY RUN: would DELETE {api_root} clob with clob-id={clob_id} office={office}"
)
return
cwms.init_session(api_root=api_root, api_key=api_key)
cwms.delete_clob(office_id=office, clob_id=clob_id)
logging.info(f"Deleted clob: {clob_id} for office: {office}")


def update_cmd(
input_file: str,
clob_id: str,
description: str,
ignore_nulls: bool,
dry_run: bool,
office: str,
api_root: str,
api_key: str,
):
if dry_run:
logging.info(
f"DRY RUN: would PATCH {api_root} clob with clob-id={clob_id} office={office}"
)
return
file_data = None
if input_file:
try:
file_size = os.path.getsize(input_file)
with open(input_file, "r") as f:
file_data = f.read()
logging.info(f"Read file: {input_file} ({file_size} bytes)")
except Exception as e:
logging.error(f"Failed to read file: {e}")
sys.exit(1)
# Setup minimum required payload
clob = {"office-id": office, "id": clob_id.upper()}
if description:
clob["description"] = description

if file_data:
clob["value"] = file_data
cwms.init_session(api_root=api_root, api_key=api_key)
cwms.update_clob(clob, clob_id.upper(), ignore_nulls=ignore_nulls)


def list_cmd(
clob_id_like: str,
columns: list[str],
sort_by: list[str],
desc: bool,
limit: int,
to_csv: str,
office: str,
api_root: str,
api_key: str,
):
cwms.init_session(api_root=api_root, api_key=get_api_key(api_key, None))
df = list_clobs(
office=office,
clob_id_like=clob_id_like,
columns=columns,
sort_by=sort_by,
ascending=not desc,
limit=limit,
)
if to_csv:
df.to_csv(to_csv, index=False)
logging.info(f"Wrote {len(df)} rows to {to_csv}")
else:
# Friendly console preview
with pd.option_context("display.max_rows", 500, "display.max_columns", None):
# Left-align all columns
logging.info("\n" + df.apply(lambda s: (s:=s.astype(str).str.strip()).str.ljust(s.str.len().max())).to_string(index=False, justify='left'))
Loading
Loading