tonic-discharge-guard / utils /responseparser.py
Tonic's picture
fix typo , major, fingers crossed
83a7a3b unverified
# utils/responseparser.py
import json
import lxml.etree as etree
from datetime import datetime
from typing import List, Dict, Optional, Union
import base64
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class PatientDataExtractor:
"""Class to extract fields from FHIR Patient Bundle (JSON) or C-CDA (XML)."""
def __init__(self, patient_data: str, format_type: str = None):
"""Initialize with patient data and optional format type."""
self.format = format_type.lower() if format_type else self._detect_format(patient_data)
if self.format == "xml":
self.data = etree.fromstring(patient_data.encode('utf-8')) if isinstance(patient_data, str) else patient_data
self.ns = {'hl7': 'urn:hl7-org:v3'}
elif self.format == "json":
self.data = json.loads(patient_data) if isinstance(patient_data, str) else patient_data
else:
raise ValueError("Unsupported format. Use 'xml' or 'json'")
self.patients = self._extract_patients()
self.current_patient_idx = 0
def _detect_format(self, data: str) -> str:
"""Detect the format of the input data."""
if isinstance(data, str):
data = data.strip()
if data.startswith('<'):
return 'xml'
elif data.startswith('{') or data.startswith('['):
return 'json'
raise ValueError("Cannot determine data format")
def _extract_patients(self) -> List:
"""Extract all patient entries based on format."""
if self.format == "xml":
return [self.data]
elif self.format == "json":
if self.data.get("resourceType") != "Bundle" or "entry" not in self.data:
raise ValueError("Invalid FHIR Bundle format")
return [entry["resource"] for entry in self.data["entry"] if entry["resource"]["resourceType"] == "Patient"]
def set_patient_by_index(self, index: int) -> bool:
"""Set the current patient by index."""
if 0 <= index < len(self.patients):
self.current_patient_idx = index
return True
return False
def _get_current_patient(self):
"""Get the currently selected patient resource."""
return self.patients[self.current_patient_idx]
def get_id(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
id_list = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:id/@extension", namespaces=self.ns)
return id_list[0] if id_list else ""
elif self.format == "json":
patient_id = patient.get("id", "")
if patient_id:
return patient_id
identifiers = patient.get("identifier", [])
for identifier in identifiers:
if identifier.get("value"):
return identifier["value"]
return ""
def get_first_name(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
given = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:name/hl7:given/text()", namespaces=self.ns)
return given[0] if given else ""
elif self.format == "json":
for name in patient.get("name", []):
if name.get("use") == "official" and "given" in name:
return name["given"][0]
return ""
def get_last_name(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
family = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:name/hl7:family/text()", namespaces=self.ns)
return family[0] if family else ""
elif self.format == "json":
for name in patient.get("name", []):
if name.get("use") == "official" and "family" in name:
return name["family"]
return ""
def get_dob(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
dob = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:birthTime/@value", namespaces=self.ns)
return dob[0] if dob else ""
elif self.format == "json":
return patient.get("birthDate", "")
def get_age(self) -> str:
dob = self.get_dob()
if not dob:
return ""
try:
birth_date = datetime.strptime(dob[:8], "%Y%m%d") if len(dob) >= 8 else datetime.strptime(dob, "%Y-%m-%d")
today = datetime.now()
age = today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day))
return str(age)
except ValueError:
return ""
def get_gender(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
gender = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:administrativeGenderCode/@code", namespaces=self.ns)
return "Male" if gender and gender[0] == "M" else "Female" if gender and gender[0] == "F" else ""
elif self.format == "json":
return patient.get("gender", "").capitalize()
def get_address_line(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
line = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:streetAddressLine/text()", namespaces=self.ns)
return line[0] if line else ""
elif self.format == "json":
addresses = patient.get("address", [])
return addresses[0]["line"][0] if addresses and "line" in addresses[0] else ""
def get_city(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
city = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:city/text()", namespaces=self.ns)
return city[0] if city else ""
elif self.format == "json":
addresses = patient.get("address", [])
return addresses[0]["city"] if addresses and "city" in addresses[0] else ""
def get_state(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
state = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:state/text()", namespaces=self.ns)
return state[0] if state else ""
elif self.format == "json":
addresses = patient.get("address", [])
return addresses[0]["state"] if addresses and "state" in addresses[0] else ""
def get_zip_code(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
zip = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:postalCode/text()", namespaces=self.ns)
return zip[0] if zip else ""
elif self.format == "json":
addresses = patient.get("address", [])
return addresses[0]["postalCode"] if addresses and "postalCode" in addresses[0] else ""
def get_phone(self) -> str:
patient = self._get_current_patient()
if self.format == "xml":
telecom = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:telecom/@value", namespaces=self.ns)
return telecom[0].replace("tel:", "") if telecom and "tel:" in telecom[0] else ""
elif self.format == "json":
for telecom in patient.get("telecom", []):
if telecom.get("system") == "phone" and telecom.get("use") == "home":
return telecom.get("value", "")
return ""
def get_medications(self) -> List[Dict[str, str]]:
if self.format == "xml":
section = self.data.xpath("//hl7:section[hl7:code/@code='10160-0']", namespaces=self.ns)
if not section:
return []
meds = section[0].xpath(".//hl7:substanceAdministration", namespaces=self.ns)
result = []
for med in meds:
start_list = med.xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns)
start = start_list[0] if start_list else ""
stop_list = med.xpath(".//hl7:effectiveTime/hl7:high/@value", namespaces=self.ns)
stop = stop_list[0] if stop_list else ""
desc_list = med.xpath(".//hl7:manufacturedMaterial/hl7:code/@displayName", namespaces=self.ns)
desc = desc_list[0] if desc_list else ""
code_list = med.xpath(".//hl7:manufacturedMaterial/hl7:code/@code", namespaces=self.ns)
code = code_list[0] if code_list else ""
result.append({"start": start, "stop": stop, "description": desc, "code": code})
return result
elif self.format == "json":
entries = self.data.get("entry", [])
result = []
for entry in entries:
if entry["resource"]["resourceType"] == "MedicationRequest":
med = entry["resource"]
start = med.get("authoredOn", "")
stop = med.get("dispenseRequest", {}).get("validityPeriod", {}).get("end", "")
desc = med.get("medicationCodeableConcept", {}).get("text", "")
code = med.get("medicationCodeableConcept", {}).get("coding", [{}])[0].get("code", "")
result.append({"start": start, "stop": stop, "description": desc, "code": code})
return result
def get_encounters(self) -> List[Dict[str, str]]:
if self.format == "xml":
service = self.data.xpath("//hl7:documentationOf/hl7:serviceEvent", namespaces=self.ns)
if not service:
return []
start_list = service[0].xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns)
start = start_list[0] if start_list else ""
end_list = service[0].xpath(".//hl7:effectiveTime/hl7:high/@value", namespaces=self.ns)
end = end_list[0] if end_list else ""
return [{"start": start, "end": end, "description": "Patient Care", "code": ""}]
elif self.format == "json":
entries = self.data.get("entry", [])
result = []
for entry in entries:
if entry["resource"]["resourceType"] == "Encounter":
enc = entry["resource"]
start = enc.get("period", {}).get("start", "")
end = enc.get("period", {}).get("end", "")
desc = enc.get("type", [{}])[0].get("text", "")
code = enc.get("type", [{}])[0].get("coding", [{}])[0].get("code", "")
result.append({"start": start, "end": end, "description": desc, "code": code})
return result
def get_conditions(self) -> List[Dict[str, str]]:
if self.format == "xml":
section = self.data.xpath("//hl7:section[hl7:code/@code='11450-4']", namespaces=self.ns)
if not section:
return []
entries = section[0].xpath(".//hl7:entry/hl7:act/hl7:entryRelationship/hl7:observation", namespaces=self.ns) if section else []
result = []
for entry in entries:
onset_list = entry.xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns)
onset = onset_list[0] if onset_list else ""
desc_list = entry.xpath(".//hl7:value/@displayName", namespaces=self.ns)
desc = desc_list[0] if desc_list else ""
code_list = entry.xpath(".//hl7:value/@code", namespaces=self.ns)
code = code_list[0] if code_list else ""
result.append({"onset": onset, "description": desc, "code": code})
return result
elif self.format == "json":
entries = self.data.get("entry", [])
result = []
for entry in entries:
if entry["resource"]["resourceType"] == "Condition":
cond = entry["resource"]
onset = cond.get("onsetDateTime", "")
desc = cond.get("code", {}).get("text", "")
code = cond.get("code", {}).get("coding", [{}])[0].get("code", "")
result.append({"onset": onset, "description": desc, "code": code})
return result
def get_patient_dict(self) -> Dict[str, str]:
"""Return a dictionary of patient data mapped to discharge form fields."""
data = self.get_all_patient_data()
# Get the latest encounter for admission/discharge dates
latest_encounter = data["encounters"][-1] if data["encounters"] else {}
admission_date = latest_encounter.get("start", "")
discharge_date = latest_encounter.get("end", "")
# Get the latest condition for diagnosis
latest_condition = data["conditions"][-1] if data["conditions"] else {}
diagnosis = latest_condition.get("description", "")
# Format medications as a string
medications_str = "; ".join([m["description"] for m in data["medications"] if m["description"]]) or "None specified"
# Safely extract fields with defaults to avoid KeyError
return {
"id": data.get("id", "Unknown"),
"first_name": data.get("first_name", ""),
"last_name": data.get("last_name", ""),
"name_prefix": data.get("name_prefix", ""), # Fixed to avoid KeyError
"dob": data.get("dob", "Unknown"),
"age": data.get("age", "Unknown"),
"sex": data.get("gender", "Unknown"),
"address": data.get("address_line", "Unknown"),
"city": data.get("city", "Unknown"),
"state": data.get("state", "Unknown"),
"zip_code": data.get("zip_code", "Unknown"),
"phone": data.get("phone", "Unknown"),
"admission_date": admission_date,
"discharge_date": discharge_date,
"diagnosis": diagnosis,
"medications": medications_str,
"doctor_first_name": "", # Could be extracted from Practitioner resource if linked
"doctor_last_name": "",
"hospital_name": "", # Could be extracted from Organization resource if linked
"doctor_address": "",
"doctor_city": "",
"doctor_state": "",
"doctor_zip": "",
"middle_initial": "",
"referral_source": "",
"admission_method": "",
"discharge_reason": "",
"date_of_death": "",
"procedures": "",
"preparer_name": "",
"preparer_job_title": ""
}
def get_all_patient_data(self) -> Dict[str, Union[str, List, Dict]]:
"""Extract all available data for the current patient."""
return {
"id": self.get_id(),
"first_name": self.get_first_name(),
"last_name": self.get_last_name(),
"dob": self.get_dob(),
"age": self.get_age(),
"gender": self.get_gender(),
"address_line": self.get_address_line(),
"city": self.get_city(),
"state": self.get_state(),
"zip_code": self.get_zip_code(),
"phone": self.get_phone(),
"medications": self.get_medications(),
"encounters": self.get_encounters(),
"conditions": self.get_conditions(),
}
def get_all_patients(self) -> List[Dict[str, str]]:
"""Return a list of dictionaries for all patients."""
original_idx = self.current_patient_idx
all_patients = []
for i in range(len(self.patients)):
self.set_patient_by_index(i)
all_patients.append(self.get_patient_dict())
self.set_patient_by_index(original_idx)
return all_patients