In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import json
import yaml
from typing import Dict, List, Tuple, Optional
import warnings
import time
from collections import defaultdict


# Deep Learning Libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from sklearn.model_selection import train_test_split, KFold, StratifiedKFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
import albumentations as A
from albumentations.pytorch import ToTensorV2

# YOLOv8 imports
try:
    from ultralytics import YOLO
    from ultralytics.utils.plotting import Annotator
    from ultralytics.utils.metrics import box_iou
except ImportError:
    print("Installing ultralytics...")
    os.system("pip install ultralytics")
    from ultralytics import YOLO
    from ultralytics.utils.plotting import Annotator
    from ultralytics.utils.metrics import box_iou

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

print("Environment Setup Complete!")
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name()}")

class TurtleDatasetHandler:    
    def __init__(self, data_path: str, num_classes: int = 30):
        self.data_path = Path(data_path)
        self.num_classes = num_classes
        self.images = []
        self.labels = []
        
    def prepare_dataset(self):
        # Assuming YOLO format dataset structure
        images_path = self.data_path / "images"
        labels_path = self.data_path / "labels"
        
        if not images_path.exists() or not labels_path.exists():
            print("Creating sample dataset structure...")
            self._create_sample_structure()
            
        # Load image and label paths
        for img_file in images_path.glob("*.jpg"):
            label_file = labels_path / f"{img_file.stem}.txt"
            if label_file.exists():
                self.images.append(str(img_file))
                self.labels.append(str(label_file))
        
        print(f"Found {len(self.images)} images with labels")
        return self.images, self.labels
    
    def _create_sample_structure(self):
        os.makedirs(self.data_path / "images", exist_ok=True)
        os.makedirs(self.data_path / "labels", exist_ok=True)
        print("Sample dataset structure created")
    
    def create_kfold_splits(self, k_folds: int = 5):
        kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)
        splits = []
        
        indices = np.arange(len(self.images))
        for fold, (train_idx, val_idx) in enumerate(kfold.split(indices)):
            train_images = [self.images[i] for i in train_idx]
            val_images = [self.images[i] for i in val_idx]
            splits.append({
                'fold': fold + 1,
                'train': train_images,
                'val': val_images
            })
        
        return splits

class DirectionalWeightYOLOv8:    
    def __init__(self, model_size='n', num_classes=30, directional_weight_map=0.75):
        self.model_size = model_size
        self.num_classes = num_classes
        self.directional_weight_map = directional_weight_map
        self.model = None
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.training_results = []
        
        # Training parameters from paper
        self.training_params = {
            'epochs': 100,
            'batch': 16,
            'imgsz': 640,
            'optimizer': 'SGD',
            'lr0': 0.01,
            'degrees': 10,
            'translate': 0.1,
            'scale': 0.5,
            'shear': 2,
            'momentum': 0.937,
            'weight_decay': 0.01
        }
        
        self.initialize_model()
    
    def initialize_model(self):
        model_name = f'yolov8{self.model_size}.pt'
        self.model = YOLO(model_name)
        print(f"Initialized YOLOv8{self.model_size} with {self.num_classes} classes")
    
    def create_yaml_config(self, train_images, val_images, fold_num):
        config = {
            'path': str(Path.cwd() / 'turtle_dataset'),
            'train': train_images,
            'val': val_images,
            'nc': self.num_classes,
            'names': [f'turtle_{i}' for i in range(self.num_classes)]
        }
        
        config_path = f'turtle_config_fold_{fold_num}.yaml'
        with open(config_path, 'w') as f:
            yaml.dump(config, f)
        
        return config_path
    
    def calculate_directional_weights(self, pattern_type='plastron'):
        weight_configs = {
            'plastron': {
                'spatial': 1.2,
                'confidence': 1.0,
                'pattern': 1.3,
                'box': 1.1
            },
            'nasal': {
                'spatial': 1.0,
                'confidence': 1.1,
                'pattern': 1.0,
                'box': 1.0
            },
            'infraorbital': {
                'spatial': 1.1,
                'confidence': 1.0,
                'pattern': 1.1,
                'box': 1.05
            }
        }
        
        return weight_configs.get(pattern_type, {
            'spatial': 1.0,
            'confidence': 1.0,
            'pattern': 1.0,
            'box': 1.0
        })
    
    def train_fold(self, config_path, fold_num, pattern_type='plastron'):
        print(f"\n{'='*50}")
        print(f"Training Fold {fold_num}")
        print(f"{'='*50}")
        
        # Apply directional weights
        weights = self.calculate_directional_weights(pattern_type)
        
        # Custom training parameters with directional weighting
        train_params = {
            **self.training_params,
            'data': config_path,
            'name': f'turtle_fold_{fold_num}',
            'save_dir': f'runs/detect/fold_{fold_num}',
            # Apply directional weights to loss components
            'box': weights['box'],
            'cls': weights['pattern'],
            'dfl': weights['spatial']
        }
        
        start_time = time.time()
        
        # Train model
        results = self.model.train(**train_params)
        
        training_time = time.time() - start_time
        
        # Store results
        fold_results = {
            'fold': fold_num,
            'pattern_type': pattern_type,
            'training_time': training_time,
            'results': results,
            'weights': weights
        }
        
        self.training_results.append(fold_results)
        
        return results
    
    def validate_fold(self, config_path, fold_num):
        print(f"\nValidating Fold {fold_num}...")
        
        # Load the best model from training
        model_path = f'runs/detect/fold_{fold_num}/weights/best.pt'
        if os.path.exists(model_path):
            val_model = YOLO(model_path)
        else:
            val_model = self.model
        
        # Validate
        val_results = val_model.val(data=config_path, imgsz=640, batch=16)
        
        return val_results
    
    def extract_metrics(self, results):
        metrics = {}
        
        if hasattr(results, 'box'):
            metrics['mAP50'] = float(results.box.map50) if results.box.map50 is not None else 0.0
            metrics['mAP50-95'] = float(results.box.map) if results.box.map is not None else 0.0
            metrics['precision'] = float(results.box.mp) if results.box.mp is not None else 0.0
            metrics['recall'] = float(results.box.mr) if results.box.mr is not None else 0.0
        else:
            # Fallback values
            metrics = {
                'mAP50': 0.0,
                'mAP50-95': 0.0,
                'precision': 0.0,
                'recall': 0.0
            }
        
        # Calculate F1 score
        if metrics['precision'] > 0 and metrics['recall'] > 0:
            metrics['f1_score'] = 2 * (metrics['precision'] * metrics['recall']) / (metrics['precision'] + metrics['recall'])
        else:
            metrics['f1_score'] = 0.0
        
        return metrics

class PerformanceAnalyzer:
    
    def __init__(self):
        self.results = defaultdict(list)
        self.comparison_data = []
    
    def add_fold_results(self, fold_num, metrics, model_type='DirectionalYOLO'):
        """Add results from a fold"""
        self.results[model_type].append({
            'fold': fold_num,
            **metrics
        })
    
    def calculate_statistics(self, model_type='DirectionalYOLO'):
        """Calculate mean and standard deviation for metrics"""
        if not self.results[model_type]:
            return {}
        
        metrics = ['mAP50', 'mAP50-95', 'precision', 'recall', 'f1_score']
        stats = {}
        
        for metric in metrics:
            values = [fold[metric] for fold in self.results[model_type]]
            stats[f'{metric}_mean'] = np.mean(values)
            stats[f'{metric}_std'] = np.std(values)
        
        return stats
    
    def compare_models(self, baseline_type='StandardYOLO', enhanced_type='DirectionalYOLO'):
        """Compare baseline vs enhanced model"""
        baseline_stats = self.calculate_statistics(baseline_type)
        enhanced_stats = self.calculate_statistics(enhanced_type)
        
        comparison = {}
        metrics = ['mAP50', 'mAP50-95', 'precision', 'recall', 'f1_score']
        
        for metric in metrics:
            baseline_mean = baseline_stats.get(f'{metric}_mean', 0)
            enhanced_mean = enhanced_stats.get(f'{metric}_mean', 0)
            
            improvement = enhanced_mean - baseline_mean
            improvement_pct = (improvement / baseline_mean * 100) if baseline_mean > 0 else 0
            
            comparison[metric] = {
                'baseline': baseline_mean,
                'enhanced': enhanced_mean,
                'improvement': improvement,
                'improvement_pct': improvement_pct
            }
        
        return comparison
    
    def plot_results(self, save_path='performance_analysis.png'):
        fig, axes = plt.subplots(2, 3, figsize=(18, 12))
        axes = axes.flatten()
        
        metrics = [ 'precision', 'recall', 'accuracy', 'mAP50-95' ]
        
        for i, metric in enumerate(metrics):
            ax = axes[i]
            
            # Prepare data for plotting
            model_types = list(self.results.keys())
            data_for_plot = []
            labels = []
            
            for model_type in model_types:
                values = [fold[metric] for fold in self.results[model_type]]
                data_for_plot.append(values)
                labels.append(model_type)
            
            # Box plot
            bp = ax.boxplot(data_for_plot, labels=labels, patch_artist=True)
            
            # Customize colors
            colors = ['lightblue', 'lightcoral', 'lightgreen']
            for patch, color in zip(bp['boxes'], colors[:len(bp['boxes'])]):
                patch.set_facecolor(color)
            
            ax.set_title(f'{metric.upper()} Comparison')
            ax.set_ylabel(metric.upper())
            ax.grid(True, alpha=0.3)
        
        # Remove empty subplot
        if len(metrics) < len(axes):
            axes[-1].remove()
        
        plt.tight_layout()
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.show()
    
    def generate_report(self):
        """Generate comprehensive performance report"""
        report = "\n" + "="*80 + "\n"
        report += "KHORAT TURTLE IDENTIFICATION - PERFORMANCE REPORT\n"
        report += "="*80 + "\n"
        
        for model_type in self.results.keys():
            stats = self.calculate_statistics(model_type)
            
            report += f"\n{model_type} Results:\n"
            report += "-" * 40 + "\n"
            
            metrics = [ 'precision', 'recall', 'accuracy', 'mAP50-95' ]
            for metric in metrics:
                mean_val = stats.get(f'{metric}_mean', 0)
                std_val = stats.get(f'{metric}_std', 0)
                report += f"{metric.upper():<12}: {mean_val:.4f} Â± {std_val:.4f}\n"
        
        # Add comparison if multiple models
        if len(self.results) > 1:
            model_types = list(self.results.keys())
            if len(model_types) >= 2:
                comparison = self.compare_models(model_types[0], model_types[1])
                
                report += f"\nModel Comparison ({model_types[1]} vs {model_types[0]}):\n"
                report += "-" * 50 + "\n"
                
                for metric, comp in comparison.items():
                    report += f"{metric.upper():<12}: {comp['improvement']:+.4f} ({comp['improvement_pct']:+.2f}%)\n"
        
        report += "\n" + "="*80 + "\n"
        
        return report

def run_kfold_experiment(data_path='./turtle_dataset', k_folds=5):
    """Run complete K-fold cross-validation experiment"""
    
    print("Starting K-Fold Cross-Validation Experiment")
    print("="*60)
    
    # Initialize components
    dataset_handler = TurtleDatasetHandler(data_path)
    analyzer = PerformanceAnalyzer()
    
    # Prepare dataset
    images, labels = dataset_handler.prepare_dataset()
    if len(images) == 0:
        print("Warning: No images found. Creating synthetic results for demonstration.")
        # Generate synthetic results for demonstration
        return generate_synthetic_results(analyzer)
    
    # Create K-fold splits
    splits = dataset_handler.create_kfold_splits(k_folds)
    
    # Run experiments for both standard and directional YOLOv8
    model_configs = [
        {'name': 'non Directional YOLO', 'use_directional': False},
        {'name': 'DirectionalYOLO', 'use_directional': True}
    ]
    
    for config in model_configs:
        print(f"\nRunning {config['name']} experiment...")
        
        for split in splits:
            fold_num = split['fold']
            
            # Initialize model
            model = DirectionalWeightYOLOv8(model_size='n', num_classes=30)
            
            # Create config for this fold
            config_path = model.create_yaml_config(
                split['train'], split['val'], fold_num
            )
            
            try:
                # Train
                if config['use_directional']:
                    train_results = model.train_fold(config_path, fold_num, 'plastron')
                else:
                    # Standard training without directional weights
                    train_results = model.train_fold(config_path, fold_num, 'standard')
                
                # Validate
                val_results = model.validate_fold(config_path, fold_num)
                
                # Extract metrics
                metrics = model.extract_metrics(val_results)
                
                # Add to analyzer
                analyzer.add_fold_results(fold_num, metrics, config['name'])
                
                print(f"Fold {fold_num} completed - mAP50: {metrics['mAP50']:.4f}")
                
            except Exception as e:
                print(f"Error in fold {fold_num}: {e}")
                # Add default metrics for failed fold
                default_metrics = {
                    'mAP50': 0.0, 'mAP50-95': 0.0, 'precision': 0.0, 
                    'recall': 0.0, 'f1_score': 0.0
                }
                analyzer.add_fold_results(fold_num, default_metrics, config['name'])
    
    return analyzer


if __name__ == "__main__":

    analyzer = run_kfold_experiment(data_path='./turtle_dataset', k_folds=5)
    
    report = analyzer.generate_report()
    print(report)
    
    # Save detailed results to CSV
    all_results = []
    for model_type, results in analyzer.results.items():
        for result in results:
            result['model_type'] = model_type
            all_results.append(result)
    
    df_results = pd.DataFrame(all_results)
    df_results.to_csv('turtle_kfold_results.csv', index=False)