Source code for qnexus.client.jobs

"""Client API for jobs in Nexus."""

import asyncio
import json
import ssl
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Type, Union, cast, overload
from uuid import UUID

from pytket.backends.status import WAITING_STATUS, StatusEnum
from quantinuum_schemas.models.backend_config import config_name_to_class
from quantinuum_schemas.models.hypertket_config import HyperTketConfig
from websockets.client import connect
from websockets.exceptions import ConnectionClosed

import qnexus.exceptions as qnx_exc
from qnexus.client import get_nexus_client
from qnexus.client.jobs import _compile, _execute
from qnexus.client.nexus_iterator import NexusIterator
from qnexus.client.utils import accept_circuits_for_programs, handle_fetch_errors
from qnexus.config import CONFIG
from qnexus.context import (
    get_active_project,
    merge_project_from_context,
    merge_properties_from_context,
)
from qnexus.models import BackendConfig
from qnexus.models.annotations import Annotations, PropertiesDict
from qnexus.models.filters import (
    CreatorFilter,
    FuzzyNameFilter,
    JobStatusEnum,
    JobStatusFilter,
    JobTypeFilter,
    PaginationFilter,
    ProjectRefFilter,
    PropertiesFilter,
    ScopeFilter,
    ScopeFilterEnum,
    SortFilter,
    SortFilterEnum,
    TimeFilter,
)
from qnexus.models.job_status import JobStatus
from qnexus.models.language import Language
from qnexus.models.references import (
    CircuitRef,
    CompilationResultRef,
    CompileJobRef,
    DataframableList,
    ExecuteJobRef,
    ExecutionProgram,
    ExecutionResult,
    ExecutionResultRef,
    JobRef,
    JobType,
    ProjectRef,
    WasmModuleRef,
)
from qnexus.models.utils import assert_never

EPOCH_START = datetime(1970, 1, 1, tzinfo=timezone.utc)


[docs] class RemoteRetryStrategy(str, Enum): """Strategy to use when retrying jobs. Each strategy defines how the system should approach resolving potential conflicts with remote state. DEFAULT will only attempt to re-sync status and collect results from the third party. Duplicate results will not be saved. ALLOW_RESUBMIT will submit the job to the third party again if # the system has no record of a third party handle. FORCE_RESUBMIT will submit the job to the third party again if the system has a job handle already but no result. FULL_RESTART will act as though the job is entirely fresh and re-perform every action. """ DEFAULT = "DEFAULT" ALLOW_RESUBMIT = "ALLOW_RESUBMIT" FORCE_RESUBMIT = "FORCE_RESUBMIT" FULL_RESTART = "FULL_RESTART"
class Params( CreatorFilter, PropertiesFilter, PaginationFilter, FuzzyNameFilter, JobStatusFilter, ProjectRefFilter, JobTypeFilter, ScopeFilter, SortFilter, TimeFilter, ): """Params for filtering jobs"""
[docs] @merge_project_from_context def get_all( name_like: str | None = None, creator_email: list[str] | None = None, project: ProjectRef | None = None, properties: PropertiesDict | None = None, job_status: list[JobStatusEnum] | None = None, job_type: list[JobType] | None = None, created_before: datetime | None = None, created_after: datetime | None = datetime(day=1, month=1, year=2023), modified_before: datetime | None = None, modified_after: datetime | None = None, sort_filters: list[SortFilterEnum] | None = None, page_number: int | None = None, page_size: int | None = None, scope: ScopeFilterEnum | None = None, ) -> NexusIterator[CompileJobRef | ExecuteJobRef]: """Get a NexusIterator over jobs with optional filters.""" project = project or get_active_project(project_required=False) project = cast(ProjectRef, project) params = Params( name_like=name_like, creator_email=creator_email, project=project, status=( JobStatusFilter.convert_status_filters(job_status) if job_status else None ), job_type=job_type, properties=properties, created_before=created_before, created_after=created_after, modified_before=modified_before, modified_after=modified_after, sort=SortFilter.convert_sort_filters(sort_filters), page_number=page_number, page_size=page_size, scope=scope, ).model_dump(by_alias=True, exclude_unset=True, exclude_none=True) return NexusIterator( resource_type="Job", nexus_url="/api/jobs/v1beta3", params=params, wrapper_method=_to_jobref, nexus_client=get_nexus_client(), )
def _to_jobref(data: dict[str, Any]) -> DataframableList[CompileJobRef | ExecuteJobRef]: """Parse a json dictionary into a list of JobRefs.""" job_list: list[CompileJobRef | ExecuteJobRef] = [] for entry in data["data"]: project_id = entry["relationships"]["project"]["data"]["id"] project_details = next( proj for proj in data["included"] if proj["id"] == project_id ) project = ProjectRef( id=project_id, annotations=Annotations.from_dict(project_details["attributes"]), contents_modified=project_details["attributes"]["contents_modified"], archived=project_details["attributes"]["archived"], ) job_type: Type[CompileJobRef] | Type[ExecuteJobRef] match entry["attributes"]["job_type"]: case JobType.COMPILE: job_type = CompileJobRef case JobType.EXECUTE: job_type = ExecuteJobRef case _: assert_never(entry["attributes"]["job_type"]) job_list.append( job_type( id=entry["id"], annotations=Annotations.from_dict(entry["attributes"]), job_type=entry["attributes"]["job_type"], last_status=JobStatus.from_dict(entry["attributes"]["status"]).status, last_message=JobStatus.from_dict(entry["attributes"]["status"]).message, project=project, ) ) return DataframableList(job_list)
[docs] def get( id: Union[str, UUID, None] = None, name_like: str | None = None, creator_email: list[str] | None = None, project: ProjectRef | None = None, properties: PropertiesDict | None = None, job_status: list[JobStatusEnum] | None = None, job_type: list[JobType] | None = None, created_before: datetime | None = None, created_after: datetime | None = datetime(day=1, month=1, year=2023), modified_before: datetime | None = None, modified_after: datetime | None = None, sort_filters: list[SortFilterEnum] | None = None, page_number: int | None = None, page_size: int | None = None, scope: ScopeFilterEnum | None = None, ) -> JobRef: """ Get a single job using filters. Throws an exception if the filters do not match exactly one object. """ if id: return _fetch_by_id(job_id=id, scope=scope) return get_all( name_like=name_like, creator_email=creator_email, project=project, properties=properties, job_status=job_status, job_type=job_type, created_before=created_before, created_after=created_after, modified_before=modified_before, modified_after=modified_after, sort_filters=sort_filters, page_number=page_number, page_size=page_size, scope=scope, ).try_unique_match()
def _fetch_by_id(job_id: UUID | str, scope: ScopeFilterEnum | None) -> JobRef: """Utility method for fetching directly by a unique identifier.""" params = Params( scope=scope, ).model_dump(by_alias=True, exclude_unset=True, exclude_none=True) res = get_nexus_client().get(f"/api/jobs/v1beta3/{job_id}", params=params) handle_fetch_errors(res) job_data = res.json() project_id = job_data["data"]["relationships"]["project"]["data"]["id"] project_details = next( proj for proj in job_data["included"] if proj["id"] == project_id ) project = ProjectRef( id=project_id, annotations=Annotations.from_dict(project_details["attributes"]), contents_modified=project_details["attributes"]["contents_modified"], archived=project_details["attributes"]["archived"], ) job_type: Type[CompileJobRef] | Type[ExecuteJobRef] match job_data["data"]["attributes"]["job_type"]: case JobType.COMPILE: job_type = CompileJobRef case JobType.EXECUTE: job_type = ExecuteJobRef case _: assert_never(job_data["attributes"]["job_type"]) backend_config_dict = job_data["data"]["attributes"]["definition"]["backend_config"] backend_config_class = config_name_to_class[backend_config_dict["type"]] backend_config: BackendConfig = backend_config_class( # type: ignore **backend_config_dict ) return job_type( id=job_data["data"]["id"], annotations=Annotations.from_dict(job_data["data"]["attributes"]), job_type=job_data["data"]["attributes"]["job_type"], last_status=JobStatus.from_dict( job_data["data"]["attributes"]["status"] ).status, last_message=JobStatus.from_dict( job_data["data"]["attributes"]["status"] ).message, project=project, backend_config_store=backend_config, )
[docs] def wait_for( job: JobRef, wait_for_status: StatusEnum = StatusEnum.COMPLETED, timeout: float | None = 300.0, ) -> JobStatus: """Check job status until the job is complete (or a specified status).""" job_status = asyncio.run( asyncio.wait_for( listen_job_status(job=job, wait_for_status=wait_for_status), timeout=timeout, ) ) if job_status.status == StatusEnum.ERROR: raise qnx_exc.JobError(f"Job errored with detail: {job_status.error_detail}") return job_status
[docs] def status(job: JobRef) -> JobStatus: """Get the status of a job.""" resp = get_nexus_client().get(f"api/jobs/v1beta3/{job.id}/attributes/status") if resp.status_code != 200: raise qnx_exc.ResourceFetchFailed( message=resp.text, status_code=resp.status_code ) job_status = JobStatus.from_dict(resp.json()) # job.last_status = job_status.status return job_status
[docs] async def listen_job_status( job: JobRef, wait_for_status: StatusEnum = StatusEnum.COMPLETED ) -> JobStatus: """Check the Status of a Job via a websocket connection. Will use SSO tokens.""" job_status = status(job) # logger.debug("Current job status: %s", job_status.status) if job_status.status not in WAITING_STATUS or job_status.status == wait_for_status: return job_status # If we pass True into the websocket connection, it sets a default SSLContext. # See: https://websockets.readthedocs.io/en/stable/reference/client.html ssl_reconfigured: Union[bool, ssl.SSLContext] = True if not CONFIG.httpx_verify: ssl_reconfigured = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ssl_reconfigured.check_hostname = False ssl_reconfigured.verify_mode = ssl.CERT_NONE extra_headers = { # TODO, this cookie will expire frequently "Cookie": f"myqos_id={get_nexus_client().auth.cookies.get('myqos_id')}" # type: ignore } async for websocket in connect( f"{CONFIG.websockets_url}/api/jobs/v1beta3/{job.id}/attributes/status/ws", ssl=ssl_reconfigured, extra_headers=extra_headers, # logger=logger, ): try: async for status_json in websocket: # logger.debug("New status: %s", status_json) job_status = JobStatus.from_dict(json.loads(status_json)) if ( job_status.status not in WAITING_STATUS or job_status.status == wait_for_status ): break break except ConnectionClosed: # logger.debug( # "Websocket connection closed... attempting to reconnect..." # ) continue finally: try: await websocket.close(code=1000, reason="Client closed connection") except GeneratorExit: pass return job_status
@overload def results( job: CompileJobRef, allow_incomplete: bool = False ) -> DataframableList[CompilationResultRef]: ... @overload def results( job: ExecuteJobRef, allow_incomplete: bool = False ) -> DataframableList[ExecutionResultRef]: ...
[docs] def results( job: CompileJobRef | ExecuteJobRef, allow_incomplete: bool = False, ) -> DataframableList[CompilationResultRef] | DataframableList[ExecutionResultRef]: """Get the ResultRefs from a JobRef, if the job is complete. To enable fetching results from Jobs with incomplete items, set allow_incomplete=True. """ match job: case CompileJobRef(): return _compile._results(job, allow_incomplete) case ExecuteJobRef(): return _execute._results(job, allow_incomplete) case _: assert_never(job.job_type)
[docs] def retry_submission( job: JobRef, retry_status: list[StatusEnum] | None = None, remote_retry_strategy: RemoteRetryStrategy = RemoteRetryStrategy.DEFAULT, user_group: str | None = None, ) -> None: """Retry a job in Nexus according to status(es) or retry strategy. By default, jobs with the ERROR status will be retried. """ body: dict[str, str | list[str]] = {"remote_retry_strategy": remote_retry_strategy} if user_group is not None: body["user_group"] = user_group if retry_status is not None: body["retry_status"] = [status.name for status in retry_status] res = get_nexus_client().post( f"/api/jobs/v1beta3/{job.id}/rpc/retry", json=body, ) if res.status_code != 202: res.raise_for_status()
[docs] def cancel(job: JobRef) -> None: """Attempt cancellation of a job in Nexus. If the job has been submitted to a backend, Nexus will request cancellation of the job. """ res = get_nexus_client().post( f"/api/jobs/v1beta3/{job.id}/rpc/cancel", json={}, ) if res.status_code != 202: res.raise_for_status()
[docs] def delete(job: JobRef) -> None: """Delete a job in Nexus.""" res = get_nexus_client().delete( f"/api/jobs/v1beta3/{job.id}", ) if res.status_code != 204: res.raise_for_status()
[docs] @accept_circuits_for_programs @merge_properties_from_context def compile( programs: Union[CircuitRef, list[CircuitRef]], backend_config: BackendConfig, name: str, description: str = "", project: ProjectRef | None = None, properties: PropertiesDict | None = None, optimisation_level: int = 2, credential_name: str | None = None, user_group: str | None = None, hypertket_config: HyperTketConfig | None = None, timeout: float | None = 300.0, ) -> DataframableList[CircuitRef]: """ Utility method to run a compile job on a program or programs and return a DataframableList of the compiled programs. """ project = project or get_active_project(project_required=True) project = cast(ProjectRef, project) compile_job_ref = _compile.start_compile_job( programs=programs, backend_config=backend_config, name=name, description=description, project=project, properties=properties, optimisation_level=optimisation_level, credential_name=credential_name, user_group=user_group, hypertket_config=hypertket_config, ) wait_for(job=compile_job_ref, timeout=timeout) compile_results = results(compile_job_ref) return DataframableList( [compile_result.get_output() for compile_result in compile_results] )
[docs] @accept_circuits_for_programs @merge_properties_from_context def execute( programs: Union[ExecutionProgram, list[ExecutionProgram]], n_shots: list[int] | list[None], backend_config: BackendConfig, name: str, description: str = "", properties: PropertiesDict | None = None, project: ProjectRef | None = None, valid_check: bool = True, postprocess: bool = False, noisy_simulator: bool = True, wasm_module: WasmModuleRef | None = None, language: Language = Language.AUTO, seed: int | None = None, credential_name: str | None = None, user_group: str | None = None, timeout: float | None = 300.0, ) -> list[ExecutionResult]: """ Utility method to run an execute job and return the results. Blocks until the results are available. See ``qnexus.start_execute_job`` for a function that submits the job and returns immediately, rather than waiting for results. """ execute_job_ref = _execute.start_execute_job( programs=programs, n_shots=n_shots, backend_config=backend_config, name=name, description=description, properties=properties, project=project, valid_check=valid_check, postprocess=postprocess, noisy_simulator=noisy_simulator, wasm_module=wasm_module, language=language, seed=seed, credential_name=credential_name, user_group=user_group, ) wait_for(job=execute_job_ref, timeout=timeout) execute_results = results(execute_job_ref) return [result.download_result() for result in execute_results]