import json
import logging
import posixpath
from datetime import datetime
from pathlib import Path
from typing import Any, Literal
import h5py
import numpy as np
import pandas as pd
from packaging.version import Version
import phenotypic
logger = logging.getLogger(__name__)
[docs]
class HDF:
"""
Represents an interface to manage HDF5 files with support for single or set image modes,
and ensures safe and compatible file access with retry and error-handling mechanisms.
The class facilitates operations on HDF5 files commonly used for storing phenotypic
data in both single image and image set modes. This class includes utilities to
handle locking errors and ensure compatibility by initializing proper HDF5 modes
while providing safe access methods for writing.
Attributes:
filepath (Path): Path to the HDF5 file on the filesystem.
name (str): Name associated with the HDF5 resource, often used as an identifier.
mode (Literal['single', 'set']): Specifies the mode for the HDF5 file, either
single image or image set.
root_posix (str): The root path for the HDF5 resource, determined by the mode.
home_posix (str): The specific root directory of the HDF5 resource in the file,
derived based on its mode.
set_data_posix (str, optional): The subgroup path for the data entity in image
set mode, if applicable.
SINGLE_IMAGE_ROOT_POSIX (str): Base path for single image mode.
IMAGE_SET_ROOT_POSIX (str): Base path for image set mode.
IMAGE_SET_DATA_POSIX (str): Subgroup marker for image set data.
EXT (set): Set of valid file extensions used to recognize HDF5 files.
IMAGE_MEASUREMENT_SUBGROUP_KEY (str): Key for accessing measurements in an
image's group.
IMAGE_STATUS_SUBGROUP_KEY (str): Key for accessing statuses in an image's group.
"""
if Version(phenotypic.__version__) < Version("0.7.1"):
SINGLE_IMAGE_ROOT_POSIX = f"/phenotypic/"
else:
SINGLE_IMAGE_ROOT_POSIX = f"/phenotypic/images/"
IMAGE_SET_ROOT_POSIX = f"/phenotypic/image_sets/"
IMAGE_SET_DATA_POSIX = "data" # The image and individual measurement group
# measurements and status are stored within in each image's group
IMAGE_MEASUREMENT_SUBGROUP_KEY = "measurements"
IMAGE_STATUS_SUBGROUP_KEY = "status"
PROTECTED_METADATA_SUBGROUP_KEY = "protected_metadata"
PUBLIC_METADATA_SUBGROUP_KEY = "public_metadata"
EXT = {".h5", ".hdf5", ".hdf", ".he5"}
[docs]
def __init__(self, filepath, name: str, mode: Literal["single", "set"]):
"""
Initializes a class instance to manage HDF5 file structures for single or set image
data based on the given filepath, name of the resource, and operational mode.
Attributes:
filepath (Path): Path to the HDF5 file.
name (str): Identifier for the resource within the HDF5 file.
mode (Literal['single', 'set']): Operational mode determining the structure
and organization within the HDF5 file. Must be either 'single' or 'set'.
root_posix (str): Posix path representing the root directory within the
HDF5 file based on the mode.
home_posix (str): Posix path representing the home directory for the resource
within the HDF5 file based on the mode.
set_data_posix (Optional[str]): Posix path for the data subdirectory within
the resource home directory. Only initialized in 'set' mode.
Args:
filepath: Path to the target HDF5 file. Must have an HDF5-compatible extension,
or a ValueError is raised.
name: Name of the resource to be managed in the file. Used to construct the
home directory for the resource within the HDF5 file.
mode: Operational mode. Specifies whether the resource represents a 'single'
or 'set' image data. If the mode is invalid, a ValueError is raised.
Raises:
ValueError: If the filepath does not have an HDF5-compatible extension.
ValueError: If the mode is neither 'single' nor 'set'.
"""
self.filepath = Path(filepath)
if self.filepath.suffix not in self.EXT:
raise ValueError("filepath is not an hdf5 file")
if not self.filepath.exists():
with h5py.File(name=self.filepath, mode="a", libver="latest") as hdf:
pass
self.name = name
self.mode = mode
if mode == "single":
self.root_posix = self.SINGLE_IMAGE_ROOT_POSIX
self.home_posix = posixpath.join(self.SINGLE_IMAGE_ROOT_POSIX, self.name)
elif mode == "set":
self.root_posix = self.IMAGE_SET_ROOT_POSIX
self.home_posix = posixpath.join(self.IMAGE_SET_ROOT_POSIX, self.name)
self.set_data_posix = posixpath.join(
self.home_posix, self.IMAGE_SET_DATA_POSIX
)
else:
raise ValueError(f"Invalid mode {mode}")
[docs]
def safe_writer(self) -> h5py.File:
"""
Returns a writer object that provides safe and controlled write access to an
HDF5 file at the specified filepath or creates it if it doesn't exist. Ensures that the file uses the 'latest'
version of the HDF5 library for compatibility and performance.
Handles HDF5 file locking conflicts by attempting to clear consistency flags
and retrying file opening with exponential backoff.
Returns:
h5py.File: A file writer object with append mode and 'latest' library
version enabled.
Raises:
OSError: If file cannot be opened after all retry attempts.
"""
import time
import subprocess
import os
import logging
logger = logging.getLogger(__name__)
max_retries = 3
retry_delay = 0.5
for attempt in range(max_retries):
try:
return h5py.File(self.filepath, "a", libver="latest")
except OSError as e:
error_msg = str(e).lower()
# Handle various HDF5 locking scenarios
is_lock_error = any(
[
"file is already open for write/swmr write" in error_msg,
"file is already open" in error_msg,
"unable to lock file" in error_msg,
"resource temporarily unavailable" in error_msg,
"file locking disabled" in error_msg,
]
)
if is_lock_error:
logger.warning(
f"HDF5 file access conflict (attempt {attempt + 1}/{max_retries}): {e}"
)
# Try to clear HDF5 consistency flags if h5clear is available
if attempt < max_retries - 1: # Don't try h5clear on last attempt
try:
if os.path.exists(self.filepath):
logger.info(
f"Attempting to clear HDF5 consistency flags for {self.filepath}"
)
result = subprocess.run(
["h5clear", "-s", str(self.filepath)],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0:
logger.info(
"Successfully cleared HDF5 consistency flags"
)
else:
logger.warning(f"h5clear failed: {result.stderr}")
except (
subprocess.TimeoutExpired,
subprocess.CalledProcessError,
FileNotFoundError,
) as clear_error:
logger.warning(f"Could not run h5clear: {clear_error}")
# Wait before retrying
logger.info(f"Waiting {retry_delay} seconds before retry...")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
# Last attempt failed - provide helpful error message
logger.error(
f"Failed to open HDF5 file after {max_retries} attempts"
)
raise RuntimeError(
f"Failed to open HDF5 file after {max_retries} attempts. "
f"The file {self.filepath} may be locked by another process. "
f"Try manually running: h5clear -s {self.filepath} && h5clear -f {self.filepath}"
) from e
else:
# Different OSError, re-raise immediately
raise
# This should not be reached due to the raise in the loop
raise OSError(f"Unexpected error opening HDF5 file {self.filepath}")
[docs]
def swmr_writer(self) -> h5py.File:
"""
Returns a writer object that provides safe SWMR-compatible write access to an
HDF5 file. Creates the file if it doesn't exist and enables SWMR mode properly.
This method ensures proper SWMR mode initialization by creating the file
with the correct settings from the start, avoiding cache conflicts that
occur when trying to enable SWMR mode after opening.
Returns:
h5py.File: A file writer object with SWMR mode enabled.
Raises:
OSError: If file cannot be opened after all retry attempts.
"""
import time
import subprocess
import os
import logging
logger = logging.getLogger(__name__)
max_retries = 3
retry_delay = 0.5
for attempt in range(max_retries):
try:
# Create/open file with proper SWMR settings
file_handle = h5py.File(self.filepath, "a", libver="latest")
# Enable SWMR mode immediately after opening
try:
file_handle.swmr_mode = True
logger.debug(f"SWMR mode enabled successfully for {self.filepath}")
return file_handle
except Exception as swmr_error:
logger.warning(f"Could not enable SWMR mode: {swmr_error}")
# Return file handle without SWMR mode as fallback
return file_handle
except OSError as e:
error_msg = str(e).lower()
# Handle various HDF5 locking scenarios
is_lock_error = any(
[
"file is already open for write/swmr write" in error_msg,
"file is already open" in error_msg,
"unable to lock file" in error_msg,
"resource temporarily unavailable" in error_msg,
"file locking disabled" in error_msg,
"ring type mismatch" in error_msg,
"pinned entry count" in error_msg,
]
)
if is_lock_error:
logger.warning(
f"HDF5 SWMR file access conflict (attempt {attempt + 1}/{max_retries}): {e}"
)
# Try to clear HDF5 consistency flags if h5clear is available
if attempt < max_retries - 1: # Don't try h5clear on last attempt
try:
if os.path.exists(self.filepath):
logger.info(
f"Attempting to clear HDF5 consistency flags for {self.filepath}"
)
# Clear both status and force flags for SWMR issues
subprocess.run(
["h5clear", "-s", str(self.filepath)],
capture_output=True,
text=True,
timeout=10,
)
subprocess.run(
["h5clear", "-f", str(self.filepath)],
capture_output=True,
text=True,
timeout=10,
)
logger.info("Cleared HDF5 consistency flags")
except (
subprocess.TimeoutExpired,
subprocess.CalledProcessError,
FileNotFoundError,
) as clear_error:
logger.warning(f"Could not run h5clear: {clear_error}")
# Wait before retrying
logger.info(f"Waiting {retry_delay} seconds before retry...")
time.sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
# Last attempt failed - provide helpful error message
logger.error(
f"Failed to open HDF5 file in SWMR mode after {max_retries} attempts"
)
raise RuntimeError(
f"Failed to open HDF5 file in SWMR mode after {max_retries} attempts. "
f"The file {self.filepath} may have cache conflicts. "
f"Try manually running: h5clear -s {self.filepath} && h5clear -f {self.filepath}"
) from e
else:
# Different OSError, re-raise immediately
raise
# This should not be reached due to the raise in the loop
raise OSError(
f"Unexpected error opening HDF5 file in SWMR mode {self.filepath}"
)
[docs]
def strict_writer(self) -> h5py.File:
"""
Provides access to an HDF5 file in read/write mode using the `h5py` library. This
property is used to obtain an `h5py.File` object configured with the latest library version.
Note:
If using SWMR mode, don't forget to enable SWMR mode:
.. dropdown:: Enable SWMR mode
.. code-block:: python
hdf = HDF(filepath)
with hdf.writer as writer:
writer.swmr_mode = True
# rest of your code
Returns:
h5py.File: An HDF5 file object opened in 'r+' mode, enabling reading and writing.
Raises:
OSError: If the file cannot be opened or accessed.
"""
return h5py.File(self.filepath, "r+", libver="latest")
[docs]
def swmr_reader(self) -> h5py.File:
return h5py.File(self.filepath, "r", libver="latest", swmr=True)
[docs]
def reader(self) -> h5py.File:
return h5py.File(self.filepath, "r", libver="latest", swmr=False)
[docs]
@staticmethod
def get_group(handle: h5py.File, posix) -> h5py.Group:
"""
Retrieves or creates a group in an HDF5 file.
This method checks the validity of the provided HDF5 file handle and tries to
retrieve the specified group based on the given posix path. If the group does not
exist and the file is not opened in read-only mode, the group gets created. If the
file is in read-only mode and the group does not exist, an error is raised.
Args:
handle (h5py.File): The HDF5 file handle to operate on.
posix (str): The posix path of the group to retrieve or create in the HDF5 file.
Returns:
h5py.Group: The corresponding h5py group within the HDF5 file.
Raises:
ValueError: If the HDF5 file handle is invalid or no longer valid.
ValueError: If the file handle mode cannot be determined.
KeyError: If the specified group does not exist in read-only mode.
"""
posix = str(posix)
# Check if the handle is valid before accessing it
try:
# Test if handle is still valid by checking if it's open
if not handle.id.valid:
raise ValueError(
"HDF5 file handle is no longer valid (file may have been closed)"
)
except (ValueError, AttributeError) as e:
raise ValueError(f"Invalid HDF5 file handle: {e}")
if posix in handle:
return handle[posix]
else:
# Check if file is opened in read-only mode - with error handling
# Handle both File and Group objects (Groups have a .file attribute)
try:
if isinstance(handle, h5py.Group) and not isinstance(handle, h5py.File):
# For Group objects, access the parent file
file_obj = handle.file
file_mode = file_obj.mode
swmr_mode = file_obj.swmr_mode
else:
# For File objects, access directly
file_mode = handle.mode
swmr_mode = handle.swmr_mode
except (ValueError, AttributeError) as e:
raise ValueError(
f"Cannot determine file mode - HDF5 handle may be invalid: {e}"
)
if file_mode == "r":
raise KeyError(
f"Group '{posix}' not found in HDF5 file opened in read-only mode"
)
if swmr_mode is True:
raise KeyError(
f"Group '{posix}' not found in HDF5 file opened in SWMR mode"
)
else:
# File has write permissions, safe to create group
return handle.create_group(posix)
[docs]
def get_home(self, handle):
"""
Retrieves a specific group from an HDF file corresponding to single image data.
This method is used to fetch a predefined group from an HDF container, where the group
is identified by a constant key related to single image data. The function provides
a static interface allowing invocation without requiring an instance of the class.
Args:
handle: The HDF file handle from which the group should be retrieved.
Returns:
The group corresponding to single image data, retrieved based on the defined
SINGLE_IMAGE_ROOT_POSIX.
Raises:
Appropriate exceptions may be raised by the underlying HDF.get_group() method,
based on the implementation and provided handle or key.
"""
return self.get_group(handle=handle, posix=self.home_posix)
[docs]
def get_root_group(self, handle) -> h5py.Group:
return self.get_group(handle=handle, posix=self.root_posix)
[docs]
def get_data_group(self, handle):
if self.mode != "set":
raise AttributeError("This method is only available for image sets")
return self.get_group(handle, self.set_data_posix)
[docs]
def get_image_group(self, handle, image_name):
if self.mode == "single":
return self.get_home(handle)
elif self.mode == "set":
return self.get_group(
handle, posixpath.join(self.set_data_posix, image_name)
)
else:
raise ValueError(f"Invalid mode {self.mode}")
[docs]
def get_image_measurement_subgroup(self, handle, image_name):
return self.get_group(
handle,
posixpath.join(
self.set_data_posix, image_name, self.IMAGE_MEASUREMENT_SUBGROUP_KEY
),
)
[docs]
def get_status_subgroup(self, handle, image_name):
return self.get_group(
handle,
posixpath.join(
self.set_data_posix, image_name, self.IMAGE_STATUS_SUBGROUP_KEY
),
)
[docs]
@staticmethod
def save_array2hdf5(group, array, name, **kwargs):
"""
Saves a given numpy array to an HDF5 group. If a dataset with the specified
name already exists in the group, it checks if the shapes match. If the
shapes match, it updates the existing dataset; otherwise, it removes the
existing dataset and creates a new one with the specified name. If a dataset
with the given name doesn't exist, it creates a new dataset.
Args:
group: h5py.Group
The HDF5 group in which the dataset will be saved.
array: numpy.ndarray
The data array to be stored in the dataset.
name: str
The name of the dataset within the group.
**kwargs: dict
Additional keyword arguments to pass when creating a new dataset.
"""
if name in group:
dset = group[name]
if dset.shape == array.shape:
dset[...] = array
else:
del group[name]
group.create_dataset(name, data=array, **kwargs)
else:
group.create_dataset(name, data=array, **kwargs)
# =================== PANDAS2HDF FUNCTIONALITY ===================
[docs]
@staticmethod
def assert_swmr_on(g: h5py.Group) -> None:
"""Assert that SWMR mode is enabled on the group's file.
Args:
g: HDF5 group to check.
Raises:
RuntimeError: If SWMR mode is not enabled.
"""
if not g.file.swmr_mode:
raise RuntimeError(
f"SWMR mode is required but not enabled on file {g.file.filename}"
)
[docs]
@staticmethod
def get_uncompressed_sizes_for_group(
group: h5py.Group,
) -> tuple[dict[str, int], int]:
"""Recursively collect the uncompressed (logical) sizes of SWMR-compatible datasets.
This function walks the provided HDF5 group and inspects every dataset without
reading any data. For each dataset that is compatible with SWMR writing rules
(i.e., chunked layout and no variable-length data types), it computes the
uncompressed size in bytes as: dtype.itemsize * number_of_elements.
Notes
- This works regardless of whether datasets are stored compressed on disk; the
reported size is the logical size when uncompressed in memory.
- Variable-length strings (and datasets containing variable-length fields) are
excluded because they are not SWMR-write friendly and their uncompressed size
cannot be determined from metadata alone.
- The operation is safe under SWMR: it only reads object metadata, creates no
new refine, and does not modify the file.
Args:
group: The root h5py.Group to traverse.
Returns:
A tuple (sizes, total_bytes) where:
- sizes: dict mapping absolute dataset paths (e.g., '/grp/ds') to uncompressed
size in bytes.
- total_bytes: sum of all values in sizes.
"""
import numpy as _np
import h5py as _h5py
def _dtype_has_vlen(dt: _np.dtype) -> bool:
"""Return True if dtype is or contains variable-length elements."""
# Direct vlen
if _h5py.check_vlen_dtype(dt) is not None:
return True
# String vlen
if _h5py.check_string_dtype(dt) is not None:
info = _h5py.check_string_dtype(dt)
# info.length is None for variable-length strings
if getattr(info, "length", None) is None:
return True
return False # fixed-length string
# Compound: check fields recursively
if dt.fields:
for _, (subdt, _) in dt.fields.items():
if _dtype_has_vlen(subdt):
return True
return False
sizes: dict[str, int] = {}
def _visitor(obj: _h5py.Dataset | _h5py.Group) -> None:
if isinstance(obj, _h5py.Dataset):
# SWMR-write compatibility: chunked layout required
if obj.chunks is None:
return
# Exclude variable-length dtypes (including vlen strings)
if _dtype_has_vlen(obj.dtype):
return
# Compute logical size without reading data
try:
n_elems = (
int(_np.prod(obj.shape, dtype=_np.int64))
if obj.shape is not None
else 1
)
except Exception:
# Fallback for unusual shapes
n_elems = int(getattr(obj, "size", 0))
itemsize = int(obj.dtype.itemsize)
sizes[obj.name] = itemsize * n_elems
group.visititems(_visitor)
total = int(sum(sizes.values()))
return sizes, total
@staticmethod
def _get_string_dtype(length: int | None = None) -> h5py.special_dtype:
"""Get UTF-8 string dtype for HDF5.
Args:
length: If None, returns variable-length string dtype.
If an integer, returns fixed-length string dtype with that character length.
Returns:
HDF5 string dtype (variable-length or fixed-length).
"""
if length is None:
return h5py.string_dtype("utf-8")
else:
# Convert to native Python int to avoid h5py type issues
return h5py.string_dtype("utf-8", int(length))
@staticmethod
def _pad_or_truncate_string(s: str, fixed_length: int) -> str:
"""Pad or truncate a string to a fixed character length.
Args:
s: Input string.
fixed_length: Target character length.
Returns:
String padded with spaces or truncated to exactly fixed_length characters.
"""
if len(s) < fixed_length:
# Pad with spaces on the right
return s + " " * (fixed_length - len(s))
elif len(s) > fixed_length:
# Truncate to exactly fixed_length characters
return s[:fixed_length]
else:
# Already the right length
return s
@staticmethod
def _apply_fixed_length_to_strings(
str_array: np.ndarray[Any, np.dtype[Any]],
mask: np.ndarray[Any, np.dtype[Any]],
fixed_length: int,
) -> np.ndarray[Any, np.dtype[Any]]:
"""Apply fixed-length padding/truncation to string array.
Args:
str_array: Array of strings.
mask: Mask array (1=valid, 0=missing).
fixed_length: Target character length.
Returns:
Array with strings padded/truncated to fixed_length.
"""
result = str_array.copy()
valid_indices = mask == 1
for i in np.where(valid_indices)[0]:
# Ensure we're working with proper Unicode strings
original_str = str(result[i])
processed_str = HDF._pad_or_truncate_string(original_str, fixed_length)
# Encode to UTF-8 bytes to ensure it fits in fixed-length storage
result[i] = processed_str
return result
@staticmethod
def _trim_trailing_whitespace(s: str) -> str:
"""Trim trailing whitespace from a string.
Args:
s: Input string.
Returns:
String with trailing whitespace removed.
"""
return s.rstrip()
@staticmethod
def _decode_fixed_length_strings(
str_array: np.ndarray[Any, np.dtype[Any]], mask: np.ndarray[Any, np.dtype[Any]]
) -> np.ndarray[Any, np.dtype[Any]]:
"""Decode fixed-length strings and trim trailing whitespace from valid entries.
Args:
str_array: Array of fixed-length strings.
mask: Mask array (1=valid, 0=missing).
Returns:
Object array with trimmed strings and None for missing values.
"""
result = np.empty(len(str_array), dtype=object)
valid_indices = mask == 1
# Set valid entries with trimmed strings
for i in np.where(valid_indices)[0]:
raw_str = str_array[i]
if isinstance(raw_str, bytes):
raw_str = raw_str.decode("utf-8")
result[i] = HDF._trim_trailing_whitespace(str(raw_str))
# Set missing entries to None
result[mask == 0] = None
return result
@staticmethod
def _encode_values_for_hdf5(
values: pd.Series,
*,
string_fixed_length: int | None = None,
) -> tuple[
np.ndarray[Any, np.dtype[Any]], np.ndarray[Any, np.dtype[Any]] | None, str, str
]:
"""Encode pandas Series values for HDF5 storage.
Args:
values: pandas Series to encode.
string_fixed_length: If provided, use fixed-length strings with this character length.
Returns:
Tuple of (encoded_values, mask, values_kind, orig_dtype):
- encoded_values: numpy array ready for HDF5 storage
- mask: optional mask array (uint8, 1=valid, 0=missing) for strings
- values_kind: "numeric_float64", "string_utf8_fixed", or "string_utf8_vlen"
- orig_dtype: string representation of original dtype
"""
orig_dtype = str(values.dtype)
if pd.api.types.is_numeric_dtype(values.dtype) or pd.api.types.is_bool_dtype(
values.dtype
):
# Convert to float64, with NaN for missing values
encoded = values.astype(np.float64).values
mask = None
values_kind = "numeric_float64"
elif pd.api.types.is_object_dtype(values.dtype):
# Check if object dtype contains boolean-like values
non_null_values = values.dropna()
if len(non_null_values) > 0 and all(
isinstance(v, bool | np.bool_) or v in (True, False)
for v in non_null_values
):
# Treat as boolean/numeric data
encoded = values.astype(np.float64).values
mask = None
values_kind = "numeric_float64"
else:
# Treat as string data
str_values = values.astype(str)
mask = (~values.isna()).astype(np.uint8)
# Replace NaN string representations with empty strings
str_array = str_values.values
str_array[np.asarray(values.isna())] = ""
if string_fixed_length is not None:
# Apply fixed-length padding/truncation
str_array = HDF._apply_fixed_length_to_strings(
np.asarray(str_array), np.asarray(mask), string_fixed_length
)
# Create array with explicit UTF-8 encoding for fixed-length
try:
encoded = str_array.astype(
HDF._get_string_dtype(string_fixed_length)
)
except UnicodeEncodeError:
# Fallback: truncate to ASCII-safe characters if Unicode fails
ascii_safe_array = str_array.copy()
for i, s in enumerate(str_array):
if mask[i] == 1:
# Keep only ASCII characters for problematic Unicode
ascii_safe_array[i] = s.encode(
"ascii", "ignore"
).decode("ascii")[:string_fixed_length]
encoded = ascii_safe_array.astype(
HDF._get_string_dtype(string_fixed_length)
)
values_kind = "string_utf8_fixed"
else:
encoded = str_array.astype(HDF._get_string_dtype())
values_kind = "string_utf8_vlen"
elif pd.api.types.is_string_dtype(values.dtype):
# Convert to UTF-8 strings with mask for missing values
str_values = values.astype(str)
mask = (~values.isna()).astype(np.uint8)
# Replace NaN string representations with empty strings
str_array = str_values.values
str_array[np.asarray(values.isna())] = ""
if string_fixed_length is not None:
# Apply fixed-length padding/truncation
str_array = HDF._apply_fixed_length_to_strings(
np.asarray(str_array), np.asarray(mask), string_fixed_length
)
# Create array with explicit UTF-8 encoding for fixed-length
try:
encoded = str_array.astype(
HDF._get_string_dtype(string_fixed_length)
)
except UnicodeEncodeError:
# Fallback: truncate to ASCII-safe characters if Unicode fails
ascii_safe_array = str_array.copy()
for i, s in enumerate(str_array):
if mask[i] == 1:
# Keep only ASCII characters for problematic Unicode
ascii_safe_array[i] = s.encode("ascii", "ignore").decode(
"ascii"
)[:string_fixed_length]
encoded = ascii_safe_array.astype(
HDF._get_string_dtype(string_fixed_length)
)
values_kind = "string_utf8_fixed"
else:
encoded = str_array.astype(HDF._get_string_dtype())
values_kind = "string_utf8_vlen"
else:
raise ValueError(f"Unsupported dtype: {values.dtype}")
# Ensure we return numpy arrays, not ExtensionArrays
encoded_array: np.ndarray[Any, np.dtype[Any]] = np.asarray(encoded)
mask_array: np.ndarray[Any, np.dtype[Any]] | None = (
np.asarray(mask) if mask is not None else None
)
return encoded_array, mask_array, values_kind, orig_dtype
@staticmethod
def _encode_index_for_hdf5(
index: pd.Index,
*,
string_fixed_length: int | None = None,
) -> tuple[
np.ndarray[Any, np.dtype[Any]] | list[np.ndarray[Any, np.dtype[Any]]],
np.ndarray[Any, np.dtype[Any]] | list[np.ndarray[Any, np.dtype[Any]]],
dict[str, Any],
str,
]:
"""Encode pandas Index/MultiIndex for HDF5 storage.
Args:
index: pandas Index or MultiIndex to encode.
string_fixed_length: If provided, use fixed-length strings with this character length.
Returns:
Tuple of (encoded_arrays, mask_arrays, metadata, orig_dtype):
- encoded_arrays: array or list of arrays for MultiIndex levels
- mask_arrays: mask array or list of mask arrays
- metadata: dict with index metadata
- orig_dtype: string representation of original dtype
"""
orig_dtype = (
str(index.dtype) if not isinstance(index, pd.MultiIndex) else "MultiIndex"
)
if isinstance(index, pd.MultiIndex):
# Handle MultiIndex
encoded_arrays = []
mask_arrays = []
for level_values in index.to_frame().values.T:
level_series = pd.Series(level_values)
str_values = level_series.astype(str)
mask = (~level_series.isna()).astype(np.uint8)
# Replace NaN representations
str_array = str_values.values
str_array[np.asarray(level_series.isna())] = ""
if string_fixed_length is not None:
# Apply fixed-length padding/truncation
str_array = HDF._apply_fixed_length_to_strings(
np.asarray(str_array), np.asarray(mask), string_fixed_length
)
encoded_arrays.append(
str_array.astype(HDF._get_string_dtype(string_fixed_length))
)
else:
encoded_arrays.append(str_array.astype(HDF._get_string_dtype()))
mask_arrays.append(np.asarray(mask))
metadata = {
"index_is_multiindex": 1,
"index_levels": index.nlevels,
"index_names": json.dumps(
[str(name) if name is not None else None for name in index.names]
),
}
if string_fixed_length is not None:
metadata["index_kind"] = "string_utf8_fixed"
else:
metadata["index_kind"] = "string_utf8_vlen"
else:
# Handle regular Index
index_series = pd.Series(index)
str_values = index_series.astype(str)
mask = (~index_series.isna()).astype(np.uint8)
# Replace NaN representations
str_array = str_values.values
str_array[np.asarray(index_series.isna())] = ""
if string_fixed_length is not None:
# Apply fixed-length padding/truncation
str_array = HDF._apply_fixed_length_to_strings(
np.asarray(str_array), np.asarray(mask), string_fixed_length
)
encoded_arrays = str_array.astype(
HDF._get_string_dtype(string_fixed_length)
) # type: ignore[assignment]
else:
encoded_arrays = str_array.astype(HDF._get_string_dtype()) # type: ignore[assignment]
mask_arrays = mask # type: ignore[assignment]
metadata = {
"index_is_multiindex": 0,
"index_levels": 1,
"index_names": json.dumps(
[str(index.name) if index.name is not None else None]
),
}
if string_fixed_length is not None:
metadata["index_kind"] = "string_utf8_fixed"
else:
metadata["index_kind"] = "string_utf8_vlen"
return encoded_arrays, mask_arrays, metadata, orig_dtype
@staticmethod
def _decode_values_from_hdf5(
group: h5py.Group,
dataset_name: str = "values",
length: int | None = None,
) -> tuple[np.ndarray[Any, np.dtype[Any]], str]:
"""Decode values from HDF5 storage back to numpy array.
Args:
group: HDF5 group containing the datasets.
dataset_name: Name of the values dataset.
length: Logical length to read (respects preallocated space).
Returns:
Tuple of (decoded_values, values_kind).
"""
values_kind = group.attrs["values_kind"]
if isinstance(values_kind, bytes):
values_kind = values_kind.decode("utf-8")
logical_length = length if length is not None else group.attrs["len"]
if values_kind == "numeric_float64":
values = group[dataset_name][:logical_length]
elif values_kind == "string_utf8_vlen":
values = group[dataset_name][:logical_length]
mask = group[f"{dataset_name}_mask"][:logical_length]
# Convert back to object array with proper NaN handling
result = np.empty(logical_length, dtype=object)
result[mask == 1] = [
s.decode("utf-8") if isinstance(s, bytes) else str(s)
for s in values[mask == 1]
]
result[mask == 0] = None
values = result
elif values_kind == "string_utf8_fixed":
values = group[dataset_name][:logical_length]
mask = group[f"{dataset_name}_mask"][:logical_length]
# Decode fixed-length strings and trim trailing whitespace
values = HDF._decode_fixed_length_strings(values, mask)
else:
raise ValueError(f"Unknown values_kind: {values_kind}")
return values, values_kind
@staticmethod
def _decode_index_from_hdf5(
group: h5py.Group,
index_dataset_name: str = "index",
length: int | None = None,
) -> pd.Index:
"""Decode index from HDF5 storage back to pandas Index/MultiIndex.
Args:
group: HDF5 group containing the index datasets.
index_dataset_name: Base name for index datasets.
length: Logical length to read.
Returns:
Reconstructed pandas Index or MultiIndex.
"""
logical_length = length if length is not None else group.attrs["len"]
is_multiindex = bool(group.attrs["index_is_multiindex"])
index_names_attr = group.attrs["index_names"]
if isinstance(index_names_attr, bytes):
index_names_attr = index_names_attr.decode("utf-8")
index_names = json.loads(index_names_attr)
# Check if index uses fixed-length strings
index_kind = group.attrs.get("index_kind", "string_utf8_vlen")
if isinstance(index_kind, bytes):
index_kind = index_kind.decode("utf-8")
if is_multiindex:
levels = []
for i in range(group.attrs["index_levels"]):
level_data = group[f"{index_dataset_name}/levels/L{i}"][:logical_length]
level_mask = group[f"{index_dataset_name}/levels/L{i}_mask"][
:logical_length
]
if index_kind == "string_utf8_fixed":
# Decode fixed-length strings and trim trailing whitespace
level_values = HDF._decode_fixed_length_strings(
level_data, level_mask
)
else:
# Original variable-length string handling
level_values = np.empty(logical_length, dtype=object)
level_values[level_mask == 1] = [
s.decode("utf-8") if isinstance(s, bytes) else str(s)
for s in level_data[level_mask == 1]
]
level_values[level_mask == 0] = None
levels.append(level_values)
return pd.MultiIndex.from_arrays(levels, names=index_names)
else:
index_data = group[f"{index_dataset_name}/values"][:logical_length]
index_mask = group[f"{index_dataset_name}/index_mask"][:logical_length]
if index_kind == "string_utf8_fixed":
# Decode fixed-length strings and trim trailing whitespace
index_values = HDF._decode_fixed_length_strings(index_data, index_mask)
else:
# Original variable-length string handling
index_values = np.empty(logical_length, dtype=object)
index_values[index_mask == 1] = [
s.decode("utf-8") if isinstance(s, bytes) else str(s)
for s in index_data[index_mask == 1]
]
index_values[index_mask == 0] = None
return pd.Index(index_values, name=index_names[0])
@staticmethod
def _create_resizable_dataset(
group: h5py.Group,
name: str,
dtype: Any,
shape: tuple[int, ...],
maxshape: tuple[int | None, ...],
chunks: tuple[int, ...],
compression: str,
) -> h5py.Dataset:
"""Create a resizable, chunked, compressed dataset."""
return group.create_dataset(
name,
shape=shape,
maxshape=maxshape,
dtype=dtype,
chunks=chunks,
compression=compression,
)
# =================== MAIN PANDAS2HDF FUNCTIONS ===================
[docs]
@staticmethod
def preallocate_series_layout(
group: h5py.Group,
series: pd.Series,
*,
dataset: str = "values",
index_dataset: str = "index",
chunks: tuple[int, ...] = (25,),
compression: str = "gzip",
preallocate: int = 100,
string_fixed_length: int = 100,
) -> None:
"""Preallocate HDF5 layout for a pandas Series without writing data.
Creates resizable, chunked, compressed datasets with initial shape (preallocate,)
and maxshape (None,). Initializes masks to zeros and sets len=0.
Args:
group: HDF5 group to write to.
series: pandas Series to create layout for (used for schema).
dataset: Name for the values dataset.
index_dataset: Name for the index dataset.
chunks: Chunk shape for datasets.
compression: Compression algorithm.
preallocate: Initial allocation size.
string_fixed_length: Character length for fixed-length strings.
Raises:
RuntimeError: If require_swmr=True and SWMR mode not enabled.
If group.file.swmr_mode is True and datasets don't exist.
ValueError: If series validation fails.
"""
# Prevent object creation under SWMR (SWMR programming model compliance)
if group.file.swmr_mode and dataset not in group:
raise RuntimeError(
"Cannot create new datasets while SWMR mode is enabled. "
"Create all refine before starting SWMR mode."
)
# Encode series for schema information using fixed-length strings
encoded_values, values_mask, values_kind, orig_values_dtype = (
HDF._encode_values_for_hdf5(series, string_fixed_length=string_fixed_length)
)
encoded_index, index_masks, index_metadata, orig_index_dtype = (
HDF._encode_index_for_hdf5(
series.index, string_fixed_length=string_fixed_length
)
)
# Create values dataset
if values_kind == "numeric_float64":
HDF._create_resizable_dataset(
group, dataset, np.float64, (preallocate,), (None,), chunks, compression
)
else: # string_utf8_fixed or string_utf8_vlen
if values_kind == "string_utf8_fixed":
dtype = HDF._get_string_dtype(string_fixed_length)
else:
dtype = HDF._get_string_dtype()
HDF._create_resizable_dataset(
group,
dataset,
dtype,
(preallocate,),
(None,),
chunks,
compression,
)
# Create values mask
mask_dataset = HDF._create_resizable_dataset(
group,
f"{dataset}_mask",
np.uint8,
(preallocate,),
(None,),
chunks,
compression,
)
mask_dataset[:] = 0 # Initialize to all missing
# Create index datasets
index_kind = index_metadata["index_kind"]
if index_kind == "string_utf8_fixed":
index_dtype = HDF._get_string_dtype(string_fixed_length)
else:
index_dtype = HDF._get_string_dtype()
if index_metadata["index_is_multiindex"]:
# Create index group and level datasets
index_group = group.create_group(index_dataset)
levels_group = index_group.create_group("levels")
for i in range(index_metadata["index_levels"]):
HDF._create_resizable_dataset(
levels_group,
f"L{i}",
index_dtype,
(preallocate,),
(None,),
chunks,
compression,
)
mask_dataset = HDF._create_resizable_dataset(
levels_group,
f"L{i}_mask",
np.uint8,
(preallocate,),
(None,),
chunks,
compression,
)
mask_dataset[:] = 0 # Initialize to all missing
else:
# Create index group
index_group = group.create_group(index_dataset)
HDF._create_resizable_dataset(
index_group,
"values",
index_dtype,
(preallocate,),
(None,),
chunks,
compression,
)
mask_dataset = HDF._create_resizable_dataset(
index_group,
"index_mask",
np.uint8,
(preallocate,),
(None,),
chunks,
compression,
)
mask_dataset[:] = 0 # Initialize to all missing
# Set attributes
group.attrs["series_name"] = str(series.name) if series.name is not None else ""
group.attrs["len"] = 0 # Logical length is 0
group.attrs["values_kind"] = values_kind
group.attrs["orig_values_dtype"] = orig_values_dtype
group.attrs["orig_index_dtype"] = orig_index_dtype
group.attrs["created_at_iso"] = datetime.now().isoformat()
group.attrs["version"] = "1.0"
# Set string fixed length attributes when applicable
if values_kind == "string_utf8_fixed":
group.attrs["string_fixed_length"] = string_fixed_length
if index_kind == "string_utf8_fixed":
group.attrs["index_string_fixed_length"] = string_fixed_length
# Add index metadata
for key, value in index_metadata.items():
if isinstance(value, str):
group.attrs[key] = value
else:
group.attrs[key] = value
[docs]
@staticmethod
def save_series_new(
group: h5py.Group,
series: pd.Series,
*,
dataset: str = "values",
index_dataset: str = "index",
chunks: tuple[int, ...] = (25,),
compression: str = "gzip",
preallocate: int = 100,
string_fixed_length: int = 100,
require_swmr: bool = False,
) -> None:
"""Create datasets and write a pandas Series to HDF5.
Creates new datasets or reuses existing preallocated layout.
Writes the first len(series) elements and sets logical length.
Args:
group: HDF5 group to write to.
series: pandas Series to persist.
dataset: Name for the values dataset.
index_dataset: Name for the index dataset.
chunks: Chunk shape for new datasets.
compression: Compression algorithm for new datasets.
preallocate: Initial allocation size for new datasets.
string_fixed_length: Character length for fixed-length strings.
require_swmr: If True, assert SWMR mode is enabled.
Raises:
RuntimeError: If require_swmr=True and SWMR mode not enabled.
ValueError: If series validation fails.
"""
if len(series) == 0:
raise ValueError("Cannot save empty series")
# Check if layout already exists (preallocated)
if dataset in group and group.attrs.get("len", -1) == 0:
# Use existing preallocated layout
HDF.save_series_update(
group,
series,
start=0,
dataset=dataset,
index_dataset=index_dataset,
require_swmr=require_swmr,
)
return
# Creation path: if require_swmr=True, only permit write (no object creation)
if require_swmr:
HDF.assert_swmr_on(group)
# For SWMR writes, datasets must already exist
if dataset not in group:
raise RuntimeError(
"Datasets must be created before starting SWMR mode. "
"Use preallocate_series_layout() first, then start SWMR."
)
# Use update path
HDF.save_series_update(
group,
series,
start=0,
dataset=dataset,
index_dataset=index_dataset,
require_swmr=require_swmr,
)
return
# Create new layout (require_swmr=False for creation phase)
HDF.preallocate_series_layout(
group,
series,
dataset=dataset,
index_dataset=index_dataset,
chunks=chunks,
compression=compression,
preallocate=max(preallocate, len(series)),
string_fixed_length=string_fixed_length,
)
# Write the data
HDF.save_series_update(
group,
series,
start=0,
dataset=dataset,
index_dataset=index_dataset,
require_swmr=require_swmr,
)
[docs]
@staticmethod
def save_series_update(
group: h5py.Group,
series: pd.Series,
*,
start: int = 0,
dataset: str = "values",
index_dataset: str = "index",
require_swmr: bool = True,
) -> None:
"""Update a pandas Series in HDF5 at specified position.
Overwrites [start:start+len(series)] and updates logical length
to the largest contiguous written extent.
Args:
group: HDF5 group containing existing datasets.
series: pandas Series to write.
start: Starting position for the update.
dataset: Name of the values dataset.
index_dataset: Name of the index dataset.
require_swmr: If True, assert SWMR mode is enabled.
Raises:
RuntimeError: If require_swmr=True and SWMR mode not enabled.
ValueError: If validation fails or schema mismatch.
"""
if require_swmr:
HDF.assert_swmr_on(group)
if len(series) == 0:
raise ValueError("Cannot update with empty series")
current_len = group.attrs["len"]
end_pos = start + len(series)
# Validate that this is a contiguous update
if start > current_len:
raise ValueError(
f"Non-contiguous update: start={start}, current_len={current_len}"
)
# Get stored schema to determine if fixed-length strings are used
stored_values_kind = group.attrs["values_kind"]
if isinstance(stored_values_kind, bytes):
stored_values_kind = stored_values_kind.decode("utf-8")
stored_index_kind = group.attrs.get("index_kind", "string_utf8_vlen")
if isinstance(stored_index_kind, bytes):
stored_index_kind = stored_index_kind.decode("utf-8")
# Check for SWMR restrictions on variable-length strings
if require_swmr and group.file.swmr_mode:
if stored_values_kind == "string_utf8_vlen":
raise RuntimeError(
"Cannot write to variable-length string datasets under SWMR mode. "
"Variable-length string writes are not allowed in SWMR mode."
)
if stored_index_kind == "string_utf8_vlen":
raise RuntimeError(
"Cannot write to variable-length string index datasets under SWMR mode. "
"Variable-length string writes are not allowed in SWMR mode."
)
# Get fixed-length parameters from stored attributes
string_fixed_length = None
if stored_values_kind == "string_utf8_fixed":
string_fixed_length = group.attrs["string_fixed_length"]
index_string_fixed_length = None
if stored_index_kind == "string_utf8_fixed":
index_string_fixed_length = group.attrs["index_string_fixed_length"]
# Encode data using stored schema
encoded_values, values_mask, values_kind, _ = HDF._encode_values_for_hdf5(
series, string_fixed_length=string_fixed_length
)
encoded_index, index_masks, index_metadata, _ = HDF._encode_index_for_hdf5(
series.index, string_fixed_length=index_string_fixed_length
)
# Validate schema compatibility
if stored_values_kind != values_kind:
raise ValueError(
f"Values kind mismatch: expected {stored_values_kind}, got {values_kind}"
)
expected_multiindex = bool(group.attrs["index_is_multiindex"])
actual_multiindex = bool(index_metadata["index_is_multiindex"])
if expected_multiindex != actual_multiindex:
raise ValueError(
f"Index type mismatch: expected multiindex={expected_multiindex}, got {actual_multiindex}"
)
# Resize datasets if needed
values_dataset = group[dataset]
if end_pos > values_dataset.shape[0]:
values_dataset.resize((end_pos,))
if values_mask is not None:
group[f"{dataset}_mask"].resize((end_pos,))
# Write values
values_dataset[start:end_pos] = encoded_values
if values_mask is not None:
group[f"{dataset}_mask"][start:end_pos] = values_mask
# Write index
if expected_multiindex:
levels_group = group[f"{index_dataset}/levels"]
for i, (level_data, level_mask) in enumerate(
zip(encoded_index, index_masks, strict=False)
):
level_dataset = levels_group[f"L{i}"]
if end_pos > level_dataset.shape[0]:
level_dataset.resize((end_pos,))
levels_group[f"L{i}_mask"].resize((end_pos,))
level_dataset[start:end_pos] = level_data
levels_group[f"L{i}_mask"][start:end_pos] = level_mask
else:
index_group = group[index_dataset]
index_values_dataset = index_group["values"]
if end_pos > index_values_dataset.shape[0]:
index_values_dataset.resize((end_pos,))
index_group["index_mask"].resize((end_pos,))
index_values_dataset[start:end_pos] = encoded_index
index_group["index_mask"][start:end_pos] = index_masks
# Update logical length
group.attrs["len"] = max(current_len, end_pos)
if require_swmr:
group.file.flush()
[docs]
@staticmethod
def save_series_append(
group: h5py.Group,
series: pd.Series,
*,
dataset: str = "values",
index_dataset: str = "index",
require_swmr: bool = True,
) -> None:
"""Append a pandas Series to existing HDF5 datasets.
Appends at the end using current logical length.
Resizes datasets if needed and updates logical length.
Args:
group: HDF5 group containing existing datasets.
series: pandas Series to append.
dataset: Name of the values dataset.
index_dataset: Name of the index dataset.
require_swmr: If True, assert SWMR mode is enabled.
Raises:
RuntimeError: If require_swmr=True and SWMR mode not enabled.
ValueError: If validation fails or schema mismatch.
"""
if require_swmr:
HDF.assert_swmr_on(group)
current_len = group.attrs["len"]
HDF.save_series_update(
group,
series,
start=current_len,
dataset=dataset,
index_dataset=index_dataset,
require_swmr=require_swmr,
)
[docs]
@staticmethod
def load_series(
group: h5py.Group,
*,
dataset: str = "values",
index_dataset: str = "index",
require_swmr: bool = False,
) -> pd.Series:
"""Load a pandas Series from HDF5 storage.
Reconstructs the Series with original name, index names, order,
and missingness. Respects logical length from attributes.
Args:
group: HDF5 group containing the Series data.
dataset: Name of the values dataset.
index_dataset: Name of the index dataset.
require_swmr: If True, assert SWMR mode is enabled.
Returns:
Reconstructed pandas Series.
Raises:
RuntimeError: If require_swmr=True and SWMR mode not enabled.
ValueError: If data validation fails.
"""
if require_swmr:
HDF.assert_swmr_on(group)
logical_length = group.attrs["len"]
if logical_length == 0:
# Return empty series with proper schema
series_name = group.attrs["series_name"]
if isinstance(series_name, bytes):
series_name = series_name.decode("utf-8")
series_name = series_name if series_name else None
return pd.Series([], name=series_name, dtype=object)
# Decode values and index
values, _ = HDF._decode_values_from_hdf5(group, dataset, logical_length)
index = HDF._decode_index_from_hdf5(group, index_dataset, logical_length)
# Get series name
series_name = group.attrs["series_name"]
if isinstance(series_name, bytes):
series_name = series_name.decode("utf-8")
series_name = series_name if series_name else None
return pd.Series(values, index=index, name=series_name)
# =================== DATAFRAME FUNCTIONS ===================
@staticmethod
def _convert_categorical_columns(dataframe: pd.DataFrame) -> pd.DataFrame:
"""Convert categorical columns to their non-categorical representation.
This method converts any categorical dtype columns to their underlying
data type representation, preserving the actual values but removing the
categorical encoding.
Args:
dataframe: pandas DataFrame that may contain categorical columns.
Returns:
DataFrame with categorical columns converted to their base dtypes.
"""
df = dataframe.copy()
for col in df.columns:
if pd.api.types.is_categorical_dtype(df[col]):
# Convert categorical to its underlying dtype
# This preserves the actual values but removes categorical encoding
df[col] = df[col].astype(df[col].cat.categories.dtype)
return df
[docs]
@staticmethod
def preallocate_frame_layout(
group: h5py.Group,
dataframe: pd.DataFrame,
*,
chunks: tuple[int, ...] = (25,),
compression: str = "gzip",
preallocate: int = 100,
string_fixed_length: int = 100,
require_swmr: bool = False,
) -> None:
"""Preallocate HDF5 layout for a pandas DataFrame without writing data.
Creates layout for shared index and column series using Series preallocation.
Args:
group: HDF5 group to write to.
dataframe: pandas DataFrame to create layout for.
chunks: Chunk shape for datasets.
compression: Compression algorithm.
preallocate: Initial allocation size.
string_fixed_length: Character length for fixed-length strings.
require_swmr: If True, assert SWMR mode is enabled.
Raises:
RuntimeError: If require_swmr=True and SWMR mode not enabled.
If group.file.swmr_mode is True and datasets don't exist.
ValueError: If DataFrame validation fails.
"""
# Convert categorical columns to their base dtypes
dataframe = HDF._convert_categorical_columns(dataframe)
if require_swmr:
HDF.assert_swmr_on(group)
# Prevent object creation under SWMR (SWMR programming model compliance)
if group.file.swmr_mode and "index" not in group:
raise RuntimeError(
"Cannot create new groups/datasets while SWMR mode is enabled. "
"Create all refine before starting SWMR mode."
)
if len(dataframe.columns) == 0:
raise ValueError("Cannot preallocate layout for DataFrame with no columns")
# Set frame attributes
group.attrs["column_order"] = json.dumps(list(dataframe.columns))
group.attrs["len"] = 0
# Preallocate index layout
index_group = group.create_group("index")
# Create a dummy series with string values to match the schema
dummy_series = pd.Series(
[], dtype=str, index=dataframe.index[:0], name="__index__"
)
HDF.preallocate_series_layout(
index_group,
dummy_series,
dataset="values",
index_dataset="index",
chunks=chunks,
compression=compression,
preallocate=preallocate,
string_fixed_length=string_fixed_length,
)
# Preallocate column layouts
columns_group = group.create_group("columns")
for col_name in dataframe.columns:
col_group = columns_group.create_group(str(col_name))
# Create dummy series - use actual data to determine schema if available
if len(dataframe) > 0:
# Use first few values to determine the proper schema
col_data = dataframe[col_name]
dummy_col_series = pd.Series(
[col_data.iloc[0]] if not col_data.isna().iloc[0] else [None],
dtype=col_data.dtype,
index=dataframe.index[:1],
name=col_name,
)
else:
# Fallback to dtype for empty dataframe
col_dtype = dataframe[col_name].dtype
dummy_col_series = pd.Series(
[], dtype=col_dtype, index=dataframe.index[:0], name=col_name
)
HDF.preallocate_series_layout(
col_group,
dummy_col_series,
dataset="values",
index_dataset="index",
chunks=chunks,
compression=compression,
preallocate=preallocate,
string_fixed_length=string_fixed_length,
)
[docs]
@staticmethod
def save_frame_new(
group: h5py.Group,
dataframe: pd.DataFrame,
*,
chunks: tuple[int, ...] = (25,),
compression: str = "gzip",
preallocate: int = 100,
string_fixed_length: int = 100,
require_swmr: bool = False,
) -> None:
"""Create datasets and write a pandas DataFrame to HDF5.
Args:
group: HDF5 group to write to.
dataframe: pandas DataFrame to persist.
chunks: Chunk shape for new datasets.
compression: Compression algorithm for new datasets.
preallocate: Initial allocation size for new datasets.
string_fixed_length: Character length for fixed-length strings.
require_swmr: If True, assert SWMR mode is enabled.
Raises:
RuntimeError: If require_swmr=True and SWMR mode not enabled.
ValueError: If DataFrame validation fails.
"""
# Convert categorical columns to their base dtypes
dataframe = HDF._convert_categorical_columns(dataframe)
if len(dataframe) == 0:
raise ValueError("Cannot save empty DataFrame")
# Check if layout already exists (preallocated)
if "columns" in group and group.attrs.get("len", -1) == 0:
HDF.save_frame_update(group, dataframe, start=0, require_swmr=require_swmr)
return
# Creation path: if require_swmr=True, only permit write (no object creation)
if require_swmr:
HDF.assert_swmr_on(group)
# For SWMR writes, groups/datasets must already exist
if "columns" not in group:
raise RuntimeError(
"Groups/datasets must be created before starting SWMR mode. "
"Use preallocate_frame_layout() first, then start SWMR."
)
# Use update path
HDF.save_frame_update(group, dataframe, start=0, require_swmr=require_swmr)
return
# Create new layout (require_swmr=False for creation phase)
HDF.preallocate_frame_layout(
group,
dataframe,
chunks=chunks,
compression=compression,
preallocate=max(preallocate, len(dataframe)),
string_fixed_length=string_fixed_length,
require_swmr=False,
)
# Write the data
HDF.save_frame_update(group, dataframe, start=0, require_swmr=require_swmr)
[docs]
@staticmethod
def save_frame_update(
group: h5py.Group,
dataframe: pd.DataFrame,
*,
start: int = 0,
require_swmr: bool = True,
) -> None:
"""Update a pandas DataFrame in HDF5 at specified position.
Args:
group: HDF5 group containing existing datasets.
dataframe: pandas DataFrame to write.
start: Starting position for the update.
require_swmr: If True, assert SWMR mode is enabled.
Raises:
RuntimeError: If require_swmr=True and SWMR mode not enabled.
ValueError: If validation fails or schema mismatch.
"""
# Convert categorical columns to their base dtypes
dataframe = HDF._convert_categorical_columns(dataframe)
if require_swmr:
HDF.assert_swmr_on(group)
if len(dataframe) == 0:
raise ValueError("Cannot update with empty DataFrame")
current_len = group.attrs["len"]
end_pos = start + len(dataframe)
# Validate contiguous update
if start > current_len:
raise ValueError(
f"Non-contiguous update: start={start}, current_len={current_len}"
)
# Validate column order matches
column_order_attr = group.attrs["column_order"]
if isinstance(column_order_attr, bytes):
column_order_attr = column_order_attr.decode("utf-8")
stored_columns = json.loads(column_order_attr)
if list(dataframe.columns) != stored_columns:
raise ValueError(
f"Column order mismatch: expected {stored_columns}, got {list(dataframe.columns)}"
)
# Update index - create a dummy series to represent the actual index
# We need to store the index structure, so we create a dummy series where the
# index is the actual DataFrame index and values are just placeholders
index_series = pd.Series(
["dummy"] * len(dataframe), index=dataframe.index, name="__index__"
)
HDF.save_series_update(
group["index"],
index_series,
start=start,
dataset="values",
index_dataset="index",
require_swmr=require_swmr,
)
# Update each column
columns_group = group["columns"]
for col_name in dataframe.columns:
col_series = dataframe[col_name]
col_series.name = col_name
HDF.save_series_update(
columns_group[str(col_name)],
col_series,
start=start,
dataset="values",
index_dataset="index",
require_swmr=require_swmr,
)
# Update frame length
group.attrs["len"] = max(current_len, end_pos)
if require_swmr:
group.file.flush()
[docs]
@staticmethod
def save_frame_append(
group: h5py.Group,
dataframe: pd.DataFrame,
*,
require_swmr: bool = True,
) -> None:
"""Append a pandas DataFrame to existing HDF5 datasets.
Args:
group: HDF5 group containing existing datasets.
dataframe: pandas DataFrame to append.
require_swmr: If True, assert SWMR mode is enabled.
Raises:
RuntimeError: If require_swmr=True and SWMR mode not enabled.
ValueError: If validation fails or schema mismatch.
"""
# Convert categorical columns to their base dtypes
dataframe = HDF._convert_categorical_columns(dataframe)
if require_swmr:
HDF.assert_swmr_on(group)
current_len = group.attrs["len"]
HDF.save_frame_update(
group, dataframe, start=current_len, require_swmr=require_swmr
)
[docs]
@staticmethod
def load_frame(
group: h5py.Group,
*,
require_swmr: bool = False,
) -> pd.DataFrame:
"""Load a pandas DataFrame from HDF5 storage.
Args:
group: HDF5 group containing the DataFrame data.
require_swmr: If True, assert SWMR mode is enabled.
Returns:
Reconstructed pandas DataFrame.
Raises:
RuntimeError: If require_swmr=True and SWMR mode not enabled.
ValueError: If data validation fails.
"""
if require_swmr:
HDF.assert_swmr_on(group)
logical_length = group.attrs["len"]
if logical_length == 0:
# Return empty DataFrame with proper schema
column_order_attr = group.attrs["column_order"]
if isinstance(column_order_attr, bytes):
column_order_attr = column_order_attr.decode("utf-8")
column_order = json.loads(column_order_attr)
return pd.DataFrame(columns=column_order)
# Load index
index = HDF._decode_index_from_hdf5(group["index"], "index", logical_length)
# Load columns in order
column_order_attr = group.attrs["column_order"]
if isinstance(column_order_attr, bytes):
column_order_attr = column_order_attr.decode("utf-8")
column_order = json.loads(column_order_attr)
columns_group = group["columns"]
columns_data = {}
for col_name in column_order:
col_series = HDF.load_series(
columns_group[str(col_name)],
dataset="values",
index_dataset="index",
require_swmr=require_swmr,
)
columns_data[col_name] = col_series.values
# Reconstruct DataFrame
dataframe = pd.DataFrame(columns_data, index=index)
result: pd.DataFrame = dataframe[column_order] # Ensure column order
return result
[docs]
@staticmethod
def close_handle(handle: h5py.File | h5py.Group) -> None:
if handle is not None:
handle = handle.file if isinstance(handle, h5py.Group) else handle
try:
if hasattr(handle, "id") and handle.id.valid:
logger.warning("HDF5 file handle may not have been properly closed")
# Force close
handle.close()
else:
logger.debug("HDF5 file handle properly closed")
except (ValueError, AttributeError):
# Handle is closed/invalid - this is expected
logger.debug(
f"hdf5 file handle {handle} was properly closed or invalid"
)