"""Status types for Nexus Jobs."""fromdatetimeimportdatetimefromenumimportEnumfromtypingimportAny,Dict,NamedTupleimportpandasaspdfromqnexus.models.utilsimporttruncate_to_2dp
classJobStatus(NamedTuple):"""The status of a job along with an optional description. Optionally can also include extra fields such as: * Detailed error information. * Timestamps for changes in status. * Queue position. * Cost. """status:JobStatusEnummessage:str=""error_detail:str|None=None# Timestamp for when a status was last entered.completed_time:datetime|None=Nonequeued_time:datetime|None=Nonesubmitted_time:datetime|None=Nonerunning_time:datetime|None=Nonecancelled_time:datetime|None=Noneerror_time:datetime|None=Nonequeue_position:int|None=Nonecost:float|None=None@classmethoddeffrom_dict(cls,dic:Dict[str,Any])->"JobStatus":"""Construct from JSON serializable dictionary."""invalid=ValueError(f"Dictionary invalid format for JobStatus: {dic}")if"message"notindicor"status"notindic:raiseinvalidtry:status=next(sforsinJobStatusEnumifdic["status"]==s.name)exceptStopIterationaserr:raiseinvalidfromerrerror_detail=dic.get("error_detail",None)defread_optional_datetime(key:str)->datetime|None:x=dic.get(key)returndatetime.fromisoformat(x)ifxisnotNoneelseNonecompleted_time=read_optional_datetime("completed_time")queued_time=read_optional_datetime("queued_time")submitted_time=read_optional_datetime("submitted_time")running_time=read_optional_datetime("running_time")cancelled_time=read_optional_datetime("cancelled_time")error_time=read_optional_datetime("error_time")queue_position=dic.get("queue_position",None)cost:float|None=truncate_to_2dp(dic.get("cost",None))returncls(status,dic["message"],error_detail,completed_time,queued_time,submitted_time,running_time,cancelled_time,error_time,queue_position,cost,)defdf(self)->pd.DataFrame:"""Present in a pandas DataFrame."""returnpd.DataFrame.from_dict(self._asdict(),orient="index",).TWAITING_STATUS={JobStatusEnum.QUEUED,JobStatusEnum.SUBMITTED,JobStatusEnum.RUNNING}