From 4befd984b28a6858bbb2313082eef1d05947739b Mon Sep 17 00:00:00 2001 From: Amrit Ghimire Date: Wed, 9 Jul 2025 18:06:41 +0545 Subject: [PATCH 01/12] Add cli support to move, remove and copy file to storage using Studio This adds the support for following command: ``` usage: datachain storage cp [-h] [-v] [-q] [--recursive] [--team TEAM] source_path destination_path ``` ``` usage: datachain storage mv [-h] [-v] [-q] [--recursive] [--team TEAM] path new_path ``` usage: datachain storage cp [-h] [-v] [-q] [--recursive] [--team TEAM] source_path destination_path ``` Please check the documentation on more details on this. I am not sure of the cli command storage as it seems too long. At the same time, we already have cp which do something differently. Also, should we fallback to local creds and do something locally if studio auth is not available? --- docs/commands/storage/cp.md | 168 ++++++++++++++++++ docs/commands/storage/mv.md | 89 ++++++++++ docs/commands/storage/rm.md | 97 ++++++++++ mkdocs.yml | 4 + src/datachain/cli/__init__.py | 2 + src/datachain/cli/commands/storages.py | 236 +++++++++++++++++++++++++ src/datachain/cli/parser/__init__.py | 3 +- src/datachain/cli/parser/studio.py | 117 ++++++++++++ src/datachain/remote/studio.py | 127 ++++++++++++- 9 files changed, 841 insertions(+), 2 deletions(-) create mode 100644 docs/commands/storage/cp.md create mode 100644 docs/commands/storage/mv.md create mode 100644 docs/commands/storage/rm.md create mode 100644 src/datachain/cli/commands/storages.py diff --git a/docs/commands/storage/cp.md b/docs/commands/storage/cp.md new file mode 100644 index 000000000..311332c47 --- /dev/null +++ b/docs/commands/storage/cp.md @@ -0,0 +1,168 @@ +# storage cp + +Copy files and directories between local and/or remote storage using Studio. + +## Synopsis + +```usage +usage: datachain storage cp [-h] [-v] [-q] [--recursive] [--team TEAM] source_path destination_path +``` + +## Description + +This command copies files and directories between local and/or remote storage using the credentials configured in Studio. The command automatically determines the operation type based on the source and destination protocols, supporting four different copy scenarios. + +## Arguments + +* `source_path` - Path to the source file or directory to copy +* `destination_path` - Path to the destination file or directory to copy to + +## Options + +* `--recursive` - Copy directories recursively (required for copying directories) +* `--team TEAM` - Team name to copy storage contents to (default: from config) +* `-h`, `--help` - Show the help message and exit +* `-v`, `--verbose` - Be verbose +* `-q`, `--quiet` - Be quiet + +## Copy Operations + +The command automatically determines the operation type based on the source and destination protocols: + +### 1. Local to Local (local path → local path) +**Operation**: Direct local file system copy +- Uses the local filesystem's native copy operation +- Fastest operation as no network transfer is involved +- Supports both files and directories + +**Example**: +```bash +datachain storage cp /path/to/local/file.txt /path/to/destination/file.txt +``` + +### 2. Local to Remote (local path → `s3://`, `gs://`, `az://`) +**Operation**: Upload to cloud storage +- Uploads local files/directories to remote storage +- Uses presigned URLs for secure uploads +- Supports S3 multipart form data for large files +- Requires `--recursive` flag for directories + +**Examples**: +```bash +# Upload single file +datachain storage cp /path/to/file.txt s3://my-bucket/data/file.txt + +# Upload directory recursively +datachain storage cp --recursive /path/to/directory s3://my-bucket/data/ +``` + +### 3. Remote to Local (`s3://`, `gs://`, `az://` → local path) +**Operation**: Download from cloud storage +- Downloads remote files/directories to local storage +- Uses presigned download URLs +- Automatically extracts filename if destination is a directory +- Creates destination directory if it doesn't exist + +**Examples**: +```bash +# Download single file +datachain storage cp s3://my-bucket/data/file.txt /path/to/local/file.txt + +# Download to directory (filename preserved) +datachain storage cp s3://my-bucket/data/file.txt /path/to/directory/ +``` + +### 4. Remote to Remote (`s3://` → `s3://`, `gs://` → `gs://`, etc.) +**Operation**: Copy within cloud storage +- Copies files between locations in the same bucket +- Cannot copy between different buckets (same limitation as `mv`) +- Uses Studio's internal copy operation +- Requires `--recursive` flag for directories + +**Examples**: +```bash +# Copy within same bucket +datachain storage cp s3://my-bucket/data/file.txt s3://my-bucket/archive/file.txt + +# Copy directory recursively +datachain storage cp --recursive s3://my-bucket/data/images s3://my-bucket/backup/images +``` + +## Supported Storage Protocols + +The command supports the following storage protocols: +- **Local file system**: Direct paths (e.g., `/path/to/directory` or `./relative/path`) +- **AWS S3**: `s3://bucket-name/path` +- **Google Cloud Storage**: `gs://bucket-name/path` +- **Azure Blob Storage**: `az://container-name/path` + +## Examples + +### Local to Remote Operations + +1. Upload a single file: +```bash +datachain storage cp /path/to/image.jpg s3://my-bucket/images/image.jpg +``` + +2. Upload a directory recursively: +```bash +datachain storage cp --recursive /path/to/dataset s3://my-bucket/datasets/ +``` + +3. Upload to a different team's storage: +```bash +datachain storage cp --team other-team /path/to/file.txt s3://my-bucket/data/file.txt +``` + +### Remote to Local Operations + +4. Download a file: +```bash +datachain storage cp s3://my-bucket/data/file.txt /path/to/local/file.txt +``` + +### Remote to Remote Operations + +6. Copy within the same bucket: +```bash +datachain storage cp s3://my-bucket/data/file.txt s3://my-bucket/archive/file.txt +``` + +7. Copy directory with verbose output: +```bash +datachain storage cp -v --recursive s3://my-bucket/datasets/raw s3://my-bucket/datasets/processed +``` + +## Limitations and Edge Cases + +### Bucket Restrictions +- **Cannot copy between different buckets**: Remote-to-remote copies must be within the same bucket +- **Cross-bucket operations**: Use local as intermediate step for cross-bucket copies + +### Directory Operations +- **Recursive flag required**: Copying directories requires the `--recursive` flag +- **Directory structure preservation**: Directory structure is preserved during copy operations +- **Empty directories**: Empty directories may not be copied in some scenarios + +### File Operations +- **File overwrites**: Existing files may be overwritten without confirmation +- **Relative vs absolute paths**: Both relative and absolute paths are supported +- **Directory creation**: Destination directories are created automatically when needed + +### Error Handling +- **File not found**: Missing source files result in operation failure +- **Permission errors**: Insufficient permissions cause operation failure +- **Network issues**: Network problems are reported with appropriate error messages + + +### Team Configuration +- **Default team**: If no team is specified, uses the team from your configuration +- **Team-specific storage**: Each team has its own storage namespace + +## Notes + +* Use the `--verbose` flag to get detailed information about the copy operation +* The `--quiet` flag suppresses output except for errors +* This command operates through Studio, so you must be authenticated with `datachain auth login` before using it +* For cross-bucket copies, consider using local storage as an intermediate step diff --git a/docs/commands/storage/mv.md b/docs/commands/storage/mv.md new file mode 100644 index 000000000..9f3a22ce6 --- /dev/null +++ b/docs/commands/storage/mv.md @@ -0,0 +1,89 @@ +# storage mv + +Move files and directories in Storages using Studio. + +## Synopsis + +```usage +usage: datachain storage mv [-h] [-v] [-q] [--recursive] [--team TEAM] path new_path +``` + +## Description + +This command moves files and directories within storage using the credentials configured in Studio.. The move operation is performed within the same bucket - you cannot move files between different buckets. The command supports both individual files and directories, with the `--recursive` flag required for moving directories. + +## Arguments + +* `path` - Path to the storage file or directory to move +* `new_path` - New path where the file or directory should be moved to + +## Options + +* `--recursive` - Move directories recursively (required for moving directories) +* `--team TEAM` - Team name to move storage contents from (default: from config) +* `-h`, `--help` - Show the help message and exit +* `-v`, `--verbose` - Be verbose +* `-q`, `--quiet` - Be quiet + +## Examples + +1. Move a single file: +```bash +datachain storage mv s3://my-bucket/data/file.txt s3://my-bucket/archive/file.txt +``` + +2. Move a directory recursively: +```bash +datachain storage mv --recursive s3://my-bucket/data/images s3://my-bucket/archive/images +``` + +3. Move a file to a different team's storage: +```bash +datachain storage mv --team other-team s3://my-bucket/data/file.txt s3://my-bucket/backup/file.txt +``` + +4. Move a file with verbose output: +```bash +datachain storage mv -v s3://my-bucket/data/file.txt s3://my-bucket/processed/file.txt +``` + +5. Move a directory to a subdirectory: +```bash +datachain storage mv --recursive s3://my-bucket/datasets/raw s3://my-bucket/datasets/processed/raw +``` + +## Supported Storage Protocols + +The command supports the following storage protocols: +- **AWS S3**: `s3://bucket-name/path` +- **Google Cloud Storage**: `gs://bucket-name/path` +- **Azure Blob Storage**: `az://container-name/path` +- **Local file system**: `file:///path/to/directory` + +## Limitations and Edge Cases + +### Bucket Restrictions +- **Cannot move between different buckets**: The source and destination must be in the same bucket. Attempting to move between different buckets will result in an error: "Cannot move between different buckets" + +### Directory Operations +- **Recursive flag required**: Moving directories requires the `--recursive` flag. Without it, the operation will fail +- **Directory structure preservation**: When moving directories, the internal structure is preserved + +### Path Handling +- **Relative vs absolute paths**: Both relative and absolute paths within the bucket are supported + +### Error Handling +- **File not found**: If the source file or directory doesn't exist, the operation will fail +- **Permission errors**: Insufficient permissions will result in operation failure +- **Storage service errors**: Network issues or storage service problems will be reported with appropriate error messages + +### Team Configuration +- **Default team**: If no team is specified, the command uses the team from your configuration +- **Team-specific storage**: Each team has its own storage namespace, so moving between teams is not supported + +## Notes + +* Moving large directories may take time depending on the number of files and network conditions +* Use the `--verbose` flag to get detailed information about the move operation +* The `--quiet` flag suppresses output except for errors +* This command operates through Studio, so you must be authenticated with `datachain auth login` before using it diff --git a/docs/commands/storage/rm.md b/docs/commands/storage/rm.md new file mode 100644 index 000000000..64d444efe --- /dev/null +++ b/docs/commands/storage/rm.md @@ -0,0 +1,97 @@ +# storage rm + +Delete files and directories in Storages using Studio. + +## Synopsis + +```usage +usage: datachain storage rm [-h] [-v] [-q] [--recursive] [--team TEAM] path +``` + +## Description + +This command deletes files and directories within storage using the credentials configured in Studio. The command supports both individual files and directories, with the `--recursive` flag required for deleting directories. This is a destructive operation that permanently removes files and cannot be undone. + +## Arguments + +* `path` - Path to the storage file or directory to delete + +## Options + +* `--recursive` - Delete directories recursively (required for deleting directories) +* `--team TEAM` - Team name to delete storage contents from (default: from config) +* `-h`, `--help` - Show the help message and exit +* `-v`, `--verbose` - Be verbose +* `-q`, `--quiet` - Be quiet + +## Examples + +1. Delete a single file: +```bash +datachain storage rm s3://my-bucket/data/file.txt +``` + +2. Delete a directory recursively: +```bash +datachain storage rm --recursive s3://my-bucket/data/images +``` + +3. Delete a file from a different team's storage: +```bash +datachain storage rm --team other-team s3://my-bucket/data/file.txt +``` + +4. Delete a file with verbose output: +```bash +datachain storage rm -v s3://my-bucket/data/file.txt +``` + +5. Delete a directory quietly (suppress output): +```bash +datachain storage rm -q --recursive s3://my-bucket/temp-data +``` + +6. Delete a specific subdirectory: +```bash +datachain storage rm --recursive s3://my-bucket/datasets/raw/old-version +``` + +## Supported Storage Protocols + +The command supports the following storage protocols: +- **AWS S3**: `s3://bucket-name/path` +- **Google Cloud Storage**: `gs://bucket-name/path` +- **Azure Blob Storage**: `az://container-name/path` + +## Limitations and Edge Cases + +### Directory Operations +- **Recursive flag required**: Deleting directories requires the `--recursive` flag. Without it, the operation will fail +- **Directory structure**: When deleting directories, all files and subdirectories within the directory are removed + +### File Operations +- **Non-existent files**: Attempting to delete a non-existent file will result in an error +- **Relative vs absolute paths**: Both relative and absolute paths within the bucket are supported + +### Error Handling +- **File not found**: If the source file or directory doesn't exist, the operation will fail +- **Permission errors**: Insufficient permissions will result in operation failure +- **Storage service errors**: Network issues or storage service problems will be reported with appropriate error messages +- **Directory not empty**: Attempting to delete a non-empty directory without `--recursive` will fail + +### Team Configuration +- **Default team**: If no team is specified, the command uses the team from your configuration +- **Team-specific storage**: Each team has its own storage namespace, so deleting from other teams requires explicit team specification + +### Safety Considerations +- **Permanent deletion**: This operation permanently removes files and cannot be undone +- **Batch operations**: Large directories may contain many files and deletion may take time + +## Notes + +* The delete operation is performed through Studio using the configured credentials +* Moving large directories may take time depending on the number of files and network conditions +* Use the `--verbose` flag to get detailed information about the delete operation +* The `--quiet` flag suppresses output except for errors +* This command operates through Studio, so you must be authenticated with `datachain auth login` before using it +* **Warning**: This is a destructive operation. Always double-check the path before executing the command diff --git a/mkdocs.yml b/mkdocs.yml index 6a4480639..4b1ea58a6 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -98,6 +98,10 @@ nav: - cancel: commands/job/cancel.md - ls: commands/job/ls.md - clusters: commands/job/clusters.md + - storage: + - rm: commands/storage/rm.md + - mv: commands/storage/mv.md + - cp: commands/storage/cp.md - 📚 User Guide: - Overview: guide/index.md - 📡 Interacting with remote storage: guide/remotes.md diff --git a/src/datachain/cli/__init__.py b/src/datachain/cli/__init__.py index accf0b436..d0f46eadf 100644 --- a/src/datachain/cli/__init__.py +++ b/src/datachain/cli/__init__.py @@ -78,6 +78,7 @@ def main(argv: Optional[list[str]] = None) -> int: def handle_command(args, catalog, client_config) -> int: """Handle the different CLI commands.""" + from datachain.cli.commands.storages import process_storage_command from datachain.studio import process_auth_cli_args, process_jobs_args command_handlers = { @@ -96,6 +97,7 @@ def handle_command(args, catalog, client_config) -> int: "gc": lambda: garbage_collect(catalog), "auth": lambda: process_auth_cli_args(args), "job": lambda: process_jobs_args(args), + "storage": lambda: process_storage_command(args), } handler = command_handlers.get(args.command) diff --git a/src/datachain/cli/commands/storages.py b/src/datachain/cli/commands/storages.py new file mode 100644 index 000000000..1ee04b692 --- /dev/null +++ b/src/datachain/cli/commands/storages.py @@ -0,0 +1,236 @@ +import mimetypes +import os.path +import sys +from typing import TYPE_CHECKING +from urllib.parse import urlparse + +import requests + +from datachain.config import Config +from datachain.error import DataChainError +from datachain.remote.studio import StudioClient + +if TYPE_CHECKING: + from argparse import Namespace + + from fsspec import AbstractFileSystem + + +def process_storage_command(args: "Namespace"): + if args.cmd is None: + print( + f"Use 'datachain {args.command} --help' to see available options", + file=sys.stderr, + ) + return 1 + + if args.cmd == "rm": + return rm_storage(args) + if args.cmd == "mv": + return mv_storage(args) + if args.cmd == "cp": + return cp_storage(args) + raise DataChainError(f"Unknown command '{args.cmd}'.") + + +def _get_studio_client(args: "Namespace"): + token = Config().read().get("studio", {}).get("token") + if not token: + raise DataChainError( + "Not logged in to Studio. Log in with 'datachain auth login'." + ) + return StudioClient(team=args.team) + + +def rm_storage(args: "Namespace"): + client = _get_studio_client(args) + + response = client.delete_storage_file( + args.path, + recursive=args.recursive, + ) + if not response.ok: + raise DataChainError(response.message) + + print(f"Deleted {args.path}") + + +def mv_storage(args: "Namespace"): + client = _get_studio_client(args) + + response = client.move_storage_file( + args.path, + args.new_path, + recursive=args.recursive, + ) + if not response.ok: + raise DataChainError(response.message) + + print(f"Moved {args.path} to {args.new_path}") + + +def cp_storage(args: "Namespace"): + from datachain.client.fsspec import Client + + source_cls = Client.get_implementation(args.source_path) + destination_cls = Client.get_implementation(args.destination_path) + + # Determine operation based on source and destination protocols + if source_cls.protocol == "file" and destination_cls.protocol == "file": + source_fs = source_cls.create_fs() + source_fs.cp_file( + args.source_path, + args.destination_path, + ) + elif source_cls.protocol == "file": + source_fs = source_cls.create_fs() + _upload_to_storage(args, source_fs) + elif destination_cls.protocol == "file": + destination_fs = destination_cls.create_fs() + _download_from_storage(args, destination_fs) + else: + _copy_inside_storage(args) + + +def _upload_to_storage(args: "Namespace", local_fs: "AbstractFileSystem"): + from datachain.client.fsspec import Client + + studio_client = _get_studio_client(args) + + is_dir = local_fs.isdir(args.source_path) + if is_dir and not args.recursive: + raise DataChainError("Cannot copy directory without --recursive") + + client = Client.get_implementation(args.destination_path) + _, subpath = client.split_url(args.destination_path) + + if is_dir: + file_paths = { + os.path.join(subpath, os.path.relpath(path, args.source_path)): path + for path in local_fs.find(args.source_path) + } + else: + destination_path = ( + os.path.join(subpath, os.path.basename(args.source_path)) + if args.destination_path.endswith(("/", "\\")) or not subpath + else subpath + ) + file_paths = {destination_path: args.source_path} + + response = studio_client.batch_presigned_urls( + args.destination_path, + {dest: mimetypes.guess_type(src)[0] for dest, src in file_paths.items()}, + ) + if not response.ok: + raise DataChainError(response.message) + + urls = response.data.get("urls", {}) + headers = response.data.get("headers", {}) + + # Upload each file using the presigned URLs + + for dest_path, source_path in file_paths.items(): + if dest_path not in urls: + raise DataChainError(f"No presigned URL found for {dest_path}") + + upload_url = urls[dest_path]["url"] + if "fields" in urls[dest_path]: + # S3 storage - use multipart form data upload + + # Create form data + form_data = dict(urls[dest_path]["fields"]) + + # Add Content-Type if it's required by the policy + content_type = mimetypes.guess_type(source_path)[0] + if content_type: + form_data["Content-Type"] = content_type + + # Add file content + file_content = local_fs.open(source_path, "rb").read() + form_data["file"] = ( + os.path.basename(source_path), + file_content, + content_type, + ) + + # Upload using POST with form data + upload_response = requests.post(upload_url, files=form_data, timeout=3600) + else: + # Read the file content + with local_fs.open(source_path, "rb") as f: + file_content = f.read() + + # Upload the file using the presigned URL + upload_response = requests.request( + response.data.get("method", "PUT"), + upload_url, + data=file_content, + headers={ + **headers, + "Content-Type": mimetypes.guess_type(source_path)[0], + }, + timeout=3600, + ) + + if upload_response.status_code >= 400: + raise DataChainError( + f"Failed to upload {source_path} to {dest_path}. " + f"Status: {upload_response.status_code}, " + f"Response: {upload_response.text}" + ) + + print(f"Uploaded {source_path} to {dest_path}") + + uploads = [ + { + "path": dst, + "size": local_fs.info(src).get("size", 0), + } + for dst, src in file_paths.items() + ] + studio_client.save_upload_log(args.destination_path, uploads) + + print(f"Successfully uploaded {len(file_paths)} file(s)") + + +def _download_from_storage(args: "Namespace", local_fs: "AbstractFileSystem"): + studio_client = _get_studio_client(args) + response = studio_client.download_url(args.source_path) + if not response.ok: + raise DataChainError(response.message) + + url = response.data.get("url") + if not url: + raise DataChainError("No download URL found") + + # Extract filename from URL if destination is a directory + if local_fs.isdir(args.destination_path) or args.destination_path.endswith( + ("/", "\\") + ): + # Parse the URL to get the filename + parsed_url = urlparse(url) + filename = os.path.basename(parsed_url.path) + + local_fs.makedirs(args.destination_path, exist_ok=True) + destination_path = os.path.join(args.destination_path, filename) + else: + destination_path = args.destination_path + + with local_fs.open(destination_path, "wb") as f: + f.write(requests.get(url, timeout=3600).content) + + print(f"Downloaded to {destination_path}") + + +def _copy_inside_storage(args: "Namespace"): + client = _get_studio_client(args) + + response = client.copy_storage_file( + args.source_path, + args.destination_path, + recursive=args.recursive, + ) + if not response.ok: + raise DataChainError(response.message) + + print(f"Copied {args.source_path} to {args.destination_path}") diff --git a/src/datachain/cli/parser/__init__.py b/src/datachain/cli/parser/__init__.py index a0680cb15..201e2e2fd 100644 --- a/src/datachain/cli/parser/__init__.py +++ b/src/datachain/cli/parser/__init__.py @@ -6,7 +6,7 @@ from datachain.cli.utils import BooleanOptionalAction, KeyValueArgs from .job import add_jobs_parser -from .studio import add_auth_parser +from .studio import add_auth_parser, add_storage_parser from .utils import ( FIND_COLUMNS, CustomHelpFormatter, @@ -137,6 +137,7 @@ def get_parser() -> ArgumentParser: # noqa: PLR0915 add_update_arg(parse_clone) add_auth_parser(subp, parent_parser) + add_storage_parser(subp, parent_parser) add_jobs_parser(subp, parent_parser) datasets_parser = subp.add_parser( diff --git a/src/datachain/cli/parser/studio.py b/src/datachain/cli/parser/studio.py index b9776b721..0a10a49b6 100644 --- a/src/datachain/cli/parser/studio.py +++ b/src/datachain/cli/parser/studio.py @@ -128,3 +128,120 @@ def add_auth_parser(subparsers, parent_parser) -> None: help=auth_token_help, formatter_class=CustomHelpFormatter, ) + + +def add_storage_parser(subparsers, parent_parser) -> None: + storage_help = "Manage storage" + storage_description = "Manage storage through Studio" + + storage_parser = subparsers.add_parser( + "storage", + parents=[parent_parser], + description=storage_description, + help=storage_help, + formatter_class=CustomHelpFormatter, + ) + + storage_subparser = storage_parser.add_subparsers( + dest="cmd", + help="Use `datachain storage CMD --help` to display command-specific help", + ) + + storage_delete_help = "Delete storage contents" + storage_delete_description = "Delete storage files and directories through Studio" + + storage_delete_parser = storage_subparser.add_parser( + "rm", + parents=[parent_parser], + description=storage_delete_description, + help=storage_delete_help, + formatter_class=CustomHelpFormatter, + ) + + storage_delete_parser.add_argument( + "path", + action="store", + help="Path to the storage file or directory to delete", + ) + + storage_delete_parser.add_argument( + "--recursive", + action="store_true", + help="Delete recursively", + ) + + storage_delete_parser.add_argument( + "--team", + action="store", + help="Team name to delete storage contents from", + ) + + storage_move_help = "Move storage contents" + storage_move_description = "Move storage files and directories through Studio" + + storage_move_parser = storage_subparser.add_parser( + "mv", + parents=[parent_parser], + description=storage_move_description, + help=storage_move_help, + formatter_class=CustomHelpFormatter, + ) + + storage_move_parser.add_argument( + "path", + action="store", + help="Path to the storage file or directory to move", + ) + + storage_move_parser.add_argument( + "new_path", + action="store", + help="New path to the storage file or directory to move", + ) + + storage_move_parser.add_argument( + "--recursive", + action="store_true", + help="Move recursively", + ) + + storage_move_parser.add_argument( + "--team", + action="store", + help="Team name to move storage contents from", + ) + + storage_cp_help = "Copy storage contents" + storage_cp_description = "Copy storage files and directories through Studio" + + storage_cp_parser = storage_subparser.add_parser( + "cp", + parents=[parent_parser], + description=storage_cp_description, + help=storage_cp_help, + formatter_class=CustomHelpFormatter, + ) + + storage_cp_parser.add_argument( + "source_path", + action="store", + help="Path to the source file or directory to upload", + ) + + storage_cp_parser.add_argument( + "destination_path", + action="store", + help="Path to the destination file or directory to upload", + ) + + storage_cp_parser.add_argument( + "--recursive", + action="store_true", + help="Upload recursively", + ) + + storage_cp_parser.add_argument( + "--team", + action="store", + help="Team name to upload storage contents to", + ) diff --git a/src/datachain/remote/studio.py b/src/datachain/remote/studio.py index 1824ab988..72c62f347 100644 --- a/src/datachain/remote/studio.py +++ b/src/datachain/remote/studio.py @@ -11,11 +11,12 @@ Optional, TypeVar, ) -from urllib.parse import urlparse, urlunparse +from urllib.parse import urlencode, urlparse, urlunparse import websockets from requests.exceptions import HTTPError, Timeout +from datachain.client.fsspec import Client from datachain.config import Config from datachain.dataset import DatasetRecord from datachain.error import DataChainError @@ -29,9 +30,13 @@ DatasetExportStatus = Optional[dict[str, Any]] DatasetExportSignedUrls = Optional[list[str]] FileUploadData = Optional[dict[str, Any]] + JobData = Optional[dict[str, Any]] JobListData = dict[str, Any] ClusterListData = dict[str, Any] + +PresignedUrlData = Optional[dict[str, Any]] + logger = logging.getLogger("datachain") DATASET_ROWS_CHUNK_SIZE = 8192 @@ -465,3 +470,123 @@ def cancel_job( def get_clusters(self) -> Response[ClusterListData]: return self._send_request("datachain/clusters", {}, method="GET") + + # Storage commands + def delete_storage_file( + self, path: str, recursive: bool = False + ) -> Response[FileUploadData]: + client = Client.get_implementation(path) + remote = client.protocol + bucket, subpath = client.split_url(path) + + data = { + "bucket": bucket, + "recursive": recursive, + "remote": remote, + "team": self.team, + "paths": subpath, + } + + url = f"datachain/storages/files?{urlencode(data)}" + + return self._send_request(url, data, method="DELETE") + + def move_storage_file( + self, path: str, new_path: str, recursive: bool = False + ) -> Response[FileUploadData]: + client = Client.get_implementation(path) + remote = client.protocol + + bucket, subpath = client.split_url(path) + new_bucket, new_subpath = client.split_url(new_path) + if bucket != new_bucket: + raise DataChainError("Cannot move between different buckets") + + data = { + "bucket": bucket, + "newPath": new_subpath, + "oldPath": subpath, + "recursive": recursive, + "remote": remote, + "team": self.team, + } + + return self._send_request("datachain/storages/files/mv", data, method="POST") + + def copy_storage_file( + self, path: str, new_path: str, recursive: bool = False + ) -> Response[FileUploadData]: + client = Client.get_implementation(path) + remote = client.protocol + + bucket, subpath = client.split_url(path) + new_bucket, new_subpath = client.split_url(new_path) + if bucket != new_bucket: + raise DataChainError("Cannot copy between different buckets") + + data = { + "bucket": bucket, + "newPath": new_subpath, + "oldPath": subpath, + "recursive": recursive, + "remote": remote, + "team": self.team, + } + + return self._send_request("datachain/storages/files/cp", data, method="POST") + + def batch_presigned_urls( + self, destination_path: str, paths: dict[str, str] + ) -> Response[PresignedUrlData]: + remote = urlparse(os.fspath(destination_path)).scheme + client = Client.get_implementation(destination_path) + remote = client.protocol + bucket, _ = client.split_url(destination_path) + + data = { + "bucket": bucket, + "paths": paths, + "remote": remote, + "team": self.team, + } + return self._send_request( + "datachain/storages/batch-presigned-urls", data, method="POST" + ) + + def download_url(self, path: str) -> Response[FileUploadData]: + remote = urlparse(os.fspath(path)).scheme + client = Client.get_implementation(path) + remote = client.protocol + bucket, subpath = client.split_url(path) + + data = { + "bucket": bucket, + "remote": remote, + "filepath": subpath, + "team": self.team, + } + return self._send_request( + "datachain/storages/files/download", data, method="GET" + ) + + def save_upload_log( + self, path: str, logs: list[dict[str, Any]] + ) -> Response[FileUploadData]: + remote = urlparse(os.fspath(path)).scheme + client = Client.get_implementation(path) + remote = client.protocol + bucket, _ = client.split_url(path) + + data = { + "remote": remote, + "team": self.team, + "bucket": bucket, + "uploaded_paths": logs, + "failed_paths": {}, + "modified_paths": [], + "failed_modified_paths": {}, + } + + return self._send_request( + "datachain/storages/activity-logs", data, method="POST" + ) From 9812fc9eeb4b6f7d4c490363aa366817793a2490 Mon Sep 17 00:00:00 2001 From: Amrit Ghimire Date: Thu, 10 Jul 2025 19:46:36 +0545 Subject: [PATCH 02/12] Make things much simpler --- docs/commands/storage/mv.md | 2 +- docs/commands/storage/rm.md | 2 +- src/datachain/cli/commands/storages.py | 178 ++----------------- src/datachain/cli/parser/studio.py | 8 +- src/datachain/remote/storages.py | 227 +++++++++++++++++++++++++ src/datachain/remote/studio.py | 3 +- 6 files changed, 245 insertions(+), 175 deletions(-) create mode 100644 src/datachain/remote/storages.py diff --git a/docs/commands/storage/mv.md b/docs/commands/storage/mv.md index 9f3a22ce6..03f85de77 100644 --- a/docs/commands/storage/mv.md +++ b/docs/commands/storage/mv.md @@ -1,6 +1,6 @@ # storage mv -Move files and directories in Storages using Studio. +Move files and directories in storage using Studio. ## Synopsis diff --git a/docs/commands/storage/rm.md b/docs/commands/storage/rm.md index 64d444efe..b1fde5bae 100644 --- a/docs/commands/storage/rm.md +++ b/docs/commands/storage/rm.md @@ -1,6 +1,6 @@ # storage rm -Delete files and directories in Storages using Studio. +Delete files and directories in storage using Studio. ## Synopsis diff --git a/src/datachain/cli/commands/storages.py b/src/datachain/cli/commands/storages.py index 1ee04b692..d6b4b4e25 100644 --- a/src/datachain/cli/commands/storages.py +++ b/src/datachain/cli/commands/storages.py @@ -1,20 +1,17 @@ -import mimetypes -import os.path import sys from typing import TYPE_CHECKING -from urllib.parse import urlparse -import requests - -from datachain.config import Config from datachain.error import DataChainError -from datachain.remote.studio import StudioClient +from datachain.remote.storages import ( + copy_inside_storage, + download_from_storage, + get_studio_client, + upload_to_storage, +) if TYPE_CHECKING: from argparse import Namespace - from fsspec import AbstractFileSystem - def process_storage_command(args: "Namespace"): if args.cmd is None: @@ -33,17 +30,8 @@ def process_storage_command(args: "Namespace"): raise DataChainError(f"Unknown command '{args.cmd}'.") -def _get_studio_client(args: "Namespace"): - token = Config().read().get("studio", {}).get("token") - if not token: - raise DataChainError( - "Not logged in to Studio. Log in with 'datachain auth login'." - ) - return StudioClient(team=args.team) - - def rm_storage(args: "Namespace"): - client = _get_studio_client(args) + client = get_studio_client(args) response = client.delete_storage_file( args.path, @@ -56,7 +44,7 @@ def rm_storage(args: "Namespace"): def mv_storage(args: "Namespace"): - client = _get_studio_client(args) + client = get_studio_client(args) response = client.move_storage_file( args.path, @@ -84,153 +72,9 @@ def cp_storage(args: "Namespace"): ) elif source_cls.protocol == "file": source_fs = source_cls.create_fs() - _upload_to_storage(args, source_fs) + upload_to_storage(args, source_fs) elif destination_cls.protocol == "file": destination_fs = destination_cls.create_fs() - _download_from_storage(args, destination_fs) - else: - _copy_inside_storage(args) - - -def _upload_to_storage(args: "Namespace", local_fs: "AbstractFileSystem"): - from datachain.client.fsspec import Client - - studio_client = _get_studio_client(args) - - is_dir = local_fs.isdir(args.source_path) - if is_dir and not args.recursive: - raise DataChainError("Cannot copy directory without --recursive") - - client = Client.get_implementation(args.destination_path) - _, subpath = client.split_url(args.destination_path) - - if is_dir: - file_paths = { - os.path.join(subpath, os.path.relpath(path, args.source_path)): path - for path in local_fs.find(args.source_path) - } + download_from_storage(args, destination_fs) else: - destination_path = ( - os.path.join(subpath, os.path.basename(args.source_path)) - if args.destination_path.endswith(("/", "\\")) or not subpath - else subpath - ) - file_paths = {destination_path: args.source_path} - - response = studio_client.batch_presigned_urls( - args.destination_path, - {dest: mimetypes.guess_type(src)[0] for dest, src in file_paths.items()}, - ) - if not response.ok: - raise DataChainError(response.message) - - urls = response.data.get("urls", {}) - headers = response.data.get("headers", {}) - - # Upload each file using the presigned URLs - - for dest_path, source_path in file_paths.items(): - if dest_path not in urls: - raise DataChainError(f"No presigned URL found for {dest_path}") - - upload_url = urls[dest_path]["url"] - if "fields" in urls[dest_path]: - # S3 storage - use multipart form data upload - - # Create form data - form_data = dict(urls[dest_path]["fields"]) - - # Add Content-Type if it's required by the policy - content_type = mimetypes.guess_type(source_path)[0] - if content_type: - form_data["Content-Type"] = content_type - - # Add file content - file_content = local_fs.open(source_path, "rb").read() - form_data["file"] = ( - os.path.basename(source_path), - file_content, - content_type, - ) - - # Upload using POST with form data - upload_response = requests.post(upload_url, files=form_data, timeout=3600) - else: - # Read the file content - with local_fs.open(source_path, "rb") as f: - file_content = f.read() - - # Upload the file using the presigned URL - upload_response = requests.request( - response.data.get("method", "PUT"), - upload_url, - data=file_content, - headers={ - **headers, - "Content-Type": mimetypes.guess_type(source_path)[0], - }, - timeout=3600, - ) - - if upload_response.status_code >= 400: - raise DataChainError( - f"Failed to upload {source_path} to {dest_path}. " - f"Status: {upload_response.status_code}, " - f"Response: {upload_response.text}" - ) - - print(f"Uploaded {source_path} to {dest_path}") - - uploads = [ - { - "path": dst, - "size": local_fs.info(src).get("size", 0), - } - for dst, src in file_paths.items() - ] - studio_client.save_upload_log(args.destination_path, uploads) - - print(f"Successfully uploaded {len(file_paths)} file(s)") - - -def _download_from_storage(args: "Namespace", local_fs: "AbstractFileSystem"): - studio_client = _get_studio_client(args) - response = studio_client.download_url(args.source_path) - if not response.ok: - raise DataChainError(response.message) - - url = response.data.get("url") - if not url: - raise DataChainError("No download URL found") - - # Extract filename from URL if destination is a directory - if local_fs.isdir(args.destination_path) or args.destination_path.endswith( - ("/", "\\") - ): - # Parse the URL to get the filename - parsed_url = urlparse(url) - filename = os.path.basename(parsed_url.path) - - local_fs.makedirs(args.destination_path, exist_ok=True) - destination_path = os.path.join(args.destination_path, filename) - else: - destination_path = args.destination_path - - with local_fs.open(destination_path, "wb") as f: - f.write(requests.get(url, timeout=3600).content) - - print(f"Downloaded to {destination_path}") - - -def _copy_inside_storage(args: "Namespace"): - client = _get_studio_client(args) - - response = client.copy_storage_file( - args.source_path, - args.destination_path, - recursive=args.recursive, - ) - if not response.ok: - raise DataChainError(response.message) - - print(f"Copied {args.source_path} to {args.destination_path}") + copy_inside_storage(args) diff --git a/src/datachain/cli/parser/studio.py b/src/datachain/cli/parser/studio.py index 0a10a49b6..2cd080ee5 100644 --- a/src/datachain/cli/parser/studio.py +++ b/src/datachain/cli/parser/studio.py @@ -225,23 +225,23 @@ def add_storage_parser(subparsers, parent_parser) -> None: storage_cp_parser.add_argument( "source_path", action="store", - help="Path to the source file or directory to upload", + help="Path to the source file or directory to copy", ) storage_cp_parser.add_argument( "destination_path", action="store", - help="Path to the destination file or directory to upload", + help="Path to the destination file or directory to copy", ) storage_cp_parser.add_argument( "--recursive", action="store_true", - help="Upload recursively", + help="Copy recursively", ) storage_cp_parser.add_argument( "--team", action="store", - help="Team name to upload storage contents to", + help="Team name to copy storage contents to", ) diff --git a/src/datachain/remote/storages.py b/src/datachain/remote/storages.py new file mode 100644 index 000000000..e05aa478c --- /dev/null +++ b/src/datachain/remote/storages.py @@ -0,0 +1,227 @@ +import mimetypes +import os.path +from typing import TYPE_CHECKING +from urllib.parse import urlparse + +import requests + +from datachain.error import DataChainError + +if TYPE_CHECKING: + from argparse import Namespace + + from fsspec import AbstractFileSystem + + +def get_studio_client(args: "Namespace"): + from datachain.config import Config + from datachain.remote.studio import StudioClient + + if Config().read().get("studio", {}).get("token"): + return StudioClient(team=args.team) + + raise DataChainError("Not logged in to Studio. Log in with 'datachain auth login'.") + + +def upload_to_storage(args: "Namespace", local_fs: "AbstractFileSystem"): + studio_client = get_studio_client(args) + + is_dir = _validate_upload_args(args, local_fs) + file_paths = _build_file_paths(args, local_fs, is_dir) + response = _get_presigned_urls(studio_client, args.destination_path, file_paths) + + for dest_path, source_path in file_paths.items(): + _upload_single_file( + dest_path, + source_path, + response, + local_fs, + ) + + _save_upload_log(studio_client, args.destination_path, file_paths, local_fs) + print(f"Successfully uploaded {len(file_paths)} file(s)") + + +def download_from_storage(args: "Namespace", local_fs: "AbstractFileSystem"): + studio_client = get_studio_client(args) + response = studio_client.download_url(args.source_path) + if not response.ok: + raise DataChainError(response.message) + + url = response.data.get("url") + if not url: + raise DataChainError("No download URL found") + + # Extract filename from URL if destination is a directory + if local_fs.isdir(args.destination_path) or args.destination_path.endswith( + ("/", "\\") + ): + # Parse the URL to get the filename + parsed_url = urlparse(url) + filename = os.path.basename(parsed_url.path) + + local_fs.makedirs(args.destination_path, exist_ok=True) + destination_path = os.path.join(args.destination_path, filename) + else: + destination_path = args.destination_path + + # Stream download to avoid loading entire file into memory + with requests.get(url, timeout=3600, stream=True) as download_response: + download_response.raise_for_status() + print("Downloading file", end="") + with local_fs.open(destination_path, "wb") as f: + for chunk in download_response.iter_content(chunk_size=8192): + if chunk: # Filter out keep-alive chunks + f.write(chunk) + print(".", end="") + print() + + print(f"Downloaded to {destination_path}") + + +def copy_inside_storage(args: "Namespace"): + client = get_studio_client(args) + + response = client.copy_storage_file( + args.source_path, + args.destination_path, + recursive=args.recursive, + ) + if not response.ok: + raise DataChainError(response.message) + + print(f"Copied {args.source_path} to {args.destination_path}") + + +def _validate_upload_args(args: "Namespace", local_fs: "AbstractFileSystem"): + """Validate upload arguments and raise appropriate errors.""" + is_dir = local_fs.isdir(args.source_path) + if is_dir and not args.recursive: + raise DataChainError("Cannot copy directory without --recursive") + return is_dir + + +def _build_file_paths(args: "Namespace", local_fs: "AbstractFileSystem", is_dir: bool): + """Build mapping of destination paths to source paths.""" + from datachain.client.fsspec import Client + + client = Client.get_implementation(args.destination_path) + _, subpath = client.split_url(args.destination_path) + + if is_dir: + return { + os.path.join(subpath, os.path.relpath(path, args.source_path)): path + for path in local_fs.find(args.source_path) + } + + destination_path = ( + os.path.join(subpath, os.path.basename(args.source_path)) + if args.destination_path.endswith(("/", "\\")) or not subpath + else subpath + ) + return {destination_path: args.source_path} + + +def _get_presigned_urls(studio_client, destination_path: str, file_paths: dict): + """Get presigned URLs for file uploads.""" + response = studio_client.batch_presigned_urls( + destination_path, + {dest: mimetypes.guess_type(src)[0] for dest, src in file_paths.items()}, + ) + if not response.ok: + raise DataChainError(response.message) + + return response.data + + +def _upload_file_s3( + upload_url: str, url_data: dict, source_path: str, local_fs: "AbstractFileSystem" +): + """Upload file using S3 multipart form data.""" + form_data = dict(url_data["fields"]) + content_type = mimetypes.guess_type(source_path)[0] + form_data["Content-Type"] = content_type + + file_content = local_fs.open(source_path, "rb").read() + form_data["file"] = ( + os.path.basename(source_path), + file_content, + content_type, + ) + + return requests.post(upload_url, files=form_data, timeout=3600) + + +def _upload_file_direct( + upload_url: str, + method: str, + headers: dict, + source_path: str, + local_fs: "AbstractFileSystem", +): + """Upload file using direct HTTP request.""" + with local_fs.open(source_path, "rb") as f: + file_content = f.read() + + return requests.request( + method, + upload_url, + data=file_content, + headers={ + **headers, + "Content-Type": mimetypes.guess_type(source_path)[0], + }, + timeout=3600, + ) + + +def _upload_single_file( + dest_path: str, + source_path: str, + response: dict, + local_fs: "AbstractFileSystem", +): + """Upload a single file using the appropriate method.""" + urls = response.get("urls", {}) + headers = response.get("headers", {}) + method = response.get("method", "PUT") + + if dest_path not in urls: + raise DataChainError(f"No presigned URL found for {dest_path}") + + upload_url = urls[dest_path]["url"] + + if "fields" in urls[dest_path]: + upload_response = _upload_file_s3( + upload_url, urls[dest_path], source_path, local_fs + ) + else: + upload_response = _upload_file_direct( + upload_url, method, headers, source_path, local_fs + ) + + if upload_response.status_code >= 400: + raise DataChainError( + f"Failed to upload {source_path} to {dest_path}. " + f"Status: {upload_response.status_code}, " + f"Response: {upload_response.text}" + ) + + print(f"Uploaded {source_path} to {dest_path}") + + +def _save_upload_log( + studio_client, + destination_path: str, + file_paths: dict, + local_fs: "AbstractFileSystem", +): + """Save upload log to studio.""" + uploads = [ + { + "path": dst, + "size": local_fs.info(src).get("size", 0), + } + for dst, src in file_paths.items() + ] + studio_client.save_upload_log(destination_path, uploads) diff --git a/src/datachain/remote/studio.py b/src/datachain/remote/studio.py index 72c62f347..5afa1145c 100644 --- a/src/datachain/remote/studio.py +++ b/src/datachain/remote/studio.py @@ -476,13 +476,12 @@ def delete_storage_file( self, path: str, recursive: bool = False ) -> Response[FileUploadData]: client = Client.get_implementation(path) - remote = client.protocol bucket, subpath = client.split_url(path) data = { "bucket": bucket, "recursive": recursive, - "remote": remote, + "remote": client.protocol, "team": self.team, "paths": subpath, } From 4f9b6aecf4e6b51c28b3e4d4ad66d37b9eb7f26d Mon Sep 17 00:00:00 2001 From: Amrit Ghimire Date: Thu, 10 Jul 2025 19:51:40 +0545 Subject: [PATCH 03/12] Fix mypy --- src/datachain/remote/storages.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/datachain/remote/storages.py b/src/datachain/remote/storages.py index e05aa478c..22498ab24 100644 --- a/src/datachain/remote/storages.py +++ b/src/datachain/remote/storages.py @@ -140,7 +140,7 @@ def _upload_file_s3( """Upload file using S3 multipart form data.""" form_data = dict(url_data["fields"]) content_type = mimetypes.guess_type(source_path)[0] - form_data["Content-Type"] = content_type + form_data["Content-Type"] = str(content_type) file_content = local_fs.open(source_path, "rb").read() form_data["file"] = ( From 56a39b13eb00257b2da9ef73790f596a09e467fa Mon Sep 17 00:00:00 2001 From: Amrit Ghimire Date: Fri, 11 Jul 2025 09:32:20 +0545 Subject: [PATCH 04/12] Address comments --- docs/commands/storage/rm.md | 2 +- src/datachain/remote/storages.py | 37 +++++++++++++++----------------- src/datachain/remote/studio.py | 3 --- 3 files changed, 18 insertions(+), 24 deletions(-) diff --git a/docs/commands/storage/rm.md b/docs/commands/storage/rm.md index b1fde5bae..e76bbf472 100644 --- a/docs/commands/storage/rm.md +++ b/docs/commands/storage/rm.md @@ -90,7 +90,7 @@ The command supports the following storage protocols: ## Notes * The delete operation is performed through Studio using the configured credentials -* Moving large directories may take time depending on the number of files and network conditions +* Deleting large directories may take time depending on the number of files and network conditions * Use the `--verbose` flag to get detailed information about the delete operation * The `--quiet` flag suppresses output except for errors * This command operates through Studio, so you must be authenticated with `datachain auth login` before using it diff --git a/src/datachain/remote/storages.py b/src/datachain/remote/storages.py index 22498ab24..b718a74ef 100644 --- a/src/datachain/remote/storages.py +++ b/src/datachain/remote/storages.py @@ -142,14 +142,13 @@ def _upload_file_s3( content_type = mimetypes.guess_type(source_path)[0] form_data["Content-Type"] = str(content_type) - file_content = local_fs.open(source_path, "rb").read() - form_data["file"] = ( - os.path.basename(source_path), - file_content, - content_type, - ) - - return requests.post(upload_url, files=form_data, timeout=3600) + with local_fs.open(source_path, "rb") as f: + form_data["file"] = ( + os.path.basename(source_path), + f, + content_type, + ) + return requests.post(upload_url, files=form_data, timeout=3600) def _upload_file_direct( @@ -161,18 +160,16 @@ def _upload_file_direct( ): """Upload file using direct HTTP request.""" with local_fs.open(source_path, "rb") as f: - file_content = f.read() - - return requests.request( - method, - upload_url, - data=file_content, - headers={ - **headers, - "Content-Type": mimetypes.guess_type(source_path)[0], - }, - timeout=3600, - ) + return requests.request( + method, + upload_url, + data=f, + headers={ + **headers, + "Content-Type": mimetypes.guess_type(source_path)[0], + }, + timeout=3600, + ) def _upload_single_file( diff --git a/src/datachain/remote/studio.py b/src/datachain/remote/studio.py index 5afa1145c..2b6176b2a 100644 --- a/src/datachain/remote/studio.py +++ b/src/datachain/remote/studio.py @@ -537,7 +537,6 @@ def copy_storage_file( def batch_presigned_urls( self, destination_path: str, paths: dict[str, str] ) -> Response[PresignedUrlData]: - remote = urlparse(os.fspath(destination_path)).scheme client = Client.get_implementation(destination_path) remote = client.protocol bucket, _ = client.split_url(destination_path) @@ -553,7 +552,6 @@ def batch_presigned_urls( ) def download_url(self, path: str) -> Response[FileUploadData]: - remote = urlparse(os.fspath(path)).scheme client = Client.get_implementation(path) remote = client.protocol bucket, subpath = client.split_url(path) @@ -571,7 +569,6 @@ def download_url(self, path: str) -> Response[FileUploadData]: def save_upload_log( self, path: str, logs: list[dict[str, Any]] ) -> Response[FileUploadData]: - remote = urlparse(os.fspath(path)).scheme client = Client.get_implementation(path) remote = client.protocol bucket, _ = client.split_url(path) From 022034b8d2b2cfa3b06cf9a4ba649c6f78dc5e73 Mon Sep 17 00:00:00 2001 From: Amrit Ghimire Date: Fri, 11 Jul 2025 09:40:15 +0545 Subject: [PATCH 05/12] Fix lint --- src/datachain/remote/storages.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/datachain/remote/storages.py b/src/datachain/remote/storages.py index b718a74ef..cf6dd37f3 100644 --- a/src/datachain/remote/storages.py +++ b/src/datachain/remote/storages.py @@ -166,7 +166,7 @@ def _upload_file_direct( data=f, headers={ **headers, - "Content-Type": mimetypes.guess_type(source_path)[0], + "Content-Type": str(mimetypes.guess_type(source_path)[0]), }, timeout=3600, ) From 24f3d15b2375e5bf007347de800b12afb1d0bd1b Mon Sep 17 00:00:00 2001 From: Amrit Ghimire Date: Fri, 11 Jul 2025 22:05:27 +0545 Subject: [PATCH 06/12] Add tests --- src/datachain/remote/storages.py | 10 +- tests/func/test_storage_commands.py | 200 ++++++++++++++++++++++++++++ 2 files changed, 207 insertions(+), 3 deletions(-) create mode 100644 tests/func/test_storage_commands.py diff --git a/src/datachain/remote/storages.py b/src/datachain/remote/storages.py index cf6dd37f3..f961c0c46 100644 --- a/src/datachain/remote/storages.py +++ b/src/datachain/remote/storages.py @@ -12,6 +12,8 @@ from fsspec import AbstractFileSystem + from datachain.remote.studio import StudioClient + def get_studio_client(args: "Namespace"): from datachain.config import Config @@ -122,11 +124,13 @@ def _build_file_paths(args: "Namespace", local_fs: "AbstractFileSystem", is_dir: return {destination_path: args.source_path} -def _get_presigned_urls(studio_client, destination_path: str, file_paths: dict): +def _get_presigned_urls( + studio_client: "StudioClient", destination_path: str, file_paths: dict +): """Get presigned URLs for file uploads.""" response = studio_client.batch_presigned_urls( destination_path, - {dest: mimetypes.guess_type(src)[0] for dest, src in file_paths.items()}, + {dest: str(mimetypes.guess_type(src)[0]) for dest, src in file_paths.items()}, ) if not response.ok: raise DataChainError(response.message) @@ -208,7 +212,7 @@ def _upload_single_file( def _save_upload_log( - studio_client, + studio_client: "StudioClient", destination_path: str, file_paths: dict, local_fs: "AbstractFileSystem", diff --git a/tests/func/test_storage_commands.py b/tests/func/test_storage_commands.py new file mode 100644 index 000000000..cad0bdc1a --- /dev/null +++ b/tests/func/test_storage_commands.py @@ -0,0 +1,200 @@ +import pytest + +from datachain.cli import main +from datachain.utils import STUDIO_URL + + +@pytest.mark.parametrize( + "command, recursive, team", + [ + ("storage rm s3://my-bucket/data/content", False, None), + ("storage rm s3://my-bucket/data/content --recursive", True, None), + ("storage rm s3://my-bucket/data/content --team new_team", False, "new_team"), + ( + "storage rm s3://my-bucket/data/content --team new_team --recursive", + True, + "new_team", + ), + ], +) +def test_rm_storage(requests_mock, capsys, studio_token, command, recursive, team): + team_name = team or "team_name" # default to team_name if not provided + url = f"{STUDIO_URL}/api/datachain/storages/files?bucket=my-bucket&remote=s3" + url += f"&recursive={recursive}&team={team_name}&paths=data/content" + + requests_mock.delete( + url, + json={"ok": True, "data": {"deleted": True}, "message": "", "status": 200}, + status_code=200, + ) + + result = main(command.split()) + assert result == 0 + out, _ = capsys.readouterr() + assert "Deleted s3://my-bucket/data/content" in out + + assert requests_mock.called + + +@pytest.mark.parametrize( + "command, recursive, team", + [ + ( + "s3://my-bucket/data/content2", + False, + None, + ), + ( + "s3://my-bucket/data/content2 --recursive", + True, + None, + ), + ( + "s3://my-bucket/data/content2 --team new_team", + False, + "new_team", + ), + ( + "s3://my-bucket/data/content2 --team new_team --recursive", + True, + "new_team", + ), + ], +) +def test_mv_storage(requests_mock, capsys, studio_token, command, recursive, team): + requests_mock.post( + f"{STUDIO_URL}/api/datachain/storages/files/mv", + json={"ok": True, "data": {"moved": True}, "message": "", "status": 200}, + status_code=200, + ) + + result = main(["storage", "mv", "s3://my-bucket/data/content", *command.split()]) + assert result == 0 + out, _ = capsys.readouterr() + assert "Moved s3://my-bucket/data/content to s3://my-bucket/data/content2" in out + + assert requests_mock.called + assert requests_mock.last_request.json() == { + "bucket": "my-bucket", + "newPath": "data/content2", + "oldPath": "data/content", + "recursive": recursive, + "remote": "s3", + "team": team or "team_name", + "team_name": team or "team_name", + } + + +def test_cp_storage_local_to_local(studio_token, tmp_dir): + (tmp_dir / "path1").mkdir(parents=True, exist_ok=True) + (tmp_dir / "path1" / "file1.txt").write_text("file1") + (tmp_dir / "path2").mkdir(parents=True, exist_ok=True) + + result = main( + [ + "storage", + "cp", + str(tmp_dir / "path1" / "file1.txt"), + str(tmp_dir / "path2" / "file1.txt"), + ] + ) + assert result == 0 + + assert (tmp_dir / "path2" / "file1.txt").read_text() == "file1" + + +def test_cp_storage_local_to_s3(requests_mock, capsys, studio_token, tmp_dir): + (tmp_dir / "path1").mkdir(parents=True, exist_ok=True) + (tmp_dir / "path1" / "file1.txt").write_text("file1") + + requests_mock.post( + f"{STUDIO_URL}/api/datachain/storages/batch-presigned-urls", + json={ + "urls": { + "data/content": { + "url": "https://example.com/upload", + "fields": {"key": "value"}, + } + }, + "headers": {}, + "method": "POST", + }, + ) + requests_mock.post("https://example.com/upload", status_code=200) + requests_mock.post( + f"{STUDIO_URL}/api/datachain/storages/activity-logs", + json={"success": True}, + ) + + result = main( + [ + "storage", + "cp", + str(tmp_dir / "path1" / "file1.txt"), + "s3://my-bucket/data/content", + ] + ) + assert result == 0 + + history = requests_mock.request_history + assert len(history) == 3 + assert history[0].url == f"{STUDIO_URL}/api/datachain/storages/batch-presigned-urls" + assert history[1].url == "https://example.com/upload" + assert history[2].url == f"{STUDIO_URL}/api/datachain/storages/activity-logs" + + assert history[0].json() == { + "bucket": "my-bucket", + "paths": {"data/content": "text/plain"}, + "remote": "s3", + "team": "team_name", + "team_name": "team_name", + } + + +def test_cp_remote_to_local(requests_mock, capsys, studio_token, tmp_dir): + requests_mock.get( + f"{STUDIO_URL}/api/datachain/storages/files/download?bucket=my-bucket&remote=s3&filepath=data%2Fcontent&team=team_name&team_name=team_name", + json={ + "url": "https://example.com/download", + }, + ) + requests_mock.get( + "https://example.com/download", + content=b"file1", + ) + + result = main( + ["storage", "cp", "s3://my-bucket/data/content", str(tmp_dir / "file1.txt")] + ) + assert result == 0 + assert (tmp_dir / "file1.txt").read_text() == "file1" + + history = requests_mock.request_history + assert len(history) == 2 + assert history[1].url == "https://example.com/download" + + +def test_cp_s3_to_s3(requests_mock, capsys, studio_token, tmp_dir): + requests_mock.post( + f"{STUDIO_URL}/api/datachain/storages/files/cp", + json={"copied": ["data/content"]}, + status_code=200, + ) + + result = main( + ["storage", "cp", "s3://my-bucket/data/content", "s3://my-bucket/data/content2"] + ) + assert result == 0 + + history = requests_mock.request_history + assert len(history) == 1 + assert history[0].url == f"{STUDIO_URL}/api/datachain/storages/files/cp" + assert history[0].json() == { + "bucket": "my-bucket", + "newPath": "data/content2", + "oldPath": "data/content", + "recursive": False, + "remote": "s3", + "team": "team_name", + "team_name": "team_name", + } From fb21a3dab68f430e2a21d0136c15da6009a72bd4 Mon Sep 17 00:00:00 2001 From: Amrit Ghimire Date: Wed, 16 Jul 2025 13:55:36 +0545 Subject: [PATCH 07/12] Merge with top level cp --- docs/commands/{storage => }/cp.md | 115 +++++++++++---------- docs/commands/{storage => }/mv.md | 21 ++-- docs/commands/{storage => }/rm.md | 22 ++-- mkdocs.yml | 7 +- src/datachain/cli/__init__.py | 41 +++++--- src/datachain/cli/commands/storages.py | 21 +--- src/datachain/cli/parser/__init__.py | 33 ------ src/datachain/cli/parser/studio.py | 137 +++++++++++++------------ 8 files changed, 186 insertions(+), 211 deletions(-) rename docs/commands/{storage => }/cp.md (52%) rename docs/commands/{storage => }/mv.md (70%) rename docs/commands/{storage => }/rm.md (80%) diff --git a/docs/commands/storage/cp.md b/docs/commands/cp.md similarity index 52% rename from docs/commands/storage/cp.md rename to docs/commands/cp.md index 311332c47..df96f5898 100644 --- a/docs/commands/storage/cp.md +++ b/docs/commands/cp.md @@ -1,16 +1,16 @@ -# storage cp +# cp -Copy files and directories between local and/or remote storage using Studio. +Copy storage files and directories between cloud and local storage. ## Synopsis ```usage -usage: datachain storage cp [-h] [-v] [-q] [--recursive] [--team TEAM] source_path destination_path +usage: datachain cp [-h] [-v] [-q] [-r] [--team TEAM] [--local] [--anon] [--update] [--no-glob] [--force] source_path destination_path ``` ## Description -This command copies files and directories between local and/or remote storage using the credentials configured in Studio. The command automatically determines the operation type based on the source and destination protocols, supporting four different copy scenarios. +This command copies files and directories between local and/or remote storage. The command can operate through Studio (default) or directly with local storage access. ## Arguments @@ -19,119 +19,128 @@ This command copies files and directories between local and/or remote storage us ## Options -* `--recursive` - Copy directories recursively (required for copying directories) -* `--team TEAM` - Team name to copy storage contents to (default: from config) +* `-r`, `-R`, `--recursive` - Copy directories recursively +* `--team TEAM` - Team name to copy storage contents to +* `--local` - Copy data files from the cloud locally without Studio (Default: False) +* `--anon` - Use anonymous access to storage (available only with --local) +* `--update` - Update cached list of files for the sources (available only with --local) +* `--no-glob` - Do not expand globs (such as * or ?) (available only with --local) +* `--force` - Force creating files even if they already exist (available only with --local) * `-h`, `--help` - Show the help message and exit * `-v`, `--verbose` - Be verbose * `-q`, `--quiet` - Be quiet ## Copy Operations +The command supports two main modes of operation: + +### Studio Mode (Default) +When using Studio mode (default), the command copies files and directories through Studio using the configured credentials. This mode automatically determines the operation type based on the source and destination protocols, supporting four different copy scenarios. + +### Local Mode +When using `--local` flag, the command operates directly with local storage access, bypassing Studio. This mode supports additional options like `--anon`, `--update`, `--no-glob`, and `--force`. + +## Supported Storage Protocols + +The command supports the following storage protocols: +- **Local file system**: Direct paths (e.g., `/path/to/directory` or `./relative/path`) +- **AWS S3**: `s3://bucket-name/path` +- **Google Cloud Storage**: `gs://bucket-name/path` +- **Azure Blob Storage**: `az://container-name/path` + +## Examples + +### Studio Mode Examples + The command automatically determines the operation type based on the source and destination protocols: -### 1. Local to Local (local path → local path) +#### 1. Local to Local (local path → local path) **Operation**: Direct local file system copy - Uses the local filesystem's native copy operation - Fastest operation as no network transfer is involved - Supports both files and directories -**Example**: ```bash -datachain storage cp /path/to/local/file.txt /path/to/destination/file.txt +datachain cp /path/to/local/file.txt /path/to/destination/file.txt ``` -### 2. Local to Remote (local path → `s3://`, `gs://`, `az://`) +#### 2. Local to Remote (local path → `s3://`, `gs://`, `az://`) **Operation**: Upload to cloud storage - Uploads local files/directories to remote storage - Uses presigned URLs for secure uploads - Supports S3 multipart form data for large files - Requires `--recursive` flag for directories -**Examples**: ```bash # Upload single file -datachain storage cp /path/to/file.txt s3://my-bucket/data/file.txt +datachain cp /path/to/file.txt s3://my-bucket/data/file.txt # Upload directory recursively -datachain storage cp --recursive /path/to/directory s3://my-bucket/data/ +datachain cp -r /path/to/directory s3://my-bucket/data/ ``` -### 3. Remote to Local (`s3://`, `gs://`, `az://` → local path) +#### 3. Remote to Local (`s3://`, `gs://`, `az://` → local path) **Operation**: Download from cloud storage - Downloads remote files/directories to local storage - Uses presigned download URLs - Automatically extracts filename if destination is a directory - Creates destination directory if it doesn't exist -**Examples**: ```bash # Download single file -datachain storage cp s3://my-bucket/data/file.txt /path/to/local/file.txt +datachain cp s3://my-bucket/data/file.txt /path/to/local/file.txt # Download to directory (filename preserved) -datachain storage cp s3://my-bucket/data/file.txt /path/to/directory/ +datachain cp s3://my-bucket/data/file.txt /path/to/directory/ ``` -### 4. Remote to Remote (`s3://` → `s3://`, `gs://` → `gs://`, etc.) +#### 4. Remote to Remote (`s3://` → `s3://`, `gs://` → `gs://`, etc.) **Operation**: Copy within cloud storage - Copies files between locations in the same bucket - Cannot copy between different buckets (same limitation as `mv`) - Uses Studio's internal copy operation - Requires `--recursive` flag for directories -**Examples**: ```bash # Copy within same bucket -datachain storage cp s3://my-bucket/data/file.txt s3://my-bucket/archive/file.txt +datachain cp s3://my-bucket/data/file.txt s3://my-bucket/archive/file.txt # Copy directory recursively -datachain storage cp --recursive s3://my-bucket/data/images s3://my-bucket/backup/images +datachain cp -r s3://my-bucket/data/images s3://my-bucket/backup/images ``` -## Supported Storage Protocols - -The command supports the following storage protocols: -- **Local file system**: Direct paths (e.g., `/path/to/directory` or `./relative/path`) -- **AWS S3**: `s3://bucket-name/path` -- **Google Cloud Storage**: `gs://bucket-name/path` -- **Azure Blob Storage**: `az://container-name/path` +### Additional Studio Mode Examples -## Examples - -### Local to Remote Operations - -1. Upload a single file: +1. Copy with specific team: ```bash -datachain storage cp /path/to/image.jpg s3://my-bucket/images/image.jpg +datachain cp --team other-team /path/to/file.txt s3://my-bucket/data/file.txt ``` -2. Upload a directory recursively: +2. Copy with verbose output: ```bash -datachain storage cp --recursive /path/to/dataset s3://my-bucket/datasets/ +datachain cp -v -r s3://my-bucket/datasets/raw s3://my-bucket/datasets/processed ``` -3. Upload to a different team's storage: +### Local Mode Examples + +3. Copy files locally without Studio: ```bash -datachain storage cp --team other-team /path/to/file.txt s3://my-bucket/data/file.txt +datachain cp --local /path/to/source /path/to/destination ``` -### Remote to Local Operations - -4. Download a file: +4. Copy with anonymous access: ```bash -datachain storage cp s3://my-bucket/data/file.txt /path/to/local/file.txt +datachain cp --local --anon s3://public-bucket/data /path/to/local/ ``` -### Remote to Remote Operations - -6. Copy within the same bucket: +5. Copy with force overwrite: ```bash -datachain storage cp s3://my-bucket/data/file.txt s3://my-bucket/archive/file.txt +datachain cp --local --force s3://my-bucket/data /path/to/local/ ``` -7. Copy directory with verbose output: +6. Copy with update and no glob expansion: ```bash -datachain storage cp -v --recursive s3://my-bucket/datasets/raw s3://my-bucket/datasets/processed +datachain cp --local --update --no-glob s3://my-bucket/data/*.txt /path/to/local/ ``` ## Limitations and Edge Cases @@ -145,17 +154,12 @@ datachain storage cp -v --recursive s3://my-bucket/datasets/raw s3://my-bucket/d - **Directory structure preservation**: Directory structure is preserved during copy operations - **Empty directories**: Empty directories may not be copied in some scenarios -### File Operations -- **File overwrites**: Existing files may be overwritten without confirmation -- **Relative vs absolute paths**: Both relative and absolute paths are supported -- **Directory creation**: Destination directories are created automatically when needed ### Error Handling - **File not found**: Missing source files result in operation failure - **Permission errors**: Insufficient permissions cause operation failure - **Network issues**: Network problems are reported with appropriate error messages - ### Team Configuration - **Default team**: If no team is specified, uses the team from your configuration - **Team-specific storage**: Each team has its own storage namespace @@ -164,5 +168,8 @@ datachain storage cp -v --recursive s3://my-bucket/datasets/raw s3://my-bucket/d * Use the `--verbose` flag to get detailed information about the copy operation * The `--quiet` flag suppresses output except for errors -* This command operates through Studio, so you must be authenticated with `datachain auth login` before using it +* When using Studio mode, you must be authenticated with `datachain auth login` before using it +* The `--local` mode bypasses Studio and operates directly with storage providers +* Use `--recursive` flag when copying directories +* The `--force` flag is only available in local mode and will overwrite existing files * For cross-bucket copies, consider using local storage as an intermediate step diff --git a/docs/commands/storage/mv.md b/docs/commands/mv.md similarity index 70% rename from docs/commands/storage/mv.md rename to docs/commands/mv.md index 03f85de77..1f70d9be4 100644 --- a/docs/commands/storage/mv.md +++ b/docs/commands/mv.md @@ -1,16 +1,16 @@ -# storage mv +# mv -Move files and directories in storage using Studio. +Move storage files and directories through Studio. ## Synopsis ```usage -usage: datachain storage mv [-h] [-v] [-q] [--recursive] [--team TEAM] path new_path +usage: datachain mv [-h] [-v] [-q] [--recursive] [--team TEAM] path new_path ``` ## Description -This command moves files and directories within storage using the credentials configured in Studio.. The move operation is performed within the same bucket - you cannot move files between different buckets. The command supports both individual files and directories, with the `--recursive` flag required for moving directories. +This command moves files and directories within storage using the credentials configured in Studio. The move operation is performed within the same bucket - you cannot move files between different buckets. The command supports both individual files and directories, with the `--recursive` flag required for moving directories. ## Arguments @@ -29,27 +29,27 @@ This command moves files and directories within storage using the credentials co 1. Move a single file: ```bash -datachain storage mv s3://my-bucket/data/file.txt s3://my-bucket/archive/file.txt +datachain mv s3://my-bucket/data/file.txt s3://my-bucket/archive/file.txt ``` 2. Move a directory recursively: ```bash -datachain storage mv --recursive s3://my-bucket/data/images s3://my-bucket/archive/images +datachain mv --recursive s3://my-bucket/data/images s3://my-bucket/archive/images ``` 3. Move a file to a different team's storage: ```bash -datachain storage mv --team other-team s3://my-bucket/data/file.txt s3://my-bucket/backup/file.txt +datachain mv --team other-team s3://my-bucket/data/file.txt s3://my-bucket/backup/file.txt ``` 4. Move a file with verbose output: ```bash -datachain storage mv -v s3://my-bucket/data/file.txt s3://my-bucket/processed/file.txt +datachain mv -v s3://my-bucket/data/file.txt s3://my-bucket/processed/file.txt ``` 5. Move a directory to a subdirectory: ```bash -datachain storage mv --recursive s3://my-bucket/datasets/raw s3://my-bucket/datasets/processed/raw +datachain mv --recursive s3://my-bucket/datasets/raw s3://my-bucket/datasets/processed/raw ``` ## Supported Storage Protocols @@ -58,7 +58,6 @@ The command supports the following storage protocols: - **AWS S3**: `s3://bucket-name/path` - **Google Cloud Storage**: `gs://bucket-name/path` - **Azure Blob Storage**: `az://container-name/path` -- **Local file system**: `file:///path/to/directory` ## Limitations and Edge Cases @@ -69,8 +68,6 @@ The command supports the following storage protocols: - **Recursive flag required**: Moving directories requires the `--recursive` flag. Without it, the operation will fail - **Directory structure preservation**: When moving directories, the internal structure is preserved -### Path Handling -- **Relative vs absolute paths**: Both relative and absolute paths within the bucket are supported ### Error Handling - **File not found**: If the source file or directory doesn't exist, the operation will fail diff --git a/docs/commands/storage/rm.md b/docs/commands/rm.md similarity index 80% rename from docs/commands/storage/rm.md rename to docs/commands/rm.md index e76bbf472..3869eb89f 100644 --- a/docs/commands/storage/rm.md +++ b/docs/commands/rm.md @@ -1,11 +1,11 @@ -# storage rm +# rm -Delete files and directories in storage using Studio. +Delete storage files and directories through Studio. ## Synopsis ```usage -usage: datachain storage rm [-h] [-v] [-q] [--recursive] [--team TEAM] path +usage: datachain rm [-h] [-v] [-q] [--recursive] [--team TEAM] path ``` ## Description @@ -28,32 +28,32 @@ This command deletes files and directories within storage using the credentials 1. Delete a single file: ```bash -datachain storage rm s3://my-bucket/data/file.txt +datachain rm s3://my-bucket/data/file.txt ``` 2. Delete a directory recursively: ```bash -datachain storage rm --recursive s3://my-bucket/data/images +datachain rm --recursive s3://my-bucket/data/images ``` 3. Delete a file from a different team's storage: ```bash -datachain storage rm --team other-team s3://my-bucket/data/file.txt +datachain rm --team other-team s3://my-bucket/data/file.txt ``` 4. Delete a file with verbose output: ```bash -datachain storage rm -v s3://my-bucket/data/file.txt +datachain rm -v s3://my-bucket/data/file.txt ``` 5. Delete a directory quietly (suppress output): ```bash -datachain storage rm -q --recursive s3://my-bucket/temp-data +datachain rm -q --recursive s3://my-bucket/temp-data ``` 6. Delete a specific subdirectory: ```bash -datachain storage rm --recursive s3://my-bucket/datasets/raw/old-version +datachain rm --recursive s3://my-bucket/datasets/raw/old-version ``` ## Supported Storage Protocols @@ -69,10 +69,6 @@ The command supports the following storage protocols: - **Recursive flag required**: Deleting directories requires the `--recursive` flag. Without it, the operation will fail - **Directory structure**: When deleting directories, all files and subdirectories within the directory are removed -### File Operations -- **Non-existent files**: Attempting to delete a non-existent file will result in an error -- **Relative vs absolute paths**: Both relative and absolute paths within the bucket are supported - ### Error Handling - **File not found**: If the source file or directory doesn't exist, the operation will fail - **Permission errors**: Insufficient permissions will result in operation failure diff --git a/mkdocs.yml b/mkdocs.yml index 4b1ea58a6..18d8f8525 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -98,10 +98,9 @@ nav: - cancel: commands/job/cancel.md - ls: commands/job/ls.md - clusters: commands/job/clusters.md - - storage: - - rm: commands/storage/rm.md - - mv: commands/storage/mv.md - - cp: commands/storage/cp.md + - rm: commands/rm.md + - mv: commands/mv.md + - cp: commands/cp.md - 📚 User Guide: - Overview: guide/index.md - 📡 Interacting with remote storage: guide/remotes.md diff --git a/src/datachain/cli/__init__.py b/src/datachain/cli/__init__.py index d0f46eadf..249140e80 100644 --- a/src/datachain/cli/__init__.py +++ b/src/datachain/cli/__init__.py @@ -3,8 +3,9 @@ import sys import traceback from multiprocessing import freeze_support -from typing import Optional +from typing import TYPE_CHECKING, Optional +from datachain.cli.commands.storages import cp_storage from datachain.cli.utils import get_logging_level from .commands import ( @@ -24,6 +25,9 @@ logger = logging.getLogger("datachain") +if TYPE_CHECKING: + from datachain.catalog import Catalog + def main(argv: Optional[list[str]] = None) -> int: from datachain.catalog import get_catalog @@ -78,7 +82,7 @@ def main(argv: Optional[list[str]] = None) -> int: def handle_command(args, catalog, client_config) -> int: """Handle the different CLI commands.""" - from datachain.cli.commands.storages import process_storage_command + from datachain.cli.commands.storages import mv_storage, rm_storage from datachain.studio import process_auth_cli_args, process_jobs_args command_handlers = { @@ -97,7 +101,8 @@ def handle_command(args, catalog, client_config) -> int: "gc": lambda: garbage_collect(catalog), "auth": lambda: process_auth_cli_args(args), "job": lambda: process_jobs_args(args), - "storage": lambda: process_storage_command(args), + "mv": lambda: mv_storage(args), + "rm": lambda: rm_storage(args), } handler = command_handlers.get(args.command) @@ -110,15 +115,27 @@ def handle_command(args, catalog, client_config) -> int: return 1 -def handle_cp_command(args, catalog): - catalog.cp( - args.sources, - args.output, - force=bool(args.force), - update=bool(args.update), - recursive=bool(args.recursive), - no_glob=args.no_glob, - ) +def handle_cp_command(args, catalog: "Catalog"): + from datachain.config import Config + + config = Config().read().get("studio", {}) + token = config.get("token") + if not token: + local = True + else: + local = args.local + + if local: + return catalog.cp( + [args.source_path], + args.destination_path, + force=bool(args.force), + update=bool(args.update), + recursive=bool(args.recursive), + no_glob=args.no_glob, + ) + + return cp_storage(args) def handle_clone_command(args, catalog): diff --git a/src/datachain/cli/commands/storages.py b/src/datachain/cli/commands/storages.py index d6b4b4e25..8b222113a 100644 --- a/src/datachain/cli/commands/storages.py +++ b/src/datachain/cli/commands/storages.py @@ -1,4 +1,3 @@ -import sys from typing import TYPE_CHECKING from datachain.error import DataChainError @@ -13,23 +12,6 @@ from argparse import Namespace -def process_storage_command(args: "Namespace"): - if args.cmd is None: - print( - f"Use 'datachain {args.command} --help' to see available options", - file=sys.stderr, - ) - return 1 - - if args.cmd == "rm": - return rm_storage(args) - if args.cmd == "mv": - return mv_storage(args) - if args.cmd == "cp": - return cp_storage(args) - raise DataChainError(f"Unknown command '{args.cmd}'.") - - def rm_storage(args: "Namespace"): client = get_studio_client(args) @@ -66,9 +48,10 @@ def cp_storage(args: "Namespace"): # Determine operation based on source and destination protocols if source_cls.protocol == "file" and destination_cls.protocol == "file": source_fs = source_cls.create_fs() - source_fs.cp_file( + source_fs.copy( args.source_path, args.destination_path, + recursive=args.recursive, ) elif source_cls.protocol == "file": source_fs = source_cls.create_fs() diff --git a/src/datachain/cli/parser/__init__.py b/src/datachain/cli/parser/__init__.py index 201e2e2fd..32fc808db 100644 --- a/src/datachain/cli/parser/__init__.py +++ b/src/datachain/cli/parser/__init__.py @@ -62,39 +62,6 @@ def get_parser() -> ArgumentParser: # noqa: PLR0915 dest="command", help=f"Use `{parser.prog} command --help` for command-specific help", ) - parse_cp = subp.add_parser( - "cp", - parents=[parent_parser], - description="Copy data files from the cloud.", - formatter_class=CustomHelpFormatter, - ) - add_sources_arg(parse_cp).complete = shtab.DIR # type: ignore[attr-defined] - parse_cp.add_argument( - "output", type=str, help="Path to a directory or file to put data to" - ) - parse_cp.add_argument( - "-f", - "--force", - default=False, - action="store_true", - help="Force creating files even if they already exist", - ) - parse_cp.add_argument( - "-r", - "-R", - "--recursive", - default=False, - action="store_true", - help="Copy directories recursively", - ) - parse_cp.add_argument( - "--no-glob", - default=False, - action="store_true", - help="Do not expand globs (such as * or ?)", - ) - add_anon_arg(parse_cp) - add_update_arg(parse_cp) parse_clone = subp.add_parser( "clone", diff --git a/src/datachain/cli/parser/studio.py b/src/datachain/cli/parser/studio.py index 2cd080ee5..0d7c8d52d 100644 --- a/src/datachain/cli/parser/studio.py +++ b/src/datachain/cli/parser/studio.py @@ -1,3 +1,5 @@ +import shtab + from datachain.cli.parser.utils import CustomHelpFormatter @@ -131,117 +133,124 @@ def add_auth_parser(subparsers, parent_parser) -> None: def add_storage_parser(subparsers, parent_parser) -> None: - storage_help = "Manage storage" - storage_description = "Manage storage through Studio" - - storage_parser = subparsers.add_parser( - "storage", - parents=[parent_parser], - description=storage_description, - help=storage_help, - formatter_class=CustomHelpFormatter, - ) - - storage_subparser = storage_parser.add_subparsers( - dest="cmd", - help="Use `datachain storage CMD --help` to display command-specific help", + storage_cp_help = "Copy storage contents" + storage_cp_description = ( + "Copy storage files and directories between cloud and local storage" ) - storage_delete_help = "Delete storage contents" - storage_delete_description = "Delete storage files and directories through Studio" - - storage_delete_parser = storage_subparser.add_parser( - "rm", + storage_cp_parser = subparsers.add_parser( + "cp", parents=[parent_parser], - description=storage_delete_description, - help=storage_delete_help, + description=storage_cp_description, + help=storage_cp_help, formatter_class=CustomHelpFormatter, ) - storage_delete_parser.add_argument( - "path", + storage_cp_parser.add_argument( + "source_path", action="store", - help="Path to the storage file or directory to delete", + help="Path to the source file or directory to copy", + ).complete = shtab.DIR # type: ignore[attr-defined] + + storage_cp_parser.add_argument( + "destination_path", + action="store", + help="Path to the destination file or directory to copy", ) - storage_delete_parser.add_argument( + storage_cp_parser.add_argument( + "-r", + "-R", "--recursive", action="store_true", - help="Delete recursively", + help="Copy directories recursively", ) - storage_delete_parser.add_argument( + storage_cp_parser.add_argument( "--team", action="store", - help="Team name to delete storage contents from", + help="Team name to copy storage contents to", + ) + + storage_cp_parser.add_argument( + "--local", + default=False, + action="store_true", + help="Copy data files from the cloud locally without Studio (Default: False)", ) - storage_move_help = "Move storage contents" - storage_move_description = "Move storage files and directories through Studio" + storage_cp_parser.add_argument( + "--anon", + action="store_true", + help="Use anonymous access to storage", + ) - storage_move_parser = storage_subparser.add_parser( + storage_cp_parser.add_argument( + "--update", + action="store_true", + help="Update cached list of files for the source for local approach", + ) + + storage_cp_parser.add_argument( + "--no-glob", + default=False, + action="store_true", + help="Do not expand globs (such as * or ?) for local approach", + ) + + storage_cp_parser.add_argument( + "--force", + action="store_true", + help="Force creating files even if they already exist", + ) + + mv_parser = subparsers.add_parser( "mv", parents=[parent_parser], - description=storage_move_description, - help=storage_move_help, + description="Move storage files and directories through Studio", + help="Move storage files and directories through Studio", formatter_class=CustomHelpFormatter, ) - - storage_move_parser.add_argument( + mv_parser.add_argument( "path", action="store", help="Path to the storage file or directory to move", ) - - storage_move_parser.add_argument( + mv_parser.add_argument( "new_path", action="store", help="New path to the storage file or directory to move", ) - - storage_move_parser.add_argument( + mv_parser.add_argument( "--recursive", action="store_true", help="Move recursively", ) - - storage_move_parser.add_argument( + mv_parser.add_argument( "--team", action="store", help="Team name to move storage contents from", ) - storage_cp_help = "Copy storage contents" - storage_cp_description = "Copy storage files and directories through Studio" - - storage_cp_parser = storage_subparser.add_parser( - "cp", + rm_parser = subparsers.add_parser( + "rm", parents=[parent_parser], - description=storage_cp_description, - help=storage_cp_help, + description="Delete storage files and directories through Studio", + help="Delete storage files and directories through Studio", formatter_class=CustomHelpFormatter, ) - - storage_cp_parser.add_argument( - "source_path", - action="store", - help="Path to the source file or directory to copy", - ) - - storage_cp_parser.add_argument( - "destination_path", + rm_parser.add_argument( + "path", action="store", - help="Path to the destination file or directory to copy", + help="Path to the storage file or directory to delete", ) - - storage_cp_parser.add_argument( + rm_parser.add_argument( "--recursive", action="store_true", - help="Copy recursively", + help="Delete recursively", ) - - storage_cp_parser.add_argument( + rm_parser.add_argument( "--team", action="store", - help="Team name to copy storage contents to", + help="Team name to delete storage contents from", ) From 840b8b727dd4959433f0332c4f406866174cb6a2 Mon Sep 17 00:00:00 2001 From: Amrit Ghimire <16842655+amritghimire@users.noreply.github.com> Date: Wed, 16 Jul 2025 19:50:11 +0545 Subject: [PATCH 08/12] Update src/datachain/cli/__init__.py Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com> --- src/datachain/cli/__init__.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/datachain/cli/__init__.py b/src/datachain/cli/__init__.py index 249140e80..11e44b57b 100644 --- a/src/datachain/cli/__init__.py +++ b/src/datachain/cli/__init__.py @@ -120,11 +120,7 @@ def handle_cp_command(args, catalog: "Catalog"): config = Config().read().get("studio", {}) token = config.get("token") - if not token: - local = True - else: - local = args.local - + local = True if not token else args.local if local: return catalog.cp( [args.source_path], From d822bd2e30cadbee822ee4c85f9ba01a05e846a6 Mon Sep 17 00:00:00 2001 From: Amrit Ghimire Date: Fri, 18 Jul 2025 09:43:45 +0545 Subject: [PATCH 09/12] Update test --- tests/func/test_storage_commands.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/tests/func/test_storage_commands.py b/tests/func/test_storage_commands.py index cad0bdc1a..93acf7abe 100644 --- a/tests/func/test_storage_commands.py +++ b/tests/func/test_storage_commands.py @@ -7,11 +7,11 @@ @pytest.mark.parametrize( "command, recursive, team", [ - ("storage rm s3://my-bucket/data/content", False, None), - ("storage rm s3://my-bucket/data/content --recursive", True, None), - ("storage rm s3://my-bucket/data/content --team new_team", False, "new_team"), + ("rm s3://my-bucket/data/content", False, None), + ("rm s3://my-bucket/data/content --recursive", True, None), + ("rm s3://my-bucket/data/content --team new_team", False, "new_team"), ( - "storage rm s3://my-bucket/data/content --team new_team --recursive", + "rm s3://my-bucket/data/content --team new_team --recursive", True, "new_team", ), @@ -68,7 +68,7 @@ def test_mv_storage(requests_mock, capsys, studio_token, command, recursive, tea status_code=200, ) - result = main(["storage", "mv", "s3://my-bucket/data/content", *command.split()]) + result = main(["mv", "s3://my-bucket/data/content", *command.split()]) assert result == 0 out, _ = capsys.readouterr() assert "Moved s3://my-bucket/data/content to s3://my-bucket/data/content2" in out @@ -92,7 +92,6 @@ def test_cp_storage_local_to_local(studio_token, tmp_dir): result = main( [ - "storage", "cp", str(tmp_dir / "path1" / "file1.txt"), str(tmp_dir / "path2" / "file1.txt"), @@ -128,7 +127,6 @@ def test_cp_storage_local_to_s3(requests_mock, capsys, studio_token, tmp_dir): result = main( [ - "storage", "cp", str(tmp_dir / "path1" / "file1.txt"), "s3://my-bucket/data/content", @@ -163,9 +161,7 @@ def test_cp_remote_to_local(requests_mock, capsys, studio_token, tmp_dir): content=b"file1", ) - result = main( - ["storage", "cp", "s3://my-bucket/data/content", str(tmp_dir / "file1.txt")] - ) + result = main(["cp", "s3://my-bucket/data/content", str(tmp_dir / "file1.txt")]) assert result == 0 assert (tmp_dir / "file1.txt").read_text() == "file1" From 882d6b427f2bd68d3af5ef17001d16addaa7c632 Mon Sep 17 00:00:00 2001 From: Amrit Ghimire Date: Fri, 18 Jul 2025 09:53:26 +0545 Subject: [PATCH 10/12] Storage cp test fix --- tests/func/test_storage_commands.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/func/test_storage_commands.py b/tests/func/test_storage_commands.py index 93acf7abe..a1a90076d 100644 --- a/tests/func/test_storage_commands.py +++ b/tests/func/test_storage_commands.py @@ -177,9 +177,7 @@ def test_cp_s3_to_s3(requests_mock, capsys, studio_token, tmp_dir): status_code=200, ) - result = main( - ["storage", "cp", "s3://my-bucket/data/content", "s3://my-bucket/data/content2"] - ) + result = main(["cp", "s3://my-bucket/data/content", "s3://my-bucket/data/content2"]) assert result == 0 history = requests_mock.request_history From 2fe7cc31f6aaf9ded79302b0cdc8dd351601e6ef Mon Sep 17 00:00:00 2001 From: Amrit Ghimire Date: Tue, 22 Jul 2025 19:24:40 +0545 Subject: [PATCH 11/12] Reword something --- docs/commands/cp.md | 31 +++++------------------------- src/datachain/cli/parser/studio.py | 2 +- 2 files changed, 6 insertions(+), 27 deletions(-) diff --git a/docs/commands/cp.md b/docs/commands/cp.md index df96f5898..06a75d5be 100644 --- a/docs/commands/cp.md +++ b/docs/commands/cp.md @@ -5,7 +5,10 @@ Copy storage files and directories between cloud and local storage. ## Synopsis ```usage -usage: datachain cp [-h] [-v] [-q] [-r] [--team TEAM] [--local] [--anon] [--update] [--no-glob] [--force] source_path destination_path +usage: datachain cp [-h] [-v] [-q] [-r] [--team TEAM] + [--local] [--anon] [--update] + [--no-glob] [--force] + source_path destination_path ``` ## Description @@ -143,33 +146,9 @@ datachain cp --local --force s3://my-bucket/data /path/to/local/ datachain cp --local --update --no-glob s3://my-bucket/data/*.txt /path/to/local/ ``` -## Limitations and Edge Cases - -### Bucket Restrictions +## Limitations - **Cannot copy between different buckets**: Remote-to-remote copies must be within the same bucket -- **Cross-bucket operations**: Use local as intermediate step for cross-bucket copies - -### Directory Operations -- **Recursive flag required**: Copying directories requires the `--recursive` flag -- **Directory structure preservation**: Directory structure is preserved during copy operations -- **Empty directories**: Empty directories may not be copied in some scenarios - - -### Error Handling -- **File not found**: Missing source files result in operation failure -- **Permission errors**: Insufficient permissions cause operation failure -- **Network issues**: Network problems are reported with appropriate error messages - -### Team Configuration -- **Default team**: If no team is specified, uses the team from your configuration -- **Team-specific storage**: Each team has its own storage namespace ## Notes - -* Use the `--verbose` flag to get detailed information about the copy operation -* The `--quiet` flag suppresses output except for errors * When using Studio mode, you must be authenticated with `datachain auth login` before using it * The `--local` mode bypasses Studio and operates directly with storage providers -* Use `--recursive` flag when copying directories -* The `--force` flag is only available in local mode and will overwrite existing files -* For cross-bucket copies, consider using local storage as an intermediate step diff --git a/src/datachain/cli/parser/studio.py b/src/datachain/cli/parser/studio.py index 0d7c8d52d..248205c6d 100644 --- a/src/datachain/cli/parser/studio.py +++ b/src/datachain/cli/parser/studio.py @@ -156,7 +156,7 @@ def add_storage_parser(subparsers, parent_parser) -> None: "destination_path", action="store", help="Path to the destination file or directory to copy", - ) + ).complete = shtab.DIR # type: ignore[attr-defined] storage_cp_parser.add_argument( "-r", From 023fae6677acef076029777bb79453dc39d76736 Mon Sep 17 00:00:00 2001 From: Amrit Ghimire Date: Wed, 23 Jul 2025 17:51:14 +0545 Subject: [PATCH 12/12] Change all approach --- src/datachain/cli/__init__.py | 45 +++++++---- .../cli/commands/storage/__init__.py | 10 +++ src/datachain/cli/commands/storage/base.py | 81 +++++++++++++++++++ src/datachain/cli/commands/storage/local.py | 70 ++++++++++++++++ src/datachain/cli/commands/storage/studio.py | 56 +++++++++++++ src/datachain/cli/commands/storage/utils.py | 38 +++++++++ src/datachain/cli/commands/storages.py | 63 --------------- src/datachain/cli/parser/studio.py | 17 +++- src/datachain/remote/storages.py | 53 +----------- 9 files changed, 303 insertions(+), 130 deletions(-) create mode 100644 src/datachain/cli/commands/storage/__init__.py create mode 100644 src/datachain/cli/commands/storage/base.py create mode 100644 src/datachain/cli/commands/storage/local.py create mode 100644 src/datachain/cli/commands/storage/studio.py create mode 100644 src/datachain/cli/commands/storage/utils.py delete mode 100644 src/datachain/cli/commands/storages.py diff --git a/src/datachain/cli/__init__.py b/src/datachain/cli/__init__.py index 11e44b57b..aed037ae6 100644 --- a/src/datachain/cli/__init__.py +++ b/src/datachain/cli/__init__.py @@ -5,7 +5,6 @@ from multiprocessing import freeze_support from typing import TYPE_CHECKING, Optional -from datachain.cli.commands.storages import cp_storage from datachain.cli.utils import get_logging_level from .commands import ( @@ -26,6 +25,8 @@ logger = logging.getLogger("datachain") if TYPE_CHECKING: + from argparse import Namespace + from datachain.catalog import Catalog @@ -82,7 +83,6 @@ def main(argv: Optional[list[str]] = None) -> int: def handle_command(args, catalog, client_config) -> int: """Handle the different CLI commands.""" - from datachain.cli.commands.storages import mv_storage, rm_storage from datachain.studio import process_auth_cli_args, process_jobs_args command_handlers = { @@ -101,8 +101,8 @@ def handle_command(args, catalog, client_config) -> int: "gc": lambda: garbage_collect(catalog), "auth": lambda: process_auth_cli_args(args), "job": lambda: process_jobs_args(args), - "mv": lambda: mv_storage(args), - "rm": lambda: rm_storage(args), + "mv": lambda: handle_mv_command(args, catalog), + "rm": lambda: handle_rm_command(args, catalog), } handler = command_handlers.get(args.command) @@ -115,23 +115,36 @@ def handle_command(args, catalog, client_config) -> int: return 1 -def handle_cp_command(args, catalog: "Catalog"): +def _get_storage_implementation(args: "Namespace", catalog: "Catalog"): + from datachain.cli.commands.storage import ( + LocalStorageImplementation, + StudioStorageImplementation, + ) from datachain.config import Config config = Config().read().get("studio", {}) token = config.get("token") - local = True if not token else args.local - if local: - return catalog.cp( - [args.source_path], - args.destination_path, - force=bool(args.force), - update=bool(args.update), - recursive=bool(args.recursive), - no_glob=args.no_glob, - ) + studio = False if not token else args.studio_cloud_auth + return ( + StudioStorageImplementation(args, catalog) + if studio + else LocalStorageImplementation(args, catalog) + ) + + +def handle_cp_command(args, catalog): + storage_implementation = _get_storage_implementation(args, catalog) + return storage_implementation.cp() + + +def handle_mv_command(args, catalog): + storage_implementation = _get_storage_implementation(args, catalog) + return storage_implementation.mv() + - return cp_storage(args) +def handle_rm_command(args, catalog): + storage_implementation = _get_storage_implementation(args, catalog) + return storage_implementation.rm() def handle_clone_command(args, catalog): diff --git a/src/datachain/cli/commands/storage/__init__.py b/src/datachain/cli/commands/storage/__init__.py new file mode 100644 index 000000000..7a3986ae7 --- /dev/null +++ b/src/datachain/cli/commands/storage/__init__.py @@ -0,0 +1,10 @@ +from .local import LocalStorageImplementation +from .studio import StudioStorageImplementation +from .utils import build_file_paths, validate_upload_args + +__all__ = [ + "LocalStorageImplementation", + "StudioStorageImplementation", + "build_file_paths", + "validate_upload_args", +] diff --git a/src/datachain/cli/commands/storage/base.py b/src/datachain/cli/commands/storage/base.py new file mode 100644 index 000000000..9e73944ab --- /dev/null +++ b/src/datachain/cli/commands/storage/base.py @@ -0,0 +1,81 @@ +from typing import TYPE_CHECKING, Optional + +from datachain.error import DataChainError + +if TYPE_CHECKING: + from argparse import Namespace + + from fsspec import AbstractFileSystem + + from datachain.catalog import Catalog + from datachain.client.fsspec import Client + from datachain.remote.studio import StudioClient + + +class StorageImplementation: + def __init__(self, args: "Namespace", catalog: "Catalog"): + self.args = args + self.catalog = catalog + + def rm(self): + raise NotImplementedError("Remove is not implemented") + + def mv(self): + raise NotImplementedError("Move is not implemented") + + def cp(self): + from datachain.client.fsspec import Client + + source_cls = Client.get_implementation(self.args.source_path) + destination_cls = Client.get_implementation(self.args.destination_path) + + if source_cls.protocol == "file" and destination_cls.protocol == "file": + self.copy_local_to_local(source_cls) + elif source_cls.protocol == "file": + self.upload_to_remote(source_cls, destination_cls) + elif destination_cls.protocol == "file": + self.download_from_remote(destination_cls) + else: + self.copy_remote_to_remote(source_cls) + + def copy_local_to_local(self, source_cls: "Client"): + source_fs = source_cls.create_fs() + source_fs.copy( + self.args.source_path, + self.args.destination_path, + recursive=self.args.recursive, + ) + print(f"Copied {self.args.source_path} to {self.args.destination_path}") + + def upload_to_remote(self, source_cls: "Client", destination_cls: "Client"): + raise NotImplementedError("Upload to remote is not implemented") + + def download_from_remote(self, destination_cls: "Client"): + raise NotImplementedError("Download from remote is not implemented") + + def copy_remote_to_remote(self, source_cls: "Client"): + raise NotImplementedError("Copy remote to remote is not implemented") + + def save_upload_log( + self, + studio_client: Optional["StudioClient"], + destination_path: str, + file_paths: dict, + local_fs: "AbstractFileSystem", + ): + from datachain.remote.storages import get_studio_client + + try: + if studio_client is None: + studio_client = get_studio_client(self.args) + except DataChainError: + return + + uploads = [ + { + "path": dst, + "size": local_fs.info(src).get("size", 0), + } + for dst, src in file_paths.items() + ] + studio_client.save_upload_log(destination_path, uploads) diff --git a/src/datachain/cli/commands/storage/local.py b/src/datachain/cli/commands/storage/local.py new file mode 100644 index 000000000..a59543624 --- /dev/null +++ b/src/datachain/cli/commands/storage/local.py @@ -0,0 +1,70 @@ +from typing import TYPE_CHECKING + +from datachain.cli.commands.storage.base import StorageImplementation +from datachain.cli.commands.storage.utils import build_file_paths, validate_upload_args + +if TYPE_CHECKING: + from datachain.client.fsspec import Client + + +class LocalStorageImplementation(StorageImplementation): + def upload_to_remote(self, source_cls: "Client", destination_cls: "Client"): + from tqdm import tqdm + + source_fs = source_cls.create_fs() + destination_fs = destination_cls.create_fs() + is_dir = validate_upload_args(self.args, source_fs) + file_paths = build_file_paths(self.args, source_fs, is_dir) + destination_bucket, _ = destination_cls.parse_url(self.args.destination_path) + + for dest_path, source_path in file_paths.items(): + file_size = source_fs.info(source_path)["size"] + print(f"Uploading {source_path} to {dest_path}") + + with tqdm(total=file_size, unit="B", unit_scale=True) as pbar: + with source_fs.open(source_path, "rb") as source_file: + with destination_fs.open( + f"{destination_bucket}/{dest_path}", "wb" + ) as dest_file: + while True: + chunk = source_file.read(8192) + if not chunk: + break + dest_file.write(chunk) + pbar.update(len(chunk)) + + self.save_upload_log(None, self.args.destination_path, file_paths, source_fs) + + def download_from_remote(self, destination_cls: "Client"): + self.catalog.cp( + [self.args.source_path], + self.args.destination_path, + force=bool(self.args.force), + update=bool(self.args.update), + recursive=bool(self.args.recursive), + no_glob=self.args.no_glob, + ) + + def copy_remote_to_remote(self, source_cls: "Client"): + source_fs = source_cls.create_fs() + source_fs.copy( + self.args.source_path, + self.args.destination_path, + recursive=self.args.recursive, + ) + + def rm(self): + from datachain.client.fsspec import Client + + client_cls = Client.get_implementation(self.args.path) + fs = client_cls.create_fs() + fs.rm(self.args.path, recursive=self.args.recursive) + # TODO: Add storage logging. + + def mv(self): + from datachain.client.fsspec import Client + + client_cls = Client.get_implementation(self.args.path) + fs = client_cls.create_fs() + fs.mv(self.args.path, self.args.new_path, recursive=self.args.recursive) + # TODO: Add storage logging. diff --git a/src/datachain/cli/commands/storage/studio.py b/src/datachain/cli/commands/storage/studio.py new file mode 100644 index 000000000..0c3c7b5bf --- /dev/null +++ b/src/datachain/cli/commands/storage/studio.py @@ -0,0 +1,56 @@ +from typing import TYPE_CHECKING + +from datachain.cli.commands.storage.base import StorageImplementation +from datachain.error import DataChainError + +if TYPE_CHECKING: + from datachain.client.fsspec import Client + + +class StudioStorageImplementation(StorageImplementation): + def upload_to_remote(self, source_cls: "Client", destination_cls: "Client"): + from datachain.remote.storages import upload_to_storage + + source_fs = source_cls.create_fs() + file_paths = upload_to_storage(self.args, source_fs) + self.save_upload_log(None, self.args.destination_path, file_paths, source_fs) + + def download_from_remote(self, destination_cls: "Client"): + from datachain.remote.storages import download_from_storage + + destination_fs = destination_cls.create_fs() + download_from_storage(self.args, destination_fs) + + def copy_remote_to_remote(self, source_cls: "Client"): + from datachain.remote.storages import copy_inside_storage + + copy_inside_storage(self.args) + + def rm(self): + from datachain.remote.storages import get_studio_client + + client = get_studio_client(self.args) + + response = client.delete_storage_file( + self.args.path, + recursive=self.args.recursive, + ) + if not response.ok: + raise DataChainError(response.message) + + print(f"Deleted {self.args.path}") + + def mv(self): + from datachain.remote.storages import get_studio_client + + client = get_studio_client(self.args) + + response = client.move_storage_file( + self.args.path, + self.args.new_path, + recursive=self.args.recursive, + ) + if not response.ok: + raise DataChainError(response.message) + + print(f"Moved {self.args.path} to {self.args.new_path}") diff --git a/src/datachain/cli/commands/storage/utils.py b/src/datachain/cli/commands/storage/utils.py new file mode 100644 index 000000000..e28e20547 --- /dev/null +++ b/src/datachain/cli/commands/storage/utils.py @@ -0,0 +1,38 @@ +import os.path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from argparse import Namespace + + from fsspec.spec import AbstractFileSystem + +from datachain.error import DataChainError + + +def validate_upload_args(args: "Namespace", local_fs: "AbstractFileSystem"): + """Validate upload arguments and raise appropriate errors.""" + is_dir = local_fs.isdir(args.source_path) + if is_dir and not args.recursive: + raise DataChainError("Cannot copy directory without --recursive") + return is_dir + + +def build_file_paths(args: "Namespace", local_fs: "AbstractFileSystem", is_dir: bool): + """Build mapping of destination paths to source paths.""" + from datachain.client.fsspec import Client + + client = Client.get_implementation(args.destination_path) + _, subpath = client.split_url(args.destination_path) + + if is_dir: + return { + os.path.join(subpath, os.path.relpath(path, args.source_path)): path + for path in local_fs.find(args.source_path) + } + + destination_path = ( + os.path.join(subpath, os.path.basename(args.source_path)) + if args.destination_path.endswith(("/", "\\")) or not subpath + else subpath + ) + return {destination_path: args.source_path} diff --git a/src/datachain/cli/commands/storages.py b/src/datachain/cli/commands/storages.py deleted file mode 100644 index 8b222113a..000000000 --- a/src/datachain/cli/commands/storages.py +++ /dev/null @@ -1,63 +0,0 @@ -from typing import TYPE_CHECKING - -from datachain.error import DataChainError -from datachain.remote.storages import ( - copy_inside_storage, - download_from_storage, - get_studio_client, - upload_to_storage, -) - -if TYPE_CHECKING: - from argparse import Namespace - - -def rm_storage(args: "Namespace"): - client = get_studio_client(args) - - response = client.delete_storage_file( - args.path, - recursive=args.recursive, - ) - if not response.ok: - raise DataChainError(response.message) - - print(f"Deleted {args.path}") - - -def mv_storage(args: "Namespace"): - client = get_studio_client(args) - - response = client.move_storage_file( - args.path, - args.new_path, - recursive=args.recursive, - ) - if not response.ok: - raise DataChainError(response.message) - - print(f"Moved {args.path} to {args.new_path}") - - -def cp_storage(args: "Namespace"): - from datachain.client.fsspec import Client - - source_cls = Client.get_implementation(args.source_path) - destination_cls = Client.get_implementation(args.destination_path) - - # Determine operation based on source and destination protocols - if source_cls.protocol == "file" and destination_cls.protocol == "file": - source_fs = source_cls.create_fs() - source_fs.copy( - args.source_path, - args.destination_path, - recursive=args.recursive, - ) - elif source_cls.protocol == "file": - source_fs = source_cls.create_fs() - upload_to_storage(args, source_fs) - elif destination_cls.protocol == "file": - destination_fs = destination_cls.create_fs() - download_from_storage(args, destination_fs) - else: - copy_inside_storage(args) diff --git a/src/datachain/cli/parser/studio.py b/src/datachain/cli/parser/studio.py index 248205c6d..5bf984f40 100644 --- a/src/datachain/cli/parser/studio.py +++ b/src/datachain/cli/parser/studio.py @@ -173,10 +173,10 @@ def add_storage_parser(subparsers, parent_parser) -> None: ) storage_cp_parser.add_argument( - "--local", + "--studio-cloud-auth", default=False, action="store_true", - help="Copy data files from the cloud locally without Studio (Default: False)", + help="Use credentials from Studio for cloud operations (Default: False)", ) storage_cp_parser.add_argument( @@ -232,6 +232,13 @@ def add_storage_parser(subparsers, parent_parser) -> None: help="Team name to move storage contents from", ) + mv_parser.add_argument( + "--studio-cloud-auth", + default=False, + action="store_true", + help="Use credentials from Studio for cloud operations (Default: False)", + ) + rm_parser = subparsers.add_parser( "rm", parents=[parent_parser], @@ -254,3 +261,9 @@ def add_storage_parser(subparsers, parent_parser) -> None: action="store", help="Team name to delete storage contents from", ) + rm_parser.add_argument( + "--studio-cloud-auth", + default=False, + action="store_true", + help="Use credentials from Studio for cloud operations (Default: False)", + ) diff --git a/src/datachain/remote/storages.py b/src/datachain/remote/storages.py index f961c0c46..77c7453a9 100644 --- a/src/datachain/remote/storages.py +++ b/src/datachain/remote/storages.py @@ -5,6 +5,7 @@ import requests +from datachain.cli.commands.storage.utils import build_file_paths, validate_upload_args from datachain.error import DataChainError if TYPE_CHECKING: @@ -28,8 +29,8 @@ def get_studio_client(args: "Namespace"): def upload_to_storage(args: "Namespace", local_fs: "AbstractFileSystem"): studio_client = get_studio_client(args) - is_dir = _validate_upload_args(args, local_fs) - file_paths = _build_file_paths(args, local_fs, is_dir) + is_dir = validate_upload_args(args, local_fs) + file_paths = build_file_paths(args, local_fs, is_dir) response = _get_presigned_urls(studio_client, args.destination_path, file_paths) for dest_path, source_path in file_paths.items(): @@ -40,8 +41,8 @@ def upload_to_storage(args: "Namespace", local_fs: "AbstractFileSystem"): local_fs, ) - _save_upload_log(studio_client, args.destination_path, file_paths, local_fs) print(f"Successfully uploaded {len(file_paths)} file(s)") + return file_paths def download_from_storage(args: "Namespace", local_fs: "AbstractFileSystem"): @@ -95,35 +96,6 @@ def copy_inside_storage(args: "Namespace"): print(f"Copied {args.source_path} to {args.destination_path}") -def _validate_upload_args(args: "Namespace", local_fs: "AbstractFileSystem"): - """Validate upload arguments and raise appropriate errors.""" - is_dir = local_fs.isdir(args.source_path) - if is_dir and not args.recursive: - raise DataChainError("Cannot copy directory without --recursive") - return is_dir - - -def _build_file_paths(args: "Namespace", local_fs: "AbstractFileSystem", is_dir: bool): - """Build mapping of destination paths to source paths.""" - from datachain.client.fsspec import Client - - client = Client.get_implementation(args.destination_path) - _, subpath = client.split_url(args.destination_path) - - if is_dir: - return { - os.path.join(subpath, os.path.relpath(path, args.source_path)): path - for path in local_fs.find(args.source_path) - } - - destination_path = ( - os.path.join(subpath, os.path.basename(args.source_path)) - if args.destination_path.endswith(("/", "\\")) or not subpath - else subpath - ) - return {destination_path: args.source_path} - - def _get_presigned_urls( studio_client: "StudioClient", destination_path: str, file_paths: dict ): @@ -209,20 +181,3 @@ def _upload_single_file( ) print(f"Uploaded {source_path} to {dest_path}") - - -def _save_upload_log( - studio_client: "StudioClient", - destination_path: str, - file_paths: dict, - local_fs: "AbstractFileSystem", -): - """Save upload log to studio.""" - uploads = [ - { - "path": dst, - "size": local_fs.info(src).get("size", 0), - } - for dst, src in file_paths.items() - ] - studio_client.save_upload_log(destination_path, uploads)