# utils/responseparser.py import json import lxml.etree as etree from datetime import datetime from typing import List, Dict, Optional, Union import base64 import logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class PatientDataExtractor: """Class to extract fields from FHIR Patient Bundle (JSON) or C-CDA (XML).""" def __init__(self, patient_data: str, format_type: str = None): """Initialize with patient data and optional format type.""" self.format = format_type.lower() if format_type else self._detect_format(patient_data) if self.format == "xml": self.data = etree.fromstring(patient_data.encode('utf-8')) if isinstance(patient_data, str) else patient_data self.ns = {'hl7': 'urn:hl7-org:v3'} elif self.format == "json": self.data = json.loads(patient_data) if isinstance(patient_data, str) else patient_data else: raise ValueError("Unsupported format. Use 'xml' or 'json'") self.patients = self._extract_patients() self.current_patient_idx = 0 def _detect_format(self, data: str) -> str: """Detect the format of the input data.""" if isinstance(data, str): data = data.strip() if data.startswith('<'): return 'xml' elif data.startswith('{') or data.startswith('['): return 'json' raise ValueError("Cannot determine data format") def _extract_patients(self) -> List: """Extract all patient entries based on format.""" if self.format == "xml": return [self.data] elif self.format == "json": if self.data.get("resourceType") != "Bundle" or "entry" not in self.data: raise ValueError("Invalid FHIR Bundle format") return [entry["resource"] for entry in self.data["entry"] if entry["resource"]["resourceType"] == "Patient"] def set_patient_by_index(self, index: int) -> bool: """Set the current patient by index.""" if 0 <= index < len(self.patients): self.current_patient_idx = index return True return False def _get_current_patient(self): """Get the currently selected patient resource.""" return self.patients[self.current_patient_idx] def get_id(self) -> str: patient = self._get_current_patient() if self.format == "xml": id_list = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:id/@extension", namespaces=self.ns) return id_list[0] if id_list else "" elif self.format == "json": patient_id = patient.get("id", "") if patient_id: return patient_id identifiers = patient.get("identifier", []) for identifier in identifiers: if identifier.get("value"): return identifier["value"] return "" def get_first_name(self) -> str: patient = self._get_current_patient() if self.format == "xml": given = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:name/hl7:given/text()", namespaces=self.ns) return given[0] if given else "" elif self.format == "json": for name in patient.get("name", []): if name.get("use") == "official" and "given" in name: return name["given"][0] return "" def get_last_name(self) -> str: patient = self._get_current_patient() if self.format == "xml": family = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:name/hl7:family/text()", namespaces=self.ns) return family[0] if family else "" elif self.format == "json": for name in patient.get("name", []): if name.get("use") == "official" and "family" in name: return name["family"] return "" def get_dob(self) -> str: patient = self._get_current_patient() if self.format == "xml": dob = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:birthTime/@value", namespaces=self.ns) return dob[0] if dob else "" elif self.format == "json": return patient.get("birthDate", "") def get_age(self) -> str: dob = self.get_dob() if not dob: return "" try: birth_date = datetime.strptime(dob[:8], "%Y%m%d") if len(dob) >= 8 else datetime.strptime(dob, "%Y-%m-%d") today = datetime.now() age = today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day)) return str(age) except ValueError: return "" def get_gender(self) -> str: patient = self._get_current_patient() if self.format == "xml": gender = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:administrativeGenderCode/@code", namespaces=self.ns) return "Male" if gender and gender[0] == "M" else "Female" if gender and gender[0] == "F" else "" elif self.format == "json": return patient.get("gender", "").capitalize() def get_address_line(self) -> str: patient = self._get_current_patient() if self.format == "xml": line = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:streetAddressLine/text()", namespaces=self.ns) return line[0] if line else "" elif self.format == "json": addresses = patient.get("address", []) return addresses[0]["line"][0] if addresses and "line" in addresses[0] else "" def get_city(self) -> str: patient = self._get_current_patient() if self.format == "xml": city = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:city/text()", namespaces=self.ns) return city[0] if city else "" elif self.format == "json": addresses = patient.get("address", []) return addresses[0]["city"] if addresses and "city" in addresses[0] else "" def get_state(self) -> str: patient = self._get_current_patient() if self.format == "xml": state = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:state/text()", namespaces=self.ns) return state[0] if state else "" elif self.format == "json": addresses = patient.get("address", []) return addresses[0]["state"] if addresses and "state" in addresses[0] else "" def get_zip_code(self) -> str: patient = self._get_current_patient() if self.format == "xml": zip = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:postalCode/text()", namespaces=self.ns) return zip[0] if zip else "" elif self.format == "json": addresses = patient.get("address", []) return addresses[0]["postalCode"] if addresses and "postalCode" in addresses[0] else "" def get_phone(self) -> str: patient = self._get_current_patient() if self.format == "xml": telecom = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:telecom/@value", namespaces=self.ns) return telecom[0].replace("tel:", "") if telecom and "tel:" in telecom[0] else "" elif self.format == "json": for telecom in patient.get("telecom", []): if telecom.get("system") == "phone" and telecom.get("use") == "home": return telecom.get("value", "") return "" def get_medications(self) -> List[Dict[str, str]]: if self.format == "xml": section = self.data.xpath("//hl7:section[hl7:code/@code='10160-0']", namespaces=self.ns) if not section: return [] meds = section[0].xpath(".//hl7:substanceAdministration", namespaces=self.ns) result = [] for med in meds: start_list = med.xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns) start = start_list[0] if start_list else "" stop_list = med.xpath(".//hl7:effectiveTime/hl7:high/@value", namespaces=self.ns) stop = stop_list[0] if stop_list else "" desc_list = med.xpath(".//hl7:manufacturedMaterial/hl7:code/@displayName", namespaces=self.ns) desc = desc_list[0] if desc_list else "" code_list = med.xpath(".//hl7:manufacturedMaterial/hl7:code/@code", namespaces=self.ns) code = code_list[0] if code_list else "" result.append({"start": start, "stop": stop, "description": desc, "code": code}) return result elif self.format == "json": entries = self.data.get("entry", []) result = [] for entry in entries: if entry["resource"]["resourceType"] == "MedicationRequest": med = entry["resource"] start = med.get("authoredOn", "") stop = med.get("dispenseRequest", {}).get("validityPeriod", {}).get("end", "") desc = med.get("medicationCodeableConcept", {}).get("text", "") code = med.get("medicationCodeableConcept", {}).get("coding", [{}])[0].get("code", "") result.append({"start": start, "stop": stop, "description": desc, "code": code}) return result def get_encounters(self) -> List[Dict[str, str]]: if self.format == "xml": service = self.data.xpath("//hl7:documentationOf/hl7:serviceEvent", namespaces=self.ns) if not service: return [] start_list = service[0].xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns) start = start_list[0] if start_list else "" end_list = service[0].xpath(".//hl7:effectiveTime/hl7:high/@value", namespaces=self.ns) end = end_list[0] if end_list else "" return [{"start": start, "end": end, "description": "Patient Care", "code": ""}] elif self.format == "json": entries = self.data.get("entry", []) result = [] for entry in entries: if entry["resource"]["resourceType"] == "Encounter": enc = entry["resource"] start = enc.get("period", {}).get("start", "") end = enc.get("period", {}).get("end", "") desc = enc.get("type", [{}])[0].get("text", "") code = enc.get("type", [{}])[0].get("coding", [{}])[0].get("code", "") result.append({"start": start, "end": end, "description": desc, "code": code}) return result def get_conditions(self) -> List[Dict[str, str]]: if self.format == "xml": section = self.data.xpath("//hl7:section[hl7:code/@code='11450-4']", namespaces=self.ns) if not section: return [] entries = section[0].xpath(".//hl7:entry/hl7:act/hl7:entryRelationship/hl7:observation", namespaces=self.ns) if section else [] result = [] for entry in entries: onset_list = entry.xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns) onset = onset_list[0] if onset_list else "" desc_list = entry.xpath(".//hl7:value/@displayName", namespaces=self.ns) desc = desc_list[0] if desc_list else "" code_list = entry.xpath(".//hl7:value/@code", namespaces=self.ns) code = code_list[0] if code_list else "" result.append({"onset": onset, "description": desc, "code": code}) return result elif self.format == "json": entries = self.data.get("entry", []) result = [] for entry in entries: if entry["resource"]["resourceType"] == "Condition": cond = entry["resource"] onset = cond.get("onsetDateTime", "") desc = cond.get("code", {}).get("text", "") code = cond.get("code", {}).get("coding", [{}])[0].get("code", "") result.append({"onset": onset, "description": desc, "code": code}) return result def get_patient_dict(self) -> Dict[str, str]: """Return a dictionary of patient data mapped to discharge form fields.""" data = self.get_all_patient_data() # Get the latest encounter for admission/discharge dates latest_encounter = data["encounters"][-1] if data["encounters"] else {} admission_date = latest_encounter.get("start", "") discharge_date = latest_encounter.get("end", "") # Get the latest condition for diagnosis latest_condition = data["conditions"][-1] if data["conditions"] else {} diagnosis = latest_condition.get("description", "") # Format medications as a string medications_str = "; ".join([m["description"] for m in data["medications"] if m["description"]]) or "None specified" # Safely extract fields with defaults to avoid KeyError return { "id": data.get("id", "Unknown"), "first_name": data.get("first_name", ""), "last_name": data.get("last_name", ""), "name_prefix": data.get("name_prefix", ""), # Fixed to avoid KeyError "dob": data.get("dob", "Unknown"), "age": data.get("age", "Unknown"), "sex": data.get("gender", "Unknown"), "address": data.get("address_line", "Unknown"), "city": data.get("city", "Unknown"), "state": data.get("state", "Unknown"), "zip_code": data.get("zip_code", "Unknown"), "phone": data.get("phone", "Unknown"), "admission_date": admission_date, "discharge_date": discharge_date, "diagnosis": diagnosis, "medications": medications_str, "doctor_first_name": "", # Could be extracted from Practitioner resource if linked "doctor_last_name": "", "hospital_name": "", # Could be extracted from Organization resource if linked "doctor_address": "", "doctor_city": "", "doctor_state": "", "doctor_zip": "", "middle_initial": "", "referral_source": "", "admission_method": "", "discharge_reason": "", "date_of_death": "", "procedures": "", "preparer_name": "", "preparer_job_title": "" } def get_all_patient_data(self) -> Dict[str, Union[str, List, Dict]]: """Extract all available data for the current patient.""" return { "id": self.get_id(), "first_name": self.get_first_name(), "last_name": self.get_last_name(), "dob": self.get_dob(), "age": self.get_age(), "gender": self.get_gender(), "address_line": self.get_address_line(), "city": self.get_city(), "state": self.get_state(), "zip_code": self.get_zip_code(), "phone": self.get_phone(), "medications": self.get_medications(), "encounters": self.get_encounters(), "conditions": self.get_conditions(), } def get_all_patients(self) -> List[Dict[str, str]]: """Return a list of dictionaries for all patients.""" original_idx = self.current_patient_idx all_patients = [] for i in range(len(self.patients)): self.set_patient_by_index(i) all_patients.append(self.get_patient_dict()) self.set_patient_by_index(original_idx) return all_patients