"""Client API for execution in Nexus."""
from typing import Union, cast
from hugr.qsystem.result import QsysResult
from pytket.backends.backendinfo import BackendInfo
from pytket.backends.backendresult import BackendResult
import qnexus.exceptions as qnx_exc
from qnexus.client import circuits as circuit_api
from qnexus.client import get_nexus_client
from qnexus.client import hugr as hugr_api
from qnexus.client import qir as qir_api
from qnexus.client.utils import accept_circuits_for_programs
from qnexus.context import get_active_project, merge_properties_from_context
from qnexus.models import BackendConfig, StoredBackendInfo, to_pytket_backend_info
from qnexus.models.annotations import Annotations, CreateAnnotations, PropertiesDict
from qnexus.models.job_status import JobStatusEnum
from qnexus.models.language import Language
from qnexus.models.references import (
CircuitRef,
DataframableList,
ExecuteJobRef,
ExecutionProgram,
ExecutionResultRef,
GpuDecoderConfigRef,
HUGRRef,
JobType,
ProjectRef,
QIRRef,
QIRResult,
ResultType,
ResultVersions,
WasmModuleRef,
)
from qnexus.models.utils import assert_never
[docs]
@accept_circuits_for_programs
@merge_properties_from_context
def start_execute_job(
programs: Union[ExecutionProgram, list[ExecutionProgram]],
n_shots: list[int] | list[None],
backend_config: BackendConfig,
name: str,
description: str = "",
properties: PropertiesDict | None = None,
project: ProjectRef | None = None,
valid_check: bool = True,
postprocess: bool = False,
noisy_simulator: bool = True,
language: Language = Language.AUTO,
seed: int | None = None,
credential_name: str | None = None,
wasm_module: WasmModuleRef | None = None,
gpu_decoder_config: GpuDecoderConfigRef | None = None,
user_group: str | None = None,
) -> ExecuteJobRef:
"""
Submit an execute job to be run in Nexus. Returns an ``ExecuteJobRef``
object which can be used to check the job's status. See ``qnexus.execute``
for a utility method that waits for the results and returns them.
"""
project = project or get_active_project(project_required=True)
project = cast(ProjectRef, project)
program_ids = (
[str(p.id) for p in programs]
if isinstance(programs, list)
else [str(programs.id)]
)
if len(n_shots) != len(program_ids):
raise ValueError("Number of programs must equal number of n_shots.")
attributes_dict = CreateAnnotations(
name=name,
description=description,
properties=properties,
).model_dump(exclude_none=True)
attributes_dict.update(
{
"job_type": "execute",
"definition": {
"job_definition_type": "execute_job_definition",
"backend_config": backend_config.model_dump(exclude_none=True),
"user_group": user_group,
"valid_check": valid_check,
"postprocess": postprocess,
"noisy_simulator": noisy_simulator,
"language": (
language.value if isinstance(language, Language) else language
),
"seed": seed,
"wasm_module_id": str(wasm_module.id) if wasm_module else None,
"gpu_decoder_config_id": (
str(gpu_decoder_config.id) if gpu_decoder_config else None
),
"credential_name": credential_name,
"items": [
{"program_id": program_id, "n_shots": n_shot}
for program_id, n_shot in zip(program_ids, n_shots)
],
},
}
)
relationships = {
"project": {"data": {"id": str(project.id), "type": "project"}},
}
req_dict = {
"data": {
"attributes": attributes_dict,
"relationships": relationships,
"type": "job",
}
}
resp = get_nexus_client().post(
"/api/jobs/v1beta3",
json=req_dict,
)
if resp.status_code != 202:
raise qnx_exc.ResourceCreateFailed(
message=resp.text, status_code=resp.status_code
)
return ExecuteJobRef(
id=resp.json()["data"]["id"],
annotations=Annotations.from_dict(resp.json()["data"]["attributes"]),
job_type=JobType.EXECUTE,
last_status=JobStatusEnum.SUBMITTED,
last_message="",
project=project,
backend_config_store=backend_config,
)
def _results(
execute_job: ExecuteJobRef,
allow_incomplete: bool = False,
) -> DataframableList[ExecutionResultRef]:
"""Get the results from an execute job."""
resp = get_nexus_client().get(f"/api/jobs/v1beta3/{execute_job.id}")
if resp.status_code != 200:
raise qnx_exc.ResourceFetchFailed(
message=resp.text, status_code=resp.status_code
)
resp_data = resp.json()["data"]
job_status = resp_data["attributes"]["status"]["status"]
if job_status != "COMPLETED" and not allow_incomplete:
raise qnx_exc.ResourceFetchFailed(message=f"Job status: {job_status}")
execute_results: DataframableList[ExecutionResultRef] = DataframableList([])
for item in resp_data["attributes"]["definition"]["items"]:
# Check if item is in a "final" state and append to results
if item["status"]["status"] in (
"CANCELLED",
"ERROR",
"DEPLETED",
"TERMINATED",
"COMPLETED",
):
result_type: ResultType
match item.get("result_type", None):
case ResultType.QSYS:
result_type = ResultType.QSYS
case ResultType.PYTKET:
result_type = ResultType.PYTKET
case None:
continue
case _:
assert_never(item["result_type"])
result_ref = ExecutionResultRef(
id=item["result_id"],
annotations=execute_job.annotations,
project=execute_job.project,
result_type=result_type,
)
execute_results.append(result_ref)
return execute_results
def _fetch_pytket_execution_result(
result_ref: ExecutionResultRef,
) -> tuple[BackendResult, BackendInfo, Union[CircuitRef, QIRRef]]:
"""Get the results for an execute job item."""
assert result_ref.result_type == ResultType.PYTKET, "Incorrect result type"
res = get_nexus_client().get(f"/api/results/v1beta3/{result_ref.id}")
if res.status_code != 200:
raise qnx_exc.ResourceFetchFailed(message=res.text, status_code=res.status_code)
res_dict = res.json()
program_data = res_dict["data"]["relationships"]["program"]["data"]
program_id = program_data["id"]
program_type = program_data["type"]
input_program: Union[CircuitRef | QIRRef]
match program_type:
case "circuit":
input_program = circuit_api._fetch_by_id(program_id, scope=None)
case "qir":
input_program = qir_api._fetch_by_id(program_id, scope=None)
case _:
raise ValueError(f"Unknown program type {type}")
results_data = res_dict["data"]["attributes"]
results_dict = {k: v for k, v in results_data.items() if v != [] and v is not None}
backend_result = BackendResult.from_dict(results_dict)
backend_info_data = next(
data for data in res_dict["included"] if data["type"] == "backend_snapshot"
)
backend_info = to_pytket_backend_info(
StoredBackendInfo(**backend_info_data["attributes"])
)
return (backend_result, backend_info, input_program)
def _fetch_qsys_execution_result(
result_ref: ExecutionResultRef,
version: ResultVersions,
) -> tuple[QsysResult | QIRResult, BackendInfo, HUGRRef | QIRRef]:
"""Get the results of a next-gen Qsys execute job."""
assert result_ref.result_type == ResultType.QSYS, "Incorrect result type"
params = {"version": version.value}
res = get_nexus_client().get(
f"/api/qsys_results/v1beta/{result_ref.id}", params=params
)
if res.status_code != 200:
raise qnx_exc.ResourceFetchFailed(message=res.text, status_code=res.status_code)
res_dict = res.json()
input_program_id = res_dict["data"]["relationships"]["program"]["data"]["id"]
input_program: HUGRRef | QIRRef
result: QsysResult | QIRResult
match res_dict["data"]["relationships"]["program"]["data"]["type"]:
case "hugr":
input_program = hugr_api._fetch_by_id(
input_program_id,
scope=None,
)
result = QsysResult(res_dict["data"]["attributes"]["results"])
case "qir":
input_program = qir_api._fetch_by_id(
input_program_id,
scope=None,
)
if version == ResultVersions.DEFAULT:
result = QIRResult(res_dict["data"]["attributes"]["results"])
else:
result = QsysResult(res_dict["data"]["attributes"]["results"])
backend_info_data = next(
data for data in res_dict["included"] if data["type"] == "backend_snapshot"
)
backend_info = to_pytket_backend_info(
StoredBackendInfo(**backend_info_data["attributes"])
)
return (
result,
backend_info,
input_program,
)