Source code for phenotypic.abc_._base_operation
from __future__ import annotations
import inspect
from typing import TYPE_CHECKING, Callable
import functools, types
if TYPE_CHECKING:
from phenotypic import Image
import logging
import tracemalloc
try:
from pympler import muppy, summary
PYMPLER_AVAILABLE = True
except ImportError:
PYMPLER_AVAILABLE = False
try:
import psutil
PSUTIL_AVAILABLE = True
except ImportError:
PSUTIL_AVAILABLE = False
from abc import ABC
[docs]
class BaseOperation(ABC):
"""Root abstract base class for all operations in PhenoTypic.
BaseOperation is the foundation of PhenoTypic's operation system. It provides
automatic memory tracking, logging integration, and utilities for parallel
execution. All operations in PhenoTypic inherit from BaseOperation (either
directly or through intermediate ABCs like ImageOperation and MeasureFeatures).
This class is a blueprint for extending the framework: when you create a new
operation, BaseOperation automatically handles memory profiling and logging so
you can focus on the algorithm implementation.
What it provides automatically:
- **Memory Tracking:** BaseOperation automatically initiates tracemalloc when
the logger is enabled for INFO level or higher. This enables per-operation
memory usage monitoring without explicit instrumentation. Three levels of
memory tracking are available:
1. Object memory (via pympler if available): Detailed breakdown of memory
used by Python objects in your operation.
2. Process memory (via psutil if available): System-level memory usage
(RSS - resident set size).
3. Tracemalloc snapshots: Python's built-in memory tracking showing current
and peak allocations.
- **Logging Integration:** A logger is created automatically for each operation
class with the name format: `module.ClassName`. Subclasses can log messages
and memory usage without additional setup.
- **Parallel Execution Support:** The `_get_matched_operation_args()` method
enables serialization of operation state for parallel execution by extracting
operation attributes that match the `_operate()` method's parameters.
Inheritance hierarchy:
BaseOperation (this class)
├── ImageOperation
│ ├── ImageEnhancer (preprocessing filters, noise reduction)
│ ├── ImageCorrector (rotation, alignment, quality fixes)
│ └── ObjectDetector (colony detection algorithms)
│
├── MeasureFeatures (feature extraction from detected objects)
│
└── GridOperation (grid detection and refinement)
How to subclass BaseOperation:
When extending BaseOperation, you typically implement one of its subclasses
(ImageOperation, MeasureFeatures, etc.) which provides the specific interface
for your operation type. All the memory tracking and logging happens
automatically in the parent class.
Example: Creating a custom operation (without image details):
from phenotypic.abc_ import BaseOperation
import logging
class MyCustomOperation(BaseOperation):
def __init__(self, param1, param2=5):
# Always call parent __init__ first
super().__init__()
# Store your parameters as attributes
self.param1 = param1
self.param2 = param2
def _operate(self, data):
# Your algorithm here
# Logger available as self._logger
self._logger.info(f"Processing with param1={self.param1}")
# Log memory usage after expensive operations
self._log_memory_usage("after processing")
return result
Attributes:
_logger (logging.Logger): Logger instance created automatically with
the format `module.ClassName`. Use `_logger.info()`, `_logger.debug()`
to log messages during operation execution.
_tracemalloc_started (bool): Internal flag indicating whether tracemalloc
was started. Set to True automatically if logger is enabled for INFO
level or higher.
Notes:
- Memory tracking is only enabled if the logger is configured to handle
INFO level messages or higher. If you want to disable memory tracking,
set the logger level to WARNING or higher.
- Tracemalloc is automatically stopped when the operation object is
deleted (in `__del__`), even if an exception occurs.
- The `_get_matched_operation_args()` method is used internally by the
pipeline system for parallel execution. It extracts operation attributes
that match the `_operate()` method signature, enabling operations to be
serialized and executed in worker processes.
- On Windows, pympler may not be available, so object memory tracking
will fall back gracefully. psutil is available on all platforms.
Examples:
.. dropdown:: Enabling memory tracking for an operation
.. code-block:: python
import logging
from phenotypic.detect import OtsuDetector
# Set up logging to see memory usage
logging.basicConfig(level=logging.INFO)
# Create detector instance
detector = OtsuDetector()
# Apply operation - memory usage is logged automatically
result = detector.apply(image)
# Console output shows:
# INFO: Memory usage after <step>: XX.XX MB (objects), YY.YY MB (process)
.. dropdown:: Accessing memory information programmatically
.. code-block:: python
import logging
from phenotypic.enhance import GaussianBlur
# Create custom logger to capture memory messages
logger = logging.getLogger('phenotypic.enhance.GaussianBlur')
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
logger.addHandler(handler)
# Use operation
blur = GaussianBlur(sigma=2)
enhanced = blur.apply(image)
# Memory tracking happens automatically during operation
.. dropdown:: Custom operation with parameter matching for parallel execution
.. code-block:: python
from phenotypic.abc_ import ImageOperation
from phenotypic import Image
class CustomThreshold(ImageOperation):
def __init__(self, threshold_value: int):
super().__init__()
self.threshold_value = threshold_value
@staticmethod
def _operate(image: Image, threshold_value: int = 128) -> Image:
# Apply threshold algorithm
image.enh_gray[:] = image.enh_gray[:] > threshold_value
return image
# When operation is applied via pipeline:
operation = CustomThreshold(threshold_value=100)
# _get_matched_operation_args() automatically extracts:
# {'threshold_value': 100}
# This enables parallel execution in pipelines
"""
def __init__(self):
self._logger = logging.getLogger(
f"{self.__class__.__module__}.{self.__class__.__name__}"
)
self._tracemalloc_started = False
# Start tracemalloc automatically if logger is enabled for INFO level
if self._logger.isEnabledFor(logging.INFO):
tracemalloc.start()
self._tracemalloc_started = True
self._logger.debug("Tracemalloc started for memory logging")
def _log_memory_usage(
self,
step: str,
include_process: bool = False,
include_tracemalloc: bool = False,
) -> None:
"""Log memory usage if logger is in INFO mode."""
if self._logger.isEnabledFor(logging.INFO):
log_msg_parts = [f"Memory usage after {step}:"]
# Object memory using pympler
if PYMPLER_AVAILABLE:
try:
all_objects = muppy.get_objects()
mem_summary = summary.summarize(all_objects)
object_memory = sum(
mem[2] for mem in mem_summary
) # mem[2] is total size
log_msg_parts.append(
f"{object_memory / 1024 / 1024:.2f} MB (objects)"
)
except Exception as e:
self._logger.debug(f"Failed to get object memory: {e}")
else:
log_msg_parts.append("pympler not available")
# Process memory using psutil
if include_process and PSUTIL_AVAILABLE:
try:
process = psutil.Process()
process_memory = process.memory_info().rss
log_msg_parts.append(
f"{process_memory / 1024 / 1024:.2f} MB (process)"
)
except Exception as e:
self._logger.debug(f"Failed to get process memory: {e}")
# Tracemalloc snapshot
if include_tracemalloc:
try:
current, peak = tracemalloc.get_traced_memory()
log_msg_parts.append(
f"{current / 1024 / 1024:.2f} MB current, {peak / 1024 / 1024:.2f} MB peak (tracemalloc)"
)
except Exception as e:
self._logger.debug(f"Failed to get tracemalloc memory: {e}")
log_msg = ", ".join(log_msg_parts)
self._logger.info(log_msg)
[docs]
def __del__(self):
"""Automatically stop tracemalloc when the object is deleted."""
if hasattr(self, "_tracemalloc_started") and self._tracemalloc_started:
try:
tracemalloc.stop()
# Only log if we can determine logging is still available
if hasattr(self, "_logger") and hasattr(self._logger, "isEnabledFor"):
self._logger.debug("Tracemalloc stopped automatically")
except Exception:
# Ignore errors during cleanup
pass
def _get_matched_operation_args(self) -> dict:
"""Returns a dictionary of matched attributes with the arguments for the _operate method. This aids in parallel execution
Returns:
dict: A dictionary of matched attributes with the arguments for the _operate method or blank dict if
_operate is a staticmethod. This is used for parallel execution of operations.
"""
raw_operate_method = inspect.getattr_static(self.__class__, "_operate")
if isinstance(raw_operate_method, staticmethod):
return self._matched_args(raw_operate_method.__func__)
else:
return {}
def _matched_args(self, func):
"""Return a dict of attributes that satisfy *func*'s signature."""
sig = inspect.signature(func)
matched = {}
for name, param in sig.parameters.items():
if (
name == "image"
): # The image provided by the user is always passed as the first argument.
continue
if hasattr(self, name):
value = getattr(self, name)
if isinstance(
value, types.MethodType
): # transform a bounded method into a pickleable object
value = functools.partial(value.__func__, self)
matched[name] = value
elif hasattr(self.__class__, name):
matched[name] = getattr(self.__class__, name)
elif param.default is not param.empty:
continue # default will be used
else:
raise AttributeError(
f"{self.__class__.__name__} lacks attribute '{name}' "
f"required by {func.__qualname__}",
)
return matched