"""Client API for quotas in Nexus."""fromtypingimportLiteralfromqnexus.clientimportget_nexus_clientfromqnexus.exceptionsimportResourceFetchFailedfromqnexus.modelsimportQuotafromqnexus.models.referencesimportDataframableListQuotaName=Literal["compilation","simulation","jupyterhub","database_usage"]_quota_map={"compilation":"total_time_taken","simulation":"total_time_taken","jupyterhub":"total_time_taken","database_usage":"megabytes_used",}NO_QUOTA_SET="No quota set for user"
[docs]defget_all()->DataframableList[Quota]:"""Get all quotas, including usage."""res=get_nexus_client().get("/api/quotas/v1beta",params={"entity_type":"user","include_usage":True})ifres.status_code!=200:raiseResourceFetchFailed(message=res.text,status_code=res.status_code)quota_list:DataframableList[Quota]=DataframableList([])forquotainres.json():try:quota_key=_quota_map[quota["quota"]["name"]]exceptKeyError:# If the quota name is not in the map, skip itcontinuequota_value=quota["quota"]["details"].get(quota_key,None)quota_list.append(Quota(name=quota["quota"]["name"],description=quota["quota"]["details"]["description"],usage=quota["quota"]["usage"].get(quota_key,0),quota=quota_valueifquota_valueelseNO_QUOTA_SET,))returnquota_list
[docs]defget(name:QuotaName)->Quota:"""Get specific quota details by name."""res=get_nexus_client().get("/api/quotas/v1beta",params={"entity_type":"user","name":name,"include_usage":True},)ifres.status_code!=200:raiseResourceFetchFailed(message=res.text,status_code=res.status_code)quota=res.json()[0]quota_key=_quota_map[quota["quota"]["name"]]quota_value=quota["quota"]["details"].get(quota_key,None)returnQuota(name=quota["quota"]["name"],description=quota["quota"]["details"]["description"],usage=quota["quota"]["usage"].get(quota_key,None),quota=quota_valueifquota_valueelseNO_QUOTA_SET,)
[docs]defcheck_quota(name:QuotaName)->bool:"""Check that the current user has available quota."""res=get_nexus_client().get("/api/quotas/v1beta/guard",params={"name":name})ifres.status_code!=200:returnFalsereturnTrue