Source code for phenotypic.tools.funcs_

from __future__ import annotations

from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from phenotypic import Image

# this is a dummy variable so annotation's in ImageOperation, MeasureFeatures classes don't cause integrity check to throw an exception
Image: Any

import numpy as np
import time
import inspect
import mmh3
from functools import wraps

from phenotypic.tools.exceptions_ import OperationIntegrityError
from phenotypic.tools.constants_ import VALIDATE_OPS


[docs] def is_binary_mask(arr: np.ndarray): return ( True if (arr.ndim == 2 or arr.ndim == 3) and np.all((arr == 0) | (arr == 1)) else False )
[docs] def timed_execution(func): """ Decorator to measure and print the execution time of a function. """ def wrapper(*args, **kwargs): start_time = time.time() # Record the start time result = func(*args, **kwargs) # Execute the wrapped function end_time = time.time() # Record the end time print( f"Function '{func.__name__}' executed in {end_time - start_time:.4f} seconds" ) return result return wrapper
[docs] def is_static_method(owner_cls: type, method_name: str) -> bool: """ Return True if *method_name* is defined on *owner_cls* (or an ancestor in its MRO) as a staticmethod. """ # Retrieve attribute without invoking the descriptor protocol attr = inspect.getattr_static(owner_cls, method_name) # Python ≥3.2 return isinstance(attr, staticmethod) # True ⇒ @staticmethod
[docs] def murmur3_array_signature(arr: np.ndarray) -> bytes: """ Return a 128‑bit MurmurHash3 digest of *arr*. The rgb is converted to a C‑contiguous view so that ``memoryview`` can safely expose its buffer. If the rgb is already contiguous this is a zero‑copy operation. """ if not arr.flags["C_CONTIGUOUS"]: arr = np.ascontiguousarray(arr) return mmh3.mmh3_x64_128_digest(memoryview(arr))
[docs] def validate_operation_integrity(*targets: str): """ Decorator to ensure that key NumPy arrays on the 'image' argument remain unchanged by an ImageOperation.apply() call. If no targets are specified, defaults to checking: image.rgb, image.gray, image.enh_gray, image.objmap Example Usage: @validate_member_integrity('image.rgb', 'image.objmap') def func(image: Image,...): ... """ def decorator(func): # Step 1: Get the function signature to analyze parameters sig = inspect.signature(func) # Remove all annotations in the signature to avoid circular import issues with Image class params = [p.replace(annotation=inspect._empty) for p in sig.parameters.values()] sig = sig.replace(parameters=params, return_annotation=inspect._empty) # Step 2: Determine which attributes to check for integrity # If targets are explicitly provided, use those if targets: eff_targets = list(targets) else: # Otherwise use default targets, but ensure 'image' parameter exists if "image" not in sig.parameters: raise AttributeError( f"{func.__name__}: no 'image' parameter and no targets given", ) # Default attributes to check on the image object eff_targets = ["image.rgb", "image.gray", "image.enh_gray", "image.objmap"] # Helper function to retrieve a NumPy array from an object by attribute path def _get_array(bound_args, target: str) -> np.ndarray: # Split the target path (e.g., 'image.rgb' -> ['image', 'rgb']) parts = target.split(".") # Get the root object from function arguments obj: Image = bound_args.arguments.get(parts[0]) if obj is None: raise AttributeError( f"{func.__name__}: parameter '{parts[0]}' not found", ) # Navigate through the attribute chain to get the final rgb for attr in parts[1:]: if obj.rgb.isempty() and attr == "rgb": pass else: obj = getattr(obj, attr)[:] # Use [:] to get a view of the rgb # Ensure the result is a NumPy rgb if not isinstance(obj, np.ndarray): raise RuntimeError( f"{func.__name__}: '{target}' is not a NumPy array", ) return obj # The actual wrapper function that will replace the decorated function @wraps(func) def wrapper(*args, **kwargs): # Step 3: Bind the provided arguments to the function signature bound = sig.bind_partial(*args, **kwargs) bound.apply_defaults() # Step 4: Calculate hash values for all target arrays before function execution # This creates a dictionary mapping each target to its hash value if VALIDATE_OPS: pre_hashes = { tgt: murmur3_array_signature(_get_array(bound, tgt)) for tgt in eff_targets } # Step 5: Execute the original function result = func(*args, **kwargs) # Step 6: Verify integrity by comparing hash values after function execution # For each target, calculate a new hash and compare with the original if VALIDATE_OPS: for tgt, old_hash in pre_hashes.items(): parts = tgt.split(".") # Start with the result object returned by the function obj = result # Navigate through the attribute chain on the result object for attr in parts[1:]: if not hasattr(obj, attr): raise AttributeError(f"{obj} has no attribute '{attr}'") obj = getattr(obj, attr)[:] # Ensure the attribute is still a NumPy array if not isinstance(obj, np.ndarray): raise RuntimeError( f"{func.__name__}: '{tgt}' is not a NumPy rgb on result", ) # Calculate new hash and compare with original new_hash = murmur3_array_signature(obj) # If hashes don't match, the array was modified - raise an error if new_hash != old_hash: raise OperationIntegrityError( opname=f"{func.__name__}", component=f"{tgt}", ) # Step 7: Return the original function's result if integrity check passes return result # Step 8: Preserve the original function's metadata on the wrapper wrapper.__name__ = func.__name__ wrapper.__doc__ = func.__doc__ wrapper.__signature__ = sig return wrapper return decorator
[docs] def validate_measure_integrity(*targets: str): """ Decorator to ensure that key NumPy arrays on the 'image' argument are not mutated by an MeasureFeatures.measure() call. If you pass explicit targets, it will honor those—for example: @validate_member_integrity('image.rgb') Otherwise it defaults to checking: image.rgb, image.gray, image.enh_gray, image.objmap """ def decorator(func): sig = inspect.signature(func) # wipe out all annotations in the signature params = [p.replace(annotation=inspect._empty) for p in sig.parameters.values()] sig = sig.replace(parameters=params, return_annotation=inspect._empty) # determine which attributes to check if targets: eff_targets = list(targets) else: # apply only to methods with an 'image' parameter if "image" not in sig.parameters: raise OperationIntegrityError( f"{func.__name__}: no 'image' parameter and no targets given", ) eff_targets = ["image.rgb", "image.gray", "image.enh_gray", "image.objmap"] def _get_array(bound_args, target: str) -> np.ndarray: # e.g. target = 'image.rgb' obj = bound_args.arguments.get(target.split(".")[0]) if obj is None: raise OperationIntegrityError( f"{func.__name__}: cannot find parameter '{target.split('.')[0]}'", ) for attr in target.split(".")[1:]: obj = getattr(obj, attr)[:] if not isinstance(obj, np.ndarray): raise OperationIntegrityError( f"{func.__name__}: '{target}' is not a NumPy array" ) return obj @wraps(func) def wrapper(*args, **kwargs): bound = sig.bind_partial(*args, **kwargs) bound.apply_defaults() # hash each target before the call if VALIDATE_OPS: pre_hashes = { tgt: murmur3_array_signature(_get_array(bound, tgt)) for tgt in eff_targets } # execute the original method result = func(*args, **kwargs) # re-hash and compare if VALIDATE_OPS: for tgt, old in pre_hashes.items(): new = murmur3_array_signature(_get_array(bound, tgt)) if new != old: raise OperationIntegrityError( opname=f"{func.__name__}", component=f"{tgt}" ) return result # preserve metadata wrapper.__name__ = func.__name__ wrapper.__doc__ = func.__doc__ wrapper.__signature__ = sig return wrapper return decorator
[docs] def normalize_rgb_bitdepth(image: np.ndarray) -> np.ndarray: """ Normalize an RGB rgb to [0,1] using bit-depth inference. Rules: - If dtype is integer: use dtype max (e.g. 255 for uint8, 65535 for uint16). - If dtype is float: * If max <= 1 → assume already normalized, return as-is. * If 255 < max <= 65535 → assume 16-bit, divide by 65535. * Else → assume 8-bit, divide by 255. Args: image: RGB rgb. Returns: np.ndarray: float64 normalized image in [0,1]. """ img = np.asarray(image.copy()) if np.issubdtype(img.dtype, np.integer): max_val = np.iinfo(img.dtype).max return img.astype(np.float64)/max_val elif np.issubdtype(img.dtype, np.floating): tol = 1e-7 m = img.max() if m <= 1.0 + tol: return img.astype(np.float32, copy=False) elif 1 < m <= (255 + tol): return (img.astype(np.float32)/255.0).clip(0, 1) elif 255 < m <= (65535.0 + tol): return (img.astype(np.float32)/65535.0).clip(0, 1) else: raise ValueError( f"Invalid range: min={img.min():.02f} max={img.max():.02f}" ) else: raise TypeError(f"Unsupported dtype: {img.dtype}")