sanggusti's picture
Grok Engineering, fingers crossed
9690499
import requests
import json
import base64
import hashlib
import secrets
from typing import Optional, Dict, Any
from urllib.parse import urlencode
class MeldRxAPI:
def __init__(self, client_id: str, client_secret: str, workspace_id: str, redirect_uri: str):
self.base_url = "https://app.meldrx.com"
self.api_base_url = f"{self.base_url}/api"
self.fhir_base_url = f"{self.api_base_url}/fhir/{workspace_id}"
self.mips_base_url = f"{self.base_url}/mms-api"
self.token_url = f"{self.base_url}/connect/token"
self.authorize_url = f"{self.base_url}/connect/authorize"
self.client_id = client_id
self.client_secret = client_secret
self.workspace_id = workspace_id
self.redirect_uri = redirect_uri
self.access_token = None
self.code_verifier = None
self.session = requests.Session()
def _generate_code_verifier(self) -> str:
self.code_verifier = secrets.token_urlsafe(32) # 43 characters
return self.code_verifier
def _generate_code_challenge(self, code_verifier: str) -> str:
sha256_hash = hashlib.sha256(code_verifier.encode('utf-8')).digest()
code_challenge = base64.urlsafe_b64encode(sha256_hash).decode('utf-8').rstrip('=')
return code_challenge
def authenticate(self) -> bool:
payload = {
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
try:
response = self.session.post(self.token_url, data=payload, headers=headers)
response.raise_for_status()
token_data = response.json()
self.access_token = token_data.get("access_token")
if not self.access_token:
raise ValueError("No access token received.")
return True
except requests.RequestException as e:
print(f"Authentication failed: {e}")
return False
except ValueError as e:
print(f"Authentication error: {e}")
return False
def _get_headers(self) -> Dict[str, str]:
headers = {"Content-Type": "application/json"}
if self.access_token:
headers["Authorization"] = f"Bearer {self.access_token}"
return headers
def get_patients(self):
# This should return the FHIR Bundle as per your sample output
headers = {"Authorization": f"Bearer {self.access_token}"}
response = requests.get(
f"https://app.meldrx.com/api/fhir/{self.workspace_id}/Patient",
headers=headers
)
return response.json() if response.status_code == 200 else None
def get_authorization_url(self, scope: str = "patient/*.read openid profile", state: str = "random_state") -> str:
code_verifier = self._generate_code_verifier()
code_challenge = self._generate_code_challenge(code_verifier)
params = {
"response_type": "code",
"client_id": self.client_id,
"redirect_uri": self.redirect_uri,
"scope": scope,
"state": state,
"aud": self.fhir_base_url,
"code_challenge": code_challenge,
"code_challenge_method": "S256"
}
query_string = "&".join(f"{k}={v}" for k, v in params.items())
print(f"Generated Authorization URL: {self.authorize_url}?{query_string}") # Debugging
return f"{self.authorize_url}?{query_string}"
def authenticate_with_code(self, auth_code: str) -> bool:
if not self.code_verifier:
print("Error: Code verifier not set. Generate an authorization URL first.")
return False
payload = {
"grant_type": "authorization_code",
"code": auth_code,
"redirect_uri": self.redirect_uri,
"client_id": self.client_id,
"code_verifier": self.code_verifier
}
if self.client_secret:
payload["client_secret"] = self.client_secret
headers = {"Content-Type": "application/x-www-form-urlencoded"}
print(f"Token Request Payload: {payload}") # Debugging
try:
response = self.session.post(self.token_url, data=payload, headers=headers)
response.raise_for_status()
token_data = response.json()
print(f"Token Response: {token_data}") # Debugging
self.access_token = token_data.get("access_token")
if not self.access_token:
raise ValueError("No access token received.")
return True
except requests.RequestException as e:
print(f"Token Request Failed: {e}")
print(f"Response Status: {e.response.status_code if e.response else 'No response'}")
print(f"Response Text: {e.response.text if e.response else 'No response'}")
return False
except ValueError as e:
print(f"Authentication error: {e}")
return False
def create_virtual_workspace(self, snapshot: str = "patient-prefetch",
patient_id: str = "AutoPopulatedIfNotManuallySet",
hook: str = "patient-view") -> bool:
"""
Create a virtual workspace in the specified workspace (FHIR API).
Args:
snapshot (str): The snapshot type (default: "patient-prefetch").
patient_id (str): The patient ID (default: "AutoPopulatedIfNotManuallySet").
hook (str): The hook type (default: "patient-view").
Returns:
bool: True if the virtual workspace is created successfully, False otherwise.
"""
url = f"{self.fhir_base_url}/$virtual-workspace"
if not self.access_token and not self.authenticate():
print("Cannot proceed without authentication.")
return False
payload = {"snapshot": snapshot, "patientId": patient_id, "hook": hook}
try:
response = self.session.post(url, data=json.dumps(payload), headers=self._get_headers())
response.raise_for_status()
return True
except requests.RequestException as e:
print(f"Failed to create virtual workspace: {e}")
return False
def get_mips_patients(self) -> Optional[Dict[str, Any]]:
"""
Retrieve a list of patients from the MIPS API.
Returns:
Optional[Dict[str, Any]]: Patient data as a dictionary if successful, None otherwise.
"""
url = f"{self.mips_base_url}/Patient"
if not self.access_token and not self.authenticate():
print("Cannot proceed without authentication.")
return None
try:
response = self.session.get(url, headers=self._get_headers())
response.raise_for_status()
return response.json() if response.text else {}
except requests.RequestException as e:
print(f"Failed to retrieve MIPS patients: {e}")
return None
def get_mips_patient_by_id(self, patient_id: str) -> Optional[Dict[str, Any]]:
"""
Retrieve patient information by ID from the MIPS API.
Args:
patient_id (str): The ID of the patient to retrieve.
Returns:
Optional[Dict[str, Any]]: Patient data as a dictionary if successful, None otherwise.
"""
url = f"{self.mips_base_url}/Patient/{patient_id}"
if not self.access_token and not self.authenticate():
print("Cannot proceed without authentication.")
return None
try:
response = self.session.get(url, headers=self._get_headers())
response.raise_for_status()
return response.json() if response.text else {}
except requests.RequestException as e:
print(f"Failed to retrieve patient {patient_id}: {e}")
return None
def get_mips_encounters(self, patient_id: str = None) -> Optional[Dict[str, Any]]:
"""
Retrieve encounters from the MIPS API, optionally filtered by patient ID.
Args:
patient_id (str, optional): The ID of the patient to filter encounters by.
Returns:
Optional[Dict[str, Any]]: Encounter data as a dictionary if successful, None otherwise.
"""
url = f"{self.mips_base_url}/Encounter"
if patient_id:
url += f"?patient={patient_id}"
if not self.access_token and not self.authenticate():
print("Cannot proceed without authentication.")
return None
try:
response = self.session.get(url, headers=self._get_headers())
response.raise_for_status()
return response.json() if response.text else {}
except requests.RequestException as e:
print(f"Failed to retrieve encounters: {e}")
return None
def update_fhir_patient(self, patient_id: str, patient_data: Dict[str, Any]) -> bool:
"""
Update patient data in the FHIR API using a PUT request.
Args:
patient_id (str): The ID of the patient to update.
patient_data (Dict[str, Any]): The updated patient data in FHIR JSON format.
Returns:
bool: True if the patient is updated successfully, False otherwise.
"""
url = f"{self.fhir_base_url}/Patient/{patient_id}"
if not self.access_token and not self.authenticate():
print("Cannot proceed without authentication.")
return False
try:
response = self.session.put(url, data=json.dumps(patient_data), headers=self._get_headers())
response.raise_for_status()
return True
except requests.RequestException as e:
print(f"Failed to update FHIR patient {patient_id}: {e}")
return False
def update_mips_patient(self, patient_id: str, patient_data: Dict[str, Any]) -> bool:
"""
Update patient data in the MIPS API using a PUT request.
Args:
patient_id (str): The ID of the patient to update.
patient_data (Dict[str, Any]): The updated patient data in FHIR JSON format.
Returns:
bool: True if the patient is updated successfully, False otherwise.
"""
url = f"{self.mips_base_url}/Patient/{patient_id}"
if not self.access_token and not self.authenticate():
print("Cannot proceed without authentication.")
return False
try:
response = self.session.put(url, data=json.dumps(patient_data), headers=self._get_headers())
response.raise_for_status()
return True
except requests.RequestException as e:
print(f"Failed to update MIPS patient {patient_id}: {e}")
return False
def close(self):
"""Close the session to free up resources."""
self.session.close()
def __enter__(self):
"""Support for context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Support for context manager exit, ensuring session is closed."""
self.close()
# # Example usage with patient update functionality
# if __name__ == "__main__":
# # Replace these with your actual credentials and workspace ID
# CLIENT_ID = "your_client_id"
# CLIENT_SECRET = "your_client_secret"
# WORKSPACE_ID = "your_workspace_id"
# PATIENT_ID = "example_patient_id" # Replace with an actual patient ID
# with MeldRxAPI(client_id=CLIENT_ID, client_secret=CLIENT_SECRET, workspace_id=WORKSPACE_ID) as meldrx:
# # Authenticate
# if meldrx.authenticate():
# print("Authentication successful!")
# # Retrieve specific patient information from MIPS API
# patient_info = meldrx.get_mips_patient_by_id(PATIENT_ID)
# if patient_info is not None:
# print(f"Original Patient {PATIENT_ID} Info:", json.dumps(patient_info, indent=2))
# # Example patient data to update (FHIR Patient resource format)
# updated_patient_data = {
# "resourceType": "Patient",
# "id": PATIENT_ID,
# "name": [{
# "family": "Doe",
# "given": ["John", "Updated"]
# }],
# "gender": "male",
# "birthDate": "1980-01-01"
# }
# # Update patient in FHIR API
# if meldrx.update_fhir_patient(PATIENT_ID, updated_patient_data):
# print(f"Successfully updated patient {PATIENT_ID} in FHIR API")
# updated_info = meldrx.get_mips_patient_by_id(PATIENT_ID)
# if updated_info:
# print(f"Updated Patient {PATIENT_ID} Info (FHIR):", json.dumps(updated_info, indent=2))
# # Update patient in MIPS API
# if meldrx.update_mips_patient(PATIENT_ID, updated_patient_data):
# print(f"Successfully updated patient {PATIENT_ID} in MIPS API")
# updated_info = meldrx.get_mips_patient_by_id(PATIENT_ID)
# if updated_info:
# print(f"Updated Patient {PATIENT_ID} Info (MIPS):", json.dumps(updated_info, indent=2))
# # Retrieve encounters for the patient from MIPS API
# encounters = meldrx.get_mips_encounters(patient_id=PATIENT_ID)
# if encounters is not None:
# print(f"Encounters for Patient {PATIENT_ID}:", json.dumps(encounters, indent=2))