# Copyright 2019-2024 Cambridge Quantum Computing
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
from collections import Counter, OrderedDict
from collections.abc import Callable, Iterable
from functools import lru_cache
from math import ceil, log2
from typing import cast
import numpy as np
from pytket.backends import Backend
from pytket.backends.backendresult import BackendResult
from pytket.circuit import Bit, CircBox, Circuit, Node, OpType, Qubit
from pytket.passes import DecomposeBoxes, FlattenRegisters
from pytket.utils.outcomearray import OutcomeArray
from pytket.utils.results import CountsDict, StateTuple
ParallelMeasures = list[dict[Qubit, Bit]]
[docs]
def compress_counts(
counts: dict[StateTuple, float], tol: float = 1e-6, round_to_int: bool = False
) -> CountsDict:
"""Filter counts to remove states that have a count value (which can be a
floating-point number) below a tolerance, and optionally round to an
integer.
:param counts: Input counts
:type counts: Dict[StateTuple, float]
:param tol: Value below which counts are pruned. Defaults to 1e-6.
:type tol: float, optional
:param round_to_int: Whether to round each count to an integer. Defaults to False.
:type round_to_int: bool, optional
:return: Filtered counts
:rtype: CountsDict
"""
valprocess: Callable[[float], int | float] = lambda x: (
int(round(x)) if round_to_int else x
)
processed_pairs = (
(key, valprocess(val)) for key, val in counts.items() if val > tol
)
return {key: val for key, val in processed_pairs if val > 0}
@lru_cache(maxsize=128)
def binary_to_int(bintuple: tuple[int]) -> int:
"""Convert a binary tuple to corresponding integer, with most significant bit as
the first element of tuple.
:param bintuple: Binary tuple
:type bintuple: Tuple[int]
:return:
Integer :rtype: int
"""
integer = 0
for index, bitset in enumerate(reversed(bintuple)):
if bitset:
integer |= 1 << index
return integer
@lru_cache(maxsize=128)
def int_to_binary(val: int, dim: int) -> tuple[int, ...]:
"""Convert an integer to corresponding binary tuple, with most significant bit as
the first element of tuple.
:param val: input integer
:type val: int
:param dim: Bit width
:type dim: int
:return: Binary tuple of width dim
:rtype: Tuple[int, ...]
"""
return tuple(map(int, format(val, f"0{dim}b")))
#########################################
### _compute_dot and helper functions ###
###
### With thanks to
### https://math.stackexchange.com/a/3423910
### and especially
### https://gist.github.com/ahwillia/f65bc70cb30206d4eadec857b98c4065
### on which this code is based.
def _unfold(tens: np.ndarray, mode: int, dims: list[int]) -> np.ndarray:
"""Unfolds tensor into matrix.
:param tens: Tensor with shape equivalent to dimensions
:type tens: np.ndarray
:param mode: Specifies axis move to front of matrix in unfolding of tensor
:type mode: int
:param dims: Gives shape of tensor passed
:type dims: List[int]
:return: Matrix with shape (dims[mode], prod(dims[/mode]))
:rtype: np.ndarray
"""
if mode == 0:
return tens.reshape(dims[0], -1)
return np.moveaxis(tens, mode, 0).reshape(dims[mode], -1)
def _refold(vec: np.ndarray, mode: int, dims: list[int]) -> np.ndarray:
"""Refolds vector into tensor.
:param vec: Tensor with length equivalent to the product of dimensions given in
dims
:type vec: np.ndarray
:param mode: Axis tensor was unfolded along
:type mode: int
:param dims: Shape of tensor
:type dims: List[int]
:return: Tensor folded from vector with shape equivalent to given dimensions
:rtype: np.ndarray
"""
if mode == 0:
return vec.reshape(dims)
# Reshape and then move dims[mode] back to its
# appropriate spot (undoing the `unfold` operation).
tens = vec.reshape([dims[mode]] + [d for m, d in enumerate(dims) if m != mode])
return np.moveaxis(tens, 0, mode)
def _compute_dot(submatrices: Iterable[np.ndarray], vector: np.ndarray) -> np.ndarray:
"""Multiplies the kronecker product of the given submatrices with given vector.
:param submatrices: Submatrices multiplied
:type submatrices: Iterable[np.ndarray]
:param vector: Vector multplied
:type vector: np.ndarray
:return: Kronecker product of arguments
:rtype: np.ndarray
"""
dims = [A.shape[0] for A in submatrices]
vt = vector.reshape(dims)
for i, A in enumerate(submatrices):
vt = _refold(A @ _unfold(vt, i, dims), i, dims)
return vt.ravel()
def _bayesian_iteration(
submatrices: Iterable[np.ndarray],
measurements: np.ndarray,
t: np.ndarray,
epsilon: float,
) -> np.ndarray:
"""Transforms T corresponds to a Bayesian iteration, used to modfiy
measurements.
:param submatrices: submatrices to be inverted and applied to measurements.
:type submatrices: Iterable[np.ndarray]
:param measurements: Probability distribution over set of states to be amended.
:type measurements: np.ndarray
:param t: Some transform to act on measurements.
:type t: np.ndarray
:param epsilon: A stabilization parameter to define an affine transformation for
application to submatrices, eliminating zero probabilities.
:type epsilon: float
:return: Transformed distribution vector.
:rtype: np.ndarray
"""
# Transform t according to the Bayesian iteration
# The parameter epsilon is a stabilization parameter which defines an affine
# transformation to apply to the submatrices to eliminate zero probabilities. This
# transformation preserves the property that all columns sum to 1
if epsilon == 0:
# avoid copying if we don't need to
As = submatrices
else:
As = [
epsilon / submatrix.shape[0] + (1 - epsilon) * submatrix
for submatrix in submatrices
]
z = _compute_dot(As, t)
if np.isclose(z, 0).any():
raise ZeroDivisionError
return cast(
np.ndarray, t * _compute_dot([A.transpose() for A in As], measurements / z)
)
def _bayesian_iterative_correct(
submatrices: Iterable[np.ndarray],
measurements: np.ndarray,
tol: float = 1e-5,
max_it: int | None = None,
) -> np.ndarray:
"""Finds new states to represent application of inversion of submatrices on
measurements. Converges when update states within tol range of previously
tested states.
:param submatrices: Matrices comprising the pure noise characterisation.
:type submatrices: Iterable[np.ndarray]
:param input_vector: Vector corresponding to some counts distribution.
:type input_vector: np.ndarray
:param tol: tolerance of closeness of found results
:type tol: float
:param max_it: Maximum number of inversions attempted to correct results.
:type max_it: int
"""
# based on method found in https://arxiv.org/abs/1910.00129
vector_size = measurements.size
# uniform initial
true_states = np.full(vector_size, 1 / vector_size)
prev_true = true_states.copy()
converged = False
count = 0
epsilon: float = 0 # stabilization parameter, adjusted dynamically
while not converged:
if max_it:
if count >= max_it:
break
count += 1
try:
true_states = _bayesian_iteration(
submatrices, measurements, true_states, epsilon
)
converged = np.allclose(true_states, prev_true, atol=tol)
prev_true = true_states.copy()
except ZeroDivisionError:
# Shift the stabilization parameter up a bit (always < 0.5).
epsilon = 0.99 * epsilon + 0.01 * 0.5
return true_states
def reduce_matrix(indices_to_remove: list[int], matrix: np.ndarray) -> np.ndarray:
"""Removes indices from indices_to_remove from binary associated to indexing of
matrix, producing a new transition matrix. To do so, it assigns all transition
probabilities as the given state in the remaining indices binary, with the removed
binary in state 0. This is an assumption on the noise made because it is likely
that unmeasured qubits will be in that state.
:param indices_to_remove: Binary index of state matrix is mapping to be removed.
:type indices_to_remove: List[int]
:param matrix: Transition matrix where indices correspond to some binary state.
:type matrix: np.ndarray
:return: Transition matrix with removed entries.
:rtype: np.ndarray
"""
new_n_qubits = int(log2(matrix.shape[0])) - len(indices_to_remove)
if new_n_qubits == 0:
return np.array([])
bin_map = dict()
mat_dim = 1 << new_n_qubits
for index in range(mat_dim):
# get current binary
bina = list(int_to_binary(index, new_n_qubits))
# add 0's to fetch old binary to set values from
for i in sorted(indices_to_remove):
bina.insert(i, 0)
# get index of values
bin_map[index] = binary_to_int(tuple(bina))
new_mat = np.zeros((mat_dim,) * 2, dtype=float)
for i in range(len(new_mat)):
old_row_index = bin_map[i]
for j in range(len(new_mat)):
old_col_index = bin_map[j]
new_mat[i, j] = matrix[old_row_index, old_col_index]
return new_mat
def reduce_matrices(
entries_to_remove: list[tuple[int, int]], matrices: list[np.ndarray]
) -> list[np.ndarray]:
"""Removes some dimensions from some matrices.
:param entries_to_remove: Via indexing, details dimensions to be removed.
:type entries_to_remove: List[Tuple[int, int]]
:param matrices: All matrices to have dimensions removed.
:type matrices: List[np.ndarray]
:return: Matrices with some dimensions removed.
:rtype: List[np.ndarray]
"""
organise: dict[int, list] = dict({k: [] for k in range(len(matrices))})
for unused in entries_to_remove:
# unused[0] is index in matrices
# unused[1] is qubit index in matrix
organise[unused[0]].append(unused[1])
output_matrices = [reduce_matrix(organise[m], matrices[m]) for m in organise]
return [
mat / np.sum(mat, axis=0) for mat in [x for x in output_matrices if len(x) != 0]
]
[docs]
class SpamCorrecter:
"""A class for generating "state preparation and measurement" (SPAM) calibration
experiments for ``pytket`` backends, and correcting counts generated from them.
Supports saving calibrated state to a dictionary format, and restoring from the
dictionary.
"""
[docs]
def __init__(self, qubit_subsets: list[list[Node]], backend: Backend | None = None):
"""Construct a new `SpamCorrecter`.
:param qubit_subsets: A list of lists of correlated Nodes of an `Architecture`.
Qubits within the same list are assumed to only have SPAM errors correlated
with each other. Thus to allow SPAM errors between all qubits you should
provide a single list.
:type qubit_subsets: List[List[Node]]
:param backend: Backend on which the experiments are intended to be run
(optional). If provided, the qubits in `qubit_subsets` must be nodes in the
backend's associated `Architecture`. If not provided, it is assumed that the
experiment will be run on an `Architecture`with the nodes in
`qubit_subsets`, and furthermore that the intended architecture natively
supports X gates.
:raises ValueError: There are repeats in the `qubit_subsets` specification.
"""
self.correlations = qubit_subsets
self.all_qbs = [qb for subset in qubit_subsets for qb in subset]
def to_tuple(inp: list[Node]) -> tuple:
return tuple(inp)
self.subsets_matrix_map = OrderedDict.fromkeys(
sorted(map(to_tuple, self.correlations), key=len, reverse=True)
)
# ordered from largest to smallest via OrderedDict & sorted
self.subset_dimensions = [len(subset) for subset in self.subsets_matrix_map]
if len(self.all_qbs) != len(set(self.all_qbs)):
raise ValueError("Qubit subsets are not mutually disjoint.")
xcirc = Circuit(1).X(0)
if backend is not None:
if backend.backend_info is None:
raise ValueError("No architecture associated with backend.")
nodes = backend.backend_info.nodes
if not all(node in nodes for node in self.all_qbs):
raise ValueError("Nodes do not all belong to architecture.")
backend.default_compilation_pass().apply(xcirc)
FlattenRegisters().apply(xcirc)
self.xbox = CircBox(xcirc)
[docs]
def calibration_circuits(self) -> list[Circuit]:
"""Generate calibration circuits according to the specified correlations.
:return: A list of calibration circuits to be run on the machine. The circuits
should be processed without compilation. Results from these circuits must
be given back to this class (via the `calculate_matrices` method) in the
same order.
:rtype: List[Circuit]
"""
major_state_dimensions = self.subset_dimensions[0]
n_circuits = 1 << major_state_dimensions
# output
self.prepared_circuits = []
self.state_infos = []
# set up base circuit for appending xbox to
base_circuit = Circuit()
c_reg = []
for index, qb in enumerate(self.all_qbs):
base_circuit.add_qubit(qb)
c_bit = Bit(index)
c_reg.append(c_bit)
base_circuit.add_bit(c_bit)
# generate state circuits for given correlations
for major_state_index in range(n_circuits):
state_circuit = base_circuit.copy()
# get bit string corresponding to basis state of biggest subset of qubits
major_state = int_to_binary(major_state_index, major_state_dimensions)
new_state_dicts = {}
# parallelise circuits, run uncorrelated subsets
# characterisation in parallel
for dim, qubits in zip(self.subset_dimensions, self.subsets_matrix_map):
# add state to prepared states
new_state_dicts[qubits] = major_state[:dim]
# find only qubits that are expected to be in 1 state,
# add xbox to given qubits
for flipped_qb in itertools.compress(qubits, major_state[:dim]):
state_circuit.add_circbox(self.xbox, [flipped_qb])
# Decompose boxes, add barriers to preserve circuit, add measures
DecomposeBoxes().apply(state_circuit)
for qb, cb in zip(self.all_qbs, c_reg):
state_circuit.Measure(qb, cb)
# add to returned types
self.prepared_circuits.append(state_circuit)
self.state_infos.append((new_state_dicts, state_circuit.qubit_to_bit_map))
return self.prepared_circuits
[docs]
def calculate_matrices(self, results_list: list[BackendResult]) -> None:
"""Calculate the calibration matrices from the results of running calibration
circuits.
:param results_list: List of results from Backend. Must be in the same order as
the corresponding circuits generated by `calibration_circuits`.
:type counts_list: List[BackendResult]
:raises RuntimeError: Calibration circuits have not been generated yet.
"""
if not self.state_infos:
raise RuntimeError(
"Ensure calibration states/circuits have been calculated first."
)
counter = 0
self.node_index_dict: dict[Node, tuple[int, int]] = dict()
for qbs, dim in zip(self.subsets_matrix_map, self.subset_dimensions):
# for a subset with n qubits, create a 2^n by 2^n matrix
self.subsets_matrix_map[qbs] = np.zeros((1 << dim,) * 2, dtype=float)
for i in range(len(qbs)):
qb = qbs[i]
self.node_index_dict[qb] = (counter, i)
counter += 1
for result, state_info in zip(results_list, self.state_infos):
state_dict = state_info[0]
qb_bit_map = state_info[1]
for qb_sub in self.subsets_matrix_map:
# bits of counts to consider
bits = [qb_bit_map[q] for q in qb_sub]
counts_dict = result.get_counts(cbits=bits)
for measured_state, count in counts_dict.items():
# intended state
prepared_state_index = binary_to_int(state_dict[qb_sub])
# produced state
measured_state_index = binary_to_int(measured_state)
# update characterisation matrix
M = self.subsets_matrix_map[qb_sub]
assert type(M) is np.ndarray
M[measured_state_index, prepared_state_index] += count
# normalise everything
self.characterisation_matrices = [
mat / np.sum(cast(np.ndarray, mat), axis=0)
for mat in self.subsets_matrix_map.values()
]
[docs]
def get_parallel_measure(self, circuit: Circuit) -> ParallelMeasures:
"""For a given circuit, produces and returns a ParallelMeasures object required
for correcting counts results.
:param circuit: Circuit with some Measure operations.
:type circuit: Circuit
:return: A list of dictionaries mapping Qubit to Bit where each separate
dictionary details some set of Measurement operations run in parallel.
:rtype: ParallelMeasures
"""
parallel_measure = [circuit.qubit_to_bit_map]
# implies mid-circuit measurements, or that at least missing
# bits need to be checked for Measure operation
if len(parallel_measure[0]) != len(circuit.bits):
used_bits = set(parallel_measure[0].values())
for mc in circuit.commands_of_type(OpType.Measure):
bit = mc.bits[0]
if bit not in used_bits:
# mid-circuit measure, add as a separate parallel measure
parallel_measure.append({mc.qubits[0]: bit})
return parallel_measure
[docs]
def correct_counts(
self,
result: BackendResult,
parallel_measures: ParallelMeasures,
method: str = "bayesian",
options: dict | None = None,
) -> BackendResult:
"""Modifies count distribution for result, such that the inversion of the pure
noise map represented by characterisation matrices is applied to it.
:param result: BackendResult object to be negated by pure noise object.
:type result: BackendResult
:param parallel_measures: Used to permute corresponding BackendResult object so
counts order matches noise characterisation and to amend characterisation
matrices to correct the right bits. SpamCorrecter.get_parallel_measure
returns the required object for a given circuit.
:type parallel_measures: ParallelMeasures
:raises ValueError: Measured qubit in result not characterised.
:return: A new result object with counts modified to reflect SPAM correction.
:rtype: BackendResult
"""
# the correction process assumes that when passed a list of matrices
# and a distribution to correct, that the j rows of matrix i
# corrects for the i, i+1,...i+j states in the passed distribution
# given information of which bits are measured on which qubits from
# parallel_measures, the following first produces matrices such that
# this condition is true
char_bits_order = []
correction_matrices = []
for mapping in parallel_measures:
# reduce_matrices removes given qubits corresponding entries from
# characterisation matrices
unused_qbs = set(self.all_qbs.copy())
for q in mapping:
# no q duplicates as mapping is dict from qubit to bit
if q not in unused_qbs:
raise ValueError(
f"Measured qubit {q} is not characterised by SpamCorrecter"
)
unused_qbs.remove(q) # type:ignore[arg-type]
char_bits_order.append(mapping[q])
correction_matrices.extend(
reduce_matrices(
[self.node_index_dict[q] for q in unused_qbs],
self.characterisation_matrices,
)
)
# get counts object for returning later
counts = result.get_counts(cbits=char_bits_order)
in_vec = np.zeros(1 << len(char_bits_order), dtype=float)
# turn from counts to probability distribution
for state, count in counts.items():
in_vec[binary_to_int(state)] = count
Ncounts = np.sum(in_vec)
in_vec_norm = in_vec / Ncounts
# with counts and characterisation matrices orders matching,
# correct distribution
if method == "invert":
try:
subinverts = [
np.linalg.inv(submatrix) for submatrix in correction_matrices
]
except np.linalg.LinAlgError:
raise ValueError(
"Unable to invert calibration matrix: please re-run "
"calibration experiments or use an alternative correction method."
)
# assumes that order of rows in flattened subinverts equals
# order of bits in input vector
outvec = _compute_dot(subinverts, in_vec_norm)
# The entries of v will always sum to 1, but they may not all
# be in the range [0,1]. In order to make them genuine
# probabilities (and thus generate meaningful counts),
# we adjust them by setting all negative values to 0 and scaling
# the remainder.
outvec[outvec < 0] = 0
outvec /= sum(outvec)
elif method == "bayesian":
if options is None:
options = {}
tol_val = options.get("tol", 1 / Ncounts)
maxit = options.get("maxiter", None)
outvec = _bayesian_iterative_correct(
correction_matrices, in_vec_norm, tol=tol_val, max_it=maxit
)
else:
valid_methods = ("invert", "bayesian")
raise ValueError("Method must be one of: ", *valid_methods)
outvec *= Ncounts
# counter object with binary from distribution
corrected_counts = {
int_to_binary(index, len(char_bits_order)): Bcount
for index, Bcount in enumerate(outvec)
}
counter = Counter(
{
OutcomeArray.from_readouts([key]): ceil(val)
for key, val in corrected_counts.items()
}
)
# produce and return BackendResult object
return BackendResult(counts=counter, c_bits=char_bits_order)
[docs]
def to_dict(self) -> dict:
"""Get calibration information as a dictionary.
:return: Dictionary output
:rtype: Dict
"""
correlations = []
for subset in self.correlations:
correlations.append([(uid.reg_name, uid.index) for uid in subset])
node_index_hashable = [
((uid.reg_name, uid.index), self.node_index_dict[uid])
for uid in self.node_index_dict
]
char_matrices = [m.tolist() for m in self.characterisation_matrices]
return {
"correlations": correlations,
"node_index_dict": node_index_hashable,
"characterisation_matrices": char_matrices,
}
[docs]
@classmethod
def from_dict(class_obj, d: dict) -> "SpamCorrecter":
"""Build a `SpamCorrecter` instance from a dictionary in the format returned
by `to_dict`.
:return: Dictionary of calibration information.
:rtype: SpamCorrecter
"""
new_inst = class_obj(
[
[Node(*pair)]
for subset_tuple in d["correlations"]
for pair in subset_tuple
]
)
new_inst.node_index_dict = dict(
[
(Node(*pair[0]), (int(pair[1][0]), int(pair[1][1])))
for pair in d["node_index_dict"]
]
)
new_inst.characterisation_matrices = [
np.array(m) for m in d["characterisation_matrices"]
]
return new_inst