Source code for qward.scanner

"""
Scanner class for QWARD.
"""

from typing import Any, Dict, List, Optional, Union
import pandas as pd

from qiskit import QuantumCircuit
from qiskit_aer import AerJob
from qiskit.providers.job import Job as QiskitJob

from qward.metrics.base_metric import MetricCalculator


[docs] class Scanner: """ Class for analyzing quantum circuits using the Strategy pattern. This class provides methods for analyzing quantum circuits using various metric strategies. """ def __init__( self, circuit: Optional[QuantumCircuit] = None, *, job: Optional[Union[AerJob, QiskitJob]] = None, strategies: Optional[list] = None, ): """ Initialize a Scanner object. Args: circuit: The quantum circuit to analyze job: The job that executed the circuit strategies: Optional list of metric strategy classes or instances. If a class is provided, it will be instantiated with the circuit. If an instance is provided, its circuit must match the Scanner's circuit. """ self._circuit = circuit self._job = job self._strategies: List[MetricCalculator] = [] if strategies is not None: for strategy in strategies: self._add_strategy_to_list(strategy) def _add_strategy_to_list(self, strategy): """Helper method to add a strategy to the list with validation.""" # If strategy is a class (not instance), instantiate with circuit if isinstance(strategy, type): self._strategies.append(strategy(self._circuit)) return # Strategy is an instance - validate circuit compatibility strategy_circuit = getattr(strategy, "circuit", None) or getattr(strategy, "_circuit", None) if strategy_circuit is not None and strategy_circuit is not self._circuit: raise ValueError( f"Strategy instance {strategy.__class__.__name__} was initialized with a different circuit than the Scanner." ) self._strategies.append(strategy) @property def circuit(self) -> Optional[QuantumCircuit]: """ Get the quantum circuit. Returns: Optional[QuantumCircuit]: The quantum circuit """ return self._circuit @property def job(self) -> Optional[Union[AerJob, QiskitJob]]: """ Get the job that executed the circuit. Returns: Optional[Union[AerJob, QiskitJob]]: The job that executed the circuit """ return self._job @property def strategies(self) -> List[MetricCalculator]: """ Get the metric strategies. Returns: List[MetricCalculator]: The metric strategies """ return self._strategies
[docs] def add_strategy(self, strategy) -> None: """ Add a metric strategy to the scanner. Args: strategy: The metric strategy to add """ self._add_strategy_to_list(strategy)
[docs] def add(self, strategy, **kwargs) -> "Scanner": """ Add a metric strategy and return self for fluent chaining. Args: strategy: Metric strategy class or instance **kwargs: Extra constructor keyword arguments when strategy is a class Returns: Scanner: This scanner instance """ if isinstance(strategy, type): self._strategies.append(strategy(self._circuit, **kwargs)) return self if kwargs: raise TypeError("Keyword arguments are only supported when strategy is a class.") self._add_strategy_to_list(strategy) return self
[docs] def calculate_metrics(self) -> Dict[str, pd.DataFrame]: """ Calculate metrics using all registered strategies. Returns: Dict[str, pd.DataFrame]: Dictionary containing DataFrames for each metric type. For CircuitPerformance metrics, returns two DataFrames: - "CircuitPerformance.individual_jobs": DataFrame containing metrics for each job - "CircuitPerformance.aggregate": DataFrame containing aggregate metrics across all jobs """ metric_dataframes: Dict[str, pd.DataFrame] = {} for strategy in self.strategies: metric_results = strategy.get_metrics() metric_name = strategy.__class__.__name__ if metric_name == "CircuitPerformanceMetrics": self._process_circuit_performance_metrics( strategy, metric_results, metric_dataframes ) else: self._process_standard_metrics(metric_name, metric_results, metric_dataframes) return metric_dataframes
[docs] def scan(self, include_all_pre_runtime: bool = True) -> "ScanResult": """ Calculate metrics and return a ScanResult wrapper. Args: include_all_pre_runtime: If True and no strategies are registered, auto-add all pre-runtime strategy classes. Returns: ScanResult: Wrapper with fluent helper methods. """ if not self._strategies and include_all_pre_runtime: from qward.metrics.defaults import get_all_pre_runtime_strategies for strategy_class in get_all_pre_runtime_strategies(): self._strategies.append(strategy_class(self._circuit)) metrics = self.calculate_metrics() return ScanResult(metrics, scanner=self)
def _process_circuit_performance_metrics(self, strategy, metric_results, metric_dataframes): """Process CircuitPerformanceMetrics using schema-based API with proper column separation.""" display_name = "CircuitPerformance" # Check if we have multiple jobs jobs_count = len(getattr(strategy, "_jobs", [])) if hasattr(metric_results, "to_flat_dict"): if jobs_count > 1: # For multiple jobs, create separate DataFrames with only relevant columns # Create aggregate DataFrame with only aggregate-relevant fields aggregate_data = self._extract_aggregate_fields(metric_results) aggregate_df = pd.DataFrame([aggregate_data]) metric_dataframes[f"{display_name}.aggregate"] = aggregate_df # Create individual jobs DataFrame with only individual-relevant fields individual_jobs_data = [] for job in getattr(strategy, "_jobs", []): # Create a temporary single-job strategy to get individual metrics temp_strategy = strategy.__class__( strategy.circuit, job=job, success_criteria=strategy.success_criteria, expected_outcomes=getattr(strategy, "_expected_outcomes", None), ) job_metrics = temp_strategy.get_metrics() individual_data = self._extract_individual_fields(job_metrics) individual_jobs_data.append(individual_data) if individual_jobs_data: individual_jobs_df = pd.DataFrame(individual_jobs_data) metric_dataframes[f"{display_name}.individual_jobs"] = individual_jobs_df else: # Fallback: create empty DataFrame with individual columns individual_data = self._extract_individual_fields(metric_results) metric_dataframes[f"{display_name}.individual_jobs"] = pd.DataFrame( [individual_data] ) else: # Single job case - only create individual_jobs DataFrame with individual fields individual_data = self._extract_individual_fields(metric_results) single_job_df = pd.DataFrame([individual_data]) metric_dataframes[f"{display_name}.individual_jobs"] = single_job_df else: # Fallback for any remaining edge cases flattened_metrics = self._flatten_metrics(metric_results) single_job_df = pd.DataFrame([flattened_metrics]) metric_dataframes[f"{display_name}.individual_jobs"] = single_job_df def _extract_individual_fields(self, metric_results): """Extract only fields relevant for individual job metrics.""" if hasattr(metric_results, "to_flat_dict"): all_fields = metric_results.to_flat_dict() else: all_fields = self._flatten_metrics(metric_results) # Define individual job relevant fields (exclude aggregate fields) individual_fields = {} for key, value in all_fields.items(): # Include fields that are NOT aggregate-specific if not any( aggregate_prefix in key for aggregate_prefix in [ "mean_", "std_", "min_", "max_", "total_trials", "peak_mismatch_rate", "total_jobs", ] ): individual_fields[key] = value return individual_fields def _extract_aggregate_fields(self, metric_results): """Extract only fields relevant for aggregate metrics.""" if hasattr(metric_results, "to_flat_dict"): all_fields = metric_results.to_flat_dict() else: all_fields = self._flatten_metrics(metric_results) # Define aggregate relevant fields (exclude individual-specific fields) aggregate_fields = {} for key, value in all_fields.items(): # Include aggregate fields OR common fields like error_rate if ( any( aggregate_prefix in key for aggregate_prefix in [ "mean_", "std_", "min_", "max_", "total_trials", "peak_mismatch_rate", "total_jobs", ] ) or "error_rate" in key ): aggregate_fields[key] = value return aggregate_fields def _process_standard_metrics(self, metric_name, metric_results, metric_dataframes): """Process standard metrics using schema-based API.""" # All metrics should return schema objects with to_flat_dict() if hasattr(metric_results, "to_flat_dict"): flattened_metrics = metric_results.to_flat_dict() else: # Fallback for any remaining edge cases flattened_metrics = self._flatten_metrics(metric_results) df = pd.DataFrame([flattened_metrics]) metric_dataframes[metric_name] = df def _flatten_metrics(self, metric_results): """Flatten metric results into a single-level dictionary.""" flattened_metrics = {} if isinstance(metric_results, dict): for key, value in metric_results.items(): if isinstance(value, dict): for subkey, subvalue in value.items(): flattened_metrics[f"{key}.{subkey}"] = subvalue else: flattened_metrics[key] = value elif hasattr(metric_results, "model_dump"): # Schema object without to_flat_dict - try model_dump metric_dict = metric_results.model_dump() for key, value in metric_dict.items(): if isinstance(value, dict): for subkey, subvalue in value.items(): flattened_metrics[f"{key}.{subkey}"] = subvalue else: flattened_metrics[key] = value else: # Fallback - treat as single value flattened_metrics = {"value": metric_results} return flattened_metrics
[docs] def set_circuit(self, circuit: QuantumCircuit) -> None: """ Set the quantum circuit. Args: circuit: The quantum circuit """ self._circuit = circuit
[docs] def set_job(self, job: Union[AerJob, QiskitJob]) -> None: """ Set the job that executed the circuit. Args: job: The job that executed the circuit """ self._job = job
[docs] def display_summary(self, metrics_dict: Optional[Dict[str, pd.DataFrame]] = None) -> None: """ Display a summary of the calculated metrics. Args: metrics_dict: Optional metrics dictionary. If None, will calculate metrics first. """ if metrics_dict is None: metrics_dict = self.calculate_metrics() print("\n" + "=" * 60) print("🎯 QWARD ANALYSIS SUMMARY") print("=" * 60) self._display_circuit_performance_summary(metrics_dict) self._display_aggregate_performance_summary(metrics_dict) self._display_other_metrics_summary(metrics_dict) print("=" * 60)
def _display_circuit_performance_summary(self, metrics_dict: Dict[str, pd.DataFrame]) -> None: """Display circuit performance analysis summary.""" if "CircuitPerformance.individual_jobs" not in metrics_dict: return individual_df = metrics_dict["CircuitPerformance.individual_jobs"] print("\n📋 Circuit Performance Analysis:") print(f" Jobs analyzed: {len(individual_df)}") # Show individual job results for i, row in individual_df.iterrows(): success_rate = row.get("success_metrics.success_rate", 0) total_shots = row.get("success_metrics.total_shots", 0) print(f" Job {i+1}: {success_rate:.1%} success ({total_shots} shots)") def _display_aggregate_performance_summary(self, metrics_dict: Dict[str, pd.DataFrame]) -> None: """Display aggregate performance summary.""" if "CircuitPerformance.aggregate" not in metrics_dict: return aggregate_df = metrics_dict["CircuitPerformance.aggregate"] if aggregate_df.empty: return row = aggregate_df.iloc[0] mean_success = row.get("success_metrics.mean_success_rate", 0) std_success = row.get("success_metrics.std_success_rate", 0) total_trials = row.get("success_metrics.total_trials", 0) print("\n📊 Overall Performance:") print(f" Average success rate: {mean_success:.1%} ± {std_success:.1%}") print(f" Total measurements: {total_trials}") # Performance analysis self._display_performance_analysis(mean_success) def _display_performance_analysis(self, mean_success: float) -> None: """Display performance analysis based on success rate.""" if mean_success >= 0.95: print(" ✅ Excellent performance") elif mean_success >= 0.80: print(" ⚠️ Good performance with some noise impact") elif mean_success >= 0.50: print(" ⚠️ Moderate performance - significant noise detected") else: print(" ❌ Poor performance - high noise or errors") def _display_other_metrics_summary(self, metrics_dict: Dict[str, pd.DataFrame]) -> None: """Display summary of other metrics.""" other_metrics = [k for k in metrics_dict.keys() if not k.startswith("CircuitPerformance")] if not other_metrics: return print("\n📈 Additional Metrics:") for metric_name in other_metrics: df = metrics_dict[metric_name] print(f" {metric_name}: {df.shape[1]} metrics calculated") if not df.empty: self._display_metric_preview(df) def _display_metric_preview(self, df: pd.DataFrame) -> None: """Display a preview of the first 5 metrics.""" first_row = df.iloc[0] columns_to_show = list(df.columns)[:5] # First 5 columns print(" Preview (first 5):") for col in columns_to_show: value = first_row[col] formatted_value = self._format_metric_value(value) print(f" {col}: {formatted_value}") # Show "..." if there are more columns if len(df.columns) > 5: remaining = len(df.columns) - 5 print(f" ... and {remaining} more metrics") def _format_metric_value(self, value) -> str: """Format a metric value for display.""" if isinstance(value, float): if 0 < value < 1: return f"{value:.3f}" else: return f"{value:.1f}" else: return str(value)
[docs] class ScanResult: """Thin wrapper around scanner metrics with fluent post-processing helpers.""" def __init__(self, metrics: Dict[str, pd.DataFrame], scanner: Scanner): self._metrics = metrics self._scanner = scanner def __getitem__(self, key): return self._metrics[key] def __contains__(self, key): return key in self._metrics def __iter__(self): return iter(self._metrics) def __len__(self): return len(self._metrics)
[docs] def keys(self): return self._metrics.keys()
[docs] def items(self): return self._metrics.items()
[docs] def values(self): return self._metrics.values()
[docs] def get(self, key, default=None): return self._metrics.get(key, default)
[docs] def to_dict(self) -> Dict[str, pd.DataFrame]: """Return a shallow copy of the wrapped metrics dictionary.""" return dict(self._metrics)
[docs] def summary(self) -> "ScanResult": """Display scanner summary and return self for chaining.""" self._scanner.display_summary(self._metrics) return self
[docs] def visualize( self, *, save: bool = False, show: bool = True, output_dir: str = "qward/examples/img", config=None, selections=None, ) -> "ScanResult": """ Visualize wrapped metrics and return self for chaining. If selections are provided, generate selected plots. Otherwise, generate dashboards. """ from qward.visualization import PlotConfig, Visualizer visualizer = Visualizer( metrics_data=self._metrics, config=config or PlotConfig(), output_dir=output_dir, ) if selections: visualizer.generate_plots(selections=selections, save=save, show=show) else: visualizer.create_dashboard(save=save, show=show) return self