Spaces:
Running
Running
# check.py | |
import os | |
import tempfile | |
import json | |
import numpy as np | |
import cv2 | |
from PIL import Image | |
from pdf2image import convert_from_bytes | |
from fastapi import APIRouter, UploadFile, File, HTTPException | |
from fastapi.responses import JSONResponse, StreamingResponse | |
from google import genai | |
router = APIRouter(prefix="/check", tags=["check"]) | |
# GenAI client | |
GENAI_API_KEY = os.getenv("GENAI_API_KEY") | |
if not GENAI_API_KEY: | |
raise Exception("GENAI_API_KEY not set in environment") | |
client = genai.Client(api_key=GENAI_API_KEY) | |
# Temp storage for results | |
TEMP_FOLDER = tempfile.gettempdir() | |
RESULT_FILE = os.path.join(TEMP_FOLDER, "result_cards.json") | |
def extract_json_from_output(output_str: str): | |
start = output_str.find("{") | |
end = output_str.rfind("}") | |
if start == -1 or end == -1: | |
return None | |
try: | |
return json.loads(output_str[start : end + 1]) | |
except json.JSONDecodeError: | |
return None | |
def parse_all_answers(image_input: Image.Image) -> str: | |
output_format = """ | |
Answer in the following JSON format. Do not write anything else: | |
{ "Answers": { "1": "<…>", …, "15": "<…>" } } | |
""" | |
prompt = f""" | |
You are an assistant that extracts answers from an image of a 15-question sheet. | |
Provide ONLY JSON in this format: | |
{output_format} | |
""" | |
response = client.models.generate_content( | |
model="gemini-2.0-flash", contents=[prompt, image_input] | |
) | |
return response.text | |
def parse_info(image_input: Image.Image) -> str: | |
output_format = """ | |
Answer in the following JSON format. Do not write anything else: | |
{ "Candidate Info": { "Name": "<…>", "Number": "<…>", "Country": "<…>", "Level": "<…>", "Paper": "<…>" } } | |
""" | |
prompt = f""" | |
You are an assistant that extracts candidate info from an image. | |
Provide ONLY JSON in this format: | |
{output_format} | |
""" | |
response = client.models.generate_content( | |
model="gemini-2.0-flash", contents=[prompt, image_input] | |
) | |
return response.text | |
def calculate_result(student_answers: dict, correct_answers: dict) -> dict: | |
student_all = (student_answers or {}).get("Answers", {}) | |
correct_all = (correct_answers or {}).get("Answers", {}) | |
total = 15 | |
marks = 0 | |
detailed = {} | |
for q in map(str, range(1, total + 1)): | |
stud = (student_all.get(q) or "").strip() | |
corr = (correct_all.get(q) or "").strip() | |
ok = stud == corr | |
detailed[q] = {"Student": stud, "Correct": corr, "Result": "Correct" if ok else "Incorrect"} | |
if ok: | |
marks += 1 | |
return {"Total Marks": marks, "Total Questions": total, "Percentage": marks / total * 100, "Detailed Results": detailed} | |
def load_answer_key(pdf_bytes: bytes) -> dict: | |
images = convert_from_bytes(pdf_bytes) | |
last_page = images[-1] | |
resp = parse_all_answers(last_page) | |
return extract_json_from_output(resp) | |
async def process_pdfs( | |
student_pdf: UploadFile = File(..., description="Student sheets PDF"), | |
paper_k_pdf: UploadFile = File(..., description="Answer key PDF for Paper K"), | |
): | |
try: | |
stud_bytes = await student_pdf.read() | |
key_bytes = await paper_k_pdf.read() | |
answer_key = load_answer_key(key_bytes) | |
if answer_key is None: | |
raise HTTPException(400, detail="Could not parse Paper K answer key.") | |
student_pages = convert_from_bytes(stud_bytes) | |
all_results = [] | |
for idx, page in enumerate(student_pages, start=1): | |
# crop candidate-info | |
cv = cv2.cvtColor(np.array(page), cv2.COLOR_RGB2BGR) | |
h, w = cv.shape[:2] | |
mask = np.zeros((h, w), dtype="uint8") | |
top, bottom = int(h * 0.10), int(h * 0.75) | |
cv2.rectangle(mask, (0, top), (w, h - bottom), 255, -1) | |
crop = cv2.bitwise_and(cv, cv, mask=mask) | |
coords = cv2.findNonZero(mask) | |
if coords is None: | |
continue | |
x, y, mw, mh = cv2.boundingRect(coords) | |
cand_img = Image.fromarray(cv2.cvtColor(crop[y : y + mh, x : x + mw], cv2.COLOR_BGR2RGB)) | |
# parse candidate info | |
info_txt = parse_info(cand_img) | |
candidate_info = extract_json_from_output(info_txt) or {} | |
# parse student answers | |
stud_txt = parse_all_answers(page) | |
stud_answers = extract_json_from_output(stud_txt) | |
if stud_answers is None: | |
raise HTTPException(400, detail=f"Failed to parse answers on page {idx}.") | |
# grade | |
result = calculate_result(stud_answers, answer_key) | |
all_results.append( | |
{ | |
"Student Index": idx, | |
"Candidate Info": candidate_info.get("Candidate Info", {}), | |
"Student Answers": stud_answers, | |
"Correct Answer Key": answer_key, | |
"Result": result, | |
} | |
) | |
# write file | |
with open(RESULT_FILE, "w", encoding="utf-8") as f: | |
json.dump({"results": all_results}, f, indent=2) | |
return JSONResponse(content={"results": all_results}) | |
except HTTPException: | |
raise | |
except Exception as e: | |
raise HTTPException(500, detail=str(e)) | |
async def download_results(): | |
if not os.path.exists(RESULT_FILE): | |
raise HTTPException(404, detail="No results available. Run /check/process first.") | |
return StreamingResponse( | |
open(RESULT_FILE, "rb"), | |
media_type="application/json", | |
headers={"Content-Disposition": "attachment; filename=result_cards.json"}, | |
) | |
async def health_check(): | |
return {"status": "healthy"} | |
async def version_check(): | |
return {"version": "1.0.0"} | |