Amarthya7's picture
Update mediSync/models/multimodal_fusion.py
5cdf85c verified
import logging
from .image_analyzer import XRayImageAnalyzer
from .text_analyzer import MedicalReportAnalyzer
class MultimodalFusion:
"""
A class for fusing insights from image analysis and text analysis of medical data.
This fusion approach combines the strengths of both modalities:
- Images provide visual evidence of abnormalities
- Text reports provide context, history and radiologist interpretations
The combined analysis provides a more comprehensive understanding than either modality alone.
"""
def __init__(self, image_model=None, text_model=None, device=None):
"""
Initialize the multimodal fusion module with image and text analyzers.
Args:
image_model (str, optional): Model to use for image analysis
text_model (str, optional): Model to use for text analysis
device (str, optional): Device to run models on ('cuda' or 'cpu')
"""
self.logger = logging.getLogger(__name__)
# Determine device
if device is None:
import torch
self.device = "cuda" if torch.cuda.is_available() else "cpu"
else:
self.device = device
self.logger.info(f"Using device: {self.device}")
# Initialize image analyzer
try:
self.image_analyzer = XRayImageAnalyzer(
model_name=image_model
if image_model
else "codewithdark/vit-chest-xray",
device=self.device,
)
self.logger.info("Successfully initialized image analyzer")
except Exception as e:
self.logger.error(f"Failed to initialize image analyzer: {e}")
self.image_analyzer = None
# Initialize text analyzer
try:
self.text_analyzer = MedicalReportAnalyzer(
classifier_model=text_model if text_model else "medicalai/ClinicalBERT",
device=self.device,
)
self.logger.info("Successfully initialized text analyzer")
except Exception as e:
self.logger.error(f"Failed to initialize text analyzer: {e}")
self.text_analyzer = None
def analyze_image(self, image_path):
"""
Analyze a medical image.
Args:
image_path (str): Path to the medical image
Returns:
dict: Image analysis results
"""
if not self.image_analyzer:
self.logger.warning("Image analyzer not available")
return {"error": "Image analyzer not available"}
try:
return self.image_analyzer.analyze(image_path)
except Exception as e:
self.logger.error(f"Error analyzing image: {e}")
return {"error": str(e)}
def analyze_text(self, text):
"""
Analyze medical report text.
Args:
text (str): Medical report text
Returns:
dict: Text analysis results
"""
if not self.text_analyzer:
self.logger.warning("Text analyzer not available")
return {"error": "Text analyzer not available"}
try:
return self.text_analyzer.analyze(text)
except Exception as e:
self.logger.error(f"Error analyzing text: {e}")
return {"error": str(e)}
def _calculate_agreement_score(self, image_results, text_results):
"""
Calculate agreement score between image and text analyses.
Args:
image_results (dict): Results from image analysis
text_results (dict): Results from text analysis
Returns:
float: Agreement score (0-1, where 1 is perfect agreement)
"""
try:
# Default to neutral agreement
agreement = 0.5
# Check if image detected abnormality
image_abnormal = image_results.get("has_abnormality", False)
# Check text severity
text_severity = text_results.get("severity", {}).get("level", "Unknown")
text_abnormal = text_severity not in ["Normal", "Unknown"]
# Basic agreement check
if image_abnormal == text_abnormal:
agreement += 0.25
else:
agreement -= 0.25
# Check if specific findings match
image_finding = image_results.get("primary_finding", "").lower()
# Extract problem entities from text
problems = text_results.get("entities", {}).get("problem", [])
problem_text = " ".join(problems).lower()
# Check for common keywords in both
common_conditions = [
"pneumonia",
"effusion",
"nodule",
"mass",
"cardiomegaly",
"opacity",
"fracture",
"tumor",
"edema",
]
matching_conditions = 0
total_mentioned = 0
for condition in common_conditions:
in_image = condition in image_finding
in_text = condition in problem_text
if in_image or in_text:
total_mentioned += 1
if in_image and in_text:
matching_conditions += 1
agreement += 0.05 # Boost agreement for each matching condition
# Calculate condition match ratio if any conditions were mentioned
if total_mentioned > 0:
match_ratio = matching_conditions / total_mentioned
agreement += match_ratio * 0.2
# Normalize agreement to 0-1 range
agreement = max(0, min(1, agreement))
return agreement
except Exception as e:
self.logger.error(f"Error calculating agreement score: {e}")
return 0.5 # Return neutral agreement on error
def _get_confidence_weighted_finding(self, image_results, text_results, agreement):
"""
Get the most confident finding weighted by modality confidence.
Args:
image_results (dict): Results from image analysis
text_results (dict): Results from text analysis
agreement (float): Agreement score between modalities
Returns:
str: Most confident finding
"""
try:
image_finding = image_results.get("primary_finding", "")
image_confidence = image_results.get("confidence", 0.5)
# For text, use the most severe problem as primary finding
problems = text_results.get("entities", {}).get("problem", [])
text_confidence = text_results.get("severity", {}).get("confidence", 0.5)
if not problems:
# No problems identified in text
if image_confidence > 0.7:
return image_finding
else:
return "No significant findings"
# Simple confidence-weighted selection
if image_confidence > text_confidence + 0.2:
return image_finding
elif problems and text_confidence > image_confidence + 0.2:
return (
problems[0]
if isinstance(problems, list) and problems
else "Unknown finding"
)
else:
# Similar confidence, check agreement
if agreement > 0.7:
# High agreement, try to find the specific condition mentioned in both
for problem in problems:
if problem.lower() in image_finding.lower():
return problem
# Default to image finding if high confidence
if image_confidence > 0.6:
return image_finding
elif problems:
return problems[0]
else:
return image_finding
else:
# Low agreement, include both perspectives
if image_finding and problems:
return f"{image_finding} (image) / {problems[0]} (report)"
elif image_finding:
return image_finding
elif problems:
return problems[0]
else:
return "Findings unclear - review recommended"
except Exception as e:
self.logger.error(f"Error getting weighted finding: {e}")
return "Unable to determine primary finding"
def _merge_followup_recommendations(self, image_results, text_results):
"""
Merge follow-up recommendations from both modalities.
Args:
image_results (dict): Results from image analysis
text_results (dict): Results from text analysis
Returns:
list: Combined follow-up recommendations
"""
try:
# Get text-based recommendations
text_recommendations = text_results.get("followup_recommendations", [])
# Create image-based recommendations based on findings
image_recommendations = []
if image_results.get("has_abnormality", False):
primary = image_results.get("primary_finding", "")
confidence = image_results.get("confidence", 0)
if (
"nodule" in primary.lower()
or "mass" in primary.lower()
or "tumor" in primary.lower()
):
image_recommendations.append(
f"Follow-up imaging recommended to further evaluate {primary}."
)
elif "pneumonia" in primary.lower():
image_recommendations.append(
"Clinical correlation and follow-up imaging recommended."
)
elif confidence > 0.8:
image_recommendations.append(
f"Consider follow-up imaging to monitor {primary}."
)
elif confidence > 0.5:
image_recommendations.append(
"Consider clinical correlation and potential follow-up."
)
# Combine recommendations, removing duplicates
all_recommendations = text_recommendations + image_recommendations
# Remove near-duplicates (similar recommendations)
unique_recommendations = []
for rec in all_recommendations:
if not any(
self._is_similar_recommendation(rec, existing)
for existing in unique_recommendations
):
unique_recommendations.append(rec)
return unique_recommendations
except Exception as e:
self.logger.error(f"Error merging follow-up recommendations: {e}")
return ["Follow-up recommended based on findings."]
def _is_similar_recommendation(self, rec1, rec2):
"""Check if two recommendations are semantically similar."""
# Convert to lowercase for comparison
rec1_lower = rec1.lower()
rec2_lower = rec2.lower()
# Check for significant overlap
words1 = set(rec1_lower.split())
words2 = set(rec2_lower.split())
# Calculate Jaccard similarity
intersection = words1.intersection(words2)
union = words1.union(words2)
similarity = len(intersection) / len(union) if union else 0
# Consider similar if more than 60% overlap
return similarity > 0.6
def _get_final_severity(self, image_results, text_results, agreement):
"""
Determine final severity based on both modalities.
Args:
image_results (dict): Results from image analysis
text_results (dict): Results from text analysis
agreement (float): Agreement score between modalities
Returns:
dict: Final severity assessment
"""
try:
# Get text-based severity
text_severity = text_results.get("severity", {})
text_level = text_severity.get("level", "Unknown")
text_score = text_severity.get("score", 0)
text_confidence = text_severity.get("confidence", 0.5)
# Convert image findings to severity
image_abnormal = image_results.get("has_abnormality", False)
image_confidence = image_results.get("confidence", 0.5)
# Default severity mapping from image
image_severity = "Normal" if not image_abnormal else "Moderate"
image_score = 0 if not image_abnormal else 2.0
# Adjust image severity based on specific findings
primary_finding = image_results.get("primary_finding", "").lower()
# Map certain conditions to severity levels
severity_mapping = {
"pneumonia": ("Moderate", 2.5),
"pneumothorax": ("Severe", 3.0),
"effusion": ("Moderate", 2.0),
"pulmonary edema": ("Moderate", 2.5),
"nodule": ("Mild", 1.5),
"mass": ("Moderate", 2.5),
"tumor": ("Severe", 3.0),
"cardiomegaly": ("Mild", 1.5),
"fracture": ("Moderate", 2.0),
"consolidation": ("Moderate", 2.0),
}
# Check if any key terms are in the primary finding
for key, (severity, score) in severity_mapping.items():
if key in primary_finding:
image_severity = severity
image_score = score
break
# Weight based on confidence and agreement
if agreement > 0.7:
# High agreement - weight equally
final_score = (image_score + text_score) / 2
else:
# Lower agreement - weight by confidence
total_confidence = image_confidence + text_confidence
if total_confidence > 0:
image_weight = image_confidence / total_confidence
text_weight = text_confidence / total_confidence
final_score = (image_score * image_weight) + (
text_score * text_weight
)
else:
final_score = (image_score + text_score) / 2
# Map score to severity level
severity_levels = {
0: "Normal",
1: "Mild",
2: "Moderate",
3: "Severe",
4: "Critical",
}
# Round to nearest level
level_index = round(min(4, max(0, final_score)))
final_level = severity_levels[level_index]
return {
"level": final_level,
"score": round(final_score, 1),
"confidence": round((image_confidence + text_confidence) / 2, 2),
}
except Exception as e:
self.logger.error(f"Error determining final severity: {e}")
return {"level": "Unknown", "score": 0, "confidence": 0}
def fuse_analyses(self, image_results, text_results):
"""
Fuse the results from image and text analyses.
Args:
image_results (dict): Results from image analysis
text_results (dict): Results from text analysis
Returns:
dict: Fused analysis results
"""
try:
# Calculate agreement between modalities
agreement = self._calculate_agreement_score(image_results, text_results)
self.logger.info(f"Agreement score between modalities: {agreement:.2f}")
# Get confidence-weighted primary finding
primary_finding = self._get_confidence_weighted_finding(
image_results, text_results, agreement
)
# Merge follow-up recommendations
followup = self._merge_followup_recommendations(image_results, text_results)
# Get final severity assessment
severity = self._get_final_severity(image_results, text_results, agreement)
# Create comprehensive findings list
findings = []
# Add text-extracted findings
text_findings = text_results.get("findings", [])
if text_findings:
findings.extend(text_findings)
# Add primary image finding if not already included
image_finding = image_results.get("primary_finding", "")
if image_finding and not any(
image_finding.lower() in f.lower() for f in findings
):
findings.append(f"Image finding: {image_finding}")
# Create fused result
fused_result = {
"agreement_score": round(agreement, 2),
"primary_finding": primary_finding,
"severity": severity,
"findings": findings,
"followup_recommendations": followup,
"modality_results": {"image": image_results, "text": text_results},
}
return fused_result
except Exception as e:
self.logger.error(f"Error fusing analyses: {e}")
return {
"error": str(e),
"modality_results": {"image": image_results, "text": text_results},
}
def analyze(self, image_path, report_text):
"""
Perform multimodal analysis of medical image and report.
Args:
image_path (str): Path to the medical image
report_text (str): Medical report text
Returns:
dict: Fused analysis results
"""
try:
# Analyze image
image_results = self.analyze_image(image_path)
# Analyze text
text_results = self.analyze_text(report_text)
# Fuse the analyses
return self.fuse_analyses(image_results, text_results)
except Exception as e:
self.logger.error(f"Error in multimodal analysis: {e}")
return {"error": str(e)}
def get_explanation(self, fused_results):
"""
Generate a human-readable explanation of the fused analysis.
Args:
fused_results (dict): Results from the fused analysis
Returns:
str: A text explanation of the fused analysis
"""
try:
explanation = []
# Add overview section
primary_finding = fused_results.get("primary_finding", "Unknown")
severity = fused_results.get("severity", {}).get("level", "Unknown")
explanation.append("# Medical Analysis Summary\n")
explanation.append("## Overview\n")
explanation.append(f"Primary finding: **{primary_finding}**\n")
explanation.append(f"Severity level: **{severity}**\n")
# Add agreement information
agreement = fused_results.get("agreement_score", 0)
agreement_text = (
"High" if agreement > 0.7 else "Moderate" if agreement > 0.4 else "Low"
)
explanation.append(
f"Image and text analysis agreement: **{agreement_text}** ({agreement:.0%})\n"
)
# Add findings section
explanation.append("\n## Detailed Findings\n")
findings = fused_results.get("findings", [])
if findings:
for finding in findings:
explanation.append(f"- {finding}\n")
else:
explanation.append("No specific findings detailed.\n")
# Add follow-up section
explanation.append("\n## Recommended Follow-up\n")
followups = fused_results.get("followup_recommendations", [])
if followups:
for followup in followups:
explanation.append(f"- {followup}\n")
else:
explanation.append("No specific follow-up recommendations provided.\n")
# Add confidence note
confidence = fused_results.get("severity", {}).get("confidence", 0)
explanation.append(
f"\n*Note: This analysis has a confidence level of {confidence:.0%}. "
f"Please consult with healthcare professionals for official diagnosis.*"
)
return "\n".join(explanation)
except Exception as e:
self.logger.error(f"Error generating explanation: {e}")
return "Error generating analysis explanation."
# Example usage
if __name__ == "__main__":
# Set up logging
logging.basicConfig(level=logging.INFO)
# Test on sample data if available
import os
fusion = MultimodalFusion()
# Sample text report
sample_report = """
CHEST X-RAY EXAMINATION
CLINICAL HISTORY: 55-year-old male with cough and fever.
FINDINGS: The heart size is at the upper limits of normal. The lungs are clear without focal consolidation,
effusion, or pneumothorax. There is mild prominence of the pulmonary vasculature. No pleural effusion is seen.
There is a small nodular opacity noted in the right lower lobe measuring approximately 8mm, which is suspicious
and warrants further investigation. The mediastinum is unremarkable. The visualized bony structures show no acute abnormalities.
IMPRESSION:
1. Mild cardiomegaly.
2. 8mm nodular opacity in the right lower lobe, recommend follow-up CT for further evaluation.
3. No acute pulmonary parenchymal abnormality.
RECOMMENDATIONS: Follow-up chest CT to further characterize the nodular opacity in the right lower lobe.
"""
# Check if sample data directory exists and contains images
sample_dir = "../data/sample"
if os.path.exists(sample_dir) and os.listdir(sample_dir):
sample_image = os.path.join(sample_dir, os.listdir(sample_dir)[0])
print(f"Analyzing sample image: {sample_image}")
# Perform multimodal analysis
fused_results = fusion.analyze(sample_image, sample_report)
explanation = fusion.get_explanation(fused_results)
print("\nFused Analysis Results:")
print(explanation)
else:
print("No sample images found. Only analyzing text report.")
# Analyze just the text
text_results = fusion.analyze_text(sample_report)
print("\nText Analysis Results:")
print(
f"Severity: {text_results['severity']['level']} (Score: {text_results['severity']['score']})"
)
print("\nKey Findings:")
for finding in text_results["findings"]:
print(f"- {finding}")
print("\nEntities:")
for category, items in text_results["entities"].items():
if items:
print(f"- {category.capitalize()}: {', '.join(items)}")
print("\nFollow-up Recommendations:")
for rec in text_results["followup_recommendations"]:
print(f"- {rec}")