In [None]:
import os
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import json
import yaml
from typing import Dict, List, Tuple, Optional
import warnings
import time
from collections import defaultdict
import shutil
warnings.filterwarnings('ignore')

# Deep Learning Libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
from sklearn.model_selection import train_test_split, KFold, StratifiedKFold
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
import albumentations as A
from albumentations.pytorch import ToTensorV2

# YOLOv5 imports
try:
    import yolov5
    print("YOLOv5 already installed")
except ImportError:
    print("Installing YOLOv5...")
    os.system("pip install yolov5")
    import yolov5

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

print("Environment Setup Complete!")
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name()}")

class TurtleDatasetHandler:
    
    def __init__(self, data_path: str, num_classes: int = 30):
        self.data_path = Path(data_path)
        self.num_classes = num_classes
        self.images = []
        self.labels = []
        
    def prepare_dataset(self):
        # Assuming YOLO format dataset structure
        images_path = self.data_path / "images"
        labels_path = self.data_path / "labels"
        
        if not images_path.exists() or not labels_path.exists():
            print("Creating sample dataset structure...")
            self._create_sample_structure()
            
        # Load image and label paths
        for img_file in images_path.glob("*.jpg"):
            label_file = labels_path / f"{img_file.stem}.txt"
            if label_file.exists():
                self.images.append(str(img_file))
                self.labels.append(str(label_file))
        
        print(f"Found {len(self.images)} images with labels")
        return self.images, self.labels
    
    def _create_sample_structure(self):
        os.makedirs(self.data_path / "images", exist_ok=True)
        os.makedirs(self.data_path / "labels", exist_ok=True)
        print("Sample dataset structure created")
    
    def create_kfold_splits(self, k_folds: int = 5):
        kfold = KFold(n_splits=k_folds, shuffle=True, random_state=42)
        splits = []
        
        indices = np.arange(len(self.images))
        for fold, (train_idx, val_idx) in enumerate(kfold.split(indices)):
            train_images = [self.images[i] for i in train_idx]
            val_images = [self.images[i] for i in val_idx]
            splits.append({
                'fold': fold + 1,
                'train': train_images,
                'val': val_images
            })
        
        return splits

class DirectionalWeightYOLOv5:
    
    def __init__(self, model_size='s', num_classes=30, directional_weight_map=0.75):
        self.model_size = model_size
        self.num_classes = num_classes
        self.directional_weight_map = directional_weight_map
        self.model = None
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.training_results = []
        
        # Training parameters from paper
        self.training_params = {
            'epochs': 100,
            'batch_size': 16,
            'img_size': 640,
            'lr0': 0.01,
            'degrees': 10,
            'translate': 0.1,
            'scale': 0.5,
            'shear': 2,
            'momentum': 0.937,
            'weight_decay': 0.01,
            'workers': 8,
            'project': 'runs/train',
            'name': 'exp',
            'exist_ok': True
        }
        
        self.initialize_model()
    
    def initialize_model(self):
        # Load YOLOv5 model
        model_name = f'yolov5{self.model_size}'
        self.model = yolov5.load(model_name, pretrained=True)
        self.model.model.nc = self.num_classes  # Set number of classes
        print(f"Initialized YOLOv5{self.model_size} with {self.num_classes} classes")
    
    def create_yaml_config(self, train_images, val_images, fold_num):
        
        # Create fold-specific directories
        fold_dir = Path(f'fold_{fold_num}')
        fold_dir.mkdir(exist_ok=True)
        
        train_dir = fold_dir / 'train'
        val_dir = fold_dir / 'val'
        train_dir.mkdir(exist_ok=True)
        val_dir.mkdir(exist_ok=True)
        
        (train_dir / 'images').mkdir(exist_ok=True)
        (train_dir / 'labels').mkdir(exist_ok=True)
        (val_dir / 'images').mkdir(exist_ok=True)
        (val_dir / 'labels').mkdir(exist_ok=True)
        
        config = {
            'path': str(fold_dir.absolute()),
            'train': 'train/images',
            'val': 'val/images',
            'nc': self.num_classes,
            'names': [f'turtle_{i}' for i in range(self.num_classes)]
        }
        
        config_path = f'turtle_config_fold_{fold_num}.yaml'
        with open(config_path, 'w') as f:
            yaml.dump(config, f)
        
        return config_path
    
    def calculate_directional_weights(self, pattern_type='plastron'):
        """Calculate directional weights based on turtle pattern features"""
        weight_configs = {
            'plastron': {
                'box': 1.2,      # Bounding box loss weight
                'cls': 1.3,      # Classification loss weight  
                'obj': 1.0,      # Objectness loss weight
                'anchor': 1.1    # Anchor matching weight
            },
            'nasal': {
                'box': 1.0,
                'cls': 1.0,
                'obj': 1.1,
                'anchor': 1.0
            },
            'infraorbital': {
                'box': 1.1,
                'cls': 1.1,
                'obj': 1.0,
                'anchor': 1.05
            },
            'standard': {
                'box': 1.0,
                'cls': 1.0,
                'obj': 1.0,
                'anchor': 1.0
            }
        }
        
        return weight_configs.get(pattern_type, weight_configs['standard'])
    
    def train_fold(self, config_path, fold_num, pattern_type='plastron'):
        print(f"\n{'='*50}")
        print(f"Training Fold {fold_num} - {pattern_type}")
        print(f"{'='*50}")
        
        # Apply directional weights
        weights = self.calculate_directional_weights(pattern_type)
        
        # Prepare training arguments
        train_args = {
            'data': config_path,
            'epochs': self.training_params['epochs'],
            'batch_size': self.training_params['batch_size'],
            'imgsz': self.training_params['img_size'],
            'lr0': self.training_params['lr0'],
            'momentum': self.training_params['momentum'],
            'weight_decay': self.training_params['weight_decay'],
            'degrees': self.training_params['degrees'],
            'translate': self.training_params['translate'],
            'scale': self.training_params['scale'],
            'shear': self.training_params['shear'],
            'project': f'runs/train/fold_{fold_num}',
            'name': f'{pattern_type}_exp',
            'exist_ok': True,
            'workers': 8,
            'device': self.device,
            # Apply directional weights
            'box': weights['box'],
            'cls': weights['cls'],
            'obj': weights['obj']
        }
        
        start_time = time.time()
        
        try:
            # Train model using YOLOv5
            results = self.model.train(**train_args)
            training_time = time.time() - start_time
            
            # Store results
            fold_results = {
                'fold': fold_num,
                'pattern_type': pattern_type,
                'training_time': training_time,
                'results': results,
                'weights': weights
            }
            
            self.training_results.append(fold_results)
            
            return results
            
        except Exception as e:
            print(f"Training error: {e}")
            training_time = time.time() - start_time
            
            # Create dummy results for failed training
            fold_results = {
                'fold': fold_num,
                'pattern_type': pattern_type,
                'training_time': training_time,
                'results': None,
                'weights': weights
            }
            
            self.training_results.append(fold_results)
            return None
    
    def validate_fold(self, config_path, fold_num, pattern_type='plastron'):
        """Validate model for a specific fold"""
        print(f"\nValidating Fold {fold_num}...")
        
        try:
            # Load the best model from training
            model_path = f'runs/train/fold_{fold_num}/{pattern_type}_exp/weights/best.pt'
            
            if os.path.exists(model_path):
                # Load trained model
                val_model = yolov5.load(model_path)
            else:
                # Use current model if no trained model found
                val_model = self.model
            
            # Validate using YOLOv5
            val_results = val_model.val(
                data=config_path,
                batch_size=16,
                imgsz=640,
                conf_thres=0.001,
                iou_thres=0.6
            )
            
            return val_results
            
        except Exception as e:
            print(f"Validation error: {e}")
            return None
    
    def extract_metrics(self, results):
        """Extract key metrics from validation results"""
        metrics = {
            'mAP50': 0.0,
            'mAP50-95': 0.0,
            'precision': 0.0,
            'recall': 0.0,
            'f1_score': 0.0,
            'accuracy': 0.0
        }
        
        if results is not None:
            try:
                # Extract metrics from YOLOv5 results
                if hasattr(results, 'maps'):
                    metrics['mAP50'] = float(results.maps[0]) if len(results.maps) > 0 else 0.0
                    metrics['mAP50-95'] = float(np.mean(results.maps)) if len(results.maps) > 0 else 0.0
                
                if hasattr(results, 'mp'):
                    metrics['precision'] = float(results.mp)
                
                if hasattr(results, 'mr'):
                    metrics['recall'] = float(results.mr)
                
                # Calculate F1 score and accuracy
                if metrics['precision'] > 0 and metrics['recall'] > 0:
                    metrics['f1_score'] = 2 * (metrics['precision'] * metrics['recall']) / (metrics['precision'] + metrics['recall'])
                
                # Approximate accuracy from precision and recall
                metrics['accuracy'] = (metrics['precision'] + metrics['recall']) / 2
                
            except Exception as e:
                print(f"Error extracting metrics: {e}")
        
        return metrics

class PerformanceAnalyzer:
    """Analyze and compare model performance across folds"""
    
    def __init__(self):
        self.results = defaultdict(list)
        self.comparison_data = []
    
    def add_fold_results(self, fold_num, metrics, model_type='DirectionalYOLO'):
        """Add results from a fold"""
        self.results[model_type].append({
            'fold': fold_num,
            **metrics
        })
    
    def calculate_statistics(self, model_type='DirectionalYOLO'):
        """Calculate mean and standard deviation for metrics"""
        if not self.results[model_type]:
            return {}
        
        metrics = ['precision', 'recall', 'accuracy', 'mAP50-95']
        stats = {}
        
        for metric in metrics:
            values = [fold[metric] for fold in self.results[model_type]]
            stats[f'{metric}_mean'] = np.mean(values)
            stats[f'{metric}_std'] = np.std(values)
        
        return stats
    
    def compare_models(self, baseline_type='non Directional YOLO', enhanced_type='DirectionalYOLO'):
        """Compare baseline vs enhanced model"""
        baseline_stats = self.calculate_statistics(baseline_type)
        enhanced_stats = self.calculate_statistics(enhanced_type)
        
        comparison = {}
        metrics = ['precision', 'recall', 'accuracy', 'mAP50-95']
        
        for metric in metrics:
            baseline_mean = baseline_stats.get(f'{metric}_mean', 0)
            enhanced_mean = enhanced_stats.get(f'{metric}_mean', 0)
            
            improvement = enhanced_mean - baseline_mean
            improvement_pct = (improvement / baseline_mean * 100) if baseline_mean > 0 else 0
            
            comparison[metric] = {
                'baseline': baseline_mean,
                'enhanced': enhanced_mean,
                'improvement': improvement,
                'improvement_pct': improvement_pct
            }
        
        return comparison
    
    def plot_results(self, save_path='performance_analysis.png'):
        """Plot performance comparison"""
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        axes = axes.flatten()
        
        metrics = ['precision', 'recall', 'accuracy', 'mAP50-95']
        
        for i, metric in enumerate(metrics):
            ax = axes[i]
            
            # Prepare data for plotting
            model_types = list(self.results.keys())
            data_for_plot = []
            labels = []
            
            for model_type in model_types:
                values = [fold[metric] for fold in self.results[model_type]]
                data_for_plot.append(values)
                labels.append(model_type)
            
            # Box plot
            bp = ax.boxplot(data_for_plot, labels=labels, patch_artist=True)
            
            # Customize colors
            colors = ['lightblue', 'lightcoral', 'lightgreen']
            for patch, color in zip(bp['boxes'], colors[:len(bp['boxes'])]):
                patch.set_facecolor(color)
            
            ax.set_title(f'{metric.upper()} Comparison')
            ax.set_ylabel(metric.upper())
            ax.grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.savefig(save_path, dpi=300, bbox_inches='tight')
        plt.show()
    
    def generate_report(self):
        """Generate comprehensive performance report"""
        report = "\n" + "="*80 + "\n"
        report += "KHORAT TURTLE IDENTIFICATION - PERFORMANCE REPORT (YOLOv5)\n"
        report += "="*80 + "\n"
        
        for model_type in self.results.keys():
            stats = self.calculate_statistics(model_type)
            
            report += f"\n{model_type} Results:\n"
            report += "-" * 40 + "\n"
            
            metrics = ['precision', 'recall', 'accuracy', 'mAP50-95']
            for metric in metrics:
                mean_val = stats.get(f'{metric}_mean', 0)
                std_val = stats.get(f'{metric}_std', 0)
                report += f"{metric.upper():<12}: {mean_val:.4f} Â± {std_val:.4f}\n"
        
        # Add comparison if multiple models
        if len(self.results) > 1:
            model_types = list(self.results.keys())
            if len(model_types) >= 2:
                comparison = self.compare_models(model_types[0], model_types[1])
                
                report += f"\nModel Comparison ({model_types[1]} vs {model_types[0]}):\n"
                report += "-" * 50 + "\n"
                
                for metric, comp in comparison.items():
                    report += f"{metric.upper():<12}: {comp['improvement']:+.4f} ({comp['improvement_pct']:+.2f}%)\n"
        
        report += "\n" + "="*80 + "\n"
        
        return report

def run_kfold_experiment(data_path='./turtle_dataset', k_folds=5):
    """Run complete K-fold cross-validation experiment"""
    
    print("Starting K-Fold Cross-Validation Experiment")
    print("="*60)
    
    # Initialize components
    dataset_handler = TurtleDatasetHandler(data_path)
    analyzer = PerformanceAnalyzer()
    
    # Prepare dataset
    images, labels = dataset_handler.prepare_dataset()
    
    # Create K-fold splits
    splits = dataset_handler.create_kfold_splits(k_folds)
    
    # Run experiments for both standard and directional YOLOv8
    model_configs = [
        {'name': 'non Directional YOLO', 'use_directional': False},
        {'name': 'DirectionalYOLO', 'use_directional': True}
    ]
    
    for config in model_configs:
        print(f"\nRunning {config['name']} experiment...")
        
        for split in splits:
            fold_num = split['fold']
            
            # Initialize model
            model = DirectionalWeightYOLOv8(model_size='n', num_classes=30)
            
            # Create config for this fold
            config_path = model.create_yaml_config(
                split['train'], split['val'], fold_num
            )
            
            try:
                # Train
                if config['use_directional']:
                    train_results = model.train_fold(config_path, fold_num, 'plastron')
                else:
                    # Standard training without directional weights
                    train_results = model.train_fold(config_path, fold_num, 'standard')
                
                # Validate
                val_results = model.validate_fold(config_path, fold_num)
                
                # Extract metrics
                metrics = model.extract_metrics(val_results)
                
                # Add to analyzer
                analyzer.add_fold_results(fold_num, metrics, config['name'])
                
                print(f"Fold {fold_num} completed - mAP50: {metrics['mAP50']:.4f}")
                
            except Exception as e:
                print(f"Error in fold {fold_num}: {e}")
                # Add default metrics for failed fold
                default_metrics = {
                    'mAP50': 0.0, 'mAP50-95': 0.0, 'precision': 0.0, 
                    'recall': 0.0, 'f1_score': 0.0
                }
                analyzer.add_fold_results(fold_num, default_metrics, config['name'])
    
    return analyzer


if __name__ == "__main__":

    analyzer = run_kfold_experiment(data_path='./turtle_dataset', k_folds=5)
    
    report = analyzer.generate_report()
    print(report)
    
    # Save detailed results to CSV
    all_results = []
    for model_type, results in analyzer.results.items():
        for result in results:
            result['model_type'] = model_type
            all_results.append(result)
    
    df_results = pd.DataFrame(all_results)
    df_results.to_csv('turtle_kfold_results.csv', index=False)