Source code for qward.metrics.circuit_performance

"""
Circuit performance metrics implementation for QWARD.

This module provides the CircuitPerformanceMetrics class for analyzing the performance of
quantum circuits based on job execution results. It supports both single job
and multiple job analysis with customizable success criteria.

Supports multiple job types:
- AerJob: Qiskit Aer simulator jobs
- QiskitJob (JobV1): Traditional Qiskit jobs
- RuntimeJobV2: IBM Quantum Runtime V2 primitive jobs (SamplerV2, EstimatorV2)

The performance metrics help evaluate the success rate and statistical
properties of quantum circuit executions.
"""

from typing import Any, Callable, Dict, List, Optional, Union

import numpy as np
from qiskit import QuantumCircuit
from qiskit_aer import AerJob
from qiskit_ibm_runtime import RuntimeJobV2

from qward.metrics.base_metric import MetricCalculator
from qward.metrics.differential_success_rate import (
    compute_dsr_log_ratio,
    compute_dsr_normalized_margin,
    compute_dsr_ratio,
    compute_dsr_with_flags,
)
from qward.metrics.types import MetricsType, MetricsId

# Import schemas for structured data validation
try:
    from qward.schemas.circuit_performance_schema import (
        CircuitPerformanceSchema,
        DSRVariantsSchema,
        StatisticalMetricsSchema,
        SuccessMetricsSchema,
    )

    SCHEMAS_AVAILABLE = True
except ImportError:
    SCHEMAS_AVAILABLE = False

# Type alias for job types - include RuntimeJobV2
JobType = Union[AerJob, RuntimeJobV2]


[docs] class CircuitPerformanceMetrics(MetricCalculator): """ Calculate circuit performance metrics for quantum circuits. This class provides methods for analyzing the performance of quantum circuits based on job execution results. It supports both single job and multiple job analysis with customizable success criteria. Supports multiple job types: - AerJob: Qiskit Aer simulator jobs - QiskitJob (JobV1): Traditional Qiskit jobs - RuntimeJobV2: IBM Quantum Runtime V2 primitive jobs (SamplerV2, EstimatorV2) The performance metrics include: - Success metrics: Success rate, error rate, successful shots analysis - Statistical metrics: Entropy, uniformity, concentration analysis Example: # Basic usage with default success criteria (ground state) performance = CircuitPerformanceMetrics(circuit, job=job) metrics = performance.get_metrics() # With RuntimeJobV2 from SamplerV2 from qiskit_ibm_runtime import SamplerV2 as Sampler sampler = Sampler(backend) job = sampler.run([circuit]) # Returns RuntimeJobV2 performance = CircuitPerformanceMetrics(circuit, job=job) success_metrics = performance.get_success_metrics() # With custom success criteria def custom_success(state): return state == "101" performance = CircuitPerformanceMetrics( circuit, job=job, success_criteria=custom_success ) success_metrics = performance.get_success_metrics() """ def __init__( self, circuit: QuantumCircuit, *, job: Optional[JobType] = None, jobs: Optional[List[JobType]] = None, success_criteria: Optional[Callable[[str], bool]] = None, expected_outcomes: Optional[List[str]] = None, ): """ Initialize a CircuitPerformanceMetrics object. Args: circuit: The quantum circuit to analyze job: A single job that executed the circuit jobs: A list of jobs that executed the circuit (for multiple runs) success_criteria: Function that determines if a measurement result is successful expected_outcomes: Expected bitstrings for DSR histogram contrast calculations """ super().__init__(circuit) self._job = job self._jobs = jobs or [] if job and not self._jobs: self._jobs = [job] self.success_criteria = success_criteria or self._default_success_criteria() self._expected_outcomes = list(expected_outcomes) if expected_outcomes else None def _get_metric_type(self) -> MetricsType: """Get the type of this metric.""" return MetricsType.POST_RUNTIME def _get_metric_id(self) -> MetricsId: """Get the ID of this metric.""" return MetricsId.CIRCUIT_PERFORMANCE
[docs] def is_ready(self) -> bool: """Check if the metric is ready to be calculated.""" return self.circuit is not None and (self._job is not None or len(self._jobs) > 0)
def _ensure_schemas_available(self) -> None: """Ensure Pydantic schemas are available, raise ImportError if not.""" if not SCHEMAS_AVAILABLE: raise ImportError( "Pydantic schemas are not available. Install pydantic to use structured metrics." ) def _default_success_criteria(self) -> Callable[[str], bool]: """ Define the default success criteria for the circuit. By default, considers the ground state (all zeros) as success. Handles measurement results with spaces and classical bit information. Returns: Callable[[str], bool]: Function that takes a measurement result and returns True if successful """ def is_ground_state(result: str) -> bool: # Remove all spaces to get clean bit string clean_result = result.replace(" ", "") # Check if all bits are zero return all(bit == "0" for bit in clean_result) return is_ground_state def _extract_job_id(self, job: JobType) -> str: """Extract job ID from a job object.""" # Try different job ID extraction methods if hasattr(job, "job_id"): return job.job_id() if callable(job.job_id) else job.job_id elif hasattr(job, "id"): return job.id if callable(job.id) else job.id else: return "unknown" def _extract_counts(self, job: JobType) -> Dict[str, int]: """ Extract counts from a job result, handling different job types. Args: job: The job object to extract counts from Returns: Dict[str, int]: Dictionary of measurement outcomes and their counts Raises: ValueError: If counts cannot be extracted from the job result """ try: # Try qBraid direct result extraction qbraid_counts = self._try_qbraid_extraction(job) if qbraid_counts is not None: return qbraid_counts # Try RuntimeJobV2 extraction if isinstance(job, RuntimeJobV2): return self._extract_runtime_v2_counts(job) # Try traditional job extraction return self._extract_traditional_counts(job) except Exception as e: raise ValueError( f"Failed to extract counts from job {self._extract_job_id(job)}: {str(e)}" ) from e def _try_qbraid_extraction(self, job: JobType) -> Optional[Dict[str, int]]: """Try to extract counts using qBraid patterns.""" # Check if this is a qBraid Result object directly if hasattr(job, "data") and hasattr(job.data, "get_counts"): return job.data.get_counts() # Check if this is a qBraid job with result() method if hasattr(job, "result"): try: result = job.result() if hasattr(result, "data") and hasattr(result.data, "get_counts"): return result.data.get_counts() except Exception: pass return None def _extract_runtime_v2_counts(self, job: RuntimeJobV2) -> Dict[str, int]: """Extract counts from RuntimeJobV2 (V2 primitives).""" result = job.result() if len(result) == 0: return {} pub_result = result[0] # Get first (and typically only) PUB result # Try common classical register names first counts = self._try_common_register_names(pub_result) if counts is not None: return counts # Try all available data attributes return self._try_all_data_attributes(pub_result) def _try_common_register_names(self, pub_result) -> Optional[Dict[str, int]]: """Try common classical register names.""" common_names = ["meas", "c", "cr", "classical"] for name in common_names: if hasattr(pub_result.data, name): register_data = getattr(pub_result.data, name) if hasattr(register_data, "get_counts"): return register_data.get_counts() return None def _try_all_data_attributes(self, pub_result) -> Dict[str, int]: """Try all available data attributes to find counts.""" data_attrs = [attr for attr in dir(pub_result.data) if not attr.startswith("_")] for attr in data_attrs: try: register_data = getattr(pub_result.data, attr) if hasattr(register_data, "get_counts"): return register_data.get_counts() except (AttributeError, TypeError): continue # If no classical register found, return empty dict return {} def _extract_traditional_counts(self, job: JobType) -> Dict[str, int]: """Extract counts from traditional job types (V1 primitives, AerJob, etc.).""" result = job.result() if hasattr(result, "get_counts"): return result.get_counts() else: raise ValueError(f"Job result of type {type(result)} does not have get_counts method") # ============================================================================= # Main API Methods # =============================================================================
[docs] def get_metrics(self) -> "CircuitPerformanceSchema": """ Get all performance metrics as a structured, validated schema object. Returns: CircuitPerformanceSchema: Complete validated performance metrics schema Raises: ImportError: If Pydantic schemas are not available ValidationError: If metrics data doesn't match schema constraints """ self._ensure_schemas_available() return CircuitPerformanceSchema( success_metrics=self.get_success_metrics(), statistical_metrics=self.get_statistical_metrics(), dsr_metrics=self.get_dsr_metrics() if self._expected_outcomes else None, )
# ============================================================================= # Success Metrics # =============================================================================
[docs] def get_success_metrics(self) -> "SuccessMetricsSchema": """ Get success rate analysis metrics as a validated schema object. Returns: SuccessMetricsSchema: Validated success metrics schema Raises: ImportError: If Pydantic schemas are not available ValidationError: If metrics data doesn't match schema constraints """ self._ensure_schemas_available() success_dict = self.get_success_metrics_dict() return SuccessMetricsSchema(**success_dict)
[docs] def get_success_metrics_dict(self) -> Dict[str, Any]: """ Get success rate analysis metrics as a dictionary. Returns: Dict[str, Any]: Dictionary containing success rate, error rate, and shot analysis """ if len(self._jobs) > 1: return self._get_multiple_jobs_success_metrics() elif len(self._jobs) == 1: return self._get_single_job_success_metrics(self._jobs[0]) else: raise ValueError("No jobs available to calculate success metrics")
# ============================================================================= # Statistical Metrics # =============================================================================
[docs] def get_statistical_metrics(self) -> "StatisticalMetricsSchema": """ Get statistical analysis metrics as a validated schema object. Returns: StatisticalMetricsSchema: Validated statistical metrics schema Raises: ImportError: If Pydantic schemas are not available ValidationError: If metrics data doesn't match schema constraints """ self._ensure_schemas_available() statistical_dict = self.get_statistical_metrics_dict() return StatisticalMetricsSchema(**statistical_dict)
[docs] def get_statistical_metrics_dict(self) -> Dict[str, Any]: """ Get statistical analysis metrics as a dictionary. Returns: Dict[str, Any]: Dictionary containing entropy, uniformity, and concentration metrics """ if len(self._jobs) > 1: return self._get_multiple_jobs_statistical_metrics() elif len(self._jobs) == 1: return self._get_single_job_statistical_metrics(self._jobs[0]) else: raise ValueError("No jobs available to calculate statistical metrics")
# ============================================================================= # DSR Metrics # =============================================================================
[docs] def get_dsr_metrics(self) -> "DSRVariantsSchema": """ Get Differential Success Rate metrics as a validated schema object. Returns: DSRVariantsSchema: Validated DSR metrics schema Raises: ImportError: If Pydantic schemas are not available ValueError: If expected outcomes are not configured ValidationError: If metrics data doesn't match schema constraints """ self._ensure_schemas_available() dsr_dict = self.get_dsr_metrics_dict() return DSRVariantsSchema(**dsr_dict)
[docs] def get_dsr_metrics_dict(self) -> Dict[str, Any]: """ Get Differential Success Rate metrics as a dictionary. Returns: Dict[str, Any]: Dictionary containing DSR variant and mismatch metrics """ if not self._expected_outcomes: raise ValueError("expected_outcomes is required to calculate DSR metrics") if len(self._jobs) > 1: return self._get_multiple_jobs_dsr_metrics() elif len(self._jobs) == 1: return self._get_single_job_dsr_metrics(self._jobs[0]) else: raise ValueError("No jobs available to calculate DSR metrics")
# ============================================================================= # Single Job Metrics # =============================================================================
[docs] def get_single_job_metrics(self, job: Optional[JobType] = None) -> Dict[str, Any]: """ Calculate circuit performance metrics from a single job result. DEPRECATED: Use get_success_metrics() or get_statistical_metrics() instead. Args: job: Optional job to use for metrics calculation. If None, uses self._job. Returns: Dict[str, Any]: Circuit performance metrics for a single job """ if self._job is None and job is None: raise ValueError("A runtime job is required to calculate circuit performance metrics") job_to_use = job or self._job # Combine all metrics for backward compatibility success_metrics = self._get_single_job_success_metrics(job_to_use) statistical_metrics = self._get_single_job_statistical_metrics(job_to_use) dsr_metrics = ( self._get_single_job_dsr_metrics(job_to_use) if self._expected_outcomes else {} ) # Merge all metrics into a single dictionary combined_metrics = { **success_metrics, **statistical_metrics, **dsr_metrics, } return combined_metrics
[docs] def get_structured_single_job_metrics( self, job: Optional[JobType] = None ) -> "CircuitPerformanceSchema": """ Get single job metrics as a validated schema object. DEPRECATED: Use get_structured_metrics() instead. Returns: CircuitPerformanceSchema: Validated single job metrics """ self._ensure_schemas_available() metrics = self.get_single_job_metrics(job) # Remove fields not in schema schema_data = {k: v for k, v in metrics.items() if k != "average_counts"} return CircuitPerformanceSchema(**schema_data)
# ============================================================================= # Multiple Jobs Metrics # =============================================================================
[docs] def get_multiple_jobs_metrics(self) -> Dict[str, Any]: """ Calculate circuit performance metrics from multiple job results. DEPRECATED: Use get_success_metrics() or get_statistical_metrics() instead. Returns: Dict[str, Any]: Circuit performance metrics for multiple jobs with individual and aggregate data """ if not self._jobs: raise ValueError("Multiple runtime jobs are required to calculate multiple job metrics") # Get metrics for each category success_metrics = self._get_multiple_jobs_success_metrics() statistical_metrics = self._get_multiple_jobs_statistical_metrics() dsr_metrics = self._get_multiple_jobs_dsr_metrics() if self._expected_outcomes else None # Combine individual job metrics (flatten all categories for each job) combined_individual_jobs = [] for i in range(len(self._jobs)): combined_job_metrics = {} # Add success metrics for this job if i < len(success_metrics.get("individual_jobs", [])): combined_job_metrics.update(success_metrics["individual_jobs"][i]) # Add statistical metrics for this job if i < len(statistical_metrics.get("individual_jobs", [])): statistical_job_metrics = statistical_metrics["individual_jobs"][i] # Remove duplicate job_id to avoid conflicts statistical_job_metrics = { k: v for k, v in statistical_job_metrics.items() if k != "job_id" } combined_job_metrics.update(statistical_job_metrics) # Add DSR metrics for this job if dsr_metrics is not None and i < len(dsr_metrics.get("individual_jobs", [])): dsr_job_metrics = dsr_metrics["individual_jobs"][i] # Remove duplicate job_id to avoid conflicts dsr_job_metrics = {k: v for k, v in dsr_job_metrics.items() if k != "job_id"} combined_job_metrics.update(dsr_job_metrics) combined_individual_jobs.append(combined_job_metrics) # Combine aggregate metrics aggregate_metrics = {} metric_categories = [success_metrics, statistical_metrics] if dsr_metrics is not None: metric_categories.append(dsr_metrics) for metrics_dict in metric_categories: for key, value in metrics_dict.items(): if key != "individual_jobs": aggregate_metrics[key] = value return { "aggregate": aggregate_metrics, "individual_jobs": combined_individual_jobs, }
[docs] def get_structured_multiple_jobs_metrics(self) -> "CircuitPerformanceSchema": """ Get multiple jobs metrics as a validated schema object. DEPRECATED: Use get_structured_metrics() instead. Returns: CircuitPerformanceSchema: Validated aggregate metrics """ self._ensure_schemas_available() metrics = self.get_multiple_jobs_metrics() return CircuitPerformanceSchema(**metrics["aggregate"])
# ============================================================================= # Job Management Methods # =============================================================================
[docs] def add_job(self, job: Union[JobType, List[JobType]]) -> None: """ Add one or more jobs to the list of jobs for multiple job metrics. Args: job: A single job or a list of jobs to add """ if isinstance(job, list): for single_job in job: if single_job not in self._jobs: self._jobs.append(single_job) # If this is the first job, also set it as the single job if not self._job: self._job = single_job else: if job not in self._jobs: self._jobs.append(job) # If this is the first job, also set it as the single job if not self._job: self._job = job
[docs] @staticmethod def create_uniform_distribution(num_qubits: int) -> Dict[str, float]: """ Create a uniform distribution over all possible measurement outcomes. Args: num_qubits: Number of qubits in the circuit Returns: Dict[str, float]: Uniform probability distribution """ num_states = 2**num_qubits prob = 1.0 / num_states return {format(i, f"0{num_qubits}b"): prob for i in range(num_states)}
[docs] @staticmethod def create_ground_state_distribution(num_qubits: int) -> Dict[str, float]: """ Create a distribution where only the ground state (|000...⟩) has probability 1. Args: num_qubits: Number of qubits in the circuit Returns: Dict[str, float]: Ground state distribution """ ground_state = "0" * num_qubits return {ground_state: 1.0}
[docs] @staticmethod def create_bell_state_distribution() -> Dict[str, float]: """ Create the expected distribution for a Bell state (|00⟩ + |11⟩)/√2. Returns: Dict[str, float]: Bell state distribution """ return {"00": 0.5, "11": 0.5}
# ============================================================================= # Helper Methods for DSR Metrics # ============================================================================= def _get_single_job_dsr_metrics(self, job: JobType) -> Dict[str, Any]: """Calculate DSR metrics from a single job result.""" if not self._expected_outcomes: raise ValueError("expected_outcomes is required to calculate DSR metrics") counts = self._extract_counts(job) job_id = self._extract_job_id(job) if not counts: return { "job_id": job_id, "dsr_michelson": 0.0, "dsr_ratio": 0.0, "dsr_log_ratio": 0.0, "dsr_normalized_margin": 0.0, "peak_mismatch": True, "expected_outcomes": list(self._expected_outcomes), } dsr_michelson, peak_mismatch = compute_dsr_with_flags(counts, self._expected_outcomes) dsr_ratio = compute_dsr_ratio(counts, self._expected_outcomes) dsr_log_ratio = compute_dsr_log_ratio(counts, self._expected_outcomes) dsr_normalized_margin = compute_dsr_normalized_margin(counts, self._expected_outcomes) return { "job_id": job_id, "dsr_michelson": float(dsr_michelson), "dsr_ratio": float(dsr_ratio), "dsr_log_ratio": float(dsr_log_ratio), "dsr_normalized_margin": float(dsr_normalized_margin), "peak_mismatch": bool(peak_mismatch), "expected_outcomes": list(self._expected_outcomes), } def _get_multiple_jobs_dsr_metrics(self) -> Dict[str, Any]: """Calculate aggregate DSR metrics from multiple job results.""" if not self._jobs: raise ValueError("Multiple runtime jobs are required to calculate DSR metrics") if not self._expected_outcomes: raise ValueError("expected_outcomes is required to calculate DSR metrics") job_metrics = [] dsr_michelson_values = [] dsr_ratio_values = [] dsr_log_ratio_values = [] dsr_normalized_margin_values = [] peak_mismatch_flags = [] for job in self._jobs: single_metrics = self._get_single_job_dsr_metrics(job) job_metrics.append(single_metrics) dsr_michelson_values.append(single_metrics["dsr_michelson"]) dsr_ratio_values.append(single_metrics["dsr_ratio"]) dsr_log_ratio_values.append(single_metrics["dsr_log_ratio"]) dsr_normalized_margin_values.append(single_metrics["dsr_normalized_margin"]) peak_mismatch_flags.append(bool(single_metrics["peak_mismatch"])) if not job_metrics: return { "mean_dsr_michelson": 0.0, "std_dsr_michelson": 0.0, "min_dsr_michelson": 0.0, "max_dsr_michelson": 0.0, "mean_dsr_ratio": 0.0, "std_dsr_ratio": 0.0, "min_dsr_ratio": 0.0, "max_dsr_ratio": 0.0, "mean_dsr_log_ratio": 0.0, "std_dsr_log_ratio": 0.0, "min_dsr_log_ratio": 0.0, "max_dsr_log_ratio": 0.0, "mean_dsr_normalized_margin": 0.0, "std_dsr_normalized_margin": 0.0, "min_dsr_normalized_margin": 0.0, "max_dsr_normalized_margin": 0.0, "peak_mismatch_rate": 0.0, "total_jobs": 0, "expected_outcomes": list(self._expected_outcomes), "individual_jobs": job_metrics, } def _aggregate(values: List[float], metric_name: str) -> Dict[str, float]: values_array = np.array(values) return { f"mean_{metric_name}": float(np.mean(values_array)), f"std_{metric_name}": float(np.std(values_array)) if len(values_array) > 1 else 0.0, f"min_{metric_name}": float(np.min(values_array)), f"max_{metric_name}": float(np.max(values_array)), } metrics = { **_aggregate(dsr_michelson_values, "dsr_michelson"), **_aggregate(dsr_ratio_values, "dsr_ratio"), **_aggregate(dsr_log_ratio_values, "dsr_log_ratio"), **_aggregate(dsr_normalized_margin_values, "dsr_normalized_margin"), "peak_mismatch_rate": float(np.mean(peak_mismatch_flags)), "total_jobs": len(job_metrics), "expected_outcomes": list(self._expected_outcomes), "individual_jobs": job_metrics, } return metrics # ============================================================================= # Helper Methods for Success Metrics # ============================================================================= def _get_single_job_success_metrics(self, job: JobType) -> Dict[str, Any]: """Calculate success metrics from a single job result.""" counts = self._extract_counts(job) job_id = self._extract_job_id(job) if not counts: return { "job_id": job_id, "success_rate": 0.0, "error_rate": 1.0, "total_shots": 0, "successful_shots": 0, } # Calculate basic statistics total_shots = int(sum(counts.values())) successful_shots = sum( count for state, count in counts.items() if self.success_criteria(state) ) # Calculate rates success_rate = successful_shots / total_shots if total_shots > 0 else 0.0 error_rate = 1.0 - success_rate return { "job_id": job_id, "success_rate": float(success_rate), "error_rate": float(error_rate), "total_shots": total_shots, "successful_shots": successful_shots, } def _get_multiple_jobs_success_metrics(self) -> Dict[str, Any]: """Calculate success metrics from multiple job results.""" if not self._jobs: raise ValueError("Multiple runtime jobs are required to calculate success metrics") # Calculate metrics for each job job_metrics = [] success_rates = [] total_shots_list = [] for job in self._jobs: single_metrics = self._get_single_job_success_metrics(job) success_rates.append(single_metrics["success_rate"]) total_shots_list.append(single_metrics["total_shots"]) job_metrics.append(single_metrics) # Calculate aggregate metrics if not success_rates: return { "mean_success_rate": 0.0, "std_success_rate": 0.0, "min_success_rate": 0.0, "max_success_rate": 0.0, "total_trials": 0, "error_rate": 1.0, "individual_jobs": job_metrics, } success_rates_array = np.array(success_rates) mean_success_rate = float(np.mean(success_rates_array)) std_success_rate = ( float(np.std(success_rates_array)) if len(success_rates_array) > 1 else 0.0 ) min_success_rate = float(np.min(success_rates_array)) max_success_rate = float(np.max(success_rates_array)) total_trials = int(sum(total_shots_list)) error_rate = 1.0 - mean_success_rate return { "mean_success_rate": mean_success_rate, "std_success_rate": std_success_rate, "min_success_rate": min_success_rate, "max_success_rate": max_success_rate, "total_trials": total_trials, "error_rate": error_rate, "individual_jobs": job_metrics, } # ============================================================================= # Helper Methods for Statistical Metrics # ============================================================================= def _get_single_job_statistical_metrics(self, job: JobType) -> Dict[str, Any]: """Calculate statistical metrics from a single job result.""" counts = self._extract_counts(job) job_id = self._extract_job_id(job) if not counts: return { "job_id": job_id, "entropy": 0.0, "uniformity": 0.0, "concentration": 1.0, "dominant_outcome_probability": 0.0, "num_unique_outcomes": 0, } total_shots = sum(counts.values()) probabilities = [count / total_shots for count in counts.values()] # Calculate entropy entropy = -sum(p * np.log2(p) for p in probabilities if p > 0) # Calculate uniformity (how close to uniform distribution) num_outcomes = len(counts) max_entropy = np.log2(num_outcomes) if num_outcomes > 1 else 0 uniformity = entropy / max_entropy if max_entropy > 0 else 1.0 # Calculate concentration (opposite of uniformity) concentration = 1.0 - uniformity # Dominant outcome probability dominant_outcome_probability = max(probabilities) if probabilities else 0.0 return { "job_id": job_id, "entropy": float(entropy), "uniformity": float(uniformity), "concentration": float(concentration), "dominant_outcome_probability": float(dominant_outcome_probability), "num_unique_outcomes": num_outcomes, } def _get_multiple_jobs_statistical_metrics(self) -> Dict[str, Any]: """Calculate statistical metrics from multiple job results.""" if not self._jobs: raise ValueError("Multiple runtime jobs are required to calculate statistical metrics") # Calculate metrics for each job job_metrics = [] entropies = [] uniformities = [] concentrations = [] dominant_probs = [] for job in self._jobs: single_metrics = self._get_single_job_statistical_metrics(job) entropies.append(single_metrics["entropy"]) uniformities.append(single_metrics["uniformity"]) concentrations.append(single_metrics["concentration"]) dominant_probs.append(single_metrics["dominant_outcome_probability"]) job_metrics.append(single_metrics) # Calculate aggregate metrics if not entropies: return { "mean_entropy": 0.0, "mean_uniformity": 0.0, "mean_concentration": 1.0, "mean_dominant_probability": 0.0, "std_entropy": 0.0, "individual_jobs": job_metrics, } return { "mean_entropy": float(np.mean(entropies)), "mean_uniformity": float(np.mean(uniformities)), "mean_concentration": float(np.mean(concentrations)), "mean_dominant_probability": float(np.mean(dominant_probs)), "std_entropy": float(np.std(entropies)) if len(entropies) > 1 else 0.0, "std_uniformity": float(np.std(uniformities)) if len(uniformities) > 1 else 0.0, "individual_jobs": job_metrics, }