# # SPDX-FileCopyrightText: Hadad # SPDX-License-Identifier: Apache-2.0 # import asyncio import codecs import docx import gradio as gr import httpx import json import os import pandas as pd import pdfplumber import pytesseract import random import requests import threading import uuid import zipfile import io from PIL import Image from pathlib import Path from pptx import Presentation from openpyxl import load_workbook # Install OCR tools and dependencies os.system("apt-get update -q -y && apt-get install -q -y tesseract-ocr tesseract-ocr-eng tesseract-ocr-ind libleptonica-dev libtesseract-dev") # ============================ # Environments # ============================ # Initial welcome messages JARVIS_INIT = json.loads(os.getenv("HELLO", "[]")) # Deep Search DEEP_SEARCH_PROVIDER_HOST = os.getenv("DEEP_SEARCH_PROVIDER_HOST") DEEP_SEARCH_PROVIDER_KEY = os.getenv('DEEP_SEARCH_PROVIDER_KEY') DEEP_SEARCH_INSTRUCTIONS = os.getenv("DEEP_SEARCH_INSTRUCTIONS") # AI servers, keys and instructions INTERNAL_AI_GET_SERVER = os.getenv("INTERNAL_AI_GET_SERVER") INTERNAL_AI_INSTRUCTIONS = os.getenv("INTERNAL_TRAINING_DATA") # System instructions mapping for various models and default instructions SYSTEM_PROMPT_MAPPING = json.loads(os.getenv("SYSTEM_PROMPT_MAPPING", "{}")) SYSTEM_PROMPT_DEFAULT = os.getenv("DEFAULT_SYSTEM") # List of available servers and keys LINUX_SERVER_HOSTS = [h for h in json.loads(os.getenv("LINUX_SERVER_HOST", "[]")) if h] LINUX_SERVER_PROVIDER_KEYS = [k for k in json.loads(os.getenv("LINUX_SERVER_PROVIDER_KEY", "[]")) if k] # Sets and dicts to track problematic keys and their retry attempts LINUX_SERVER_PROVIDER_KEYS_MARKED = set() LINUX_SERVER_PROVIDER_KEYS_ATTEMPTS = {} # HTTP error codes considered as server errors for marking keys LINUX_SERVER_ERRORS = set(map(int, filter(None, os.getenv("LINUX_SERVER_ERROR", "").split(",")))) # UI labels and responses from environment variables for easy localization/customization AI_TYPES = {f"AI_TYPE_{i}": os.getenv(f"AI_TYPE_{i}") for i in range(1, 10)} RESPONSES = {f"RESPONSE_{i}": os.getenv(f"RESPONSE_{i}") for i in range(1, 11)} # Model mapping and configuration loaded from environment variables MODEL_MAPPING = json.loads(os.getenv("MODEL_MAPPING", "{}")) MODEL_CONFIG = json.loads(os.getenv("MODEL_CONFIG", "{}")) MODEL_CHOICES = list(MODEL_MAPPING.values()) # Default model config and key for fallback DEFAULT_CONFIG = json.loads(os.getenv("DEFAULT_CONFIG", "{}")) DEFAULT_MODEL_KEY = list(MODEL_MAPPING.keys())[0] if MODEL_MAPPING else None # Meta tags for HTML head (SEO, etc.) META_TAGS = os.getenv("META_TAGS") # Allowed file extensions for upload (e.g., .pdf, .docx, etc.) ALLOWED_EXTENSIONS = json.loads(os.getenv("ALLOWED_EXTENSIONS", "[]")) # ============================ # Session Management # ============================ class SessionWithID(requests.Session): """ Custom session object that holds a unique session ID and async control flags. Used to track individual user sessions and allow cancellation of ongoing requests. """ def __init__(self): super().__init__() self.session_id = str(uuid.uuid4()) # Unique ID per session self.stop_event = asyncio.Event() # Async event to signal stop requests self.cancel_token = {"cancelled": False} # Flag to indicate cancellation def create_session(): """ Create and return a new SessionWithID object. Called when a new user session starts or chat is reset. """ return SessionWithID() def ensure_stop_event(sess): """ Ensure that the session object has stop_event and cancel_token attributes. Useful when restoring or reusing sessions. """ if not hasattr(sess, "stop_event"): sess.stop_event = asyncio.Event() if not hasattr(sess, "cancel_token"): sess.cancel_token = {"cancelled": False} def marked_item(item, marked, attempts): """ Mark a provider key or host as temporarily problematic after repeated failures. Automatically unmark after 5 minutes to retry. This helps avoid repeatedly using failing providers. """ marked.add(item) attempts[item] = attempts.get(item, 0) + 1 if attempts[item] >= 3: def remove(): marked.discard(item) attempts.pop(item, None) threading.Timer(300, remove).start() def get_model_key(display): """ Get the internal model key (identifier) from the display name. Returns default model key if not found. """ return next((k for k, v in MODEL_MAPPING.items() if v == display), DEFAULT_MODEL_KEY) # ============================ # File Content Extraction Utilities # ============================ def extract_pdf_content(fp): """ Extract text content from PDF file. Includes OCR on embedded images to capture text within images. Also extracts tables as tab-separated text. """ content = "" try: with pdfplumber.open(fp) as pdf: for page in pdf.pages: # Extract text from page text = page.extract_text() or "" content += text + "\n" # OCR on images if any if page.images: img_obj = page.to_image(resolution=300) for img in page.images: bbox = (img["x0"], img["top"], img["x1"], img["bottom"]) cropped = img_obj.original.crop(bbox) ocr_text = pytesseract.image_to_string(cropped) if ocr_text.strip(): content += ocr_text + "\n" # Extract tables as TSV tables = page.extract_tables() for table in tables: for row in table: cells = [str(cell) for cell in row if cell is not None] if cells: content += "\t".join(cells) + "\n" except Exception as e: content += f"\n[Error reading PDF {fp}: {e}]" return content.strip() def extract_docx_content(fp): """ Extract text from Microsoft Word files. Also performs OCR on embedded images inside the Microsoft Word archive. """ content = "" try: doc = docx.Document(fp) # Extract paragraphs for para in doc.paragraphs: content += para.text + "\n" # Extract tables for table in doc.tables: for row in table.rows: cells = [cell.text for cell in row.cells] content += "\t".join(cells) + "\n" # OCR on embedded images inside Microsoft Word with zipfile.ZipFile(fp) as z: for file in z.namelist(): if file.startswith("word/media/"): data = z.read(file) try: img = Image.open(io.BytesIO(data)) ocr_text = pytesseract.image_to_string(img) if ocr_text.strip(): content += ocr_text + "\n" except Exception: # Ignore images that can't be processed pass except Exception as e: content += f"\n[Error reading Microsoft Word {fp}: {e}]" return content.strip() def extract_excel_content(fp): """ Extract content from Microsoft Excel files. Converts sheets to CSV text. Attempts OCR on embedded images if present. """ content = "" try: # Extract all sheets as CSV text sheets = pd.read_excel(fp, sheet_name=None) for name, df in sheets.items(): content += f"Sheet: {name}\n" content += df.to_csv(index=False) + "\n" # Load workbook to access images wb = load_workbook(fp, data_only=True) if wb._images: for image in wb._images: try: pil_img = Image.open(io.BytesIO(image._data())) ocr_text = pytesseract.image_to_string(pil_img) if ocr_text.strip(): content += ocr_text + "\n" except Exception: # Ignore images that can't be processed pass except Exception as e: content += f"\n[Error reading Microsoft Excel {fp}: {e}]" return content.strip() def extract_pptx_content(fp): """ Extract text content from Microsoft PowerPoint presentation slides. Includes text from shapes and tables. Performs OCR on embedded images. """ content = "" try: prs = Presentation(fp) for slide in prs.slides: for shape in slide.shapes: # Extract text from shapes if hasattr(shape, "text") and shape.text: content += shape.text + "\n" # OCR on images inside shapes if shape.shape_type == 13 and hasattr(shape, "image") and shape.image: try: img = Image.open(io.BytesIO(shape.image.blob)) ocr_text = pytesseract.image_to_string(img) if ocr_text.strip(): content += ocr_text + "\n" except Exception: pass # Extract tables for shape in slide.shapes: if shape.has_table: table = shape.table for row in table.rows: cells = [cell.text for cell in row.cells] content += "\t".join(cells) + "\n" except Exception as e: content += f"\n[Error reading Microsoft PowerPoint {fp}: {e}]" return content.strip() def extract_file_content(fp): """ Determine file type by extension and extract text content accordingly. For unknown types, attempts to read as plain text. """ ext = Path(fp).suffix.lower() if ext == ".pdf": return extract_pdf_content(fp) elif ext in [".doc", ".docx"]: return extract_docx_content(fp) elif ext in [".xlsx", ".xls"]: return extract_excel_content(fp) elif ext in [".ppt", ".pptx"]: return extract_pptx_content(fp) else: try: return Path(fp).read_text(encoding="utf-8").strip() except Exception as e: return f"\n[Error reading file {fp}: {e}]" # ============================ # Server Communication # ============================ async def fetch_response_stream_async(host, key, model, msgs, cfg, sid, stop_event, cancel_token): """ Async generator that streams AI responses from a backend server. Implements retry logic and marks failing keys to avoid repeated failures. Streams reasoning and content separately for richer UI updates. """ for timeout in [5, 10]: try: async with httpx.AsyncClient(timeout=timeout) as client: async with client.stream( "POST", host, json={**{"model": model, "messages": msgs, "session_id": sid, "stream": True}, **cfg}, headers={"Authorization": f"Bearer {key}"} ) as response: if response.status_code in LINUX_SERVER_ERRORS: marked_item(key, LINUX_SERVER_PROVIDER_KEYS_MARKED, LINUX_SERVER_PROVIDER_KEYS_ATTEMPTS) return async for line in response.aiter_lines(): if stop_event.is_set() or cancel_token["cancelled"]: return if not line: continue if line.startswith("data: "): data = line[6:] if data.strip() == RESPONSES["RESPONSE_10"]: return try: j = json.loads(data) if isinstance(j, dict) and j.get("choices"): for ch in j["choices"]: delta = ch.get("delta", {}) # Stream reasoning text separately for UI if "reasoning" in delta and delta["reasoning"]: decoded = delta["reasoning"].encode('utf-8').decode('unicode_escape') yield ("reasoning", decoded) # Stream main content text if "content" in delta and delta["content"]: yield ("content", delta["content"]) except Exception: # Ignore malformed JSON or unexpected data continue except Exception: # Network or other errors, try next timeout or mark key continue marked_item(key, LINUX_SERVER_PROVIDER_KEYS_MARKED, LINUX_SERVER_PROVIDER_KEYS_ATTEMPTS) return async def chat_with_model_async(history, user_input, model_display, sess, custom_prompt, deep_search): """ Core async function to interact with AI model. Prepares message history, system instructions, and optionally integrates deep search results. Tries multiple backend hosts and keys with fallback. Yields streamed responses for UI updates. """ ensure_stop_event(sess) sess.stop_event.clear() sess.cancel_token["cancelled"] = False if not LINUX_SERVER_PROVIDER_KEYS or not LINUX_SERVER_HOSTS: yield ("content", RESPONSES["RESPONSE_3"]) # No providers available return if not hasattr(sess, "session_id") or not sess.session_id: sess.session_id = str(uuid.uuid4()) model_key = get_model_key(model_display) cfg = MODEL_CONFIG.get(model_key, DEFAULT_CONFIG) msgs = [] # If deep search enabled and using primary model, prepend deep search instructions and results if deep_search and model_display == MODEL_CHOICES[0]: msgs.append({"role": "system", "content": DEEP_SEARCH_INSTRUCTIONS}) try: async with httpx.AsyncClient() as client: payload = { "query": user_input, "topic": "general", "search_depth": "basic", "chunks_per_source": 5, "max_results": 5, "time_range": None, "days": 7, "include_answer": True, "include_raw_content": False, "include_images": False, "include_image_descriptions": False, "include_domains": [], "exclude_domains": [] } r = await client.post(DEEP_SEARCH_PROVIDER_HOST, headers={"Authorization": f"Bearer {DEEP_SEARCH_PROVIDER_KEY}"}, json=payload) sr_json = r.json() msgs.append({"role": "system", "content": json.dumps(sr_json)}) except Exception: # Fail silently if deep search fails pass msgs.append({"role": "system", "content": INTERNAL_AI_INSTRUCTIONS}) elif model_display == MODEL_CHOICES[0]: # For primary model without deep search, use internal instructions msgs.append({"role": "system", "content": INTERNAL_AI_INSTRUCTIONS}) else: # For other models, use default instructions msgs.append({"role": "system", "content": custom_prompt or SYSTEM_PROMPT_MAPPING.get(model_key, SYSTEM_PROMPT_DEFAULT)}) # Append conversation history alternating user and assistant messages msgs.extend([{"role": "user", "content": u} for u, _ in history]) msgs.extend([{"role": "assistant", "content": a} for _, a in history if a]) # Append current user input msgs.append({"role": "user", "content": user_input}) # Shuffle provider hosts and keys for load balancing and fallback candidates = [(h, k) for h in LINUX_SERVER_HOSTS for k in LINUX_SERVER_PROVIDER_KEYS] random.shuffle(candidates) # Try each host-key pair until a successful response is received for h, k in candidates: stream_gen = fetch_response_stream_async(h, k, model_key, msgs, cfg, sess.session_id, sess.stop_event, sess.cancel_token) got_responses = False async for chunk in stream_gen: if sess.stop_event.is_set() or sess.cancel_token["cancelled"]: return got_responses = True yield chunk if got_responses: return # If no response from any provider, yield fallback message yield ("content", RESPONSES["RESPONSE_2"]) # ============================ # Gradio Interaction Handlers # ============================ async def respond_async(multi, history, model_display, sess, custom_prompt, deep_search): """ Main async handler for user input submission. Supports text + file uploads (multi-modal input). Extracts file content and appends to user input. Streams AI responses back to UI, updating chat history live. Allows stopping response generation gracefully. """ ensure_stop_event(sess) sess.stop_event.clear() sess.cancel_token["cancelled"] = False # Extract text and files from multimodal input msg_input = {"text": multi.get("text", "").strip(), "files": multi.get("files", [])} # If no input, reset UI state and return if not msg_input["text"] and not msg_input["files"]: yield history, gr.update(value="", interactive=True, submit_btn=True, stop_btn=False), sess return # Initialize input with extracted file contents inp = "" for f in msg_input["files"]: # Support dict or direct file path fp = f.get("data", f.get("name", "")) if isinstance(f, dict) else f inp += f"{Path(fp).name}\n\n{extract_file_content(fp)}\n\n" # Append user text input if any if msg_input["text"]: inp += msg_input["text"] # Append user input to chat history with placeholder response history.append([inp, RESPONSES["RESPONSE_8"]]) yield history, gr.update(interactive=False, submit_btn=False, stop_btn=True), sess queue = asyncio.Queue() # Background async task to fetch streamed AI responses async def background(): reasoning = "" responses = "" content_started = False ignore_reasoning = False async for typ, chunk in chat_with_model_async(history, inp, model_display, sess, custom_prompt, deep_search): if sess.stop_event.is_set() or sess.cancel_token["cancelled"]: break if typ == "reasoning": if ignore_reasoning: continue reasoning += chunk await queue.put(("reasoning", reasoning)) elif typ == "content": if not content_started: content_started = True ignore_reasoning = True responses = chunk await queue.put(("reasoning", "")) # Clear reasoning on content start await queue.put(("replace", responses)) else: responses += chunk await queue.put(("append", responses)) await queue.put(None) return responses bg_task = asyncio.create_task(background()) stop_task = asyncio.create_task(sess.stop_event.wait()) try: while True: done, _ = await asyncio.wait({stop_task, asyncio.create_task(queue.get())}, return_when=asyncio.FIRST_COMPLETED) if stop_task in done: # User requested stop, cancel background task and update UI sess.cancel_token["cancelled"] = True bg_task.cancel() history[-1][1] = RESPONSES["RESPONSE_1"] yield history, gr.update(value="", interactive=True, submit_btn=True, stop_btn=False), sess return for d in done: result = d.result() if result is None: raise StopAsyncIteration action, text = result # Update last message content in history with streamed text history[-1][1] = text yield history, gr.update(interactive=False, submit_btn=False, stop_btn=True), sess except StopAsyncIteration: pass finally: stop_task.cancel() # Await full response to ensure completion full_response = await bg_task yield history, gr.update(value="", interactive=True, submit_btn=True, stop_btn=False), sess def change_model(new): """ Handler to change selected AI model. Resets chat history and session. Updates system instructions and deep search checkbox visibility accordingly. """ visible = new == MODEL_CHOICES[0] default_prompt = SYSTEM_PROMPT_MAPPING.get(get_model_key(new), SYSTEM_PROMPT_DEFAULT) return [], create_session(), new, default_prompt, False, gr.update(visible=visible) def stop_response(history, sess): """ Handler to stop ongoing AI response generation. Sets cancellation flags and updates last message to cancellation notice. """ ensure_stop_event(sess) sess.stop_event.set() sess.cancel_token["cancelled"] = True if history: history[-1][1] = RESPONSES["RESPONSE_1"] return history, None, create_session() # ============================ # Gradio UI Setup # ============================ with gr.Blocks(fill_height=True, fill_width=True, title=AI_TYPES["AI_TYPE_4"], head=META_TAGS) as jarvis: # States to keep chat history, user session, selected model and custom instructions user_history = gr.State([]) user_session = gr.State(create_session()) selected_model = gr.State(MODEL_CHOICES[0] if MODEL_CHOICES else "") J_A_R_V_I_S = gr.State("") # Chatbot UI component with initial welcome messages loaded from env variable chatbot = gr.Chatbot(label=AI_TYPES["AI_TYPE_1"], show_copy_button=True, scale=1, elem_id=AI_TYPES["AI_TYPE_2"], examples=JARVIS_INIT) # Checkbox to enable/disable deep search feature deep_search = gr.Checkbox(label=AI_TYPES["AI_TYPE_8"], value=False, info=AI_TYPES["AI_TYPE_9"], visible=True) # Multimodal Textbox (support text input + file upload. Limited to single file, restricted file types) msg = gr.MultimodalTextbox(show_label=False, placeholder=RESPONSES["RESPONSE_5"], interactive=True, file_count="single", file_types=ALLOWED_EXTENSIONS) # Sidebar with radio buttons to select AI model with gr.Sidebar(open=False): model_radio = gr.Radio(show_label=False, choices=MODEL_CHOICES, value=MODEL_CHOICES[0]) # When model changes, reset chat and update prompt/deep search checkbox visibility model_radio.change(fn=change_model, inputs=[model_radio], outputs=[user_history, user_session, selected_model, J_A_R_V_I_S, deep_search, deep_search]) # When initial welcome messages selected from chatbot, populate input box and trigger response def on_example_select(evt: gr.SelectData): return evt.value chatbot.example_select(fn=on_example_select, inputs=[], outputs=[msg]).then( fn=respond_async, inputs=[msg, user_history, selected_model, user_session, J_A_R_V_I_S, deep_search], outputs=[chatbot, msg, user_session] ) # Clear chat, resets history, session and states def clear_chat(history, sess, prompt, model): return [], create_session(), prompt, model, [] deep_search.change(fn=clear_chat, inputs=[user_history, user_session, J_A_R_V_I_S, selected_model], outputs=[chatbot, user_session, J_A_R_V_I_S, selected_model, user_history]) chatbot.clear(fn=clear_chat, inputs=[user_history, user_session, J_A_R_V_I_S, selected_model], outputs=[chatbot, user_session, J_A_R_V_I_S, selected_model, user_history]) # Submit message triggers async AI response generation msg.submit(fn=respond_async, inputs=[msg, user_history, selected_model, user_session, J_A_R_V_I_S, deep_search], outputs=[chatbot, msg, user_session], api_name=INTERNAL_AI_GET_SERVER) # Stop button cancels ongoing response generation msg.stop(fn=stop_response, inputs=[user_history, user_session], outputs=[chatbot, msg, user_session]) # Launch jarvis.queue(default_concurrency_limit=2).launch(max_file_size="1mb")