|
import logging
|
|
import re
|
|
|
|
import torch
|
|
from transformers import AutoModelForSequenceClassification, AutoTokenizer, pipeline
|
|
|
|
|
|
class MedicalReportAnalyzer:
|
|
"""
|
|
A class for analyzing medical text reports using pre-trained NLP models from Hugging Face.
|
|
|
|
This analyzer can:
|
|
1. Extract medical entities (conditions, treatments, tests)
|
|
2. Classify report severity
|
|
3. Extract key findings
|
|
4. Identify suggested follow-up actions
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
ner_model="samrawal/bert-base-uncased_medical-ner",
|
|
classifier_model="medicalai/ClinicalBERT",
|
|
device=None,
|
|
):
|
|
"""
|
|
Initialize the text analyzer with specific pre-trained models.
|
|
|
|
Args:
|
|
ner_model (str): Model for named entity recognition
|
|
classifier_model (str): Model for text classification
|
|
device (str, optional): Device to run models on ('cuda' or 'cpu')
|
|
"""
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
|
|
if device is None:
|
|
self.device = "cuda" if torch.cuda.is_available() else "cpu"
|
|
else:
|
|
self.device = device
|
|
|
|
self.logger.info(f"Using device: {self.device}")
|
|
|
|
|
|
try:
|
|
self.ner_pipeline = pipeline(
|
|
"token-classification",
|
|
model=ner_model,
|
|
aggregation_strategy="simple",
|
|
device=0 if self.device == "cuda" else -1,
|
|
)
|
|
self.logger.info(f"Successfully loaded NER model: {ner_model}")
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to load NER model: {e}")
|
|
self.ner_pipeline = None
|
|
|
|
|
|
try:
|
|
self.tokenizer = AutoTokenizer.from_pretrained(classifier_model)
|
|
self.classifier = AutoModelForSequenceClassification.from_pretrained(
|
|
classifier_model
|
|
)
|
|
self.classifier.to(self.device)
|
|
self.classifier.eval()
|
|
self.logger.info(
|
|
f"Successfully loaded classifier model: {classifier_model}"
|
|
)
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to load classifier model: {e}")
|
|
self.classifier = None
|
|
|
|
|
|
self.severity_levels = {
|
|
0: "Normal",
|
|
1: "Mild",
|
|
2: "Moderate",
|
|
3: "Severe",
|
|
4: "Critical",
|
|
}
|
|
|
|
|
|
self.finding_severity = {
|
|
"pneumonia": 3,
|
|
"fracture": 3,
|
|
"tumor": 4,
|
|
"nodule": 2,
|
|
"mass": 3,
|
|
"edema": 2,
|
|
"effusion": 2,
|
|
"hemorrhage": 3,
|
|
"opacity": 1,
|
|
"atelectasis": 2,
|
|
"pneumothorax": 3,
|
|
"consolidation": 2,
|
|
"cardiomegaly": 2,
|
|
}
|
|
|
|
def extract_entities(self, text):
|
|
"""
|
|
Extract medical entities from the report text.
|
|
|
|
Args:
|
|
text (str): Medical report text
|
|
|
|
Returns:
|
|
dict: Dictionary of entity lists by category
|
|
"""
|
|
if not self.ner_pipeline:
|
|
self.logger.warning("NER model not available")
|
|
return {}
|
|
|
|
try:
|
|
|
|
entities = self.ner_pipeline(text)
|
|
|
|
|
|
grouped_entities = {
|
|
"problem": [],
|
|
"test": [],
|
|
"treatment": [],
|
|
"anatomy": [],
|
|
}
|
|
|
|
for entity in entities:
|
|
entity_type = entity.get("entity_group", "").lower()
|
|
|
|
|
|
if entity_type in ["problem", "disease", "condition", "diagnosis"]:
|
|
category = "problem"
|
|
elif entity_type in ["test", "procedure", "examination"]:
|
|
category = "test"
|
|
elif entity_type in ["treatment", "medication", "drug"]:
|
|
category = "treatment"
|
|
elif entity_type in ["body_part", "anatomy", "organ"]:
|
|
category = "anatomy"
|
|
else:
|
|
continue
|
|
|
|
word = entity.get("word", "")
|
|
score = entity.get("score", 0)
|
|
|
|
|
|
if score > 0.7 and word not in grouped_entities[category]:
|
|
grouped_entities[category].append(word)
|
|
|
|
return grouped_entities
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error extracting entities: {e}")
|
|
return {}
|
|
|
|
def assess_severity(self, text):
|
|
"""
|
|
Assess the severity level of the medical report.
|
|
|
|
Args:
|
|
text (str): Medical report text
|
|
|
|
Returns:
|
|
dict: Severity assessment including level and confidence
|
|
"""
|
|
if not self.classifier:
|
|
self.logger.warning("Classifier model not available")
|
|
return {"level": "Unknown", "score": 0.0}
|
|
|
|
try:
|
|
|
|
severity_score = 0
|
|
confidence = 0.5
|
|
|
|
|
|
severe_keywords = [
|
|
"severe",
|
|
"critical",
|
|
"urgent",
|
|
"emergency",
|
|
"immediate attention",
|
|
]
|
|
moderate_keywords = ["moderate", "concerning", "follow-up", "monitor"]
|
|
mild_keywords = ["mild", "minimal", "slight", "minor"]
|
|
normal_keywords = [
|
|
"normal",
|
|
"unremarkable",
|
|
"no abnormalities",
|
|
"within normal limits",
|
|
]
|
|
|
|
|
|
text_lower = text.lower()
|
|
severe_count = sum(text_lower.count(word) for word in severe_keywords)
|
|
moderate_count = sum(text_lower.count(word) for word in moderate_keywords)
|
|
mild_count = sum(text_lower.count(word) for word in mild_keywords)
|
|
normal_count = sum(text_lower.count(word) for word in normal_keywords)
|
|
|
|
|
|
if severe_count > 0:
|
|
severity_score += min(severe_count, 2) * 1.5
|
|
confidence += 0.1
|
|
if moderate_count > 0:
|
|
severity_score += min(moderate_count, 3) * 0.75
|
|
confidence += 0.05
|
|
if mild_count > 0:
|
|
severity_score += min(mild_count, 3) * 0.25
|
|
confidence += 0.05
|
|
if normal_count > 0:
|
|
severity_score -= min(normal_count, 3) * 0.75
|
|
confidence += 0.1
|
|
|
|
|
|
for finding, level in self.finding_severity.items():
|
|
if finding in text_lower:
|
|
severity_score += level * 0.5
|
|
confidence += 0.05
|
|
|
|
|
|
severity_score = max(0, min(4, severity_score))
|
|
severity_level = int(round(severity_score))
|
|
|
|
|
|
severity = self.severity_levels.get(severity_level, "Moderate")
|
|
|
|
|
|
confidence = min(0.95, confidence)
|
|
|
|
return {
|
|
"level": severity,
|
|
"score": round(severity_score, 1),
|
|
"confidence": round(confidence, 2),
|
|
}
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error assessing severity: {e}")
|
|
return {"level": "Unknown", "score": 0.0, "confidence": 0.0}
|
|
|
|
def extract_findings(self, text):
|
|
"""
|
|
Extract key clinical findings from the report.
|
|
|
|
Args:
|
|
text (str): Medical report text
|
|
|
|
Returns:
|
|
list: List of key findings
|
|
"""
|
|
try:
|
|
|
|
sentences = re.split(r"[.!?]\s+", text)
|
|
findings = []
|
|
|
|
|
|
finding_markers = [
|
|
"finding",
|
|
"observed",
|
|
"noted",
|
|
"shows",
|
|
"reveals",
|
|
"demonstrates",
|
|
"indicates",
|
|
"evident",
|
|
"apparent",
|
|
"consistent with",
|
|
"suggestive of",
|
|
]
|
|
|
|
|
|
negation_markers = ["no", "not", "none", "negative", "without", "denies"]
|
|
|
|
for sentence in sentences:
|
|
|
|
if len(sentence.split()) < 3:
|
|
continue
|
|
|
|
sentence = sentence.strip()
|
|
|
|
|
|
contains_finding_marker = any(
|
|
marker in sentence.lower() for marker in finding_markers
|
|
)
|
|
|
|
|
|
contains_negation = any(
|
|
marker in sentence.lower().split() for marker in negation_markers
|
|
)
|
|
|
|
|
|
if contains_finding_marker or (
|
|
contains_negation
|
|
and any(
|
|
term in sentence.lower()
|
|
for term in self.finding_severity.keys()
|
|
)
|
|
):
|
|
findings.append(sentence)
|
|
|
|
return findings
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error extracting findings: {e}")
|
|
return []
|
|
|
|
def suggest_followup(self, text, entities, severity):
|
|
"""
|
|
Suggest follow-up actions based on report analysis.
|
|
|
|
Args:
|
|
text (str): Medical report text
|
|
entities (dict): Extracted entities
|
|
severity (dict): Severity assessment
|
|
|
|
Returns:
|
|
list: Suggested follow-up actions
|
|
"""
|
|
try:
|
|
followups = []
|
|
|
|
|
|
severity_level = severity.get("level", "Unknown")
|
|
severity_score = severity.get("score", 0)
|
|
|
|
|
|
problems = entities.get("problem", [])
|
|
|
|
|
|
followup_mentioned = any(
|
|
phrase in text.lower()
|
|
for phrase in [
|
|
"follow up",
|
|
"follow-up",
|
|
"followup",
|
|
"return",
|
|
"refer",
|
|
"consult",
|
|
]
|
|
)
|
|
|
|
|
|
if severity_level == "Critical":
|
|
followups.append("Immediate specialist consultation recommended.")
|
|
|
|
elif severity_level == "Severe":
|
|
followups.append("Prompt follow-up with specialist is recommended.")
|
|
|
|
|
|
for problem in problems:
|
|
if "pneumonia" in problem.lower():
|
|
followups.append(
|
|
"Consider antibiotic therapy and close monitoring."
|
|
)
|
|
elif "fracture" in problem.lower():
|
|
followups.append(
|
|
"Orthopedic consultation for treatment planning."
|
|
)
|
|
elif "mass" in problem.lower() or "tumor" in problem.lower():
|
|
followups.append(
|
|
"Further imaging and possible biopsy recommended."
|
|
)
|
|
|
|
elif severity_level == "Moderate":
|
|
followups.append("Follow-up with primary care physician recommended.")
|
|
if not followup_mentioned and problems:
|
|
followups.append(
|
|
"Consider additional imaging or tests for further evaluation."
|
|
)
|
|
|
|
elif severity_level == "Mild":
|
|
if problems:
|
|
followups.append(
|
|
"Routine follow-up with primary care physician as needed."
|
|
)
|
|
else:
|
|
followups.append("No immediate follow-up required.")
|
|
|
|
else:
|
|
followups.append(
|
|
"No specific follow-up indicated based on this report."
|
|
)
|
|
|
|
|
|
for critical_term in ["mass", "tumor", "nodule", "opacity"]:
|
|
if (
|
|
critical_term in text.lower()
|
|
and "follow-up" not in " ".join(followups).lower()
|
|
):
|
|
followups.append(
|
|
f"Follow-up imaging recommended to monitor {critical_term}."
|
|
)
|
|
break
|
|
|
|
return followups
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error suggesting follow-up: {e}")
|
|
return ["Unable to generate follow-up recommendations."]
|
|
|
|
def analyze(self, text):
|
|
"""
|
|
Perform comprehensive analysis of medical report text.
|
|
|
|
Args:
|
|
text (str): Medical report text
|
|
|
|
Returns:
|
|
dict: Complete analysis results
|
|
"""
|
|
try:
|
|
|
|
entities = self.extract_entities(text)
|
|
|
|
|
|
severity = self.assess_severity(text)
|
|
|
|
|
|
findings = self.extract_findings(text)
|
|
|
|
|
|
followups = self.suggest_followup(text, entities, severity)
|
|
|
|
|
|
report = {
|
|
"entities": entities,
|
|
"severity": severity,
|
|
"findings": findings,
|
|
"followup_recommendations": followups,
|
|
}
|
|
|
|
return report
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error analyzing report: {e}")
|
|
return {"error": str(e)}
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
|
|
analyzer = MedicalReportAnalyzer()
|
|
|
|
sample_report = """
|
|
CHEST X-RAY EXAMINATION
|
|
|
|
CLINICAL HISTORY: 55-year-old male with cough and fever.
|
|
|
|
FINDINGS: The heart size is at the upper limits of normal. The lungs are clear without focal consolidation,
|
|
effusion, or pneumothorax. There is mild prominence of the pulmonary vasculature. No pleural effusion is seen.
|
|
There is a small nodular opacity noted in the right lower lobe measuring approximately 8mm, which is suspicious
|
|
and warrants further investigation. The mediastinum is unremarkable. The visualized bony structures show no acute abnormalities.
|
|
|
|
IMPRESSION:
|
|
1. Mild cardiomegaly.
|
|
2. 8mm nodular opacity in the right lower lobe, recommend follow-up CT for further evaluation.
|
|
3. No acute pulmonary parenchymal abnormality.
|
|
|
|
RECOMMENDATIONS: Follow-up chest CT to further characterize the nodular opacity in the right lower lobe.
|
|
"""
|
|
|
|
results = analyzer.analyze(sample_report)
|
|
|
|
print("\nMedical Report Analysis:")
|
|
print(
|
|
f"\nSeverity: {results['severity']['level']} (Score: {results['severity']['score']})"
|
|
)
|
|
|
|
print("\nKey Findings:")
|
|
for finding in results["findings"]:
|
|
print(f"- {finding}")
|
|
|
|
print("\nEntities:")
|
|
for category, items in results["entities"].items():
|
|
if items:
|
|
print(f"- {category.capitalize()}: {', '.join(items)}")
|
|
|
|
print("\nFollow-up Recommendations:")
|
|
for rec in results["followup_recommendations"]:
|
|
print(f"- {rec}")
|
|
|