|
|
|
import json |
|
import lxml.etree as etree |
|
from datetime import datetime |
|
from typing import List, Dict, Optional, Union |
|
import base64 |
|
import logging |
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
class PatientDataExtractor: |
|
"""Class to extract fields from FHIR Patient Bundle (JSON) or C-CDA (XML).""" |
|
|
|
def __init__(self, patient_data: str, format_type: str = None): |
|
"""Initialize with patient data and optional format type.""" |
|
self.format = format_type.lower() if format_type else self._detect_format(patient_data) |
|
if self.format == "xml": |
|
self.data = etree.fromstring(patient_data.encode('utf-8')) if isinstance(patient_data, str) else patient_data |
|
self.ns = {'hl7': 'urn:hl7-org:v3'} |
|
elif self.format == "json": |
|
self.data = json.loads(patient_data) if isinstance(patient_data, str) else patient_data |
|
else: |
|
raise ValueError("Unsupported format. Use 'xml' or 'json'") |
|
|
|
self.patients = self._extract_patients() |
|
self.current_patient_idx = 0 |
|
|
|
def _detect_format(self, data: str) -> str: |
|
"""Detect the format of the input data.""" |
|
if isinstance(data, str): |
|
data = data.strip() |
|
if data.startswith('<'): |
|
return 'xml' |
|
elif data.startswith('{') or data.startswith('['): |
|
return 'json' |
|
raise ValueError("Cannot determine data format") |
|
|
|
def _extract_patients(self) -> List: |
|
"""Extract all patient entries based on format.""" |
|
if self.format == "xml": |
|
return [self.data] |
|
elif self.format == "json": |
|
if self.data.get("resourceType") != "Bundle" or "entry" not in self.data: |
|
raise ValueError("Invalid FHIR Bundle format") |
|
return [entry["resource"] for entry in self.data["entry"] if entry["resource"]["resourceType"] == "Patient"] |
|
|
|
def set_patient_by_index(self, index: int) -> bool: |
|
"""Set the current patient by index.""" |
|
if 0 <= index < len(self.patients): |
|
self.current_patient_idx = index |
|
return True |
|
return False |
|
|
|
def _get_current_patient(self): |
|
"""Get the currently selected patient resource.""" |
|
return self.patients[self.current_patient_idx] |
|
|
|
def get_id(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
id_list = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:id/@extension", namespaces=self.ns) |
|
return id_list[0] if id_list else "" |
|
elif self.format == "json": |
|
patient_id = patient.get("id", "") |
|
if patient_id: |
|
return patient_id |
|
identifiers = patient.get("identifier", []) |
|
for identifier in identifiers: |
|
if identifier.get("value"): |
|
return identifier["value"] |
|
return "" |
|
|
|
def get_first_name(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
given = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:name/hl7:given/text()", namespaces=self.ns) |
|
return given[0] if given else "" |
|
elif self.format == "json": |
|
for name in patient.get("name", []): |
|
if name.get("use") == "official" and "given" in name: |
|
return name["given"][0] |
|
return "" |
|
|
|
def get_last_name(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
family = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:name/hl7:family/text()", namespaces=self.ns) |
|
return family[0] if family else "" |
|
elif self.format == "json": |
|
for name in patient.get("name", []): |
|
if name.get("use") == "official" and "family" in name: |
|
return name["family"] |
|
return "" |
|
|
|
def get_dob(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
dob = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:birthTime/@value", namespaces=self.ns) |
|
return dob[0] if dob else "" |
|
elif self.format == "json": |
|
return patient.get("birthDate", "") |
|
|
|
def get_age(self) -> str: |
|
dob = self.get_dob() |
|
if not dob: |
|
return "" |
|
try: |
|
birth_date = datetime.strptime(dob[:8], "%Y%m%d") if len(dob) >= 8 else datetime.strptime(dob, "%Y-%m-%d") |
|
today = datetime.now() |
|
age = today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day)) |
|
return str(age) |
|
except ValueError: |
|
return "" |
|
|
|
def get_gender(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
gender = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:patient/hl7:administrativeGenderCode/@code", namespaces=self.ns) |
|
return "Male" if gender and gender[0] == "M" else "Female" if gender and gender[0] == "F" else "" |
|
elif self.format == "json": |
|
return patient.get("gender", "").capitalize() |
|
|
|
def get_address_line(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
line = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:streetAddressLine/text()", namespaces=self.ns) |
|
return line[0] if line else "" |
|
elif self.format == "json": |
|
addresses = patient.get("address", []) |
|
return addresses[0]["line"][0] if addresses and "line" in addresses[0] else "" |
|
|
|
def get_city(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
city = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:city/text()", namespaces=self.ns) |
|
return city[0] if city else "" |
|
elif self.format == "json": |
|
addresses = patient.get("address", []) |
|
return addresses[0]["city"] if addresses and "city" in addresses[0] else "" |
|
|
|
def get_state(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
state = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:state/text()", namespaces=self.ns) |
|
return state[0] if state else "" |
|
elif self.format == "json": |
|
addresses = patient.get("address", []) |
|
return addresses[0]["state"] if addresses and "state" in addresses[0] else "" |
|
|
|
def get_zip_code(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
zip = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:addr/hl7:postalCode/text()", namespaces=self.ns) |
|
return zip[0] if zip else "" |
|
elif self.format == "json": |
|
addresses = patient.get("address", []) |
|
return addresses[0]["postalCode"] if addresses and "postalCode" in addresses[0] else "" |
|
|
|
def get_phone(self) -> str: |
|
patient = self._get_current_patient() |
|
if self.format == "xml": |
|
telecom = patient.xpath("//hl7:recordTarget/hl7:patientRole/hl7:telecom/@value", namespaces=self.ns) |
|
return telecom[0].replace("tel:", "") if telecom and "tel:" in telecom[0] else "" |
|
elif self.format == "json": |
|
for telecom in patient.get("telecom", []): |
|
if telecom.get("system") == "phone" and telecom.get("use") == "home": |
|
return telecom.get("value", "") |
|
return "" |
|
|
|
def get_medications(self) -> List[Dict[str, str]]: |
|
if self.format == "xml": |
|
section = self.data.xpath("//hl7:section[hl7:code/@code='10160-0']", namespaces=self.ns) |
|
if not section: |
|
return [] |
|
meds = section[0].xpath(".//hl7:substanceAdministration", namespaces=self.ns) |
|
result = [] |
|
for med in meds: |
|
start_list = med.xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns) |
|
start = start_list[0] if start_list else "" |
|
stop_list = med.xpath(".//hl7:effectiveTime/hl7:high/@value", namespaces=self.ns) |
|
stop = stop_list[0] if stop_list else "" |
|
desc_list = med.xpath(".//hl7:manufacturedMaterial/hl7:code/@displayName", namespaces=self.ns) |
|
desc = desc_list[0] if desc_list else "" |
|
code_list = med.xpath(".//hl7:manufacturedMaterial/hl7:code/@code", namespaces=self.ns) |
|
code = code_list[0] if code_list else "" |
|
result.append({"start": start, "stop": stop, "description": desc, "code": code}) |
|
return result |
|
elif self.format == "json": |
|
entries = self.data.get("entry", []) |
|
result = [] |
|
for entry in entries: |
|
if entry["resource"]["resourceType"] == "MedicationRequest": |
|
med = entry["resource"] |
|
start = med.get("authoredOn", "") |
|
stop = med.get("dispenseRequest", {}).get("validityPeriod", {}).get("end", "") |
|
desc = med.get("medicationCodeableConcept", {}).get("text", "") |
|
code = med.get("medicationCodeableConcept", {}).get("coding", [{}])[0].get("code", "") |
|
result.append({"start": start, "stop": stop, "description": desc, "code": code}) |
|
return result |
|
|
|
def get_encounters(self) -> List[Dict[str, str]]: |
|
if self.format == "xml": |
|
service = self.data.xpath("//hl7:documentationOf/hl7:serviceEvent", namespaces=self.ns) |
|
if not service: |
|
return [] |
|
start_list = service[0].xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns) |
|
start = start_list[0] if start_list else "" |
|
end_list = service[0].xpath(".//hl7:effectiveTime/hl7:high/@value", namespaces=self.ns) |
|
end = end_list[0] if end_list else "" |
|
return [{"start": start, "end": end, "description": "Patient Care", "code": ""}] |
|
elif self.format == "json": |
|
entries = self.data.get("entry", []) |
|
result = [] |
|
for entry in entries: |
|
if entry["resource"]["resourceType"] == "Encounter": |
|
enc = entry["resource"] |
|
start = enc.get("period", {}).get("start", "") |
|
end = enc.get("period", {}).get("end", "") |
|
desc = enc.get("type", [{}])[0].get("text", "") |
|
code = enc.get("type", [{}])[0].get("coding", [{}])[0].get("code", "") |
|
result.append({"start": start, "end": end, "description": desc, "code": code}) |
|
return result |
|
|
|
def get_conditions(self) -> List[Dict[str, str]]: |
|
if self.format == "xml": |
|
section = self.data.xpath("//hl7:section[hl7:code/@code='11450-4']", namespaces=self.ns) |
|
if not section: |
|
return [] |
|
entries = section[0].xpath(".//hl7:entry/hl7:act/hl7:entryRelationship/hl7:observation", namespaces=self.ns) if section else [] |
|
result = [] |
|
for entry in entries: |
|
onset_list = entry.xpath(".//hl7:effectiveTime/hl7:low/@value", namespaces=self.ns) |
|
onset = onset_list[0] if onset_list else "" |
|
desc_list = entry.xpath(".//hl7:value/@displayName", namespaces=self.ns) |
|
desc = desc_list[0] if desc_list else "" |
|
code_list = entry.xpath(".//hl7:value/@code", namespaces=self.ns) |
|
code = code_list[0] if code_list else "" |
|
result.append({"onset": onset, "description": desc, "code": code}) |
|
return result |
|
elif self.format == "json": |
|
entries = self.data.get("entry", []) |
|
result = [] |
|
for entry in entries: |
|
if entry["resource"]["resourceType"] == "Condition": |
|
cond = entry["resource"] |
|
onset = cond.get("onsetDateTime", "") |
|
desc = cond.get("code", {}).get("text", "") |
|
code = cond.get("code", {}).get("coding", [{}])[0].get("code", "") |
|
result.append({"onset": onset, "description": desc, "code": code}) |
|
return result |
|
|
|
def get_patient_dict(self) -> Dict[str, str]: |
|
"""Return a dictionary of patient data mapped to discharge form fields.""" |
|
data = self.get_all_patient_data() |
|
|
|
|
|
latest_encounter = data["encounters"][-1] if data["encounters"] else {} |
|
admission_date = latest_encounter.get("start", "") |
|
discharge_date = latest_encounter.get("end", "") |
|
|
|
|
|
latest_condition = data["conditions"][-1] if data["conditions"] else {} |
|
diagnosis = latest_condition.get("description", "") |
|
|
|
|
|
medications_str = "; ".join([m["description"] for m in data["medications"] if m["description"]]) or "None specified" |
|
|
|
|
|
return { |
|
"id": data.get("id", "Unknown"), |
|
"first_name": data.get("first_name", ""), |
|
"last_name": data.get("last_name", ""), |
|
"name_prefix": data.get("name_prefix", ""), |
|
"dob": data.get("dob", "Unknown"), |
|
"age": data.get("age", "Unknown"), |
|
"sex": data.get("gender", "Unknown"), |
|
"address": data.get("address_line", "Unknown"), |
|
"city": data.get("city", "Unknown"), |
|
"state": data.get("state", "Unknown"), |
|
"zip_code": data.get("zip_code", "Unknown"), |
|
"phone": data.get("phone", "Unknown"), |
|
"admission_date": admission_date, |
|
"discharge_date": discharge_date, |
|
"diagnosis": diagnosis, |
|
"medications": medications_str, |
|
"doctor_first_name": "", |
|
"doctor_last_name": "", |
|
"hospital_name": "", |
|
"doctor_address": "", |
|
"doctor_city": "", |
|
"doctor_state": "", |
|
"doctor_zip": "", |
|
"middle_initial": "", |
|
"referral_source": "", |
|
"admission_method": "", |
|
"discharge_reason": "", |
|
"date_of_death": "", |
|
"procedures": "", |
|
"preparer_name": "", |
|
"preparer_job_title": "" |
|
} |
|
|
|
def get_all_patient_data(self) -> Dict[str, Union[str, List, Dict]]: |
|
"""Extract all available data for the current patient.""" |
|
return { |
|
"id": self.get_id(), |
|
"first_name": self.get_first_name(), |
|
"last_name": self.get_last_name(), |
|
"dob": self.get_dob(), |
|
"age": self.get_age(), |
|
"gender": self.get_gender(), |
|
"address_line": self.get_address_line(), |
|
"city": self.get_city(), |
|
"state": self.get_state(), |
|
"zip_code": self.get_zip_code(), |
|
"phone": self.get_phone(), |
|
"medications": self.get_medications(), |
|
"encounters": self.get_encounters(), |
|
"conditions": self.get_conditions(), |
|
} |
|
|
|
def get_all_patients(self) -> List[Dict[str, str]]: |
|
"""Return a list of dictionaries for all patients.""" |
|
original_idx = self.current_patient_idx |
|
all_patients = [] |
|
for i in range(len(self.patients)): |
|
self.set_patient_by_index(i) |
|
all_patients.append(self.get_patient_dict()) |
|
self.set_patient_by_index(original_idx) |
|
return all_patients |