Source code for optimeo.analysis

# Copyright (c) 2025 Colin BOUSIGE
# Contact: colin.bousige@cnrs.fr
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the MIT License as published by
# the Free Software Foundation, either version 3 of the License, or
# any later version. 

"""
The analysis module provides tools for data analysis and regression modeling.
The main workhorse is the `DataAnalysis` class, which allows for encoding categorical variables, performing regression analysis, and visualizing results.

It supports both linear regression using the `statsmodels` package and machine learning models from `sklearn`.
The class also provides methods for plotting Q-Q plots, box plots, histograms, and scatter plots.
It includes functionality for bootstrap resampling to estimate the variability of model coefficients.
The `DataAnalysis` class is designed to be flexible and extensible, allowing users to customize the regression analysis process.

You can see an example notebook [here](../examples/MLanalysis.ipynb).

"""


import numpy as np
import pandas as pd
import statsmodels.formula.api as smf
from scipy import stats
from scipy.stats import t
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from statsmodels.graphics.gofplots import qqplot
# import ML models and scaling functions
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression, RidgeCV, ElasticNetCV
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import root_mean_squared_error, r2_score
from sklearn.preprocessing import LabelEncoder
from sklearn.pipeline import make_pipeline
import seaborn as sns


[docs] class DataAnalysis: """ This class is used to analyze the data and perform regression analysis. Example ------- .. code-block:: python import pandas as pd from optimeo.analysis import DataAnalysis data = pd.read_csv('dataML.csv') factors = data.columns[:-1].tolist() response = data.columns[-1] analysis = DataAnalysis(data, factors, response) analysis.model_type = "ElasticNetCV" analysis.compute_ML_model() Parameters ---------- data : pd.DataFrame The input data. factors : list The list of factor variables. response : str The response variable. split_size : float, optional The proportion of the dataset to include in the test split. Default is `0.2`. model_type : str, optional The type of machine learning model to use. Default is None. Must be one of the following: `"ElasticNetCV"`, `"RidgeCV"`, `"LinearRegression"`, `"RandomForest"`, `"GaussianProcess"`, `"GradientBoosting"`. Attributes ---------- data : pd.DataFrame The input data. factors : list The list of factor variables. response : str The response variable. encoders : dict The encoders for categorical variables. dtypes : pd.Series The data types of the columns. linear_model : object The linear model object. equation : str The equation for the linear model, in the form `response ~ var1 + var2 + var1:var2`. model : object The machine learning model object. model_type : str The type of machine learning model to use. split_size : float The proportion of the dataset to include in the test split. Methods ------- encode_data() Encode categorical variables in input data. plot_qq() Plot a Q-Q plot of residuals. plot_boxplot() Plot response distribution as boxplot. plot_histogram() Plot response distribution as histogram. plot_scatter_response() Plot scatter response against factors. plot_corr() Plot factor correlation matrix. plot_pairplot_seaborn() Generate pairplot with seaborn. plot_pairplot_plotly() Generate pairplot with plotly. write_equation(order=1, quadratic=[]) Build formula string for statsmodels. compute_linear_model(order=1, quadratic=[]) Fit linear model with statsmodels. plot_linear_model() Visualize linear model effects. compute_ML_model(**kwargs) Fit machine-learning model. plot_ML_model(features_in_log=False) Plot ML model response surfaces. """ def __init__(self, data: pd.DataFrame, factors: list, response: str, split_size=.2, model_type=None): self._dtypes = None self._encoders = {} self._linear_model = None self._model = None self._equation = '' self._data = data self._factors = factors self._response = response self._model_type = model_type self._split_size = split_size self.encode_data() @property def data(self): """The input `pandas.DataFrame`.""" return self._data @data.setter def data(self, value): if not isinstance(value, pd.DataFrame): raise ValueError("Data must be a pandas DataFrame.") self._data = value
[docs] def encode_data(self): """ Called during initialization: encodes categorical variables in the data if there are any. Uses `LabelEncoder()` from `sklearn` to convert categorical variables to numerical values. Also drops rows with NaN values. """ self._dtypes = self._data.dtypes for factor in self._factors: if self._dtypes[factor] == 'object': le = LabelEncoder() self._encoders[factor] = le self._data[factor] = le.fit_transform([str(d) for d in self._data[factor]]) self.data = self.data[self._factors + [self._response]] self.data = self.data.dropna(axis=0, how='any')
@property def factors(self): """The list of names of the columns of the `data` DataFrame that contain factor variables.""" return self._factors @factors.setter def factors(self, value): if not isinstance(value, list): raise ValueError("Factors must be a list.") self._factors = value @property def encoders(self): """The list of encoders for categorical variables.""" return self._encoders @encoders.setter def encoders(self, value): if not isinstance(value, dict): raise ValueError("Encoders must be a dictionary.") self._encoders = value @property def dtypes(self): """Get the data types of the columns.""" return self._dtypes @dtypes.setter def dtypes(self, value): self._dtypes = value @property def response(self): """The name of the column of the `data` DataFrame that contain the response variable.""" return self._response @response.setter def response(self, value): if not isinstance(value, str): raise ValueError("Response must be a string.") self._response = value @property def linear_model(self): """Get the linear model.""" return self._linear_model @linear_model.setter def linear_model(self, value): self._linear_model = value @property def equation(self): """The equation for the linear model, in the form ``response ~ var1 + var2 + var1:var2``. See statsmodels formula examples: https://www.statsmodels.org/dev/examples/notebooks/generated/formulas.html """ return self._equation @equation.setter def equation(self, value): if not isinstance(value, str): raise ValueError("Equation must be a string.") self._equation = value @property def model_type(self): """The type of machine learning model to use. Default is None. Must be one of the following: `"ElasticNetCV"`, `"RidgeCV"`, `"LinearRegression"`, `"RandomForest"`, `"GaussianProcess"`, `"GradientBoosting"`.""" return self._model_type @model_type.setter def model_type(self, value): if value is not None and value not in ["ElasticNetCV", "RidgeCV", "LinearRegression", "RandomForest", "GaussianProcess", "GradientBoosting"]: raise ValueError("Model must be one of the following: " "ElasticNetCV, RidgeCV, LinearRegression, " "RandomForest, GaussianProcess, GradientBoosting.") self._model_type = value @property def model(self): """The machine learning model object.""" return self._model @model.setter def model(self, value): self._model = value @property def split_size(self): """The proportion of the dataset to include in the test split. Default is `0.2`.""" return self._split_size @split_size.setter def split_size(self, value): if not isinstance(value, (int, float)): raise ValueError("Split size must be a number.") self._split_size = value def __str__(self): """Return a string representation of the DataAnalysis object.""" return (f"DataAnalysis(data={self._data.shape}, " f"factors={self._factors}, " f"response={self._response}, " f"model_type={self._model_type}, " f"split_size={self._split_size}, " f"encoders={self._encoders})" ) def __repr__(self): """Return a string representation of the DataAnalysis object.""" return self.__str__()
[docs] def plot_qq(self): """ Plot a Q-Q plot for the response variable. Returns ------- fig : plotly.graph_objs.Figure The Q-Q plot figure. """ qqplot_data = pd.Series(self._data[self._response], copy=True) qqplot_data = qqplot(qqplot_data, line='s').gca().lines fig = go.Figure() fig.add_trace({ 'type': 'scatter', 'x': qqplot_data[0].get_xdata(), 'y': qqplot_data[0].get_ydata(), 'mode': 'markers', }) fig.add_trace({ 'type': 'scatter', 'x': qqplot_data[1].get_xdata(), 'y': qqplot_data[1].get_ydata(), 'mode': 'lines', 'line': { 'color': 'red' } }) fig.update_layout( title='Q-Q Plot', xaxis_title='Theoretical Quantiles', yaxis_title='Sample Quantiles', plot_bgcolor="white", # White background showlegend=False, margin=dict(l=10, r=10, t=50, b=50), xaxis=dict( showgrid=True, # Enable grid gridcolor="lightgray", # Light gray grid lines zeroline=False, zerolinecolor="black", # Black zero line showline=True, linewidth=1, linecolor="black", # Black border mirror=True ), yaxis=dict( showgrid=True, # Enable grid gridcolor="lightgray", # Light gray grid lines zeroline=False, zerolinecolor="black", # Black zero line showline=True, linewidth=1, linecolor="black", # Black border mirror=True ), height=300, ) return fig
[docs] def plot_boxplot(self): """ Plot a boxplot for the response variable. Returns ------- fig : plotly.graph_objs.Figure The boxplot figure. """ fig = px.box(self._data, y=self._response, points="all", title='Box Plot') fig.update_layout( plot_bgcolor="white", # White background legend=dict(bgcolor='rgba(0,0,0,0)'), margin=dict(l=10, r=10, t=50, b=50), xaxis=dict( showgrid=True, # Enable grid gridcolor="lightgray", # Light gray grid lines zeroline=False, zerolinecolor="black", # Black zero line showline=True, linewidth=1, linecolor="black", # Black border mirror=True ), yaxis=dict( showgrid=True, # Enable grid gridcolor="lightgray", # Light gray grid lines zeroline=False, zerolinecolor="black", # Black zero line showline=True, linewidth=1, linecolor="black", # Black border mirror=True ), height=300, ) return fig
[docs] def plot_histogram(self): """ Plot a histogram for the response variable. Returns ------- fig : plotly.graph_objs.Figure The histogram figure. """ fig = px.histogram(self._data, x=self._response, title='Histogram') fig.update_layout( plot_bgcolor="white", # White background legend=dict(bgcolor='rgba(0,0,0,0)'), margin=dict(l=10, r=10, t=50, b=50), xaxis=dict( showgrid=True, # Enable grid gridcolor="lightgray", # Light gray grid lines zeroline=False, zerolinecolor="black", # Black zero line showline=True, linewidth=1, linecolor="black", # Black border mirror=True ), yaxis=dict( showgrid=True, # Enable grid gridcolor="lightgray", # Light gray grid lines zeroline=False, zerolinecolor="black", # Black zero line showline=True, linewidth=1, linecolor="black", # Black border mirror=True ), height=300, ) return fig
[docs] def plot_scatter_response(self): """ Plot a scatter plot for the response variable. Returns ------- fig : plotly.graph_objs.Figure The scatter plot figure. """ fig = px.scatter(x=np.arange(1, len(self._data[self._response]) + 1), y=self._data[self._response], labels={'x': 'Measurement number', 'y': self._response}, title='Scatter Plot') fig.update_layout( plot_bgcolor="white", # White background legend=dict(bgcolor='rgba(0,0,0,0)'), margin=dict(l=10, r=10, t=50, b=50), xaxis=dict( showgrid=True, # Enable grid gridcolor="lightgray", # Light gray grid lines zeroline=False, zerolinecolor="black", # Black zero line showline=True, linewidth=1, linecolor="black", # Black border mirror=True ), yaxis=dict( showgrid=True, # Enable grid gridcolor="lightgray", # Light gray grid lines zeroline=False, zerolinecolor="black", # Black zero line showline=True, linewidth=1, linecolor="black", # Black border mirror=True ), height=300, ) return fig
[docs] def plot_pairplot_seaborn(self): """ Plot a pairplot for the data using seaborn. Returns ------- fig : seaborn.axisgrid.PairGrid The pairplot figure. """ fig = sns.pairplot( self._data, kind="reg", diag_kind="kde", plot_kws={"scatter_kws": {"alpha": 0.1}}, ) return fig
[docs] def plot_pairplot_plotly(self): """ Plot a pairplot for the data using plotly. Returns ------- fig : plotly.graph_objs.Figure The plotly figure. """ # Get the column names columns = self._data.columns n_cols = len(columns) # Create subplot grid with partially shared axes fig = make_subplots( rows=n_cols, cols=n_cols, shared_xaxes=True, shared_yaxes=True, vertical_spacing=0.05, horizontal_spacing=0.05 ) # Add scatter plots and regression lines to each subplot for i in range(n_cols): for j in range(n_cols): x_data = self._data[columns[j]] y_data = self._data[columns[i]] if i == j: # Diagonal: Add KDE plot # Calculate KDE data = self._data[columns[i]].dropna() if len(data) > 1: # Ensure enough data points kde_x = np.linspace(data.min(), data.max(), 100) kde = stats.gaussian_kde(data) kde_y = kde(kde_x) # Scale to match y-axis (needed for top left corner) kde_y = kde_y / kde_y.max() * np.max(y_data.dropna()) # Add KDE plot to diagonal fig.add_trace( go.Scatter( x=kde_x, y=kde_y, mode='lines', fill='tozeroy', line=dict(color='rgba(29, 81, 189, 1)'), showlegend=False ), row=i+1, col=j+1 ) # Ensure the y-axis is independent for diagonal plots # don't know why, doesn't work with top left corner fig.update_yaxes( matches=None, # Ensure independent y-axis showticklabels=True, # Show y-axis values row=i+1, col=j+1 ) else: # Off-diagonal: Add scatter plot with regression line # Add scatter plot fig.add_trace( go.Scatter( x=x_data, y=y_data, mode='markers', marker=dict( color='rgba(29, 81, 189, 0.5)', size=5 ), showlegend=False ), row=i+1, col=j+1 ) # Calculate and add regression line if len(x_data.dropna()) > 1 and len(y_data.dropna()) > 1: # Drop NaN values for regression calculation valid_indices = x_data.notna() & y_data.notna() x_clean = x_data[valid_indices] y_clean = y_data[valid_indices] ytype = y_clean.dtype xtype = x_clean.dtype unique_x = x_clean.unique() if len(x_clean) > 1 and len(unique_x)>1 and ytype != 'object' and xtype != 'object': # Calculate regression parameters slope, intercept, r_value, p_value, std_err = stats.linregress(x_clean, y_clean) x_range = np.linspace(x_clean.min(), x_clean.max(), 100) y_pred = slope * x_range + intercept # Standard error of estimate y_fit = slope * x_clean + intercept residuals = y_clean - y_fit dof = len(x_clean) - 2 residual_std_error = np.sqrt(np.sum(residuals**2) / dof) mean_x = np.mean(x_clean) t_val = t.ppf(0.975, dof) # 95% confidence se_line = residual_std_error * np.sqrt(1/len(x_clean) + (x_range - mean_x)**2 / np.sum((x_clean - mean_x)**2)) y_upper = y_pred + t_val * se_line y_lower = y_pred - t_val * se_line # Add regression line fig.add_trace( go.Scatter( x=x_range, y=y_pred, mode='lines', line=dict(color='red', width=2), showlegend=False ), row=i+1, col=j+1 ) # Add confidence interval area fig.add_trace( go.Scatter( x=np.concatenate([x_range, x_range[::-1]]), y=np.concatenate([y_upper, y_lower[::-1]]), fill='toself', fillcolor='rgba(255, 0, 0, 0.2)', line=dict(color='rgba(255, 0, 0, 0)'), showlegend=False ), row=i+1, col=j+1 ) # Update layout and axis properties fig.update_layout( margin=dict(l=10, r=10, t=50, b=50), title="Pair Plot", height=600, width=600, showlegend=False, plot_bgcolor="white" ) # Update all axes properties first (hiding tick labels by default) fig.update_xaxes( showgrid=True, gridcolor="lightgray", zeroline=False, showline=True, linewidth=1, linecolor="black", mirror=True, showticklabels=False ) fig.update_yaxes( showgrid=True, gridcolor="lightgray", zeroline=False, showline=True, linewidth=1, linecolor="black", mirror=True, showticklabels=False ) # Show tick labels and titles only for bottom row and leftmost column for i, col_name in enumerate(columns): # Bottom row: show x-axis titles and tick labels fig.update_xaxes( title_text=col_name, showticklabels=True, row=n_cols, col=i+1 ) # Leftmost column: show y-axis titles and tick labels fig.update_yaxes( title_text=col_name, showticklabels=True, row=i+1, col=1 ) return fig
[docs] def plot_corr(self): """ Plot a correlation matrix for the data. Returns ------- fig : seaborn.axisgrid.PairGrid The pairplot figure. """ corr_matrix = self._data.corr(method='pearson') mask = np.triu(np.ones_like(corr_matrix, dtype=bool), k=1) # Apply mask - replace upper triangle with NaN values corr_matrix_lower = corr_matrix.copy() corr_matrix_lower.values[mask] = np.nan fig = px.imshow( corr_matrix_lower, text_auto='.2f', # Display correlation values color_continuous_scale='RdBu_r', # Red to Blue color scale (reversed) zmin=-1, # Minimum correlation value zmax=1, # Maximum correlation value aspect="auto", # Keep aspect ratio adaptive title="Pearson Correlation Heatmap" ) # Customize hover template fig.update_traces( hovertemplate='<b>%{y} vs %{x}</b><br>Correlation: %{z}<extra></extra>' ) # Improve layout fig.update_layout( coloraxis_colorbar=dict( title="Correlation", tickvals=[-1, -0.5, 0, 0.5, 1], ticktext=["-1", "-0.5", "0", "0.5", "1"], ), plot_bgcolor="white", # White background legend=dict(bgcolor='rgba(0,0,0,0)'), margin=dict(l=10, r=10, t=50, b=50), xaxis=dict( showgrid=False, # Enable grid gridcolor="lightgray", # Light gray grid lines zeroline=False, zerolinecolor="black", # Black zero line showline=False, linewidth=1, tickangle=-45, linecolor="black", # Black border mirror=True ), yaxis=dict( showgrid=False, # Enable grid gridcolor="lightgray", # Light gray grid lines zeroline=False, zerolinecolor="black", # Black zero line showline=False, linewidth=1, linecolor="black", # Black border mirror=True ), height=600, width=600 ) return fig
[docs] def write_equation(self, order=1, quadratic=[]): """ Write R-style equation for multivariate fitting procedure using the statsmodels package. Parameters ---------- order : int, optional The order of the polynomial. Default is 1. quadratic : list, optional The list of quadratic factors. Default is an empty list. Returns ------- str The R-style equation. """ myfactors = self._factors.copy() if self._dtypes is not None: if not isinstance(myfactors, list): myfactors = myfactors.tolist() # Convert to list to allow mutable operations for i in range(len(self._factors)): if self._dtypes[self._factors[i]] == 'object': myfactors[i] = f'C({myfactors[i]})' eqn = f'{self._response} ~ {myfactors[0]} ' for factor in myfactors[1:]: eqn += f'+ {factor} ' if order > 1: for i in range(len(myfactors)): for j in range(i + 1, len(myfactors)): eqn += f'+ {myfactors[i]}:{myfactors[j]} ' if order > 2: for i in range(len(myfactors)): for j in range(i + 1, len(myfactors)): for k in range(j + 1, len(myfactors)): eqn += f'+ {myfactors[i]}:{myfactors[j]}:{myfactors[k]} ' if order > 3: for i in range(len(myfactors)): for j in range(i + 1, len(myfactors)): for k in range(j + 1, len(myfactors)): for l in range(k + 1, len(myfactors)): eqn += f'+ {myfactors[i]}:{myfactors[j]}:{myfactors[k]}:{myfactors[l]} ' if len(quadratic) > 0: for factor in quadratic: eqn += f'+ I({factor}**2) ' self._equation = eqn return self._equation
[docs] def compute_linear_model(self, order=1, quadratic=[]): """ Compute the linear model using the statsmodels package. Parameters ---------- order : int, optional The order of the polynomial. Default is 1. The parameter is not used if the equation is already set. quadratic : list, optional The list of quadratic factors. Default is an empty list. The parameter is not used if the equation is already set. Returns ------- statsmodels.regression.linear_model.RegressionResultsWrapper The fitted linear model. """ if self._equation == '': eqn = self.write_equation(order=order, quadratic=quadratic) else: eqn = self._equation model = smf.ols(formula=eqn, data=self._data) self._linear_model = model.fit() return self._linear_model
[docs] def plot_linear_model(self): """ Plot the linear model using plotly. Returns ------- fig : list The list of plotly figures. """ if self._linear_model is None: raise Warning("Linear model has not been computed yet.") fig = [None] * 2 fig[0] = go.Figure() # Fig[0]: actual vs predicted line fig[0].add_trace(go.Scatter(x=self._data[self._response], y=self._linear_model.predict(), mode='markers', marker=dict(size=12), name='Actual vs Predicted')) # Add 1:1 line fig[0].add_shape(type="line", x0=min(self._data[self._response]), y0=min(self._data[self._response]), x1=max(self._data[self._response]), y1=max(self._data[self._response]), line=dict(color="Gray", width=1, dash="dash")) fig[0].update_layout( plot_bgcolor="white", # White background legend=dict(bgcolor='rgba(0,0,0,0)'), xaxis_title=f'Actual {self._response}', yaxis_title=f'Predicted {self._response}', height=500, # Adjust height as needed margin=dict(l=10, r=10, t=50, b=50), xaxis=dict( showgrid=True, # Enable grid gridcolor="lightgray", # Light gray grid lines zeroline=False, zerolinecolor="black", # Black zero line showline=True, linewidth=1, linecolor="black", # Black border mirror=True ), yaxis=dict( showgrid=True, # Enable grid gridcolor="lightgray", # Light gray grid lines zeroline=False, zerolinecolor="black", # Black zero line showline=True, linewidth=1, linecolor="black", # Black border mirror=True ) ) # # # # # # # # # # # # # # # # # # # # # # Fig[1]: slope values for each factor # # # # # # # # # # # # # # # # # # # # # # Plot: Slope values for each factor res = self._linear_model.params.rename_axis('terms').reset_index()[1:] res.columns = ['terms', 'slope'] error = self._linear_model.bse.rename_axis('terms').reset_index()[1:] error.columns = ['terms', 'error'] res['error'] = error['error'] res['pvalue'] = [self._linear_model.pvalues[res['terms']].iloc[i] for i in range(len(res))] # Sort by p-values res = res.sort_values(by='pvalue', ascending=False) # Prepare colors and labels colors = ['red' if x < 0 else 'green' for x in res['slope']] res['slope'] = res['slope'].abs() fig[1] = go.Figure() # Add bar plot fig[1].add_trace(go.Bar( y=[term.replace('I(', '').replace('C(', '').replace(')', '').replace(' ** ', '^') for term in res['terms']], x=res['slope'], error_x=dict(type='data', array=res['error']), marker_color=colors, orientation='h', name='Slope', showlegend=False # Hide the legend entry for the bar trace )) # Update layout for log scale and labels fig[1].update_layout( xaxis_title='Magnitude of effect', xaxis_type="log", height=500, # Adjust height as needed plot_bgcolor="white", # White background legend=dict( orientation="h", # Horizontal orientation y=1.1 # Place legend on top ), margin=dict(l=10, r=150, t=50, b=50), # Increase right margin for annotations xaxis=dict( showgrid=True, # Enable grid gridcolor="lightgray", # Light gray grid lines zeroline=False, zerolinecolor="black", # Black zero line showline=True, linewidth=1, linecolor="black", # Black border mirror=True ), yaxis=dict( showgrid=True, # Enable grid gridcolor="lightgray", # Light gray grid lines zeroline=False, zerolinecolor="black", # Black zero line showline=True, linewidth=1, linecolor="black", # Black border mirror=True ) ) # Add legend for Negative and Positive fig[1].add_trace(go.Scatter( x=[None], y=[None], mode='markers', marker=dict(size=10, color='red'), name='Negative' )) fig[1].add_trace(go.Scatter( x=[None], y=[None], mode='markers', marker=dict(size=10, color='green'), name='Positive' )) # Add p-values as annotations outside the plot for i, p in enumerate(res['slope']): fig[1].add_annotation( x=1.025, # Place annotations outside the plot y=i, text=f"p = {self._linear_model.pvalues[res['terms'].iloc[i]]:.2g}", showarrow=False, xref="paper", # Use paper coordinates for x xanchor='left' ) return fig
[docs] def compute_ML_model(self, **kwargs): """ Compute the machine learning model. Parameters ---------- kwargs : dict Additional keyword arguments for the model. Overrides default parameters. Default parameters by model type -------------------------------- - ElasticNetCV: - l1_ratio : list, default=[0.1, 0.5, 0.7, 0.9, 0.95, 0.99, 1.0]. List of L1 ratios to try. - cv : int, default=5. Cross-validation folds. - max_iter : int, default=1000. Maximum iterations. - RidgeCV: - alphas : list, default=[0.1, 1.0, 10.0]. List of alpha values to try. - cv : int, default=5. Cross-validation folds. - LinearRegression: - fit_intercept : bool, default=True. Whether to calculate the intercept. - RandomForest: - n_estimators : int, default=100. Number of trees in the forest. - max_depth : int or None, default=None. Maximum depth of trees. - min_samples_split : int, default=2. Minimum samples required to split a node. - random_state : int, default=42. Random seed for reproducibility. - GaussianProcess: - kernel : kernel object, default=None. Kernel for the Gaussian Process. - alpha : float, default=1e-10. Value added to diagonal of kernel matrix. - normalize_y : bool, default=True. Normalize target values. - random_state : int, default=42. Random seed for reproducibility. - GradientBoosting: - n_estimators : int, default=100. Number of boosting stages. - learning_rate : float, default=0.1. Learning rate. - max_depth : int, default=3. Maximum depth of trees. - random_state : int, default=42. Random seed for reproducibility. Returns ------- object The fitted machine learning model. """ if self._model_type is None: raise ValueError("Model must be provided.") X = self._data[self._factors] y = self._data[self._response] if self._split_size > 0: X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=self._split_size, random_state=42) else: X_train, X_test, y_train, y_test = X, X, y, y # Default parameters for each model type default_params = { "ElasticNetCV": {"l1_ratio": [0.1, 0.5, 0.7, 0.9, 0.95, 0.99, 1.0], "cv": 5, "max_iter": 1000}, "RidgeCV": {"alphas": [0.1, 1.0, 10.0], "cv": 5}, "LinearRegression": {"fit_intercept": True}, "RandomForest": {"n_estimators": 100, "max_depth": None, "min_samples_split": 2, "random_state": 42}, "GaussianProcess": {"kernel": None, "alpha": 1e-10, "normalize_y": True, "random_state": 42}, "GradientBoosting": {"n_estimators": 100, "learning_rate": 0.1, "max_depth": 3, "random_state": 42} } # Get default parameters for the selected model model_defaults = default_params.get(self._model_type, {}) # Override defaults with any provided kwargs model_params = {**model_defaults, **kwargs} if self._model_type == "ElasticNetCV": self._model = make_pipeline(StandardScaler(), ElasticNetCV(**model_params)) elif self._model_type == "RidgeCV": self._model = make_pipeline(StandardScaler(), RidgeCV(**model_params)) elif self._model_type == "LinearRegression": self._model = make_pipeline(StandardScaler(), LinearRegression(**model_params)) elif self._model_type == "RandomForest": self._model = make_pipeline(StandardScaler(), RandomForestRegressor(**model_params)) elif self._model_type == "GaussianProcess": self._model = make_pipeline(StandardScaler(), GaussianProcessRegressor(**model_params)) elif self._model_type == "GradientBoosting": self._model = make_pipeline(StandardScaler(), GradientBoostingRegressor(**model_params)) # Fit the model self._model.fit(X_train, y_train) return self._model
[docs] def plot_ML_model(self, features_in_log=False): """ Plot the machine learning model using plotly. Parameters ---------- features_in_log : bool, optional Whether to plot the feature importances in log scale. Default is False. Returns ------- fig : list The list of plotly figures. """ coef_names = self._data[self._factors].columns X = self._data[self._factors] y = self._data[self._response] if self._split_size > 0: X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=self._split_size) else: X_train, X_test, y_train, y_test = X, X, y, y bootstrap_coefs = bootstrap_coefficients( self._model[1], X, y, n_bootstrap=100, random_state=42) # Calculate mean and standard deviation of coefficients mean_coefs = np.mean(bootstrap_coefs, axis=0) std_coefs = np.std(bootstrap_coefs, axis=0) # make the fig fig = [None] * 2 fig[0] = go.Figure() # Add actual vs predicted line fig[0].add_trace(go.Scatter(x=y_train, y=self._model.predict(X_train), mode='markers', marker=dict(size=12, color='royalblue'), name='Training')) if self._split_size > 0: fig[0].add_trace(go.Scatter(x=y_test, y=self._model.predict(X_test), mode='markers', marker=dict(size=12, color='orange'), name='Validation')) # Add 1:1 line fig[0].add_shape(type="line", x0=min(y_train), y0=min(y_train), x1=max(y_train), y1=max(y_train), line=dict(color="Gray", width=1, dash="dash")) fig[0].update_layout( plot_bgcolor="white", # White background legend=dict(bgcolor='rgba(0,0,0,0)'), xaxis_title=f'Actual {self._response}', yaxis_title=f'Predicted {self._response}', height=500, # Adjust height as needed margin=dict(l=10, r=10, t=120, b=50), xaxis=dict( showgrid=True, # Enable grid gridcolor="lightgray", # Light gray grid lines zeroline=False, zerolinecolor="black", # Black zero line showline=True, linewidth=1, linecolor="black", # Black border mirror=True ), yaxis=dict( showgrid=True, # Enable grid gridcolor="lightgray", # Light gray grid lines zeroline=False, zerolinecolor="black", # Black zero line showline=True, linewidth=1, linecolor="black", # Black border mirror=True ) ) # add the score in title fig[0].update_layout( title={ 'text': f"Predicted vs Actual for {str(self._model[1])}<br>R²_test = {r2_score(y_test, self._model.predict(X_test)):.4f} - R²_train = {r2_score(y_train, self._model.predict(X_train)):.4f}<br>RMSE_test = {root_mean_squared_error(y_test, self._model.predict(X_test)):.4f} - RMSE_train = {root_mean_squared_error(y_train, self._model.predict(X_train)):.4f}", 'x': 0.45, 'xanchor': 'center' } ) # make feature importance plot if self._model_type != "GaussianProcess": fig[1] = go.Figure() pos_coefs = mean_coefs[mean_coefs > 0] pos_coefs_names = coef_names[mean_coefs > 0] neg_coefs = mean_coefs[mean_coefs < 0] neg_coefs_names = coef_names[mean_coefs < 0] # Add bars for positive mean coefficients fig[1].add_trace(go.Bar( y=[f"{pos_coefs_names[i]}" for i in range(len(pos_coefs))], x=mean_coefs[mean_coefs > 0], error_x=dict(type='data', array=std_coefs[mean_coefs > 0], visible=True), orientation='h', marker_color='royalblue', name='Positive' )) fig[1].add_trace(go.Bar( y=[f"{neg_coefs_names[i]}" for i in range(len(neg_coefs))], x=-mean_coefs[mean_coefs < 0], error_x=dict(type='data', array=std_coefs[mean_coefs < 0], visible=True), orientation='h', marker_color='orange', name='Negative' )) # Update layout fig[1].update_layout( title={ 'text': f"Features importance for {str(self._model[1])}", 'x': 0.5, 'xanchor': 'center' }, xaxis_title="Coefficient Value", yaxis_title="Features", barmode='relative', margin=dict(l=150) ) if features_in_log: fig[1].update_xaxes(type="log") else: fig[1].update_xaxes(type="linear") fig[1].update_layout( plot_bgcolor="white", # White background legend=dict(bgcolor='rgba(0,0,0,0)'), height=500, # Adjust height as needed margin=dict(l=10, r=10, t=120, b=50), xaxis=dict( showgrid=True, # Enable grid gridcolor="lightgray", # Light gray grid lines zeroline=False, zerolinecolor="black", # Black zero line showline=True, linewidth=1, linecolor="black", # Black border mirror=True ), yaxis=dict( showgrid=True, # Enable grid gridcolor="lightgray", # Light gray grid lines zeroline=False, zerolinecolor="black", # Black zero line showline=True, linewidth=1, linecolor="black", # Black border mirror=True ) ) else: print('GaussianProcess does not have coefficients or feature importances') return fig
[docs] def predict(self, X=None, model='all'): """ Predict using the machine learning model and the linear model, if they are trained. Use the encoders to encode the data. If X is not provided, use the original data. If the model has not been trained, raise a warning. Parameters ---------- X : pd.DataFrame, optional The input features. Default is None, which uses the original data. Returns ------- pd.DataFrame The predicted values with a column indicating the model used. """ if X is None: X = self._data[self._factors].copy() else: X = X.copy() # Encode the input data using the same encoders for factor in self._factors: if factor in self._encoders: X[factor] = self._encoders[factor].transform(X[factor].astype(str)) predML = None predlin = None if self._model is not None: # Check if the model is fitted if not hasattr(self._model, 'predict'): raise Warning("Model has not been trained yet.") predML = self._model.predict(X) predML = pd.DataFrame(predML, columns=['prediction']) predML['model'] = self._model_type if self._linear_model is not None: # Check if the linear model is fitted if not hasattr(self._linear_model, 'predict'): raise Warning("Linear model has not been trained yet.") predlin = self._linear_model.predict(X) predlin = pd.DataFrame(predlin, columns=['prediction']) predlin['model'] = 'Linear Model' if predML is not None and predlin is not None and model == 'all': # Combine the predictions on top of each other pred = pd.concat([predML, predlin], axis=0) pred = pred.reset_index(drop=True) return pred elif predML is not None and model == 'ML': return predML elif predlin is not None and model == 'linear': return predlin else: raise Warning("No model has been trained yet.")
[docs] def bootstrap_coefficients(mod, X, y, n_bootstrap=100, random_state=None): """ Perform bootstrap resampling to estimate the variability of model coefficients. Parameters ---------- mod : object The machine learning model. X : pd.DataFrame The input features. y : pd.Series The target variable. n_bootstrap : int, optional The number of bootstrap samples. Default is 100. random_state : int, optional The seed for the random number generator. Returns ------- results : np.ndarray The bootstrapped coefficients. """ np.random.seed(random_state) n_samples = X.shape[0] results = [] for _ in range(n_bootstrap): # Resample the data indices = np.random.choice(np.arange(n_samples), size=n_samples, replace=True) X_resample, y_resample = X.values[indices], y.values[indices] # Fit the model if isinstance(mod, RidgeCV): model = RidgeCV(alphas=np.logspace(-3, 3, 10)).fit(X_resample, y_resample) results.append(model.coef_) elif isinstance(mod, ElasticNetCV): model = ElasticNetCV(alphas=np.logspace(-3, 3, 10), l1_ratio=0.5).fit(X_resample, y_resample) results.append(model.coef_) elif isinstance(mod, LinearRegression): model = LinearRegression().fit(X_resample, y_resample) results.append(model.coef_) elif isinstance(mod, RandomForestRegressor): model = RandomForestRegressor().fit(X_resample, y_resample) results.append(model.feature_importances_) elif isinstance(mod, GradientBoostingRegressor): model = GradientBoostingRegressor().fit(X_resample, y_resample) results.append(model.feature_importances_) elif isinstance(mod, GaussianProcessRegressor): # Gaussian Process does not have coefficients or feature importances model = GaussianProcessRegressor().fit(X_resample, y_resample) results.append(np.zeros(X.shape[1])) # Placeholder for Gaussian Process else: raise ValueError(f"Unsupported model type: {mod}") return np.array(results)