import deepchem as dc
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from deepchem.feat import ConvMolFeaturizer
from deepchem.data import NumpyDataset
from deepchem.models import GraphConvModel
import tensorflow as tf
import logging
from typing import List, Optional, Union, Tuple
import optuna
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error, f1_score, accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Monkey-Patching for Keras Compatibility ---
from deepchem.models.optimizers import Adam
def _create_tf_optimizer(self, global_step):
try:
if self.learning_rate is None:
learning_rate = tf.keras.optimizers.schedules.ExponentialDecay(
initial_learning_rate=self.initial_learning_rate,
decay_steps=self.decay_steps,
decay_rate=self.decay_rate,
staircase=self.staircase
)
else:
learning_rate = self.learning_rate
return tf.keras.optimizers.Adam(
learning_rate=learning_rate,
beta_1=self.beta1,
beta_2=self.beta2,
epsilon=self.epsilon
)
except Exception as e:
logger.error(f"Error creating optimizer: {e}")
raise
Adam._create_tf_optimizer = _create_tf_optimizer
from deepchem.models.keras_model import KerasModel
def _create_inputs(self, example_inputs):
try:
self._ensure_built()
keras_model = getattr(self.model, 'model', self.model)
if hasattr(keras_model, 'inputs') and keras_model.inputs is not None:
self._input_shapes = [t.shape for t in keras_model.inputs]
self._input_dtypes = [t.dtype.name for t in keras_model.inputs]
else:
if isinstance(example_inputs, (list, tuple)):
self._input_shapes = [np.shape(x) for x in example_inputs]
self._input_dtypes = [x.dtype.name for x in example_inputs]
else:
self._input_shapes = [np.shape(example_inputs)]
self._input_dtypes = [example_inputs.dtype.name]
self._inputs_built = True
except Exception as e:
logger.error(f"Error in _create_inputs: {e}")
raise
KerasModel._create_inputs = _create_inputs
# --- Enhanced Helper Function ---
def smiles_to_dataset(smiles_list: List[str], labels: Optional[Union[List, np.ndarray]] = None,
featurizer=ConvMolFeaturizer()) -> Tuple[NumpyDataset, Optional[np.ndarray]]:
try:
if not smiles_list or not all(isinstance(s, str) for s in smiles_list):
raise ValueError("SMILES list must contain valid strings.")
if labels is not None:
if len(smiles_list) != len(labels):
raise ValueError("SMILES and labels lists must have the same length.")
labels = np.array(labels)
mols = featurizer.featurize(smiles_list)
valid_mols = []
valid_labels = []
for i, mol in enumerate(mols):
if mol is not None and hasattr(mol, 'atom_features'):
valid_mols.append(mol)
if labels is not None:
valid_labels.append(labels[i])
else:
logger.warning(f"SMILES at index {i} failed to featurize: {smiles_list[i]}")
if not valid_mols:
raise ValueError("No valid SMILES strings were featurized.")
if labels is not None:
dataset = NumpyDataset(X=np.array(valid_mols, dtype=object), y=np.array(valid_labels))
logger.info(f"Created dataset with {len(valid_mols)} valid molecules out of {len(smiles_list)}.")
return dataset, np.array(valid_labels)
else:
dataset = NumpyDataset(X=np.array(valid_mols, dtype=object))
logger.info(f"Created dataset with {len(valid_mols)} valid molecules out of {len(smiles_list)}.")
return dataset, None
except Exception as e:
logger.error(f"Error in smiles_to_dataset: {e}")
raise
# --- New SMILES List with 6 Molecules ---
smiles_list_6 = [
"COc1cc(/C=C/C(=O)CC(=O)/C=C/c2ccc(c(c2)OC)O)ccc1O",
"COC1=CC(\C=C\C(=O)CC(=O)\C=C\C2=CC=C(O)C(OC)=C2)=CC=C1O",
"COC1=CC=C(\C=C\C(=O)CC(=O)\C=C\C2=CC=C(OC)C(OC)=C2)C=C1OC",
"COC1=CC(CNC(=O)CCCC\C=C/C(C)C)=CC=C1O",
"CCCCCCCCC(=O)NCC1=CC=C(O)C(OC)=C1",
"CN(C)C1=CC2=C(C=C1)N=C3C=CC(=[N+](C)C)C=C3S2.[Cl-]"
]
train_smiles = smiles_list_6 # Use the 6 SMILES for training
train_class_labels = [1, 0, 1, 0, 1, 1] # Example labels for 6 SMILES
train_reg_labels = [7.2, 6.9, 6.4, 6.3, 6.2, 6.1] # Example Regression labels
valid_smiles = smiles_list_6 # Use the same 6 SMILES for validation (for this example)
valid_class_labels = [1, 0, 1, 0, 1, 1] # Example validation labels
valid_reg_labels = [7.2, 6.9, 6.4, 6.3, 6.2, 6.1] # Example validation regression labels
test_smiles = smiles_list_6 # Use the same 6 SMILES for testing (for this example)
featurizer = ConvMolFeaturizer()
# Create Datasets
try:
train_dataset_class, train_class_labels_filtered = smiles_to_dataset(train_smiles, train_class_labels, featurizer)
train_dataset_reg, train_reg_labels_filtered = smiles_to_dataset(train_smiles, train_reg_labels, featurizer)
valid_dataset_class, valid_class_labels_filtered = smiles_to_dataset(valid_smiles, valid_class_labels, featurizer)
valid_dataset_reg, valid_reg_labels_filtered = smiles_to_dataset(valid_smiles, valid_reg_labels, featurizer)
test_dataset, _ = smiles_to_dataset(test_smiles, None, featurizer)
except Exception as e:
logger.error(f"Failed to create datasets: {e}")
raise
# --- Classification Model (Unchanged) ---
def train_and_predict_class(train_dataset, valid_dataset, test_dataset):
try:
model = GraphConvModel(
n_tasks=1,
mode='classification',
dropout=0.2,
batch_normalize=False,
model_dir='graphconv_model_classification_expanded',
graph_conv_layers=[64, 64],
dense_layer_size=128,
batch_size=50
)
if hasattr(model.model, 'name'):
model.model.name = 'graph_conv_classification_model_expanded'
logger.info("Training classification model...")
model.fit(train_dataset, nb_epoch=50)
train_pred = model.predict(train_dataset)
valid_pred = model.predict(valid_dataset)
test_pred = model.predict(test_dataset)
return model, train_pred, valid_pred, test_pred
except Exception as e:
logger.error(f"Error in classification training/prediction: {e}")
raise
# --- Regression Model with Optuna Hyperparameter Tuning ---
def objective(trial):
"""Optuna objective function to maximize R^2 for regression."""
try:
# Define hyperparameter search space
n_layers = trial.suggest_int('n_layers', 1, 3) # Number of graph conv layers
graph_conv_sizes = [trial.suggest_categorical('graph_conv_size_' + str(i), [32, 64, 128]) for i in range(n_layers)]
dense_layer_size = trial.suggest_categorical('dense_layer_size', [64, 128, 256])
dropout = trial.suggest_float('dropout', 0.0, 0.5)
batch_size = trial.suggest_categorical('batch_size', [32, 50, 64])
learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-3, log=True) # Added learning rate tuning
# Create and train model
model = GraphConvModel(
n_tasks=1,
mode='regression',
dropout=dropout,
batch_normalize=False,
model_dir=f'graphconv_model_regression_trial_{trial.number}',
graph_conv_layers=graph_conv_sizes,
dense_layer_size=dense_layer_size,
batch_size=batch_size,
learning_rate=learning_rate # Set learning rate from trial
)
if hasattr(model.model, 'name'):
model.model.name = f'graph_conv_regression_model_trial_{trial.number}'
logger.info(f"Training regression model with trial {trial.number}...")
model.fit(train_dataset_reg, nb_epoch=100, deterministic=False) # Increased epochs
# Evaluate on validation set
valid_pred = model.predict(valid_dataset_reg)
r2 = r2_score(valid_reg_labels_filtered, valid_pred.flatten())
return r2 # Maximize R^2
except Exception as e:
logger.error(f"Error in Optuna trial {trial.number}: {e}")
return float('-inf') # Return negative infinity for failed trials
def train_and_predict_reg_with_best_params(train_dataset, valid_dataset, test_dataset, best_params):
"""Train final regression model with best hyperparameters."""
try:
model = GraphConvModel(
n_tasks=1,
mode='regression',
dropout=best_params['dropout'],
batch_normalize=False,
model_dir='graphconv_model_regression_expanded',
graph_conv_layers=[best_params[f'graph_conv_size_{i}'] for i in range(best_params['n_layers'])],
dense_layer_size=best_params['dense_layer_size'],
batch_size=best_params['batch_size'],
learning_rate=best_params['learning_rate'] # Use best learning rate
)
if hasattr(model.model, 'name'):
model.model.name = 'graph_conv_regression_model_expanded'
logger.info("Training final regression model with best parameters...")
model.fit(train_dataset_reg, nb_epoch=100) # Increased epochs
train_pred = model.predict(train_dataset)
valid_pred = model.predict(valid_dataset)
test_pred = model.predict(test_dataset)
return model, train_pred, valid_pred, test_pred
except Exception as e:
logger.error(f"Error in regression training/prediction with best params: {e}")
raise
# --- Evaluation Functions ---
def evaluate_classification(true_labels, pred_probs):
try:
pred_labels = np.argmax(pred_probs, axis=2).flatten()
accuracy = accuracy_score(true_labels, pred_labels)
precision = precision_score(true_labels, pred_labels, zero_division=0)
recall = recall_score(true_labels, pred_labels, zero_division=0)
f1 = f1_score(true_labels, pred_labels, zero_division=0)
return accuracy, precision, recall, f1
except Exception as e:
logger.error(f"Error in classification evaluation: {e}")
raise
def evaluate_regression(true_labels, pred_values):
try:
mae = mean_absolute_error(true_labels, pred_values.flatten())
mse = mean_squared_error(true_labels, pred_values.flatten())
r2 = r2_score(true_labels, pred_values.flatten())
return mae, mse, r2
except Exception as e:
logger.error(f"Error in regression evaluation: {e}")
raise
# --- Main Execution ---
def main():
# Classification (unchanged)
class_model, train_class_pred, valid_class_pred, test_class_pred = train_and_predict_class(
train_dataset_class, valid_dataset_class, test_dataset
)
# Regression with Optuna tuning
study = optuna.create_study(direction='maximize')
logger.info("Starting Optuna hyperparameter optimization for regression...")
study.optimize(objective, n_trials=50) # Increased trials to 50
logger.info(f"Best trial: {study.best_trial.number}")
logger.info(f"Best R^2: {study.best_value}")
logger.info(f"Best parameters: {study.best_params}")
# Train final regression model with best parameters
reg_model, train_reg_pred, valid_reg_pred, test_reg_pred = train_and_predict_reg_with_best_params(
train_dataset_reg, valid_dataset_reg, test_dataset, study.best_params
)
# Print Predictions
print("Training Classification Predictions (Probabilities):", train_class_pred)
print("Validation Classification Predictions (Probabilities):", valid_class_pred)
print("Test Classification Predictions (Probabilities):", test_class_pred)
print("Training Regression Predictions:", train_reg_pred)
print("Validation Regression Predictions:", valid_reg_pred)
print("Test Regression Predictions:", test_reg_pred)
# Evaluate Performance
train_class_acc, train_class_prec, train_class_rec, train_class_f1 = evaluate_classification(train_class_labels_filtered, train_class_pred)
valid_class_acc, valid_class_prec, valid_class_rec, valid_class_f1 = evaluate_classification(valid_class_labels_filtered, valid_class_pred)
train_reg_mae, train_reg_mse, train_reg_r2 = evaluate_regression(train_reg_labels_filtered, train_reg_pred)
valid_reg_mae, valid_reg_mse, valid_reg_r2 = evaluate_regression(valid_reg_labels_filtered, valid_reg_pred)
print(f"--- Classification Metrics ---")
print(f"Training Accuracy: {train_class_acc:.4f}, Precision: {train_class_prec:.4f}, Recall: {train_class_rec:.4f}, F1 Score: {train_class_f1:.4f}")
print(f"Validation Accuracy: {valid_class_acc:.4f}, Precision: {valid_class_prec:.4f}, Recall: {valid_class_rec:.4f}, F1 Score: {valid_class_f1:.4f}")
print(f"--- Regression Metrics ---")
print(f"Training MAE: {train_reg_mae:.4f}, MSE: {train_reg_mse:.4f}, R^2: {train_reg_r2:.4f}")
print(f"Validation MAE: {valid_reg_mae:.4f}, MSE: {valid_reg_mse:.4f}, R^2: {valid_reg_r2:.4f}")
if __name__ == "__main__":
try:
main()
except Exception as e:
logger.error(f"Main execution failed: {e}")
raise