import math
from typing import TYPE_CHECKING, Optional
import numpy as np
from typeguard import typechecked
from mpqp import Language, QCircuit
from mpqp.core.instruction.gates import CRk
from mpqp.core.instruction.measurement import (
BasisMeasure,
ExpectationMeasure,
Observable,
)
from mpqp.execution.connection.aws_connection import get_braket_device
from mpqp.execution.devices import AWSDevice
from mpqp.execution.job import Job, JobStatus, JobType
from mpqp.execution.result import Result, Sample, StateVector
from mpqp.noise.noise_model import NoiseModel
from mpqp.tools.errors import AWSBraketRemoteExecutionError, DeviceJobIncompatibleError
if TYPE_CHECKING:
from braket.circuits import Circuit
from braket.tasks import GateModelQuantumTaskResult, QuantumTask
[docs]@typechecked
def apply_noise_to_braket_circuit(
braket_circuit: "Circuit",
noises: list[NoiseModel],
nb_qubits: int,
) -> "Circuit":
"""Apply noise models to a Braket circuit.
This function applies noise models to a given Braket circuit based on the specified noise models and
the number of qubits in the circuit. It modifies the original circuit by adding noise
instructions and returns a new circuit with the noise applied.
Args:
braket_circuit: The Braket circuit to apply noise to.
noises: A list of noise models to apply to the circuit.
nb_qubits: The number of qubits in the circuit.
Returns:
A new circuit with the noise applied.
"""
from braket.circuits import Circuit, Noise
from braket.circuits.measure import Measure
stored_measurements = []
other_instructions = []
for instr in braket_circuit.instructions:
if isinstance(instr.operator, Measure):
stored_measurements.append(instr)
else:
other_instructions.append(instr)
noisy_circuit = Circuit(other_instructions)
for noise in noises:
braket_noise = noise.to_other_language(Language.BRAKET)
if TYPE_CHECKING:
assert isinstance(braket_noise, Noise)
if CRk in noise.gates:
raise NotImplementedError(
"Cannot simulate noisy circuit with CRk gate due to an error on"
" AWS Braket side."
)
noisy_circuit.apply_gate_noise(
braket_noise, # pyright: ignore[reportArgumentType]
target_gates=(
[
gate.braket_gate
for gate in noise.gates
if hasattr(gate, "braket_gate")
]
if len(noise.gates) != 0
else None
),
target_qubits=(
noise.targets if set(noise.targets) != set(range(nb_qubits)) else None
),
)
return noisy_circuit
[docs]@typechecked
def run_braket(job: Job, translation_warning: bool = True) -> Result:
"""Executes the job on the right AWS Braket device (local or remote)
precised in the job in parameter and waits until the task is completed, then
returns the Result.
Args:
job: Job to be executed, it MUST be corresponding to a
:class:`mpqp.execution.devices.AWSDevice`.
translation_warning: If `True`, a warning will be raised.
Returns:
The result of the job.
Note:
This function is not meant to be used directly, please use
:func:`~mpqp.execution.runner.run` instead.
"""
# TODO : [multi-obs] update this to take into account the case when we have list of Observables
# TODO : [multi-obs] check if Braket allows for a list of several observables
if not isinstance(job.device, AWSDevice):
raise ValueError(
"`job` must correspond to an `AWSDevice`, but corresponds to a "
f"{job.device} instead"
)
from braket.tasks import GateModelQuantumTaskResult
if isinstance(job.measure, ExpectationMeasure):
return run_braket_observable(job, translation_warning)
_, task = submit_job_braket(job, translation_warning)
res = task.result()
if TYPE_CHECKING:
assert isinstance(res, GateModelQuantumTaskResult)
return extract_result(res, job, job.device)
[docs]@typechecked
def run_braket_observable(job: Job, translation_warning: bool = True):
"""Returns the result of an ``OBSERVABLE`` job.
TODO: check that the link bellow is correctly generated.
If :attr:`~mpqp.execution.job.Job.measure.optimize_measurement`, this
function will run based on the grouping of the pauli monomials (Read
:ref:`TODO here` for more information).
Otherwise each observable will be ran one by one.
Args:
job: Job to be executed.
translation_warning: If ``False``, the translation warnings are disabled.
Returns:
A result containing the expectation values of the observables.
"""
from braket.circuits import Circuit
from braket.tasks import GateModelQuantumTaskResult
if job.circuit.transpiled_circuit is None:
transpiled_circuit = job.circuit.to_other_device(
job.device, translation_warning
)
else:
transpiled_circuit = job.circuit.transpiled_circuit
assert isinstance(transpiled_circuit, Circuit)
device = get_braket_device(
job.device, # pyright: ignore[reportArgumentType]
is_noisy=bool(job.circuit.noises),
)
if job.measure is None:
raise NotImplementedError("job.measure is None")
assert isinstance(job.measure, ExpectationMeasure)
results = {}
errors = {}
if job.measure.optimize_measurement:
grouping = job.measure.get_pauli_grouping()
from mpqp.tools.pauli_grouping import (
find_qubitwise_rotations,
pauli_monomial_eigenvalues,
)
expectation_values = {}
for group in grouping:
transpiled_pre_measure = QCircuit(
find_qubitwise_rotations(group)
).to_other_language(Language.BRAKET, translation_warning)
job.status = JobStatus.RUNNING
if job.measure.shots == 0:
from copy import deepcopy
cirq = deepcopy(transpiled_circuit + transpiled_pre_measure)
cirq.state_vector() # pyright: ignore[reportAttributeAccessIssue]
local_result = device.run(cirq, shots=0, inputs=None).result()
assert isinstance(local_result, GateModelQuantumTaskResult)
values = local_result.values[0]
sorted_values = []
for i in range(len(values)):
sorted_values.append(float(np.abs(values[i]) ** 2))
else:
local_result = device.run(
transpiled_circuit + transpiled_pre_measure,
shots=job.measure.shots,
inputs=None,
)
result = local_result.result()
assert isinstance(result, GateModelQuantumTaskResult)
length = 2**job.circuit.nb_qubits
sorted_values: list[float] = []
for i in range(length):
binary_state = f"{bin(i)[2:].zfill(len(bin(length))- 3)}"
if binary_state in result.measurement_probabilities:
sorted_values.append(
result.measurement_probabilities[binary_state].real
)
else:
sorted_values.append(0)
for monom in group:
expectation_value: float = np.dot(
pauli_monomial_eigenvalues(monom),
np.array(sorted_values, dtype=np.float64),
)
expectation_values.update({monom.name: expectation_value})
for i, obs in enumerate(job.measure.observables):
string = obs.pauli_string
local: float = 0
for monoms in string.monomials:
assert isinstance(monoms.coef, (int, float))
local += expectation_values[monoms.name] * monoms.coef
results.update({f"observable_{i}": local})
errors.update({f"observable_{len(errors)}": None})
if len(results) == 1:
return Result(job, results["observable_0"], shots=job.measure.shots)
return Result(job, results, errors, shots=job.measure.shots)
else:
for obs in job.measure.observables:
from copy import deepcopy
copy = deepcopy(transpiled_circuit)
braket_obs = obs.to_other_language(Language.BRAKET)
copy.expectation( # pyright: ignore[reportAttributeAccessIssue]
observable=braket_obs, target=job.measure.targets
)
job.status = JobStatus.RUNNING
local_result = device.run(
copy, shots=job.measure.shots, inputs=None
).result()
assert isinstance(local_result, GateModelQuantumTaskResult)
results.update({f"observable_{len(results)}": local_result.values[0].real})
errors.update({f"observable_{len(errors)}": None})
if len(results) == 1:
return Result(job, results["observable_0"], None, job.measure.shots)
return Result(job, results, errors, job.measure.shots)
[docs]@typechecked
def submit_job_braket(
job: Job, translation_warning: bool = True
) -> tuple[str, "QuantumTask"]:
"""Submits the job to the right local/remote device and returns the
generated task.
Args:
job: Job to be executed, it MUST be corresponding to a
:class:`mpqp.execution.devices.AWSDevice`.
Returns:
The task's id and the Task itself.
Raises:
ValueError: If the job type is not supported for noisy simulations,
or if it is of type ``OBSERVABLE`` but got no
``ExpectationMeasure``.
NotImplementedError: If the job type is not ``STATE_VECTOR``, ``SAMPLE``
or ``OBSERVABLE``.
Note:
This function is not meant to be used directly, please use
:func:`~mpqp.execution.runner.run` instead.
"""
if not isinstance(job.device, AWSDevice):
raise ValueError(
"`job` must correspond to an `AWSDevice`, but corresponds to a "
f"{job.device} instead"
)
if job.job_type == JobType.STATE_VECTOR and job.device.is_remote():
raise DeviceJobIncompatibleError(
"State vector cannot be computed using AWS Braket remote simulators"
" and devices. Please use the LocalSimulator instead"
)
if job.job_type == JobType.SAMPLE and job.measure is None:
raise ValueError("`SAMPLE` jobs must have a measure.")
if job.job_type == JobType.OBSERVABLE and not isinstance(
job.measure, ExpectationMeasure
):
raise ValueError("`OBSERVABLE` jobs must have an `ExpectationMeasure`.")
is_noisy = bool(job.circuit.noises)
if is_noisy and job.job_type not in [JobType.SAMPLE, JobType.OBSERVABLE]:
raise ValueError(
f"Job of type {job.job_type} is not supported for noisy circuits."
)
from braket.circuits import Circuit
device = get_braket_device(job.device, is_noisy=is_noisy)
if job.circuit.transpiled_circuit is None:
braket_circuit = job.circuit.to_other_device(job.device, translation_warning)
else:
braket_circuit = job.circuit.transpiled_circuit
if TYPE_CHECKING:
assert isinstance(braket_circuit, Circuit)
if job.job_type == JobType.STATE_VECTOR:
braket_circuit.state_vector() # pyright: ignore[reportAttributeAccessIssue]
job.status = JobStatus.RUNNING
task = device.run(braket_circuit, shots=0, inputs=None)
elif job.job_type == JobType.SAMPLE:
if TYPE_CHECKING:
assert job.measure is not None
job.status = JobStatus.RUNNING
task = device.run(braket_circuit, shots=job.measure.shots, inputs=None)
elif job.job_type == JobType.OBSERVABLE:
# TODO : [multi-obs] update this to take into account the case when we have list of Observables
if TYPE_CHECKING:
assert isinstance(job.measure, ExpectationMeasure)
herm_op = job.measure.observables[0].to_other_language(Language.BRAKET)
braket_circuit.expectation( # pyright: ignore[reportAttributeAccessIssue]
observable=herm_op, target=job.measure.targets
)
job.status = JobStatus.RUNNING
task = device.run(braket_circuit, shots=job.measure.shots, inputs=None)
else:
raise NotImplementedError(f"Job of type {job.job_type} not handled.")
job.id = task.id
return (
task.id,
task,
) # TODO : [multi-obs] update this to take into account the case when we have list of Observables
[docs]@typechecked
def get_result_from_aws_task_arn(task_arn: str) -> Result:
"""Retrieves the result, described by the job_id in parameter, from the
remote QLM and converts it into an mpqp result.
If the job is still running, we wait (blocking) until it is DONE.
Args:
task_arn: Arn of the remote aws task.
Raises:
AWSBraketRemoteExecutionError: When the status of the task is unknown.
"""
from braket.aws import AwsQuantumTask
from braket.tasks import GateModelQuantumTaskResult, QuantumTask
task: QuantumTask = AwsQuantumTask(task_arn)
# catch an error if the id is not correct (wrong ID, wrong region, ...) ?
status = task.state()
if status in ["FAILED", "CANCELLED"]:
raise AWSBraketRemoteExecutionError(f"Job status: {status}")
elif status in ["CREATED", "QUEUED", "RUNNING", "COMPLETED"]: #
result = task.result()
if TYPE_CHECKING:
assert isinstance(result, GateModelQuantumTaskResult)
else:
raise AWSBraketRemoteExecutionError(
f"Unknown status {status} for the task {task_arn}"
)
device_arn = task.metadata()["deviceArn"]
device = AWSDevice.from_arn(device_arn)
return extract_result(result, None, device)
[docs]@typechecked
def estimate_cost_single_job(
job: Job, hybrid_iterations: int = 1, estimated_time_seconds: int = 3
) -> float:
"""
Estimates the cost of executing a :class:`~mpqp.execution.job.Job` on a remote AWS Braket device.
Args:
job: :class:`~mpqp.execution.job.Job` for which we want to estimate the cost. The job's device must be an :class:`~mpqp.execution.devices.AWSDevice`.
hybrid_iterations: Number of iteration in a case of a hybrid (quantum-classical) job.
estimated_time_seconds: Estimated runtime for simulator jobs (in seconds). The minimum duration billing is 3 seconds.
Returns:
The estimated price (in USD) for the execution of the job in parameter.
Example:
>>> circuit = QCircuit([H(0), CNOT(0, 1), CNOT(1, 2), BasisMeasure(shots=245)])
>>> job = generate_job(circuit, AWSDevice.IONQ_ARIA_1)
>>> estimate_cost_single_job(job, hybrid_iterations=150)
1147.5
"""
if not isinstance(job.device, AWSDevice):
raise ValueError(
f"This function was expecting a job with an AWSDevice but got a {type(job.device).__name__}."
)
if job.device.is_remote():
if job.device.is_simulator():
if "sv1" in job.device.value or "dm1" in job.device.value:
minute_cost = 0.075
elif "tn1" in job.device.value:
minute_cost = 0.275
else:
raise ValueError
return minute_cost * max(estimated_time_seconds / 60, 3 / 60)
else:
if job.measure is None:
raise DeviceJobIncompatibleError(
"An AWS remote job on a quantum computer requires to have a measure."
)
if "ionq" in job.device.value:
task_cost = 0.3
shot_cost = 0.03
elif "iqm" in job.device.value:
task_cost = 0.3
shot_cost = 0.00145
elif "rigetti" in job.device.value:
task_cost = 0.3
shot_cost = 0.0009
elif "quera" in job.device.value:
task_cost = 0.3
shot_cost = 0.01
else:
raise NotImplementedError(
f"Cost estimation not implemented yet for {job.device.name} device."
)
return (task_cost + job.measure.shots * shot_cost) * hybrid_iterations
else:
return 0