|
import json |
|
import os |
|
import tempfile |
|
from datetime import datetime |
|
import traceback |
|
import logging |
|
from huggingface_hub import InferenceClient |
|
from urllib.parse import urlparse, parse_qs |
|
from utils.meldrx import MeldRxAPI |
|
import logging |
|
from old.extractcode import extract_code_from_url |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
class CallbackManager: |
|
def __init__(self, redirect_uri: str, client_secret: str = None): |
|
client_id = os.getenv("APPID") |
|
if not client_id: |
|
raise ValueError("APPID environment variable not set.") |
|
workspace_id = os.getenv("WORKSPACE_URL") |
|
if not workspace_id: |
|
raise ValueError("WORKSPACE_URL environment variable not set.") |
|
self.api = MeldRxAPI(client_id, client_secret, workspace_id, redirect_uri) |
|
self.auth_code = None |
|
self.access_token = None |
|
|
|
token_path = '/tmp/access_token.txt' |
|
if os.path.exists(token_path): |
|
with open(token_path, 'r') as f: |
|
self.access_token = f.read().strip() |
|
self.api.access_token = self.access_token |
|
|
|
def handle_callback(self, callback_url: str) -> str: |
|
self.auth_code = extract_code_from_url(callback_url) |
|
if not self.auth_code: |
|
return "No authentication code found in URL." |
|
if self.api.authenticate_with_code(self.auth_code): |
|
self.access_token = self.api.access_token |
|
with open('/tmp/access_token.txt', 'w') as f: |
|
f.write(self.access_token) |
|
return f"Authentication successful! Access Token: {self.access_token[:10]}... (truncated)" |
|
return "Authentication failed. Please check the authorization code." |
|
|
|
def get_auth_url(self) -> str: |
|
return self.api.get_authorization_url() |
|
|
|
def set_auth_code(self, code: str) -> str: |
|
self.auth_code = code |
|
if self.api.authenticate_with_code(code): |
|
self.access_token = self.api.access_token |
|
return ( |
|
f"<span style='color:#00FF7F;'>Authentication successful!</span> Access Token: {self.access_token[:10]}... (truncated)" |
|
) |
|
return "<span style='color:#FF4500;'>Authentication failed. Please check the code.</span>" |
|
|
|
def get_patient_data(self) -> str: |
|
"""Fetch patient data from MeldRx""" |
|
try: |
|
if not self.access_token: |
|
logger.warning("Not authenticated when getting patient data") |
|
return "<span style='color:#FF8C00;'>Not authenticated. Please provide a valid authorization code first.</span>" |
|
|
|
|
|
|
|
if not hasattr(self.api, "get_patients") or self.api.get_patients is None: |
|
logger.info("Using mock patient data (no API connection)") |
|
|
|
mock_data = { |
|
"resourceType": "Bundle", |
|
"type": "searchset", |
|
"total": 2, |
|
"link": [], |
|
"entry": [ |
|
{ |
|
"resource": { |
|
"resourceType": "Patient", |
|
"id": "patient1", |
|
"name": [ |
|
{ |
|
"use": "official", |
|
"family": "Smith", |
|
"given": ["John"], |
|
} |
|
], |
|
"gender": "male", |
|
"birthDate": "1970-01-01", |
|
"address": [ |
|
{"city": "Boston", "state": "MA", "postalCode": "02108"} |
|
], |
|
} |
|
}, |
|
{ |
|
"resource": { |
|
"resourceType": "Patient", |
|
"id": "patient2", |
|
"name": [ |
|
{ |
|
"use": "official", |
|
"family": "Johnson", |
|
"given": ["Jane"], |
|
} |
|
], |
|
"gender": "female", |
|
"birthDate": "1985-05-15", |
|
"address": [ |
|
{ |
|
"city": "Cambridge", |
|
"state": "MA", |
|
"postalCode": "02139", |
|
} |
|
], |
|
} |
|
}, |
|
], |
|
} |
|
return json.dumps(mock_data, indent=2) |
|
|
|
|
|
logger.info("Calling Meldrx API to get patients") |
|
patients = self.api.get_patients() |
|
if patients is not None: |
|
return ( |
|
json.dumps(patients, indent=2) |
|
if patients |
|
else "<span style='color:#FFFF00;'>No patient data returned.</span>" |
|
) |
|
return "<span style='color:#DC143C;'>Failed to retrieve patient data.</span>" |
|
except Exception as e: |
|
error_msg = f"Error in get_patient_data: {str(e)}" |
|
logger.error(error_msg) |
|
return f"<span style='color:#FF6347;'>Error retrieving patient data: {str(e)}</span> {str(e)}" |
|
|
|
|
|
def get_patient_documents(self, patient_id: str = None): |
|
"""Fetch patient documents from MeldRx""" |
|
if not self.access_token: |
|
return "<span style='color:#FF8C00;'>Not authenticated. Please provide a valid authorization code first.</span>" |
|
|
|
try: |
|
|
|
|
|
return [ |
|
{ |
|
"doc_id": "doc123", |
|
"type": "clinical_note", |
|
"date": "2023-01-16", |
|
"author": "Dr. Sample Doctor", |
|
"content": "Patient presented with symptoms of respiratory distress...", |
|
}, |
|
{ |
|
"doc_id": "doc124", |
|
"type": "lab_result", |
|
"date": "2023-01-17", |
|
"author": "Lab System", |
|
"content": "CBC results: WBC 7.5, RBC 4.2, Hgb 14.1...", |
|
}, |
|
] |
|
except Exception as e: |
|
return f"<span style='color:#FF6347;'>Error retrieving patient documents: {str(e)}</span>: {str(e)}" |
|
|
|
|
|
|
|
def extract_auth_code_from_url(redirected_url): |
|
"""Extracts the authorization code from the redirected URL.""" |
|
try: |
|
parsed_url = urlparse(redirected_url) |
|
query_params = parse_qs(parsed_url.query) |
|
if "code" in query_params: |
|
return query_params["code"][0], None |
|
else: |
|
return None, "Authorization code not found in URL." |
|
except Exception as e: |
|
return None, f"Error parsing URL: {e}" |
|
|