from typing import List
import lightning as pl
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torchmetrics
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import LabelEncoder, OneHotEncoder, StandardScaler
from torch.utils.data import DataLoader, TensorDataset, random_split
from torchmetrics.classification import BinaryAccuracy
class FeatureNN(nn.Module):
"""
Neural network model for predicting a single feature.
Args:
input_size (int, optional): Size of the input feature (default is 1).
output_size (int, optional): Size of the output feature (default is 1).
hidden_units (List[int], optional): List of hidden layer sizes (default is [64, 32]).
activation (nn.Module, optional): Activation function to use (default is nn.ReLU()).
dropout (float, optional): Dropout probability (default is 0.3).
Attributes:
model (nn.Sequential): Sequential neural network model.
"""
def __init__(
self,
input_size: int = 1,
output_size: int = 1,
hidden_units: List[int] = None,
activation: nn.Module = nn.ReLU(),
dropout: float = 0.5,
):
super().__init__()
hidden_units = hidden_units or [128, 64, 32] # Default hidden units
layers = [nn.Linear(input_size, hidden_units[0]), activation]
for i in range(1, len(hidden_units)):
layers.extend(
[
nn.Linear(hidden_units[i - 1], hidden_units[i]),
activation,
nn.Dropout(dropout),
]
)
layers.append(nn.Linear(hidden_units[-1], output_size))
self.model = nn.Sequential(*layers)
def forward(self, x):
"""
Forward pass of the neural network.
Args:
x (torch.Tensor): Input tensor.
Returns:
torch.Tensor: Output tensor.
"""
return self.model(x)
def plot(self, feature_name, ax=None):
"""
Plot the model's predictions for a specific feature.
Args:
feature_name (str): Name of the feature.
ax (matplotlib.axes.Axes, optional): Matplotlib axis to plot on (default is None).
Returns:
None
"""
if ax is None:
fig, ax = plt.subplots(1, 1)
with torch.no_grad():
# Ensure input is 2D
x_axis = torch.linspace(-1, 1, 500).unsqueeze(1)
# Plot the model's predictions for the specific feature
ax.plot(
x_axis.numpy(),
self.forward(x_axis).numpy(),
linestyle="solid",
linewidth=1,
color="red",
)
# Set the y-axis label to the feature name
ax.set_ylabel(feature_name)
def plot_data(self, x, y, feature_name, ax=None):
"""
Plot the model's predictions and actual data points for a specific feature.
Args:
x (torch.Tensor): Input data.
y (torch.Tensor): Actual target data.
feature_name (str): Name of the feature.
ax (matplotlib.axes.Axes, optional): Matplotlib axis to plot on (default is None).
Returns:
None
"""
if ax is None:
fig, ax = plt.subplots(1, 1)
with torch.no_grad():
x_axis = torch.linspace(-1, 1, 500).unsqueeze(1)
# Plot the model's predictions and actual data points for the specific feature
ax.plot(
x_axis.numpy(),
self.forward(x_axis).numpy(),
linestyle="solid",
linewidth=1,
color="red",
)
ax.scatter(x, y, color="gray", s=2, alpha=0.3)
# Set the y-axis label to the feature name
ax.set_ylabel(feature_name)
class NeuralAdditiveModel(nn.Module):
"""
Neural Additive Model for combining multiple feature-specific neural networks.
Args:
input_size (int): Size of the input feature space.
output_size (int): Size of the output feature space.
hidden_units (List[int], optional): List of hidden layer sizes for feature-specific neural networks (default is [64, 32]).
feature_dropout (float, optional): Dropout probability for input features (default is 0.0).
hidden_dropout (float, optional): Dropout probability for hidden layers (default is 0.3).
activation (str, optional): Activation function for hidden layers (default is "relu").
out_activation (nn.Module, optional): Activation function for output layer (default is None).
Attributes:
input_size (int): Size of the input feature space.
hidden_units (List[int]): List of hidden layer sizes for feature-specific neural networks.
feature_dropout (nn.Dropout): Dropout layer for input features.
bias (nn.Parameter): Bias parameter for the output layer.
out_activation (nn.Module): Activation function for the output layer.
feature_nns (nn.ModuleList): List of feature-specific neural networks.
"""
def __init__(
self,
input_size: int,
output_size: int,
hidden_units: List[int] = None,
feature_dropout: float = 0.0,
hidden_dropout: float = 0.3,
activation: str = "relu",
out_activation=None,
):
super().__init__()
self.input_size = input_size
self.hidden_units = hidden_units or [64, 32] # Default hidden units
self.feature_dropout = nn.Dropout(p=feature_dropout)
self.bias = nn.Parameter(torch.zeros(output_size))
# Set up the activation function based on the string
activation_fn = self._get_activation_fn(activation)
self.out_activation = (
out_activation if out_activation is not None else nn.Identity()
)
# Create feature-specific networks using FeatureNN
self.feature_nns = nn.ModuleList(
[
FeatureNN(
input_size=1, # Each feature-specific NN takes a single feature as input
output_size=output_size,
hidden_units=hidden_units,
activation=activation_fn,
dropout=hidden_dropout,
)
for _ in range(input_size)
]
)
def _get_activation_fn(self, activation):
"""
Get the activation function based on the provided string.
Args:
activation (str): Name of the activation function.
Returns:
nn.Module: Activation function module.
"""
if activation.lower() == "relu":
return nn.ReLU()
elif activation.lower() == "sigmoid":
return nn.Sigmoid()
elif activation.lower() == "tanh":
return nn.Tanh()
else:
raise ValueError(f"Unsupported activation function: {activation}")
def forward(self, x):
"""
Forward pass of the neural network.
Args:
x (torch.Tensor): Input tensor.
Returns:
torch.Tensor: Output tensor.
"""
feature_outputs = [nn(x[:, i : i + 1]) for i, nn in enumerate(self.feature_nns)]
output = torch.cat(feature_outputs, dim=1).sum(dim=1, keepdim=True) + self.bias
return self.out_activation(output)
def plot(self):
"""
Plot the learned functions for each feature-specific neural network.
Returns:
None
"""
self.eval()
with torch.no_grad():
if len(self.feature_nns) > 1:
fig, axes = plt.subplots(len(self.feature_nns), 1, figsize=(10, 7))
for i, ax in enumerate(axes.flat):
component = self.feature_nns[i]
component.plot(ax)
else:
self.feature_nns[0].plot()
def plot_data(self, x, y):
"""
Plot the learned functions and actual data points for each feature-specific neural network.
Args:
x (torch.Tensor): Input data.
y (torch.Tensor): Actual target data.
Returns:
None
"""
self.eval()
with torch.no_grad():
if len(self.feature_nns) > 1:
fig, axes = plt.subplots(len(self.feature_nns), 1, figsize=(10, 7))
for i, ax in enumerate(axes.flat):
component = self.feature_nns[i]
component.plot_data(ax, x[i], y)
else:
self.feature_nns[0].plot()
[docs]class DownstreamModel(pl.LightningModule):
"""
PyTorch Lightning module for downstream modeling using a trained topic model.
Args:
trained_topic_model (AbstractModel): Trained topic model.
target_column (str): Name of the target column.
dataset (AbstractDataset, optional): Dataset object (default is None).
structured_data (pd.DataFrame, optional): Structured data (default is None).
task (str, optional): Type of task, either 'regression' or 'classification' (default is 'regression').
batch_size (int, optional): Batch size for training (default is 128).
lr (float, optional): Learning rate for optimization (default is 0.0005).
hidden_units (List[int], optional): List of hidden layer sizes for the Neural Additive Model (default is None).
feature_dropout (float, optional): Dropout probability for input features (default is 0.0).
hidden_dropout (float, optional): Dropout probability for hidden layers (default is 0.3).
activation (str, optional): Activation function for hidden layers (default is 'relu').
out_activation (nn.Module, optional): Activation function for output layer (default is None).
Attributes:
trained_topic_model (AbstractModel): Trained topic model.
task (str): Type of task, either 'regression' or 'classification'.
batch_size (int): Batch size for training.
lr (float): Learning rate for optimization.
loss_fn (nn.Module): Loss function for the task.
structured_data (pd.DataFrame): Structured data used for downstream modeling.
target_column (str): Name of the target column.
combined_data (pd.DataFrame): Combined DataFrame containing structured data and topic probabilities.
model (NeuralAdditiveModel): Neural Additive Model for downstream modeling.
"""
def __init__(
self,
trained_topic_model,
target_column,
dataset=None,
task="regression",
batch_size=128,
lr=0.0005,
hidden_units: List[int] = None,
feature_dropout: float = 0.0,
hidden_dropout: float = 0.3,
activation: str = "relu",
out_activation=None,
):
super().__init__()
self.trained_topic_model = trained_topic_model
self.task = task
self.batch_size = batch_size
self.lr = lr
self.loss_fn = nn.MSELoss() if task == "regression" else nn.CrossEntropyLoss()
if dataset is None:
self.structured_data = self.trained_topic_model.dataset
else:
self.structured_data = dataset.dataframe.copy()
# Drop the columns "text" and "tokens" if they exist
self.structured_data = self.structured_data.drop(
columns=["text", "tokens"], errors="ignore"
)
if "predictions" in self.structured_data.columns:
self.structured_data = self.structured_data.drop(columns=["predictions"])
self.target_column = target_column
# Combine topic probabilities with structured data
self.combined_data = self.prepare_combined_data()
# Define the NAM architecture here based on the shape of the combined data
self.model = self.define_nam_model(
hidden_units=hidden_units,
feature_dropout=feature_dropout,
hidden_dropout=hidden_dropout,
activation=activation,
out_activation=out_activation,
)
# Initialize metrics
if task == "regression":
self.metric = torchmetrics.MeanSquaredError()
elif task == "classification":
# Determine the number of unique target values
num_classes = len(np.unique(self.combined_data[self.target_column]))
if num_classes == 2:
# Binary classification
self.metric = BinaryAccuracy()
else:
# Multiclass classification
self.metric = torchmetrics.Accuracy(
task="multiclass",
num_classes=num_classes,
)
[docs] def prepare_combined_data(self):
"""
Prepare combined DataFrame containing structured data and topic probabilities.
Returns:
pd.DataFrame: Combined DataFrame.
"""
# Preprocess structured data
preprocessed_structured_data = self.preprocess_structured_data(
self.structured_data
)
# Check if the trained model has attribute 'theta' or method 'get_theta'
if hasattr(self.trained_topic_model, "theta"):
# Use the 'theta' attribute to get the topic-document matrix
topic_document_matrix = self.trained_topic_model.theta
elif hasattr(self.trained_topic_model, "get_theta"):
# Call the 'get_theta' method to get the topic-document matrix
topic_document_matrix = self.trained_topic_model.get_theta()
else:
raise AttributeError(
"The trained model does not have 'theta' attribute or 'get_theta' method."
)
# Convert the matrix to a DataFrame and transpose it to shape (n, k)
topic_probabilities = pd.DataFrame(topic_document_matrix)
new_column_names = [f"Topic_{i}" for i in range(topic_probabilities.shape[1])]
topic_probabilities.columns = new_column_names
preprocessed_structured_data = preprocessed_structured_data.reset_index(
drop=True
)
topic_probabilities = topic_probabilities.reset_index(drop=True)
# Combine the preprocessed structured data with the topic probabilities
combined_df = pd.concat(
[preprocessed_structured_data, topic_probabilities], axis=1
)
# Ensure the target column is the last column in the DataFrame
combined_df = combined_df[
[col for col in combined_df.columns if col != self.target_column]
+ [self.target_column]
]
combined_df = combined_df.dropna()
return combined_df
[docs] def preprocess_structured_data(self, data):
"""
Preprocess structured data.
Args:
data (pd.DataFrame): Structured data.
Returns:
pd.DataFrame: Preprocessed structured data.
"""
# Make a copy of the data to avoid modifying the original dataframe
data = data.copy()
# Exclude the target column from feature processing
features = data.drop(columns=[self.target_column])
# Identify categorical and numerical columns
categorical_cols = features.select_dtypes(
include=["object", "category"]
).columns
numerical_cols = features.select_dtypes(include=["int64", "float64"]).columns
transformers = []
if len(numerical_cols) > 0:
numerical_transformer = Pipeline(
[
("imputer", SimpleImputer(strategy="mean")),
("scaler", StandardScaler()),
]
)
transformers.append(("num", numerical_transformer, numerical_cols))
if len(categorical_cols) > 0:
categorical_transformer = Pipeline(
[
("imputer", SimpleImputer(strategy="most_frequent")),
("onehot", OneHotEncoder(handle_unknown="ignore")),
]
)
transformers.append(("cat", categorical_transformer, categorical_cols))
preprocessor = ColumnTransformer(transformers=transformers)
# Fit and transform the feature data
preprocessed_features = preprocessor.fit_transform(features)
preprocessed_features = (
preprocessed_features.toarray()
if hasattr(preprocessed_features, "toarray")
else preprocessed_features
)
# Generate feature names for the resulting columns
feature_names = numerical_cols.tolist()
if "cat" in preprocessor.named_transformers_:
feature_names += list(
preprocessor.named_transformers_["cat"]
.named_steps["onehot"]
.get_feature_names_out(categorical_cols)
)
# Reconstruct the DataFrame
preprocessed_data = pd.DataFrame(
preprocessed_features, columns=feature_names, index=features.index
)
preprocessed_data[self.target_column] = data[self.target_column]
return preprocessed_data
[docs] def define_nam_model(
self, hidden_units, feature_dropout, hidden_dropout, activation, out_activation
):
"""
Define the Neural Additive Model architecture.
Args:
hidden_units (List[int]): List of hidden layer sizes for the Neural Additive Model.
feature_dropout (float): Dropout probability for input features.
hidden_dropout (float): Dropout probability for hidden layers.
activation (str): Activation function for hidden layers.
out_activation (nn.Module): Activation function for output layer.
Returns:
NeuralAdditiveModel: Initialized Neural Additive Model.
"""
input_size = self.combined_data.shape[1] - 1 # Exclude target column
output_size = (
1
if self.task == "regression"
else len(self.combined_data[self.target_column].unique())
)
model = NeuralAdditiveModel(
input_size=input_size,
output_size=output_size,
hidden_units=hidden_units,
feature_dropout=feature_dropout,
hidden_dropout=hidden_dropout,
activation=activation,
out_activation=out_activation,
)
return model
[docs] def forward(self, x):
"""
Forward pass of the model.
Args:
x (torch.Tensor): Input tensor.
Returns:
torch.Tensor: Output tensor.
"""
return self.model(x).squeeze()
def training_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
loss = self.loss_fn(y_hat, y)
if self.task == "classification":
# For classification, convert logits to class predictions
preds = torch.argmax(y_hat, dim=1)
acc = self.metric(preds, y.long()) # Calculate accuracy
self.log(
"train_acc",
acc,
on_step=True,
on_epoch=True,
prog_bar=True,
logger=True,
)
elif self.task == "regression":
# For regression, directly use the output for metric calculation
# Calculate MSE or any other regression metric
mse = self.metric(y_hat, y)
self.log(
"train_mse",
mse,
on_step=True,
on_epoch=True,
prog_bar=True,
logger=True,
)
self.log(
"train_loss", loss, on_step=True, on_epoch=True, prog_bar=True, logger=True
)
return loss
def validation_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
val_loss = self.loss_fn(y_hat, y)
if self.task == "classification":
preds = torch.argmax(y_hat, dim=1)
acc = self.metric(preds, y.long())
self.log(
"val_acc", acc, on_step=True, on_epoch=True, prog_bar=True, logger=True
)
elif self.task == "regression":
mse = self.metric(y_hat, y)
self.log(
"val_mse", mse, on_step=True, on_epoch=True, prog_bar=True, logger=True
)
self.log(
"val_loss",
val_loss,
on_step=True,
on_epoch=True,
prog_bar=True,
logger=True,
)
def test_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
test_loss = self.loss_fn(y_hat, y)
if self.task == "classification":
preds = torch.argmax(y_hat, dim=1)
acc = self.metric(preds, y.long())
self.log(
"test_acc", acc, on_step=True, on_epoch=True, prog_bar=True, logger=True
)
elif self.task == "regression":
mse = self.metric(y_hat, y)
self.log(
"test_mse", mse, on_step=True, on_epoch=True, prog_bar=True, logger=True
)
self.log(
"test_loss",
test_loss,
on_step=True,
on_epoch=True,
prog_bar=True,
logger=True,
)
[docs] def setup(self, stage=None):
"""
Setup datasets for training and validation.
"""
# Split the combined data into features and target
X = self.combined_data.iloc[:, :-1].values # Exclude target column
y = self.combined_data.iloc[:, -1].values
# Convert to PyTorch tensors
X_tensor = torch.tensor(X, dtype=torch.float32)
if self.task == "classification":
# Use LabelEncoder for classification task
label_encoder = LabelEncoder()
y = label_encoder.fit_transform(y)
y_tensor = torch.tensor(
y, dtype=torch.long
) # Ensure labels are long type for classification
else:
y_tensor = torch.tensor(
y, dtype=torch.float32
) # Keep as float for regression
dataset = TensorDataset(X_tensor, y_tensor)
# Train-validation split
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
# Assign to use in dataloaders
self.train_dataset = train_dataset
self.val_dataset = val_dataset
[docs] def train_dataloader(self):
"""
DataLoader for training dataset.
Returns:
DataLoader: Training DataLoader.
"""
return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True)
[docs] def val_dataloader(self):
"""
DataLoader for validation dataset.
Returns:
DataLoader: Validation DataLoader.
"""
return DataLoader(self.val_dataset, batch_size=self.batch_size)
[docs] def get_feature_names(self):
"""
Get names of input features.
Returns:
List[str]: List of feature names.
"""
# Assuming the last column of combined_data is the target, and all other columns are features
return self.combined_data.columns[:-1].tolist()
[docs] def plot_feature_nns(self):
"""
Plot the learned functions for each feature-specific neural network.
"""
feature_names = self.get_feature_names() # Retrieve feature names
num_features = len(self.model.feature_nns)
fig, axs = plt.subplots(num_features, 1, figsize=(10, num_features * 2))
for i, feature_nn in enumerate(self.model.feature_nns):
ax = axs[i] if num_features > 1 else axs
feature_nn.plot(
feature_names[i], ax=ax
) # Pass the feature name to the plot method
ax.set_title(f"Feature: {feature_names[i]}")
plt.tight_layout()
plt.show()