Source code for pytket.extensions.qiskit.tket_job

# Copyright 2020-2024 Quantinuum
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Optional, cast

from pytket.backends import ResultHandle, StatusEnum
from pytket.backends.backend import Backend, KwargTypes
from pytket.circuit import Bit, Qubit, UnitID
from pytket.extensions.qiskit.result_convert import (
    _get_header_info,
    backendresult_to_qiskit_resultdata,
)
from qiskit.providers import JobStatus, JobV1  # type: ignore
from qiskit.result import Result  # type: ignore

if TYPE_CHECKING:
    from pytket.extensions.qiskit.tket_backend import TketBackend


[docs] @dataclass class JobInfo: circuit_name: str qbits: list[Qubit] cbits: list[Bit] n_shots: Optional[int]
[docs] class TketJob(JobV1): """TketJob wraps a :py:class:`ResultHandle` list as a :py:class:`qiskit.providers.JobV1`"""
[docs] def __init__( self, backend: "TketBackend", handles: list[ResultHandle], jobinfos: list[JobInfo], final_maps: list[None] | list[dict[UnitID, UnitID]], ): """Initializes the asynchronous job.""" super().__init__(backend, str(handles[0])) self._handles = handles self._jobinfos = jobinfos self._result: Optional[Result] = None self._final_maps = final_maps
@property def _pytket_backend(self) -> Backend: return cast("TketBackend", self._backend)._backend
[docs] def submit(self) -> None: # Circuits have already been submitted before obtaining the job pass
[docs] def result(self, **kwargs: KwargTypes) -> Result: if self._result is not None: return self._result result_list = [] for h, jobinfo, fm in zip(self._handles, self._jobinfos, self._final_maps): tk_result = self._pytket_backend.get_result(h) creg_sizes, clbit_labels = _get_header_info(jobinfo.cbits) # type: ignore qreg_sizes, qubit_labels = _get_header_info(jobinfo.qbits) # type: ignore memory_slots = sum(size for _, size in creg_sizes) result_list.append( { "shots": jobinfo.n_shots, "success": True, "data": backendresult_to_qiskit_resultdata( tk_result, jobinfo.cbits, jobinfo.qbits, fm # type: ignore ), "header": { "name": jobinfo.circuit_name, "creg_sizes": creg_sizes, "memory_slots": memory_slots, "clbit_labels": clbit_labels, "qreg_sizes": qreg_sizes, "qubit_labels": qubit_labels, }, } ) self._pytket_backend.pop_result(h) self._result = Result.from_dict( { "results": result_list, "backend_name": self._backend.name, "backend_version": self._backend.backend_version, "job_id": self._job_id, "qobj_id": ", ".join(str(hand) for hand in self._handles), "success": True, } ) return self._result
[docs] def cancel(self) -> None: for h in self._handles: self._pytket_backend.cancel(h)
[docs] def status(self) -> Any: status_list = [self._pytket_backend.circuit_status(h) for h in self._handles] if any(s.status == StatusEnum.RUNNING for s in status_list): return JobStatus.RUNNING elif any(s.status == StatusEnum.ERROR for s in status_list): return JobStatus.ERROR elif any(s.status == StatusEnum.CANCELLED for s in status_list): return JobStatus.CANCELLED elif all(s.status == StatusEnum.COMPLETED for s in status_list): return JobStatus.DONE else: return JobStatus.INITIALIZING