Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions examples/external_aerodynamics/data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import pyvista as pv
import vtk
import zarr
from zarr.storage import LocalStore

from physicsnemo_curator.etl.data_sources import DataSource
from physicsnemo_curator.etl.processing_config import ProcessingConfig
Expand Down Expand Up @@ -232,20 +233,21 @@ def _write_zarr(
output_path: Path where the .zarr directory should be written
"""
# Create store
zarr_store = zarr.DirectoryStore(output_path)
root = zarr.group(store=zarr_store)
zarr_store = LocalStore(output_path)
root = zarr.open_group(store=zarr_store, mode="w")

# Write metadata as attributes
data.metadata.zarr_format = zarr.__version__
root.attrs.update(asdict(data.metadata))

# Write required arrays
for field in ["stl_coordinates", "stl_centers", "stl_faces", "stl_areas"]:
array_info = getattr(data, field)
root.create_dataset(
field,
root.create_array(
name=field,
data=array_info.data,
chunks=array_info.chunks,
compressor=array_info.compressor,
compressors=array_info.compressor if array_info.compressor else None,
)

# Write optional arrays if present
Expand All @@ -259,11 +261,13 @@ def _write_zarr(
]:
array_info = getattr(data, field)
if array_info is not None:
root.create_dataset(
field,
root.create_array(
name=field,
data=array_info.data,
chunks=array_info.chunks,
compressor=array_info.compressor,
compressors=(
array_info.compressor if array_info.compressor else None
),
)

def should_skip(self, filename: str) -> bool:
Expand Down
7 changes: 4 additions & 3 deletions examples/external_aerodynamics/data_transformations.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import Callable, Optional

import numpy as np
from numcodecs import Blosc
import zarr

from examples.external_aerodynamics.external_aero_geometry_data_processors import (
default_geometry_processing_for_external_aerodynamics,
Expand Down Expand Up @@ -234,10 +234,11 @@ def __init__(
chunk_size_mb: float = 1.0, # Default 1MB chunk size
):
super().__init__(cfg)
self.compressor = Blosc(
# Zarr 3 codec configuration
self.compressor = zarr.codecs.BloscCodec(
cname=compression_method,
clevel=compression_level,
shuffle=Blosc.SHUFFLE,
shuffle=zarr.codecs.BloscShuffle.shuffle,
)
self.chunk_size_mb = chunk_size_mb

Expand Down
8 changes: 6 additions & 2 deletions examples/external_aerodynamics/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
from dataclasses import dataclass
from typing import Optional

import numcodecs
import numpy as np
import pyvista as pv
import vtk
import zarr

from .constants import ModelType

Expand Down Expand Up @@ -54,6 +54,9 @@ class ExternalAerodynamicsMetadata:
decimation_reduction: Optional[float] = None
decimation_algo: Optional[str] = None

# Zarr format version
zarr_format: Optional[int] = None


@dataclass
class ExternalAerodynamicsExtractedDataInMemory:
Expand Down Expand Up @@ -94,11 +97,12 @@ class PreparedZarrArrayInfo:

Version history:
- 1.0: Initial version with compression and chunking info
- 2.0: Updated to use Zarr 3 codecs
"""

data: np.ndarray
chunks: tuple[int, ...]
compressor: numcodecs.abc.Codec
compressor: zarr.abc.codec


@dataclass(frozen=True)
Expand Down
12 changes: 6 additions & 6 deletions examples/tutorials/etl_hdf5_to_zarr/h5_to_zarr_transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from typing import Any, Dict

import numpy as np
from numcodecs import Blosc
import zarr

from physicsnemo_curator.etl.data_transformations import DataTransformation
from physicsnemo_curator.etl.processing_config import ProcessingConfig
Expand All @@ -40,11 +40,11 @@ def __init__(
self.chunk_size = chunk_size
self.compression_level = compression_level

# Set up compression
self.compressor = Blosc(
cname="zstd", # zstd compression algorithm
clevel=compression_level,
shuffle=Blosc.SHUFFLE,
# Set up Zarr 3 compression codec
self.compressor = zarr.codecs.BloscCodec(
cname="zstd",
clevel=self.compression_level,
shuffle=zarr.codecs.BloscShuffle.shuffle,
)

def transform(self, data: Dict[str, Any]) -> Dict[str, Any]:
Expand Down
109 changes: 56 additions & 53 deletions examples/tutorials/etl_hdf5_to_zarr/hdf5_to_zarr.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -661,14 +661,14 @@
},
{
"cell_type": "code",
"execution_count": 23,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from typing import Any, Dict\n",
"\n",
"import numpy as np # noqa: F811\n",
"from numcodecs import Blosc\n",
"import zarr # noqa: F811\n",
"\n",
"from physicsnemo_curator.etl.data_transformations import DataTransformation\n",
"from physicsnemo_curator.etl.processing_config import ProcessingConfig\n",
Expand All @@ -677,7 +677,9 @@
"class H5ToZarrTransformation(DataTransformation):\n",
" \"\"\"Transform HDF5 data into Zarr-optimized format.\"\"\"\n",
"\n",
" def __init__(self, cfg: ProcessingConfig, chunk_size: int = 500, compression_level: int = 3):\n",
" def __init__(\n",
" self, cfg: ProcessingConfig, chunk_size: int = 500, compression_level: int = 3\n",
" ):\n",
" \"\"\"Initialize the transformation.\n",
"\n",
" Args:\n",
Expand All @@ -689,11 +691,11 @@
" self.chunk_size = chunk_size\n",
" self.compression_level = compression_level\n",
"\n",
" # Set up compression\n",
" self.compressor = Blosc(\n",
" cname='zstd', # zstd compression algorithm\n",
" clevel=compression_level,\n",
" shuffle=Blosc.SHUFFLE\n",
" # Set up Zarr 3 compression codec\n",
" self.compressor = zarr.codecs.BloscCodec(\n",
" cname='zstd',\n",
" clevel=self.compression_level,\n",
" shuffle=zarr.codecs.BloscShuffle.shuffle,\n",
" )\n",
"\n",
" def transform(self, data: Dict[str, Any]) -> Dict[str, Any]:\n",
Expand All @@ -708,66 +710,66 @@
" self.logger.info(f\"Transforming {data['filename']} for Zarr storage\")\n",
"\n",
" # Get the number of points to determine chunking\n",
" num_points = len(data['temperature'])\n",
" num_points = len(data[\"temperature\"])\n",
"\n",
" # Calculate optimal chunks (don't exceed chunk_size)\n",
" chunk_points = min(self.chunk_size, num_points)\n",
"\n",
" # Prepare arrays that will be written to Zarr stores\n",
" zarr_data = {\n",
" 'temperature': {},\n",
" 'velocity': {},\n",
" 'coordinates': {},\n",
" 'velocity_magnitude': {},\n",
" \"temperature\": {},\n",
" \"velocity\": {},\n",
" \"coordinates\": {},\n",
" \"velocity_magnitude\": {},\n",
" }\n",
"\n",
" # Temperature field (1D array)\n",
" zarr_data['temperature'] = {\n",
" 'data': data['temperature'].astype(np.float32),\n",
" 'chunks': (chunk_points,),\n",
" 'compressor': self.compressor,\n",
" 'dtype': np.float32\n",
" zarr_data[\"temperature\"] = {\n",
" \"data\": data[\"temperature\"].astype(np.float32),\n",
" \"chunks\": (chunk_points,),\n",
" \"compressor\": self.compressor,\n",
" \"dtype\": np.float32,\n",
" }\n",
"\n",
" # Velocity field (2D array: points x 3 components)\n",
" zarr_data['velocity'] = {\n",
" 'data': data['velocity'].astype(np.float32),\n",
" 'chunks': (chunk_points, 3),\n",
" 'compressor': self.compressor,\n",
" 'dtype': np.float32\n",
" zarr_data[\"velocity\"] = {\n",
" \"data\": data[\"velocity\"].astype(np.float32),\n",
" \"chunks\": (chunk_points, 3),\n",
" \"compressor\": self.compressor,\n",
" \"dtype\": np.float32,\n",
" }\n",
"\n",
" # Coordinates (2D array: points x 3 dimensions)\n",
" zarr_data['coordinates'] = {\n",
" 'data': data['coordinates'].astype(np.float32),\n",
" 'chunks': (chunk_points, 3),\n",
" 'compressor': self.compressor,\n",
" 'dtype': np.float32\n",
" zarr_data[\"coordinates\"] = {\n",
" \"data\": data[\"coordinates\"].astype(np.float32),\n",
" \"chunks\": (chunk_points, 3),\n",
" \"compressor\": self.compressor,\n",
" \"dtype\": np.float32,\n",
" }\n",
"\n",
" # Add some computed metadata useful for Zarr to existing metadata\n",
" metadata = data['metadata']\n",
" metadata['num_points'] = num_points\n",
" metadata['chunk_size'] = chunk_points\n",
" metadata['compression'] = 'zstd'\n",
" metadata['compression_level'] = self.compression_level\n",
" metadata = data[\"metadata\"]\n",
" metadata[\"num_points\"] = num_points\n",
" metadata[\"chunk_size\"] = chunk_points\n",
" metadata[\"compression\"] = \"zstd\"\n",
" metadata[\"compression_level\"] = self.compression_level\n",
"\n",
" # Also add some simple derived fields\n",
" # Temperature statistics\n",
" metadata['temperature_min'] = float(np.min(data['temperature']))\n",
" metadata['temperature_max'] = float(np.max(data['temperature']))\n",
" metadata['temperature_mean'] = float(np.mean(data['temperature']))\n",
" metadata[\"temperature_min\"] = float(np.min(data[\"temperature\"]))\n",
" metadata[\"temperature_max\"] = float(np.max(data[\"temperature\"]))\n",
" metadata[\"temperature_mean\"] = float(np.mean(data[\"temperature\"]))\n",
"\n",
" # Velocity magnitude\n",
" velocity_magnitude = np.linalg.norm(data['velocity'], axis=1)\n",
" zarr_data['velocity_magnitude'] = {\n",
" 'data': velocity_magnitude.astype(np.float32),\n",
" 'chunks': (chunk_points,),\n",
" 'compressor': self.compressor,\n",
" 'dtype': np.float32\n",
" velocity_magnitude = np.linalg.norm(data[\"velocity\"], axis=1)\n",
" zarr_data[\"velocity_magnitude\"] = {\n",
" \"data\": velocity_magnitude.astype(np.float32),\n",
" \"chunks\": (chunk_points,),\n",
" \"compressor\": self.compressor,\n",
" \"dtype\": np.float32,\n",
" }\n",
" metadata['velocity_max'] = float(np.max(velocity_magnitude))\n",
" zarr_data['metadata'] = metadata\n",
" metadata[\"velocity_max\"] = float(np.max(velocity_magnitude))\n",
" zarr_data[\"metadata\"] = metadata\n",
"\n",
" return zarr_data"
]
Expand Down Expand Up @@ -797,14 +799,15 @@
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pathlib import Path\n",
"from typing import Any, Dict, List\n",
"\n",
"import zarr\n",
"import zarr # noqa: F811\n",
"from zarr.storage import LocalStore\n",
"\n",
"from physicsnemo_curator.etl.data_sources import DataSource\n",
"from physicsnemo_curator.etl.processing_config import ProcessingConfig\n",
Expand Down Expand Up @@ -849,8 +852,8 @@
"\n",
" # Create Zarr store\n",
" self.logger.info(f\"Creating Zarr store: {store_path}\")\n",
" store = zarr.DirectoryStore(store_path)\n",
" root = zarr.group(store=store)\n",
" store = LocalStore(store_path)\n",
" root = zarr.open_group(store=store, mode=\"w\")\n",
"\n",
" # Store metadata as root attributes\n",
" if \"metadata\" in data:\n",
Expand All @@ -863,16 +866,16 @@
"\n",
" # Write all arrays from the transformation\n",
" for array_name, array_info in data.items():\n",
" root.create_dataset(\n",
" array_name,\n",
" root.create_array(\n",
" name=array_name,\n",
" data=array_info[\"data\"],\n",
" chunks=array_info[\"chunks\"],\n",
" compressor=array_info[\"compressor\"],\n",
" dtype=array_info[\"dtype\"]\n",
" compressors=[array_info[\"compressor\"]] if array_info[\"compressor\"] else None,\n",
" dtype=array_info[\"dtype\"],\n",
" )\n",
"\n",
" # Add some store-level metadata\n",
" root.attrs[\"zarr_format\"] = 2\n",
" root.attrs[\"zarr_format\"] = 3\n",
" root.attrs[\"created_by\"] = \"physicsnemo-curator-tutorial\"\n",
"\n",
" # Something weird is happening here.\n",
Expand Down
15 changes: 9 additions & 6 deletions examples/tutorials/etl_hdf5_to_zarr/zarr_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from typing import Any, Dict, List

import zarr
from zarr.storage import LocalStore

from physicsnemo_curator.etl.data_sources import DataSource
from physicsnemo_curator.etl.processing_config import ProcessingConfig
Expand Down Expand Up @@ -62,8 +63,8 @@ def write(self, data: Dict[str, Any], filename: str) -> None:

# Create Zarr store
self.logger.info(f"Creating Zarr store: {store_path}")
store = zarr.DirectoryStore(store_path)
root = zarr.group(store=store)
store = LocalStore(store_path)
root = zarr.open_group(store=store, mode="w")

# Store metadata as root attributes
if "metadata" in data:
Expand All @@ -76,16 +77,18 @@ def write(self, data: Dict[str, Any], filename: str) -> None:

# Write all arrays from the transformation
for array_name, array_info in data.items():
root.create_dataset(
array_name,
root.create_array(
name=array_name,
data=array_info["data"],
chunks=array_info["chunks"],
compressor=array_info["compressor"],
compressors=(
array_info["compressor"] if array_info["compressor"] else None
),
dtype=array_info["dtype"],
)

# Add some store-level metadata
root.attrs["zarr_format"] = 2
root.attrs["zarr_format"] = 3
root.attrs["created_by"] = "physicsnemo-curator-tutorial"

# Something weird is happening here.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ dependencies = [
"pyvista>=0.44.2",
"vtk>=9.3.1",
"hydra-core>=1.3",
"zarr~=2.18",
"zarr>=3.1.2",
"numcodecs>=0.13.1",
"tqdm>=4.67.1",
]
Expand Down
Loading