|
import json |
|
import openai |
|
import os |
|
import time |
|
import logging |
|
import base64 |
|
import requests |
|
from datetime import datetime |
|
from tenacity import retry, wait_exponential, stop_after_attempt |
|
from datasets import load_dataset |
|
|
|
|
|
logger = logging.getLogger('benchmark') |
|
model_name = 'chatgpt-4o-latest' |
|
temperature = 0.2 |
|
log_filename = None |
|
|
|
def setup_logging(filename): |
|
"""Setup logging configuration""" |
|
global logger |
|
logger.setLevel(logging.INFO) |
|
|
|
|
|
logger.handlers = [] |
|
|
|
|
|
handler = logging.FileHandler(filename) |
|
handler.setFormatter(logging.Formatter('%(message)s')) |
|
logger.addHandler(handler) |
|
|
|
return logger |
|
|
|
def encode_image(image_path): |
|
"""Encode local image to base64 string""" |
|
try: |
|
with open(image_path, "rb") as image_file: |
|
return base64.b64encode(image_file.read()).decode('utf-8') |
|
except Exception as e: |
|
print(f"Error encoding image {image_path}: {str(e)}") |
|
return None |
|
|
|
def encode_image_url(image_url): |
|
"""Encode image from URL to base64 string""" |
|
try: |
|
response = requests.get(image_url) |
|
response.raise_for_status() |
|
return base64.b64encode(response.content).decode('utf-8') |
|
except Exception as e: |
|
print(f"Error encoding image from URL {image_url}: {str(e)}") |
|
return None |
|
|
|
@retry(wait=wait_exponential(multiplier=1, min=4, max=10), stop=stop_after_attempt(3)) |
|
def create_multimodal_request(example, client, use_urls=False, shutdown_event=None): |
|
""" |
|
Create a multimodal request from a dataset example |
|
|
|
Args: |
|
example: Dataset example to process |
|
client: OpenAI client |
|
use_urls: Boolean flag to use image URLs instead of local files |
|
shutdown_event: Optional threading.Event for graceful shutdown |
|
""" |
|
prompt = f"""Given the following medical case: |
|
Please answer this multiple choice question: |
|
{example['question']} |
|
Base your answer only on the provided images and case information.""" |
|
|
|
content = [{"type": "text", "text": prompt}] |
|
|
|
if use_urls: |
|
|
|
image_urls = example['image_source_urls'] |
|
if isinstance(image_urls, str): |
|
image_urls = [image_urls] |
|
elif isinstance(image_urls[0], list): |
|
image_urls = [url for sublist in image_urls for url in sublist] |
|
|
|
for img_url in image_urls: |
|
if img_url and isinstance(img_url, str): |
|
base64_image = encode_image_url(img_url) |
|
if base64_image: |
|
content.append({ |
|
"type": "image_url", |
|
"image_url": { |
|
"url": f"data:image/jpeg;base64,{base64_image}" |
|
} |
|
}) |
|
print(f"Successfully loaded image from URL: {img_url}") |
|
else: |
|
|
|
image_paths = example['images'] |
|
if isinstance(image_paths, str): |
|
image_paths = [image_paths] |
|
elif isinstance(image_paths[0], list): |
|
image_paths = [path for sublist in image_paths for path in sublist] |
|
|
|
for img_path in image_paths: |
|
if img_path and isinstance(img_path, str): |
|
img_path = img_path.replace('figures/', '') |
|
full_path = os.path.join("figures", img_path) |
|
|
|
if os.path.exists(full_path): |
|
base64_image = encode_image(full_path) |
|
if base64_image: |
|
content.append({ |
|
"type": "image_url", |
|
"image_url": { |
|
"url": f"data:image/jpeg;base64,{base64_image}" |
|
} |
|
}) |
|
print(f"Successfully loaded image: {full_path}") |
|
else: |
|
print(f"Image file not found: {full_path}") |
|
|
|
|
|
if len(content) == 1: |
|
print(f"No images found for question {example.get('question_id', 'unknown')}") |
|
log_entry = { |
|
"question_id": example.get('question_id', 'unknown'), |
|
"timestamp": datetime.now().isoformat(), |
|
"model": model_name, |
|
"temperature": temperature, |
|
"status": "skipped", |
|
"reason": "no_images", |
|
"input": { |
|
"question": example['question'], |
|
"explanation": example.get('explanation', ''), |
|
"image_paths": example.get('images' if not use_urls else 'image_source_urls') |
|
} |
|
} |
|
logger.info(json.dumps(log_entry)) |
|
return None |
|
|
|
messages = [ |
|
{"role": "system", "content": "You are a medical imaging expert. Provide only the letter corresponding to your answer choice (A/B/C/D/E/F)."}, |
|
{"role": "user", "content": content} |
|
] |
|
|
|
try: |
|
start_time = time.time() |
|
|
|
response = client.chat.completions.create( |
|
model=model_name, |
|
messages=messages, |
|
max_tokens=50, |
|
temperature=temperature |
|
) |
|
duration = time.time() - start_time |
|
|
|
log_entry = { |
|
"question_id": example.get('question_id', 'unknown'), |
|
"timestamp": datetime.now().isoformat(), |
|
"model": model_name, |
|
"temperature": temperature, |
|
"duration": round(duration, 2), |
|
"usage": { |
|
"prompt_tokens": response.usage.prompt_tokens, |
|
"completion_tokens": response.usage.completion_tokens, |
|
"total_tokens": response.usage.total_tokens |
|
}, |
|
"model_answer": response.choices[0].message.content, |
|
"correct_answer": example['answer'], |
|
"input": { |
|
"messages": messages, |
|
"question": example['question'], |
|
"explanation": example.get('explanation', ''), |
|
"image_source": "url" if use_urls else "local", |
|
"images": example.get('image_source_urls' if use_urls else 'images') |
|
} |
|
} |
|
logger.info(json.dumps(log_entry)) |
|
return response |
|
|
|
except Exception as e: |
|
log_entry = { |
|
"question_id": example.get('question_id', 'unknown'), |
|
"timestamp": datetime.now().isoformat(), |
|
"model": model_name, |
|
"temperature": temperature, |
|
"status": "error", |
|
"error": str(e), |
|
"input": { |
|
"messages": messages, |
|
"question": example['question'], |
|
"explanation": example.get('explanation', ''), |
|
"image_source": "url" if use_urls else "local", |
|
"images": example.get('image_source_urls' if use_urls else 'images') |
|
} |
|
} |
|
logger.info(json.dumps(log_entry)) |
|
print(f"Error processing question {example.get('question_id', 'unknown')}: {str(e)}") |
|
raise |
|
|
|
def main(): |
|
import signal |
|
import threading |
|
import argparse |
|
|
|
|
|
parser = argparse.ArgumentParser(description='Run medical image analysis benchmark') |
|
parser.add_argument('--use-urls', action='store_true', help='Use image URLs instead of local files') |
|
parser.add_argument('--model', type=str, default='chatgpt-4o-latest', help='Model name to use') |
|
parser.add_argument('--temperature', type=float, default=0.2, help='Temperature for model inference') |
|
parser.add_argument('--log-prefix', type=str, help='Prefix for log filename (default: model name)') |
|
parser.add_argument('--max-cases', type=int, default=None, help='Maximum number of cases to process (default: all)') |
|
args = parser.parse_args() |
|
|
|
|
|
global model_name, temperature, log_filename |
|
model_name = args.model |
|
temperature = args.temperature |
|
log_prefix = args.log_prefix if args.log_prefix is not None else args.model |
|
log_filename = f"{log_prefix}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" |
|
|
|
|
|
setup_logging(log_filename) |
|
|
|
|
|
shutdown_event = threading.Event() |
|
|
|
def signal_handler(signum, frame): |
|
print("\nShutdown signal received. Completing current task...") |
|
shutdown_event.set() |
|
|
|
|
|
signal.signal(signal.SIGINT, signal_handler) |
|
signal.signal(signal.SIGTERM, signal_handler) |
|
|
|
|
|
dataset = load_dataset("json", data_files="chestagentbench/metadata.jsonl") |
|
train_dataset = dataset["train"] |
|
|
|
|
|
api_key = os.getenv("OPENAI_API_KEY") |
|
if not api_key: |
|
raise ValueError("OPENAI_API_KEY environment variable is not set.") |
|
|
|
kwargs = {} |
|
if base_url := os.getenv("OPENAI_BASE_URL"): |
|
kwargs["base_url"] = base_url |
|
|
|
|
|
client = openai.OpenAI(api_key=api_key, **kwargs) |
|
|
|
total_examples = len(train_dataset) |
|
processed = 0 |
|
skipped = 0 |
|
|
|
print(f"Beginning benchmark evaluation for model {model_name}") |
|
print(f"Using {'image URLs' if args.use_urls else 'local files'} for images") |
|
print(f"Temperature: {temperature}") |
|
|
|
|
|
dataset_to_process = train_dataset |
|
if args.max_cases is not None: |
|
dataset_to_process = train_dataset.select(range(min(args.max_cases, len(train_dataset)))) |
|
total_examples = len(dataset_to_process) |
|
print(f"Processing {total_examples} cases (limited by --max-cases argument)") |
|
|
|
for example in dataset_to_process: |
|
if shutdown_event.is_set(): |
|
print("\nGraceful shutdown initiated. Saving progress...") |
|
break |
|
|
|
processed += 1 |
|
|
|
response = create_multimodal_request(example, client, args.use_urls, shutdown_event) |
|
|
|
if response is None: |
|
skipped += 1 |
|
print(f"Skipped question: {example.get('question_id', 'unknown')}") |
|
continue |
|
|
|
print(f"Progress: {processed}/{total_examples}") |
|
print(f"Question ID: {example.get('question_id', 'unknown')}") |
|
print(f"Model Answer: {response.choices[0].message.content}") |
|
print(f"Correct Answer: {example['answer']}\n") |
|
|
|
print(f"\nBenchmark Summary:") |
|
print(f"Total Examples Processed: {processed}") |
|
print(f"Total Examples Skipped: {skipped}") |
|
|
|
|
|
if os.path.exists(log_filename) and os.path.getsize(log_filename) > 0: |
|
print(f"\nLog file saved to: {os.path.abspath(log_filename)}") |
|
else: |
|
print(f"\nWarning: Log file could not be verified at: {os.path.abspath(log_filename)}") |
|
print("Please check directory permissions and available disk space.") |
|
|
|
if __name__ == "__main__": |
|
main() |