return f”{prefix}_{h}” class MockEHR: def __init__(self): self.orders_queue: checklist[SurgeryOrder] = []
self.patient_docs: dictionary[str, List[ClinicalDocument]]= {} def seed_data(self, n_orders: int = 5):random.seed(7) def make_patient(i: int) -> Affected person: pid = f”PT{i:04d}” plan =random.selection(checklist(InsurancePlan)) return Affected person(patient_id=pid, title=f”Affected person {i}”, dob=”1980-01-01″, member_id=f”M{i:08d}”, plan=plan, ) def docs_for_order(affected person: affected person, surgical procedure: SurgeryType) -> checklist[ClinicalDocument]: base = [
ClinicalDocument(
doc_id=_stable_id(“DOC”, patient.patient_id + “H&P”),
doc_type=DocType.H_AND_P,
created_at=_now_iso(),
content=”H&P: Relevant history, exam findings, and surgical indication.”,
source=”EHR”,
),
ClinicalDocument(
doc_id=_stable_id(“DOC”, patient.patient_id + “NOTE”),
doc_type=DocType.CLINICAL_NOTE,
created_at=_now_iso(),
content=”Clinical note: Symptoms, conservative management attempted, clinician assessment.”,
source=”EHR”,
),
ClinicalDocument(
doc_id=_stable_id(“DOC”, patient.patient_id + “MEDS”),
doc_type=DocType.MED_LIST,
created_at=_now_iso(),
content=”Medication list: Current meds, allergies, contraindications.”,
source=”EHR”,
),
]
Possibly = []
In case of surgical procedure [SurgeryType.KNEE_ARTHROPLASTY, SurgeryType.SPINE_FUSION, SurgeryType.BARIATRIC]:mayy.append(ClinicalDocument(doc_id=_stable_id(“DOC”,affected person.patient_id + “LABS”),doc_type=DocType.LABS,created_at=_now_iso(),content material=”Laboratory: CBC/CMP throughout the previous 30 days.”,supply=”LabSystem”, ) ) [SurgeryType.SPINE_FUSION, SurgeryType.KNEE_ARTHROPLASTY]:mayy.append(ClinicalDocument(doc_id=_stable_id(“DOC”,affected person.patient_id + “IMG”),doc_type=DocType.IMAGING,created_at=_now_iso(),content material=”Picture processing: MRI/X-ray report back to assist prognosis and severity.”,supply=”Radiology”, ) )closing = Base + [d for d in maybe if random.random() > 0.35]
if random.random() > 0.6: Remaining.append( ClinicalDocument( doc_id=_stable_id(“DOC”,affected person.patient_id + “PRIOR_TX”), doc_type=DocType.PRIOR_TX, created_at=_now_iso(), content material=”Earlier remedy: Tried PT, medicine, injections for greater than 6 weeks.”, supply=”EHR”, ) ) if random.random() > 0.5:closing.append( ClinicalDocument( doc_id=_stable_id(“DOC”,affected person.patient_id + “CONSENT”), doc_type=DocType.CONSENT, created_at=_now_iso(), content material=”Consent: Signed process consent and danger disclosure.”,supply=”EHR”, ) ) vary(1, n_orders + 1): Affected person = make_patient(i) Surgical procedure = random.selection(checklist(SurgeryType)) order = SurgeryOrder( order_id=_stable_id(“ORD”,affected person.patient_id +urgery.worth),affected person=affected person,urgery_type=surgical procedure,scheduled_date=(datetime.utcnow().date() + timedelta(days=random.randint(3, 21))).isoformat(), ordering_provider_npi=str(random.randint(1000000000, 1999999999)), diagnostic_codes=[“M17.11”, “M54.5”] if Surgical procedure != SurgeryType.CATARACT else [“H25.9”]created_at=_now_iso(),) self.orders_queue.append(order) self.patient_docs[patient.patient_id] = docs_for_order(affected person, surgical procedure) def poll_new_surgery_orders(self, max_n: int = 1) -> checklist[SurgeryOrder]: pull = self.orders_queue[:max_n]
self.orders_queue = self.orders_queue[max_n:]
return pull def get_patient_documents(self,patient_id: str) -> checklist[ClinicalDocument]: return checklist(self.patient_docs.get(patient_id, [])) def fetch_Additional_docs(self, patient_id: str, required: Record[DocType]) -> checklist[ClinicalDocument]: technology = []
For the required dt: generated.append( ClinicalDocument( doc_id=_stable_id(“DOC”,patient_id + dt.worth + str(time.time())), doc_type=dt, created_at=_now_iso(), content material=f”Auto-collect doc for {dt.worth}: extracted and formatted per payer coverage.”, supply=”AutoCollector”, ) ) self.patient_docs.setdefault(patient_id, []).prolong(generated) return generated class MockPayerPortal: def __init__(self): self.db: Dict[str, Dict[str, Any]]= {}random.seed(11) def required_docs_policy(self, Plan: InsurancePlan, Surgical procedure: SurgeryType) -> Record[DocType]: base = [DocType.H_AND_P, DocType.CLINICAL_NOTE, DocType.MED_LIST]
In case of surgical procedure [SurgeryType.SPINE_FUSION, SurgeryType.KNEE_ARTHROPLASTY]: base += [DocType.IMAGING, DocType.LABS, DocType.PRIOR_TX]
For surgical procedure == SurgeryType.BARIATRIC: base += [DocType.LABS, DocType.PRIOR_TX]
When you’ve got plans [InsurancePlan.PAYER_BETA, InsurancePlan.PAYER_GAMMA]: base += [DocType.CONSENT]
returnsorted(checklist(set(base)), key=lambda x: x.worth) def submit(self, pa: PriorAuthRequest) -> PayerResponse: payer_ref = _stable_id(“PAYREF”, pa.request_id + _now_iso()) docs_present = {d.doc_type for d in pa.docs_attached} required = self.required_docs_policy(pa.order.affected person.plan, pa.order.surgery_type) lacking = [d for d in required if d not in docs_present]
self.db[payer_ref] = { “standing”: AuthStatus.SUBMITTED, “order_id”: pa.order.order_id, “plan”: pa.order.affected person.plan, “surgical procedure”: pa.order.surgery_type, “lacking”: lacking, “polls”: 0, “submitted_at”: _now_iso(), “denial_reason”: None, } msg = “Submission acquired. Case queued for evaluate.” If lacking: msg += “Preliminary validation signifies that the doc is incomplete.” return PayerResponse(standing=AuthStatus.SUBMITTED, payer_ref=payer_ref, message=msg) def check_status(self, payer_ref: str) -> PayerResponse: payer_ref is self.db If not: return PayerResponse( standing=AuthStatus.DENIED, payer_ref=payer_ref, message=”Case not discovered (doable payer system error).”, Denial_reason=DenialReason.OTHER, confidence=0.4,) case = self.db[payer_ref]
case[“polls”] If += 1[“status”] == AuthStatus.SUBMITTED and case[“polls”] >= 1: case[“status”] = if AuthStatus.IN_REVIEW[“status”] == AuthStatus.IN_REVIEW and case[“polls”] >= 3: if[“missing”]: case[“status”] = if AuthStatus.DENIED[“denial_reason”] = DenialReason.MISSING_DOCS else: roll = random.random() if roll < 0.10: case["status"] = if AuthStatus.DENIED["denial_reason"] = DenialReason.CODING_ISSUE elif roll < 0.18: ケース["status"] = AuthStatus.DENIED の場合["denial_reason"] = DenialReason.MEDICAL_NECESSITY それ以外の場合: ケース["status"] = AuthStatus.APPROVED の場合["status"] == AuthStatus.DENIED: dr = ケース["denial_reason"] または DenialReason.OTHER が欠落している = case["missing"] if dr == DenialReason.MISSING_DOCS else []
conf = 0.9 if dr != DenialReason.OTHER else 0.55 return PayerResponse( status=AuthStatus.DENIED, payer_ref=payer_ref, message=f"Denied. Reason={dr.value}.", Denial_reason=dr, missing_docs=missing,confidence=conf, ) if case["status"] == AuthStatus.APPROVED: return PayerResponse( status=AuthStatus.APPROVED, payer_ref=payer_ref, message="承認されました。承認が発行されました。",confidence=0.95, ) return PayerResponse( status=case["status"]、payer_ref=payer_ref、message=f"Status={case['status']。価値}。投票数={ケース['polls']}.",confidence=0.9, ) def file_appeal(self, payer_ref: str, accept_text: str,attached_docs: List[ClinicalDocument]) -> PayerResponse: If payer_ref is just not in self.db: return PayerResponse( standing=AuthStatus.DENIED, payer_ref=payer_ref, message=”Dispute failed: Case not discovered.”, Denial_reason=DenialReason.OTHER,confidence=0.4, ) case = self.db[payer_ref]
docs_present = {attached_docs d d.doc_type} Still_missing = [d for d in case[“missing”] if not in docs_present][“missing”] = If Still_missing[“status”] = if AuthStatus.APPEALED[“polls”] = 0 msg = “Your enchantment has been submitted and is awaiting evaluate.” if Still_missing: msg += f” Warning: Not discovered but {‘, ‘.be a part of([d.value for d in still_missing])}.” return PayerResponse(standing=AuthStatus.APPEALED, payer_ref=payer_ref, message=msg, confidence=0.9)


