import os import re import json import time import tempfile from typing import Dict, Any, List, Optional from transformers import AutoTokenizer from sentence_transformers import SentenceTransformer from huggingface_hub import login GEMINI_MODEL = "gemini-2.0-flash" DEFAULT_TEMPERATURE = 0.7 TOKENIZER_MODEL = "answerdotai/ModernBERT-base" SENTENCE_TRANSFORMER_MODEL = "all-MiniLM-L6-v2" hf_token = os.environ.get('HF_TOKEN', None) login(token=hf_token) tokenizer = AutoTokenizer.from_pretrained(TOKENIZER_MODEL) sentence_model = SentenceTransformer(SENTENCE_TRANSFORMER_MODEL) def clean_text(text): text = re.sub(r'\[speaker_\d+\]', '', text) text = re.sub(r'\s+', ' ', text).strip() return text def split_text_by_tokens(text, max_tokens=12000): text = clean_text(text) tokens = tokenizer.encode(text) if len(tokens) <= max_tokens: return [text] split_point = len(tokens) // 2 sentences = re.split(r'(?<=[.!?])\s+', text) first_half = [] second_half = [] current_tokens = 0 for sentence in sentences: sentence_tokens = len(tokenizer.encode(sentence)) if current_tokens + sentence_tokens <= split_point: first_half.append(sentence) current_tokens += sentence_tokens else: second_half.append(sentence) return [" ".join(first_half), " ".join(second_half)] def generate_with_gemini(text, api_key, language, content_type="summary"): from langchain_google_genai import ChatGoogleGenerativeAI os.environ["GOOGLE_API_KEY"] = api_key llm = ChatGoogleGenerativeAI( model=GEMINI_MODEL, temperature=DEFAULT_TEMPERATURE, max_retries=3 ) if content_type == "summary": base_prompt = SUMMARY_PROMPT_TEMPLATE.format(text=text) else: base_prompt = QUIZ_PROMPT_TEMPLATE.format(text=text) language_instruction = f"\nIMPORTANT: Generate ALL content in {language} language." prompt = base_prompt + language_instruction try: messages = [ {"role": "system", "content": "You are a helpful AI assistant that creates high-quality text summaries and quizzes."}, {"role": "user", "content": prompt} ] response = llm.invoke(messages) try: content = response.content json_match = re.search(r'```json\s*([\s\S]*?)\s*```', content) if json_match: json_str = json_match.group(1) else: json_match = re.search(r'(\{[\s\S]*\})', content) if json_match: json_str = json_match.group(1) else: json_str = content # Parse the JSON function_call = json.loads(json_str) return function_call except json.JSONDecodeError: raise Exception("Could not parse JSON from LLM response") except Exception as e: raise Exception(f"Error calling API: {str(e)}") def format_summary_for_display(results, language="English"): output = [] if language == "Uzbek": segment_header = "QISM" key_concepts_header = "ASOSIY TUSHUNCHALAR" summary_header = "QISQACHA MAZMUN" elif language == "Russian": segment_header = "СЕГМЕНТ" key_concepts_header = "КЛЮЧЕВЫЕ ПОНЯТИЯ" summary_header = "КРАТКОЕ СОДЕРЖАНИЕ" else: segment_header = "SEGMENT" key_concepts_header = "KEY CONCEPTS" summary_header = "SUMMARY" segments = results.get("segments", []) for i, segment in enumerate(segments): topic = segment["topic_name"] segment_num = i + 1 output.append(f"\n\n{'='*40}") output.append(f"{segment_header} {segment_num}: {topic}") output.append(f"{'='*40}\n") output.append(f"{key_concepts_header}:") for concept in segment["key_concepts"]: output.append(f"• {concept}") output.append(f"\n{summary_header}:") output.append(segment["summary"]) return "\n".join(output) def format_quiz_for_display(results, language="English"): output = [] if language == "Uzbek": quiz_questions_header = "TEST SAVOLLARI" elif language == "Russian": quiz_questions_header = "ТЕСТОВЫЕ ВОПРОСЫ" else: quiz_questions_header = "QUIZ QUESTIONS" output.append(f"{'='*40}") output.append(f"{quiz_questions_header}") output.append(f"{'='*40}\n") quiz_questions = results.get("quiz_questions", []) for i, q in enumerate(quiz_questions): output.append(f"\n{i+1}. {q['question']}") for j, option in enumerate(q['options']): letter = chr(97 + j).upper() correct_marker = " ✓" if option["correct"] else "" output.append(f" {letter}. {option['text']}{correct_marker}") return "\n".join(output) def analyze_document(text, gemini_api_key, language, content_type="summary"): try: start_time = time.time() text_parts = split_text_by_tokens(text) input_tokens = 0 output_tokens = 0 if content_type == "summary": all_results = {"segments": []} segment_counter = 1 for part in text_parts: actual_prompt = SUMMARY_PROMPT_TEMPLATE.format(text=part) prompt_tokens = len(tokenizer.encode(actual_prompt)) input_tokens += prompt_tokens analysis = generate_with_gemini(part, gemini_api_key, language, "summary") if "segments" in analysis: for segment in analysis["segments"]: segment["segment_number"] = segment_counter all_results["segments"].append(segment) segment_counter += 1 formatted_output = format_summary_for_display(all_results, language) else: # Quiz generation all_results = {"quiz_questions": []} for part in text_parts: actual_prompt = QUIZ_PROMPT_TEMPLATE.format(text=part) prompt_tokens = len(tokenizer.encode(actual_prompt)) input_tokens += prompt_tokens analysis = generate_with_gemini(part, gemini_api_key, language, "quiz") if "quiz_questions" in analysis: remaining_slots = 10 - len(all_results["quiz_questions"]) if remaining_slots > 0: questions_to_add = analysis["quiz_questions"][:remaining_slots] all_results["quiz_questions"].extend(questions_to_add) formatted_output = format_quiz_for_display(all_results, language) end_time = time.time() total_time = end_time - start_time output_tokens = len(tokenizer.encode(formatted_output)) token_info = f"Input tokens: {input_tokens}\nOutput tokens: {output_tokens}\nTotal tokens: {input_tokens + output_tokens}\n" formatted_text = f"Total Processing time: {total_time:.2f}s\n{token_info}\n" + formatted_output json_path = tempfile.mktemp(suffix='.json') with open(json_path, 'w', encoding='utf-8') as json_file: json.dump(all_results, json_file, indent=2) txt_path = tempfile.mktemp(suffix='.txt') with open(txt_path, 'w', encoding='utf-8') as txt_file: txt_file.write(formatted_text) return formatted_text, json_path, txt_path except Exception as e: error_message = f"Error processing document: {str(e)}" return error_message, None, None