mominah commited on
Commit
d062f19
·
verified ·
1 Parent(s): c8e839f

Update check.py

Browse files
Files changed (1) hide show
  1. check.py +89 -173
check.py CHANGED
@@ -1,3 +1,5 @@
 
 
1
  import os
2
  import tempfile
3
  import json
@@ -5,256 +7,170 @@ import numpy as np
5
  import cv2
6
  from PIL import Image
7
  from pdf2image import convert_from_bytes
8
- from fastapi import FastAPI, UploadFile, File, HTTPException
9
  from fastapi.responses import JSONResponse, StreamingResponse
10
- import uvicorn
11
- from fastapi import APIRouter, HTTPException, Path
12
 
 
13
 
14
- # Get API key from environment
15
  GENAI_API_KEY = os.getenv("GENAI_API_KEY")
16
  if not GENAI_API_KEY:
17
  raise Exception("GENAI_API_KEY not set in environment")
18
-
19
- # Import the Google GenAI client libraries.
20
- from google import genai
21
- from google.genai import types
22
-
23
- # Initialize the GenAI client with the API key.
24
  client = genai.Client(api_key=GENAI_API_KEY)
25
 
26
- router = APIRouter(prefix="/check", tags=["check"])
27
-
28
- # Use system temporary directory to store the results file.
29
  TEMP_FOLDER = tempfile.gettempdir()
30
  RESULT_FILE = os.path.join(TEMP_FOLDER, "result_cards.json")
31
 
32
 
33
- ##############################################################
34
- # Preprocessing & Extraction Functions
35
- ##############################################################
36
-
37
  def extract_json_from_output(output_str: str):
38
- """
39
- Extracts a JSON object from a string containing extra text.
40
- """
41
- start = output_str.find('{')
42
- end = output_str.rfind('}')
43
  if start == -1 or end == -1:
44
- print("No JSON block found in the output.")
45
  return None
46
- json_str = output_str[start:end+1]
47
  try:
48
- return json.loads(json_str)
49
- except json.JSONDecodeError as e:
50
- print("Error decoding JSON:", e)
51
  return None
52
 
53
 
54
  def parse_all_answers(image_input: Image.Image) -> str:
55
- """
56
- Extracts answers from an image of a 15-question answer sheet.
57
- Returns the raw JSON string response from the model.
58
- """
59
  output_format = """
60
  Answer in the following JSON format. Do not write anything else:
61
- {
62
- "Answers": {
63
- "1": "<option or text>",
64
- "2": "<option or text>",
65
- "3": "<option or text>",
66
- "4": "<option or text>",
67
- "5": "<option or text>",
68
- "6": "<option or text>",
69
- "7": "<option or text>",
70
- "8": "<option or text>",
71
- "9": "<option or text>",
72
- "10": "<option or text>",
73
- "11": "<free-text answer>",
74
- "12": "<free-text answer>",
75
- "13": "<free-text answer>",
76
- "14": "<free-text answer>",
77
- "15": "<free-text answer>"
78
- }
79
- }
80
  """
81
  prompt = f"""
82
- You are an assistant that extracts answers from an image.
83
- The image is a screenshot of an answer sheet containing 15 questions.
84
- For questions 1 to 10, the answers are multiple-choice selections.
85
- For questions 11 to 15, the answers are free-text responses.
86
- Extract the answer for each question (1 to 15) and provide the result in JSON using the format below:
87
  {output_format}
88
  """
89
  response = client.models.generate_content(
90
- model="gemini-2.0-flash",
91
- contents=[prompt, image_input]
92
  )
93
  return response.text
94
 
95
 
96
  def parse_info(image_input: Image.Image) -> str:
97
- """
98
- Extracts candidate information including name, number, country, level and paper from an image.
99
- Returns the raw JSON string response from the model.
100
- """
101
  output_format = """
102
  Answer in the following JSON format. Do not write anything else:
103
- {
104
- "Candidate Info": {
105
- "Name": "<name>",
106
- "Number": "<number>",
107
- "Country": "<country>",
108
- "Level": "<level>",
109
- "Paper": "<paper>"
110
- }
111
- }
112
  """
113
  prompt = f"""
114
- You are an assistant that extracts candidate information from an image.
115
- The image contains candidate details including name, candidate number, country, level and paper.
116
- Extract the information accurately and provide the result in JSON using the following format:
117
  {output_format}
118
  """
119
  response = client.models.generate_content(
120
- model="gemini-2.0-flash",
121
- contents=[prompt, image_input]
122
  )
123
  return response.text
124
 
125
 
126
  def calculate_result(student_answers: dict, correct_answers: dict) -> dict:
127
- """
128
- Compares student's answers with the correct answers and calculates the score.
129
- Assumes JSON structures with a top-level "Answers" key containing Q1 to Q15.
130
- """
131
- student_all = student_answers.get("Answers", {})
132
- correct_all = correct_answers.get("Answers", {})
133
- total_questions = 15
134
  marks = 0
135
  detailed = {}
136
-
137
- for q in map(str, range(1, total_questions + 1)):
138
- stud_ans = student_all.get(q, "").strip()
139
- corr_ans = correct_all.get(q, "").strip()
140
- if stud_ans == corr_ans:
 
141
  marks += 1
142
- detailed[q] = {"Student": stud_ans, "Correct": corr_ans, "Result": "Correct"}
143
- else:
144
- detailed[q] = {"Student": stud_ans, "Correct": corr_ans, "Result": "Incorrect"}
145
-
146
- percentage = (marks / total_questions) * 100
147
- return {
148
- "Total Marks": marks,
149
- "Total Questions": total_questions,
150
- "Percentage": percentage,
151
- "Detailed Results": detailed
152
- }
153
 
154
 
155
  def load_answer_key(pdf_bytes: bytes) -> dict:
156
- """
157
- Converts a PDF (as bytes) to images, takes the last page, and parses the answers.
158
- Returns the parsed JSON answer key.
159
- """
160
  images = convert_from_bytes(pdf_bytes)
161
- last_page_image = images[-1]
162
- answer_key_response = parse_all_answers(last_page_image)
163
- return extract_json_from_output(answer_key_response)
164
-
165
 
166
- ##############################################################
167
- # FastAPI Endpoints
168
- ##############################################################
169
 
170
- @router.post("/process")
171
  async def process_pdfs(
172
- original_pdf: UploadFile = File(..., description="PDF with all student answer sheets (one page per student)"),
173
- paper_k_pdf: UploadFile = File(..., description="Answer key PDF for Paper K")
174
  ):
175
  try:
176
- # Read file bytes
177
- student_pdf_bytes = await original_pdf.read()
178
- paper_k_bytes = await paper_k_pdf.read()
179
-
180
- # Load the Paper K answer key
181
- answer_key_k = load_answer_key(paper_k_bytes)
182
- if answer_key_k is None:
183
- raise Exception("Failed to parse Paper K answer key.")
184
-
185
- # Convert the student answer PDF to images (each page = one student)
186
- student_images = convert_from_bytes(student_pdf_bytes)
187
  all_results = []
188
-
189
- for idx, page in enumerate(student_images):
190
- # --- Extract Candidate Info Region ---
191
- page_cv = cv2.cvtColor(np.array(page), cv2.COLOR_RGB2BGR)
192
- h, w = page_cv.shape[:2]
193
  mask = np.zeros((h, w), dtype="uint8")
194
  top, bottom = int(h * 0.10), int(h * 0.75)
195
  cv2.rectangle(mask, (0, top), (w, h - bottom), 255, -1)
196
- cropped = cv2.bitwise_and(page_cv, page_cv, mask=mask)
197
  coords = cv2.findNonZero(mask)
198
  if coords is None:
199
  continue
200
  x, y, mw, mh = cv2.boundingRect(coords)
201
- cand_img = Image.fromarray(cv2.cvtColor(cropped[y:y+mh, x:x+mw], cv2.COLOR_BGR2RGB))
202
-
203
- # Extract candidate info
204
- info_resp = parse_info(cand_img)
205
- cand_info = extract_json_from_output(info_resp) or {}
206
-
207
- # Extract student answers
208
- stud_resp = parse_all_answers(page)
209
- stud_answers = extract_json_from_output(stud_resp) or {}
210
-
211
- # Calculate result against Paper K key
212
- result = calculate_result(stud_answers, answer_key_k)
213
-
214
- all_results.append({
215
- "Student Index": idx + 1,
216
- "Candidate Info": cand_info.get("Candidate Info", {}),
217
- "Student Answers": stud_answers,
218
- "Correct Answer Key": answer_key_k,
219
- "Result": result
220
- })
221
-
222
- # Write out JSON file
 
 
 
 
223
  with open(RESULT_FILE, "w", encoding="utf-8") as f:
224
  json.dump({"results": all_results}, f, indent=2)
225
-
226
  return JSONResponse(content={"results": all_results})
227
-
 
 
228
  except Exception as e:
229
- raise HTTPException(status_code=500, detail=str(e))
230
 
231
 
232
- @router.get("/download")
233
  async def download_results():
234
- """
235
- Returns the result JSON file stored in the temporary folder.
236
- """
237
  if not os.path.exists(RESULT_FILE):
238
- raise HTTPException(status_code=404, detail="Result file not found. Please run /process first.")
239
  return StreamingResponse(
240
  open(RESULT_FILE, "rb"),
241
  media_type="application/json",
242
- headers={"Content-Disposition": "attachment; filename=result_cards.json"}
243
  )
244
 
245
 
246
- @router.get("/")
247
- async def root():
248
- return {
249
- "message": "Welcome to the Student Result Card API (Paper K only).",
250
- "usage": (
251
- "POST two PDFs to /process: "
252
- "(1) original answer sheet PDF, "
253
- "(2) Paper K answer-key PDF. "
254
- "Then GET /download to retrieve the graded results."
255
- )
256
- }
257
 
258
 
259
- if __name__ == "__main__":
260
- uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
 
 
1
+ # check.py
2
+
3
  import os
4
  import tempfile
5
  import json
 
7
  import cv2
8
  from PIL import Image
9
  from pdf2image import convert_from_bytes
10
+ from fastapi import APIRouter, UploadFile, File, HTTPException
11
  from fastapi.responses import JSONResponse, StreamingResponse
12
+ from google import genai
 
13
 
14
+ router = APIRouter(prefix="/check", tags=["check"])
15
 
16
+ # GenAI client
17
  GENAI_API_KEY = os.getenv("GENAI_API_KEY")
18
  if not GENAI_API_KEY:
19
  raise Exception("GENAI_API_KEY not set in environment")
 
 
 
 
 
 
20
  client = genai.Client(api_key=GENAI_API_KEY)
21
 
22
+ # Temp storage for results
 
 
23
  TEMP_FOLDER = tempfile.gettempdir()
24
  RESULT_FILE = os.path.join(TEMP_FOLDER, "result_cards.json")
25
 
26
 
 
 
 
 
27
  def extract_json_from_output(output_str: str):
28
+ start = output_str.find("{")
29
+ end = output_str.rfind("}")
 
 
 
30
  if start == -1 or end == -1:
 
31
  return None
 
32
  try:
33
+ return json.loads(output_str[start : end + 1])
34
+ except json.JSONDecodeError:
 
35
  return None
36
 
37
 
38
  def parse_all_answers(image_input: Image.Image) -> str:
 
 
 
 
39
  output_format = """
40
  Answer in the following JSON format. Do not write anything else:
41
+ { "Answers": { "1": "<…>", …, "15": "<…>" } }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
42
  """
43
  prompt = f"""
44
+ You are an assistant that extracts answers from an image of a 15-question sheet.
45
+ Provide ONLY JSON in this format:
 
 
 
46
  {output_format}
47
  """
48
  response = client.models.generate_content(
49
+ model="gemini-2.0-flash", contents=[prompt, image_input]
 
50
  )
51
  return response.text
52
 
53
 
54
  def parse_info(image_input: Image.Image) -> str:
 
 
 
 
55
  output_format = """
56
  Answer in the following JSON format. Do not write anything else:
57
+ { "Candidate Info": { "Name": "<…>", "Number": "<…>", "Country": "<…>", "Level": "<…>", "Paper": "<…>" } }
 
 
 
 
 
 
 
 
58
  """
59
  prompt = f"""
60
+ You are an assistant that extracts candidate info from an image.
61
+ Provide ONLY JSON in this format:
 
62
  {output_format}
63
  """
64
  response = client.models.generate_content(
65
+ model="gemini-2.0-flash", contents=[prompt, image_input]
 
66
  )
67
  return response.text
68
 
69
 
70
  def calculate_result(student_answers: dict, correct_answers: dict) -> dict:
71
+ student_all = (student_answers or {}).get("Answers", {})
72
+ correct_all = (correct_answers or {}).get("Answers", {})
73
+ total = 15
 
 
 
 
74
  marks = 0
75
  detailed = {}
76
+ for q in map(str, range(1, total + 1)):
77
+ stud = (student_all.get(q) or "").strip()
78
+ corr = (correct_all.get(q) or "").strip()
79
+ ok = stud == corr
80
+ detailed[q] = {"Student": stud, "Correct": corr, "Result": "Correct" if ok else "Incorrect"}
81
+ if ok:
82
  marks += 1
83
+ return {"Total Marks": marks, "Total Questions": total, "Percentage": marks / total * 100, "Detailed Results": detailed}
 
 
 
 
 
 
 
 
 
 
84
 
85
 
86
  def load_answer_key(pdf_bytes: bytes) -> dict:
 
 
 
 
87
  images = convert_from_bytes(pdf_bytes)
88
+ last_page = images[-1]
89
+ resp = parse_all_answers(last_page)
90
+ return extract_json_from_output(resp)
 
91
 
 
 
 
92
 
93
+ @router.post("/process", summary="Grade student sheets (Paper K only)")
94
  async def process_pdfs(
95
+ student_pdf: UploadFile = File(..., description="Student sheets PDF"),
96
+ paper_k_pdf: UploadFile = File(..., description="Answer key PDF for Paper K"),
97
  ):
98
  try:
99
+ stud_bytes = await student_pdf.read()
100
+ key_bytes = await paper_k_pdf.read()
101
+
102
+ answer_key = load_answer_key(key_bytes)
103
+ if answer_key is None:
104
+ raise HTTPException(400, detail="Could not parse Paper K answer key.")
105
+
106
+ student_pages = convert_from_bytes(stud_bytes)
 
 
 
107
  all_results = []
108
+
109
+ for idx, page in enumerate(student_pages, start=1):
110
+ # crop candidate-info
111
+ cv = cv2.cvtColor(np.array(page), cv2.COLOR_RGB2BGR)
112
+ h, w = cv.shape[:2]
113
  mask = np.zeros((h, w), dtype="uint8")
114
  top, bottom = int(h * 0.10), int(h * 0.75)
115
  cv2.rectangle(mask, (0, top), (w, h - bottom), 255, -1)
116
+ crop = cv2.bitwise_and(cv, cv, mask=mask)
117
  coords = cv2.findNonZero(mask)
118
  if coords is None:
119
  continue
120
  x, y, mw, mh = cv2.boundingRect(coords)
121
+ cand_img = Image.fromarray(cv2.cvtColor(crop[y : y + mh, x : x + mw], cv2.COLOR_BGR2RGB))
122
+
123
+ # parse candidate info
124
+ info_txt = parse_info(cand_img)
125
+ candidate_info = extract_json_from_output(info_txt) or {}
126
+
127
+ # parse student answers
128
+ stud_txt = parse_all_answers(page)
129
+ stud_answers = extract_json_from_output(stud_txt)
130
+ if stud_answers is None:
131
+ raise HTTPException(400, detail=f"Failed to parse answers on page {idx}.")
132
+
133
+ # grade
134
+ result = calculate_result(stud_answers, answer_key)
135
+
136
+ all_results.append(
137
+ {
138
+ "Student Index": idx,
139
+ "Candidate Info": candidate_info.get("Candidate Info", {}),
140
+ "Student Answers": stud_answers,
141
+ "Correct Answer Key": answer_key,
142
+ "Result": result,
143
+ }
144
+ )
145
+
146
+ # write file
147
  with open(RESULT_FILE, "w", encoding="utf-8") as f:
148
  json.dump({"results": all_results}, f, indent=2)
149
+
150
  return JSONResponse(content={"results": all_results})
151
+
152
+ except HTTPException:
153
+ raise
154
  except Exception as e:
155
+ raise HTTPException(500, detail=str(e))
156
 
157
 
158
+ @router.get("/download", summary="Download latest grading results")
159
  async def download_results():
 
 
 
160
  if not os.path.exists(RESULT_FILE):
161
+ raise HTTPException(404, detail="No results available. Run /check/process first.")
162
  return StreamingResponse(
163
  open(RESULT_FILE, "rb"),
164
  media_type="application/json",
165
+ headers={"Content-Disposition": "attachment; filename=result_cards.json"},
166
  )
167
 
168
 
169
+ @router.get("/health", summary="Health check")
170
+ async def health_check():
171
+ return {"status": "healthy"}
 
 
 
 
 
 
 
 
172
 
173
 
174
+ @router.get("/version", summary="Service version")
175
+ async def version_check():
176
+ return {"version": "1.0.0"}