from __future__ import annotations
import numpy as np
import pandas as pd
from joblib import delayed, Parallel
import matplotlib.pyplot as plt
from matplotlib.figure import Figure
from .abc_ import SetAnalyzer
[docs]
class TukeyOutlierRemover(SetAnalyzer):
"""Analyzer for removing outliers using Tukey's fence method.
This class removes outliers from measurement data by applying Tukey's fence test
within groups. The method calculates the interquartile range (IQR) and removes values
that fall outside Q1 - k*IQR or Q3 + k*IQR, where k is a tunable multiplier
(typically 1.5 for outliers or 3.0 for extreme outliers).
Args:
on: Name of measurement column to test for outliers (e.g., 'Shape_Area', 'Intensity_IntegratedIntensity').
groupby: List of column names to group by (e.g., ['ImageName', 'Metadata_Plate']).
k: IQR multiplier for fence calculation. Default is 1.5 (standard outliers).
Use 3.0 for extreme outliers only.
num_workers: Number of parallel workers. Default is 1.
Attributes:
groupby: List of column names to group by.
on: Column to test for outliers.
k: IQR multiplier used for fence calculation.
num_workers: Number of parallel workers. Default is 1.
Examples:
.. dropdown:: Remove outliers and visualize results
.. code-block:: python
import pandas as pd
import numpy as np
from phenotypic.analysis import TukeyOutlierRemover
# Create sample data with some outliers
np.random.seed(42)
data = pd.DataFrame({
'ImageName': ['img1'] * 50 + ['img2'] * 50,
'Area': np.concatenate([
np.random.normal(200, 30, 48),
[500, 550], # outliers in img1
np.random.normal(180, 25, 48),
[50, 600] # outliers in img2
])
})
# Initialize detector
detector = TukeyOutlierRemover(
on='Area',
groupby=['ImageName'],
k=1.5
)
# Remove outliers
filtered_data = detector.analyze(data)
# Check how many were removed
print(f"Original: {len(data)}, Filtered: {len(filtered_data)}")
# Visualize removed outliers
fig = detector.show()
"""
[docs]
def __init__(
self, on: str, groupby: list[str], k: float = 1.5, num_workers: int = 1
):
"""Initialize TukeyOutlierRemover with test parameters.
Args:
on: Column name for grouping/aggregation operations.
groupby: List of column names to group by.
measurement_col: Name of measurement column to test for outliers.
k: IQR multiplier for fence calculation. Default is 1.5.
agg_func: Aggregation function. Default is 'mean'.
num_workers: Number of workers. Default is 1.
Raises:
ValueError: If k is not positive.
"""
super().__init__(on=on, groupby=groupby, agg_func=None, num_workers=num_workers)
if k <= 0:
raise ValueError(f"k must be positive, got {k}")
self.k = k
self._original_data: pd.DataFrame = pd.DataFrame()
[docs]
def analyze(self, data: pd.DataFrame) -> pd.DataFrame:
"""Remove outliers from data using Tukey's fence method.
This method processes the input DataFrame by grouping according to specified
columns and removing outliers within each group independently. Outliers are
identified using the IQR method and filtered out. The original data is stored
internally for visualization purposes.
Args:
data: DataFrame containing measurement data. Must include all columns
specified in self.groupby and self.on.
Returns:
DataFrame with outliers removed. Contains only the original columns
(no additional outlier flag columns).
Raises:
KeyError: If required columns are missing from input DataFrame.
ValueError: If data is empty or malformed.
Examples:
.. dropdown:: Analyze and filter outliers from measurement data
.. code-block:: python
import pandas as pd
import numpy as np
from phenotypic.analysis import TukeyOutlierRemover
# Create sample data
np.random.seed(42)
data = pd.DataFrame({
'ImageName': ['img1'] * 100,
'Area': np.concatenate([
np.random.normal(200, 30, 98),
[500, 50] # outliers
])
})
# Remove outliers
detector = TukeyOutlierRemover(
on='Area',
groupby=['ImageName'],
k=1.5
)
filtered_data = detector.analyze(data)
# Check results
print(f"Original: {len(data)} rows, Filtered: {len(filtered_data)} rows")
print(f"Removed {len(data) - len(filtered_data)} outliers")
Notes:
- Stores original data in self._original_data for visualization
- Stores filtered results in self._latest_measurements for retrieval
- Groups are processed independently with their own fences
- NaN values in measurement column are preserved in output
"""
# Validate input
if data is None or len(data) == 0:
raise ValueError("Input data cannot be empty")
# Store original data for visualization
self._original_data = data.copy()
# Check required columns
required_cols = set(self.groupby + [self.on])
missing_cols = required_cols - set(data.columns)
if missing_cols:
raise KeyError(f"Missing required columns: {missing_cols}")
# Prepare configuration for _apply2group_func
config = {"k": self.k, "on": self.on}
# Apply outlier removal to each group
# Create groups
grouped = data.groupby(by=self.groupby, as_index=True)
if self.n_jobs == 1:
results = []
for key, group in grouped:
results.append(self.__class__._apply2group_func(key, group, **config))
else:
results = Parallel(n_jobs=self.n_jobs)(
delayed(self.__class__._apply2group_func)(key, group, **config)
for key, group in grouped
)
# Concatenate all group results
self._latest_measurements = pd.concat(results, ignore_index=True)
return self._latest_measurements
[docs]
def show(
self,
figsize: tuple[int, int] | None = None,
max_groups: int = 20,
collapsed: bool = True,
criteria: dict[str, any] | None = None,
**kwargs,
) -> (plt.Figure, plt.Axes):
"""Visualize outlier detection results.
Creates a visualization showing the distribution of values with outliers highlighted
and fence boundaries displayed. Can display as individual subplots or as a collapsed
stacked view with all groups in a single plot. Outlier flags are computed dynamically
for visualization only.
Args:
figsize: Figure size as (width, height). If None, automatically determined
based on number of groups and mode.
max_groups: Maximum number of groups to display. If there are more groups,
only the first max_groups will be shown. Default is 20.
collapsed: If True, show all groups stacked vertically in a single plot.
If False, show each group in its own subplot. Default is False.
criteria: Optional dictionary specifying filtering criteria for data selection.
When provided, only groups matching the criteria will be displayed.
Format: {'column_name': value} or {'column_name': [value1, value2]}.
Default is None (show all groups).
**kwargs: Additional matplotlib parameters to customize the plot. Common options include:
- dpi: Figure resolution (default 100)
- facecolor: Figure background color
- edgecolor: Figure edge color
- grid_alpha: Alpha value for grid lines (default 0.3)
- grid_axis: Which axis to apply grid to ('both', 'x', 'y')
- legend_loc: Legend location (default 'best')
- legend_fontsize: Font size for legend (default 8)
- marker_alpha: Alpha value for scatter plot markers
- line_width: Line width for box plots and fence lines
Returns:
Tuple of (Figure, Axes) containing the visualization.
Raises:
ValueError: If analyze() has not been called yet (no results to display).
KeyError: If criteria references columns not present in the data.
Examples:
.. dropdown:: Visualize outlier detection with multiple grouping options
.. code-block:: python
import pandas as pd
import numpy as np
from phenotypic.analysis import TukeyOutlierRemover
# Create sample data with multiple grouping columns
np.random.seed(42)
data = pd.DataFrame({
'ImageName': ['img1', 'img2'] * 50,
'Plate': ['P1'] * 50 + ['P2'] * 50,
'Area': np.concatenate([
np.random.normal(200, 30, 48), [500, 550],
np.random.normal(180, 25, 48), [50, 600]
])
})
# Remove outliers and visualize all groups
detector = TukeyOutlierRemover(
on='Area',
groupby=['Plate', 'ImageName'],
k=1.5
)
results = detector.analyze(data)
fig, axes = detector.show(figsize=(12, 5))
# Visualize only specific plate
fig, axes = detector.show(criteria={'Plate': 'P1'})
# Visualize specific images across plates using collapsed view
fig, ax = detector.show(criteria={'ImageName': 'img1'}, collapsed=True)
Notes:
Individual mode (collapsed=False):
- Each group gets its own subplot with box plot
- Outliers shown in red, normal values in blue
- Horizontal lines show fence boundaries
Collapsed mode (collapsed=True):
- All groups stacked vertically in single plot
- Each group shown as horizontal line with median marker
- Vertical bars show fence boundaries
- Normal points as circles, outliers as diamonds
- More compact for comparing many groups
Filtering with criteria:
- Only groups matching all criteria are displayed
- Useful for focusing on specific plates, conditions, or subsets
- Can be combined with both individual and collapsed modes
"""
if self._original_data.empty:
raise ValueError("No results to display. Call analyze() first.")
# Use original data for visualization and dynamically compute outlier flags
data = self._original_data.copy()
# Apply filtering if criteria provided
if criteria is not None:
data = self._filter_by(df=data, criteria=criteria, copy=False)
if data.empty:
raise ValueError("No data matches the specified criteria")
# Get unique groups
if len(self.groupby) == 1:
groups = data[self.groupby[0]].unique()
group_col = self.groupby[0]
else:
# Create a combined group identifier for multiple groupby columns
data["_group_key"] = data[self.groupby].astype(str).agg(" | ".join, axis=1)
groups = data["_group_key"].unique()
group_col = "_group_key"
# Limit number of groups if needed
if len(groups) > max_groups:
groups = groups[:max_groups]
print(
f"Warning: Displaying only first {max_groups} of {len(data[group_col].unique())} groups"
)
# Branch based on visualization mode
if collapsed:
return self._show_collapsed(data, groups, group_col, figsize, **kwargs)
else:
return self._show_individual(data, groups, group_col, figsize, **kwargs)
def _show_individual(
self,
data: pd.DataFrame,
groups,
group_col: str,
figsize: tuple[int, int] | None,
**kwargs,
) -> (plt.Figure, plt.Axes):
"""Create individual subplots for each group."""
# Extract figure-level kwargs
fig_kwargs = {
k: v for k, v in kwargs.items() if k in ("dpi", "facecolor", "edgecolor")
}
grid_alpha = kwargs.get("grid_alpha", 0.3)
grid_axis = kwargs.get("grid_axis", "y")
legend_fontsize = kwargs.get("legend_fontsize", 8)
# Calculate layout
n_groups = len(groups)
n_cols = min(3, n_groups)
n_rows = (n_groups + n_cols - 1) // n_cols
# Set figure size
if figsize is None:
figsize = (5 * n_cols, 4 * n_rows)
fig, axes = plt.subplots(
n_rows, n_cols, figsize=figsize, squeeze=False, **fig_kwargs
)
axes = axes.flatten()
total_outliers = 0
total_count = 0
# Plot each group
for idx, group_name in enumerate(groups):
ax = axes[idx]
group_data = data[data[group_col] == group_name].copy()
# Dynamically compute outlier flags for this group
values = group_data[self.on].values
q1 = np.percentile(values, 25)
q3 = np.percentile(values, 75)
iqr = q3 - q1
lower_fence = q1 - (iqr * self.k)
upper_fence = q3 + (iqr * self.k)
is_outlier = (values < lower_fence) | (values > upper_fence)
group_data["_is_outlier"] = is_outlier
# Separate inliers and outliers
inliers = group_data[~group_data["_is_outlier"]]
outliers = group_data[group_data["_is_outlier"]]
total_outliers += len(outliers)
total_count += len(group_data)
# Create x-coordinates for scatter plot
x_inliers = np.random.normal(1, 0.04, len(inliers))
x_outliers = np.random.normal(1, 0.04, len(outliers))
# Plot inliers
if len(inliers) > 0:
ax.scatter(
x_inliers,
inliers[self.on].values,
alpha=0.6,
s=40,
c="#2E86AB",
label="Normal",
zorder=3,
)
# Plot outliers
if len(outliers) > 0:
ax.scatter(
x_outliers,
outliers[self.on].values,
alpha=0.8,
s=50,
c="#E63946",
marker="D",
label="Outlier",
zorder=4,
)
# Create box plot
bp = ax.boxplot(
[values],
positions=[1],
widths=0.3,
patch_artist=True,
showfliers=False,
boxprops=dict(facecolor="lightgray", alpha=0.3),
medianprops=dict(color="black", linewidth=2),
)
# Add fence lines
ax.axhline(
y=lower_fence,
color="#F4A261",
linestyle="--",
linewidth=1.5,
label="Lower Fence",
zorder=2,
)
ax.axhline(
y=upper_fence,
color="#F4A261",
linestyle="--",
linewidth=1.5,
label="Upper Fence",
zorder=2,
)
# Formatting
ax.set_title(
f"{group_name}\n({len(outliers)} outliers / {len(group_data)} total)",
fontsize=10,
fontweight="bold",
)
ax.set_ylabel(self.on, fontsize=9)
ax.set_xticks([])
ax.grid(True, alpha=grid_alpha, axis=grid_axis)
# Add legend only to first subplot
if idx == 0:
handles, labels = ax.get_legend_handles_labels()
# Remove duplicate labels
by_label = dict(zip(labels, handles))
ax.legend(
by_label.values(),
by_label.keys(),
loc="best",
fontsize=legend_fontsize,
framealpha=0.9,
)
# Hide unused subplots
for idx in range(n_groups, len(axes)):
axes[idx].set_visible(False)
# Overall title
outlier_pct = 100 * total_outliers / total_count if total_count > 0 else 0
fig.suptitle(
f"Tukey Outlier Detection (k={self.k})\n"
f"{total_outliers} outliers detected ({outlier_pct:.1f}% of {total_count} measurements)",
fontsize=14,
fontweight="bold",
y=0.995,
)
plt.tight_layout()
return fig, axes
def _show_collapsed(
self,
data: pd.DataFrame,
groups,
group_col: str,
figsize: tuple[int, int] | None,
**kwargs,
) -> (plt.Figure, plt.Axes):
"""Create collapsed stacked view with all groups in single plot."""
# Extract figure-level kwargs
fig_kwargs = {
k: v for k, v in kwargs.items() if k in ("dpi", "facecolor", "edgecolor")
}
grid_alpha = kwargs.get("grid_alpha", 0.2)
legend_fontsize = kwargs.get("legend_fontsize", 9)
n_groups = len(groups)
# Set figure size
if figsize is None:
figsize = (10, max(6, 0.5 * n_groups + 2))
fig, ax = plt.subplots(1, 1, figsize=figsize, **fig_kwargs)
total_outliers = 0
total_count = 0
added_labels = set()
# Process each group and plot stacked vertically
for idx, group_name in enumerate(groups):
y_pos = n_groups - idx # Stack from top to bottom
group_data = data[data[group_col] == group_name].copy()
# Dynamically compute outlier flags for this group
values = group_data[self.on].values
q1 = np.percentile(values, 25)
q3 = np.percentile(values, 75)
iqr = q3 - q1
median = np.percentile(values, 50)
lower_fence = q1 - (iqr * self.k)
upper_fence = q3 + (iqr * self.k)
is_outlier = (values < lower_fence) | (values > upper_fence)
group_data["_is_outlier"] = is_outlier
# Separate inliers and outliers
inliers = group_data[~group_data["_is_outlier"]]
outliers = group_data[group_data["_is_outlier"]]
total_outliers += len(outliers)
total_count += len(group_data)
# Draw horizontal line for full data range
data_min = values.min()
data_max = values.max()
ax.hlines(
y_pos,
data_min,
data_max,
colors="lightgray",
linewidth=1.5,
alpha=0.6,
zorder=1,
)
# Add vertical tick marks for fences and mean
tick_height = 0.15 # Height of tick marks
# Lower fence tick
lbl = "Fences"
if lbl not in added_labels:
added_labels.add(lbl)
else:
lbl = None
ax.plot(
[lower_fence, lower_fence],
[y_pos - tick_height, y_pos + tick_height],
color="#F4A261",
linewidth=2.5,
linestyle="-",
label=lbl,
zorder=3,
)
# Upper fence tick
ax.plot(
[upper_fence, upper_fence],
[y_pos - tick_height, y_pos + tick_height],
color="#F4A261",
linewidth=2.5,
linestyle="-",
zorder=3,
)
# Median marker
lbl = "Median"
if lbl not in added_labels:
added_labels.add(lbl)
else:
lbl = None
ax.plot(
[median, median],
[y_pos - tick_height, y_pos + tick_height],
color="black",
linewidth=2.5,
linestyle="-",
label=lbl,
zorder=3,
)
# Create y-coordinates with jitter for scatter plot
y_inliers = np.random.normal(y_pos, 0.06, len(inliers))
y_outliers = np.random.normal(y_pos, 0.06, len(outliers))
# Plot inliers
if len(inliers) > 0:
lbl = "Normal"
if lbl not in added_labels:
added_labels.add(lbl)
else:
lbl = None
ax.scatter(
inliers[self.on].values,
y_inliers,
alpha=0.6,
s=30,
c="#2E86AB",
label=lbl,
zorder=4,
)
# Plot outliers
if len(outliers) > 0:
lbl = "Outlier"
if lbl not in added_labels:
added_labels.add(lbl)
else:
lbl = None
ax.scatter(
outliers[self.on].values,
y_outliers,
alpha=0.8,
s=35,
c="#E63946",
marker="D",
label=lbl,
zorder=5,
)
# Formatting
ax.set_yticks(range(1, n_groups + 1))
ax.set_yticklabels(groups[::-1]) # Reverse to match top-to-bottom order
ax.set_xlabel(self.on, fontsize=11, fontweight="bold")
ax.set_ylabel("Group", fontsize=11, fontweight="bold")
ax.grid(True, alpha=grid_alpha, axis="x")
ax.set_ylim(0.5, n_groups + 0.5)
# Add legend
ax.legend(loc="best", fontsize=legend_fontsize, framealpha=0.9)
# Overall title
outlier_pct = 100 * total_outliers / total_count if total_count > 0 else 0
fig.suptitle(
f"Tukey Outlier Detection (k={self.k})\n"
f"{total_outliers} outliers detected ({outlier_pct:.1f}% of {total_count} measurements)",
fontsize=13,
fontweight="bold",
)
plt.tight_layout()
return fig, ax
[docs]
def results(self) -> pd.DataFrame:
"""Return the filtered results (outliers removed).
Returns the DataFrame with outliers removed from the most recent call to analyze().
Returns:
DataFrame with outliers filtered out. Contains only the original columns
without additional outlier flag columns. If analyze() has not been called,
returns an empty DataFrame.
Examples:
.. dropdown:: Retrieve filtered results after analysis
.. code-block:: python
detector = TukeyOutlierRemover(
on='Area',
groupby=['ImageName']
)
filtered_data = detector.analyze(data)
results_copy = detector.results() # Same as filtered_data
assert results_copy.equals(filtered_data)
# Check how many rows were removed
num_removed = len(data) - len(filtered_data)
print(f"Removed {num_removed} outliers")
Notes:
- Returns the DataFrame stored in self._latest_measurements
- Contains only inliers (outliers have been removed)
- Use this method to retrieve results after calling analyze()
"""
return self._latest_measurements
@staticmethod
def _apply2group_func(key, group: pd.DataFrame, on: str, k: float) -> pd.DataFrame:
"""
Applies Tukey's outlier removal on a DataFrame group.
This static method filters out rows in the DataFrame group where the values of a specific
column (`on`) are considered outliers. Outliers are determined by the provided multiplier
for the IQR. Rows with values outside the range defined by the lower and upper thresholds
(calculated using the IQR method) are excluded.
Args:
key: The group key (not used but required for joblib).
group: A group of DataFrame rows to which the IQR-based filtering is applied.
on: The column in the DataFrame on which the IQR thresholding is computed.
k: The factor by which the IQR is multiplied to determine the
threshold for identifying outlier rows in the DataFrame group.
Returns:
Filtered DataFrame containing rows that fall within the calculated IQR thresholds.
"""
values = group[on]
q1 = np.percentile(values, 25)
q3 = np.percentile(values, 75)
iqr = q3 - q1
lower_fence = q1 - (iqr * k)
upper_fence = q3 + (iqr * k)
return group[(values >= lower_fence) & (values <= upper_fence)]