# Step 1: Imports
import pandas as pd
import numpy as np
import joblib
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.imputer import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (accuracy_score, precision_score, recall_score, f1_score, roc_auc_score,
confusion_matrix, ConfusionMatrixDisplay, roc_curve, auc, RocCurveDisplay)
# Step 2: Load data
# Replace 'your_file.csv' with the actual path to your CSV file
try:
df = pd.read_csv('your_file.csv')
print("Data loaded successfully.")
except FileNotFoundError:
print("Error: your_file.csv not found. Please make sure the file is in the correct directory.")
# Create a dummy DataFrame for demonstration if the file is not found
# This dummy data structure matches the provided sample headers
data = {
'Index': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20],
'Name': [f'Item {i}' for i in range(1, 21)],
'Description': [f'Description {i}' for i in range(1, 21)],
'Brand': [f'Brand {i % 5}' for i in range(1, 21)],
'Category': [f'Category {i % 3}' for i in range(1, 21)],
'Price': [173, 797, 554, 477, 701, 250, 800, 300, 600, 900, 150, 750, 500, 400, 650, 850, 200, 580, 720, 950],
'Currency': ['USD'] * 20,
'Stock': [539, 733, 863, 579, 603, 100, 200, 300, 400, 500, 600, 700, 800, 900, 150, 250, 350, 450, 550, 650],
'EAN': [f'{i:013d}' for i in range(1, 21)],
'Color': [f'Color {i % 7}' for i in range(1, 21)],
'Size': [f'Size {i % 4}' for i in range(1, 21)],
'Availability': ['in_stock', 'out_of_stock', 'limited_stock'] * 6 + ['in_stock', 'out_of_stock'],
'Internal ID': list(range(1, 21))
}
df = pd.DataFrame(data)
print("Using dummy data for demonstration.")
target_variable = 'Price'
model_type = 'classification' # As specified by the user
# Step 3: Advanced Preprocessing (on df)
# --- Advanced Preprocessing Steps (if any) ---
# Apply Log Transformation to target variable 'Price'
# This is typically for skewed numeric targets in regression.
# Ensure to apply inverse transformation (np.expm1) to predictions if this is used.
# For classification, we will log transform *then* bin the target.
classification_possible = False # Flag to check if we can proceed with classification
if pd.api.types.is_numeric_dtype(df[target_variable]):
print(f"Target variable '{target_variable}' is numeric.")
# Apply log1p transformation as requested
df[target_variable] = np.log1p(df[target_variable])
print(f"Applied log1p transformation to target variable '{target_variable}'.")
# Since the model type is classification, we need to convert the numeric target
# into a categorical one, typically by binning.
# We will bin the *log-transformed* price.
binned_target_variable = f'{target_variable}_Category'
n_bins = 5 # Define number of bins
try:
# Use qcut for quantile-based binning (attempts to create bins with equal number of samples)
# duplicates='drop' handles cases where there are fewer unique values than bins
df[binned_target_variable], bins = pd.qcut(df[target_variable], q=n_bins, labels=False, retbins=True, duplicates='drop')
if df[binned_target_variable].nunique() < 2:
print(f"Warning: Binning resulted in fewer than 2 unique categories ({df[binned_target_variable].nunique()}). Cannot perform classification.")
classification_possible = False
else:
print(f"Binned log-transformed '{target_variable}' into {df[binned_target_variable].nunique()} categories based on quantiles.")
print(f"Bin edges (log scale): {bins}")
# Convert bin labels to strings to ensure they are treated as categories
df[binned_target_variable] = df[binned_target_variable].astype(str)
classification_possible = True
except Exception as e:
print(f"Warning: Could not bin '{target_variable}' for classification ({e}). Classification model cannot be trained.")
classification_possible = False
# If binning fails, we cannot proceed with classification as requested.
else:
print(f"Warning: Target variable '{target_variable}' is not numeric ({df[target_variable].dtype}). Log transformation and binning skipped.")
print(f"Classification model cannot be trained as requested with a non-numeric target.")
classification_possible = False
# Outlier detection method 'z_score' was selected, but handling is 'none'.
# Code for detection could be added here if desired for reporting, but no removal is performed.
print("\nOutlier detection method 'z_score' was selected, but handling is 'none'. No outliers were removed.")
# Example (not executed for 'handling'='none'):
# from scipy.stats import zscore
# numeric_cols_for_outliers = df.select_dtypes(include=np.number).columns.tolist()
# # Exclude the target variable(s) from outlier detection in features
# exclude_cols = [target_variable]
# if classification_possible: exclude_cols.append(binned_target_variable)
# numeric_cols_for_outliers = [col for col in numeric_cols_for_outliers if col not in exclude_cols]
#
# for col in numeric_cols_for_outliers:
# # Calculate Z-scores, handling potential NaNs
# col_data = df[col].dropna()
# if not col_data.empty:
# z_scores = np.abs(zscore(col_data))
# outliers_indices = col_data.index[z_scores > 3] # Example threshold
# if not outliers_indices.empty:
# print(f"Detected {len(outliers_indices)} potential outliers in column '{col}' (Z-score > 3).")
# Step 4: Final Definition of X and y
# Define X and y using the potentially modified 'df'.
# If binning was successful, the target is the binned column.
if classification_possible:
y = df[binned_target_variable].copy()
# Drop the original numeric target and the new binned target from features
# Also drop 'Index', 'EAN', 'Internal ID' as they are typically identifiers, not features
cols_to_drop_from_features = [target_variable, binned_target_variable, 'Index', 'EAN', 'Internal ID']
# Ensure columns exist before dropping
cols_to_drop_from_features = [col for col in cols_to_drop_from_features if col in df.columns]
X = df.drop(columns=cols_to_drop_from_features).copy()
print(f"\nUsing '{binned_target_variable}' as the target variable for classification.")
print(f"Features (X) shape: {X.shape}")
print(f"Target (y) shape: {y.shape}")
print(f"Target distribution:\n{y.value_counts()}")
else:
# If classification is not possible, set X and y to None
X = None
y = None
print("\nCannot proceed with model training due to issues with target variable setup.")
if classification_possible:
# Step 5: Column Preprocessing Setup (ColumnTransformer on final X)
# Identify feature types from the final X DataFrame (after advanced preprocessing)
numeric_features = X.select_dtypes(include=np.number).columns.tolist()
categorical_features = X.select_dtypes(include=['object', 'category']).columns.tolist()
# Specified text features for TF-IDF/CountVectorizer - None specified in requirements
text_features_for_transformer = []
# Ensure text features are not also in numeric/categorical lists for default processing
numeric_features = [f for f in numeric_features if f not in text_features_for_transformer]
categorical_features = [f for f in categorical_features if f not in text_features_for_transformer]
transformers_list = []
if len(numeric_features) > 0:
numeric_pipeline = Pipeline([('imputer', SimpleImputer(strategy='mean')), ('scaler', StandardScaler())])
transformers_list.append(('num', numeric_pipeline, numeric_features))
print(f"\nNumeric features identified for preprocessing: {numeric_features}")
if len(categorical_features) > 0:
categorical_pipeline = Pipeline([('imputer', SimpleImputer(strategy='most_frequent')), ('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False))])
transformers_list.append(('cat', categorical_pipeline, categorical_features))
print(f"Categorical features identified for preprocessing: {categorical_features}")
if len(text_features_for_transformer) > 0:
# Add text processing if text features were specified
# Example:
# from sklearn.feature_extraction.text import TfidfVectorizer
# text_pipeline = Pipeline([('tfidf', TfidfVectorizer(stop_words='english'))])
# transformers_list.append(('text', text_pipeline, text_features_for_transformer))
print(f"Text features identified but no vectorizer specified in requirements: {text_features_for_transformer}")
if not transformers_list:
print("Warning: No features identified for preprocessing via ColumnTransformer.")
# If no features are identified, the preprocessor might be Identity or passthrough
# For safety, let's handle this case, though unlikely with sample data
preprocessor = 'passthrough' # Or raise an error if X is empty
print("Using 'passthrough' for preprocessor as no features require transformation.")
else:
# 'drop' unhandled columns (like 'Name', 'Description' if not in text_features, 'Currency' if only one value)
preprocessor = ColumnTransformer(transformers=transformers_list, remainder='drop')
print("ColumnTransformer created.")
# Step 6: Split data
# Use the binned target 'y' for stratification
print(f"\nSplitting data with test_size=0.2 and stratifying on '{binned_target_variable}'.")
# Check if stratification is possible (requires at least one class in y_train with > 1 sample)
# If not possible, split without stratify
try:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)
print("Data split successfully with stratification.")
except ValueError as e:
print(f"Warning: Could not stratify the split ({e}). Splitting data without stratification.")
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
print(f"X_train shape: {X_train.shape}")
print(f"X_test shape: {X_test.shape}")
print(f"y_train shape: {y_train.shape}")
print(f"y_test shape: {y_test.shape}")
# Step 7: Apply Preprocessor
print("\nApplying preprocessor to training and testing data...")
if preprocessor != 'passthrough':
X_train_processed = preprocessor.fit_transform(X_train)
X_test_processed = preprocessor.transform(X_test)
print("Preprocessing complete.")
print(f"Processed X_train shape: {X_train_processed.shape}")
print(f"Processed X_test shape: {X_test_processed.shape}")
else:
# If preprocessor is passthrough, use original data (assuming it's already numeric/handled)
# Note: This branch is unlikely with the default ColumnTransformer setup unless X was empty
X_train_processed = X_train
X_test_processed = X_test
print("Preprocessor is 'passthrough'. Using original data splits.")
# Step 8: Initialize Model
print("\nInitializing RandomForestClassifier model.")
# No XGBoost selected, using RandomForestClassifier as requested
model = RandomForestClassifier(random_state=42)
# Step 9: Hyperparameter Tuning (using RandomizedSearchCV)
print("\nStarting Hyperparameter Tuning using RandomizedSearchCV...")
# Define parameter distribution for RandomizedSearchCV
# Using a small, sensible distribution for demonstration
param_distributions = {
'n_estimators': [50, 100, 200],
'max_depth': [None, 10, 20],
'min_samples_split': [2, 5],
'min_samples_leaf': [1, 2],
'criterion': ['gini', 'entropy']
}
# Instantiate RandomizedSearchCV
# Using 'accuracy' as the scoring metric for classification
# n_iter controls the number of parameter settings that are sampled
# cv=3 is a reasonable default for cross-validation folds
random_search = RandomizedSearchCV(
estimator=model,
param_distributions=param_distributions,
n_iter=10, # Number of parameter settings that are sampled. Adjust as needed.
cv=3, # Number of folds in cross-validation
scoring='accuracy', # Metric to evaluate for classification
n_jobs=-1, # Use all available CPU cores
random_state=42,
verbose=1
)
# Fit RandomizedSearchCV to the training data
random_search.fit(X_train_processed, y_train)
# Print the best parameters found
print(f"\nBest parameters found by RandomizedSearchCV: {random_search.best_params_}")
# The main model for prediction and evaluation is now the best estimator from the search
model = random_search.best_estimator_
print("Model updated to the best estimator found by tuning.")
# Step 10: Train Model (Already done by random_search.fit)
# If tuning was skipped, you would call model.fit(X_train_processed, y_train) here.
# Since tuning is enabled, the best_estimator_ is already trained on the full X_train_processed, y_train.
print("\nModel training complete (via RandomizedSearchCV).")
# Step 11: Predictions
print("Making predictions on the test set.")
y_pred = model.predict(X_test_processed)
# Step 12: Inverse Transform (Skip for classification)
# Define y_pred_final and y_test_final
# Inverse transform is not applicable here as the target was binned into categories.
y_pred_final = y_pred
y_test_final = y_test
print("Inverse transform skipped as target was binned for classification.")
# Step 13: Evaluation Metrics
print("\n--- Evaluation Metrics ---")
# For multi-class classification, use average='weighted' for precision, recall, f1-score
# zero_division=0 handles cases where a class has no predicted samples
print(f"Accuracy: {accuracy_score(y_test_final, y_pred_final):.4f}")
print(f"Precision: {precision_score(y_test_final, y_pred_final, average='weighted', zero_division=0):.4f}")
# Add other metrics if desired
# print(f"Recall: {recall_score(y_test_final, y_pred_final, average='weighted', zero_division=0):.4f}")
# print(f"F1 Score: {f1_score(y_test_final, y_pred_final, average='weighted', zero_division=0):.4f}")
# ROC AUC Score (requires predict_proba and is typically for binary or OVR/OVO for multi-class)
if hasattr(model, "predict_proba"):
try:
y_pred_proba = model.predict_proba(X_test_processed)
# Check if it's binary classification (2 classes)
if len(model.classes_) == 2:
# roc_auc_score expects positive class probabilities for binary
# Need to know which column corresponds to the positive class label
# A common convention is that the class at index 1 is the positive class,
# but it's safer to explicitly find it if possible or rely on score function's default.
# Let's assume the classes are ordered and the one at index 1 is 'positive' for AUC calculation
# Or better, let roc_auc_score handle it for binary case
roc_auc = roc_auc_score(y_test_final, y_pred_proba[:, 1]) # Assumes positive class is at index 1
print(f"ROC AUC Score: {roc_auc:.4f}")
else:
# For multi-class, roc_auc_score can be calculated with averaging (e.g., One-vs-Rest)
# Need to ensure y_test_final is in a format roc_auc_score expects for multi-class (e.g., integer labels or one-hot)
# RandomForestClassifier classes are usually strings if the input y was strings.
# roc_auc_score needs consistent label encoding.
# Let's use the string labels directly, roc_auc_score should handle it with multi_class='ovr'
roc_auc = roc_auc_score(y_test_final, y_pred_proba, multi_class='ovr', average='weighted')
print(f"ROC AUC Score (weighted OVR): {roc_auc:.4f}")
except Exception as e:
print(f"Could not calculate ROC AUC Score: {e}")
print("This can happen with multi-class targets or issues with predict_proba.")
else:
print("ROC AUC Score cannot be calculated as the model does not have a 'predict_proba' method.")
# Step 14: Visualizations
print("\n--- Visualizations ---")
# Confusion Matrix
try:
# Ensure labels for the confusion matrix display are in the order of model.classes_
# This ensures the matrix rows/cols match the class predictions/actuals
labels = model.classes_ if hasattr(model, 'classes_') else sorted(y_test_final.unique())
cm = confusion_matrix(y_test_final, y_pred_final, labels=labels)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
disp.plot(cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.show()
except Exception as e:
print(f"Could not plot Confusion Matrix: {e}")
# ROC Curve
if hasattr(model, "predict_proba"):
try:
y_pred_proba_roc = model.predict_proba(X_test_processed)
# Check if it's binary classification
if len(model.classes_) == 2:
# roc_curve is for binary classification
# Need probabilities of the positive class (usually the one at index 1 in model.classes_)
# And the corresponding positive label value
positive_class_label = model.classes_[1] # Assume class at index 1 is the positive class for ROC plot
fpr, tpr, _ = roc_curve(y_test_final, y_pred_proba_roc[:, 1], pos_label=positive_class_label) # Specify positive label
roc_auc_val = auc(fpr, tpr)
plt.figure()
plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc_val:.2f})')
plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver Operating Characteristic (ROC) Curve (Binary)')
plt.legend(loc="lower right")
plt.show()
else:
print("ROC curve plot for multi-class scenario requires more specific setup (e.g., one-vs-rest plot). This basic plot is for binary classification.")
# Optional: Plot OVR ROC curves for multi-class if needed
# from sklearn.preprocessing import label_binarize
# y_test_bin = label_binarize(y_test_final, classes=model.classes_)
# n_classes = y_test_bin.shape[1]
# plt.figure(figsize=(8, 6))
# for i in range(n_classes):
# fpr, tpr, _ = roc_curve(y_test_bin[:, i], y_pred_proba_roc[:, i])
# roc_auc_val = auc(fpr, tpr)
# plt.plot(fpr, tpr, lw=2, label=f'ROC curve class {model.classes_[i]} (area = {roc_auc_val:.2f})')
# plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
# plt.xlabel('False Positive Rate')
# plt.ylabel('True Positive Rate')
# plt.title('Receiver Operating Characteristic (ROC) Curve (One-vs-Rest)')
# plt.legend(loc="lower right")
# plt.show()
except Exception as e:
print(f"Could not plot ROC Curve: {e}")
print("Ensure y_test_final and y_pred_proba_roc are compatible for roc_curve.")
else:
print("ROC Curve cannot be plotted as the model does not have a 'predict_proba' method.")
# Step 15: Save Model
model_filename = 'trained_model.joblib'
try:
joblib.dump(model, model_filename)
print(f"\nModel successfully saved to {model_filename}")
print(f"To load the model later, use: loaded_model = joblib.load('{model_filename}')")
except Exception as e:
print(f"Could not save the model: {e}")
else:
print("\nSkipping model training, evaluation, and saving due to issues with target variable setup for classification.")