# # SPDX-FileCopyrightText: Hadad # SPDX-License-Identifier: Apache-2.0 # import asyncio import codecs # Reasoning import docx # Microsoft Word import gradio as gr import httpx import json import os import pandas as pd # Microsoft Excel import pdfplumber # PDF import pytesseract # OCR import random import requests import threading import uuid import zipfile # Microsoft Word import io from PIL import Image # OCR from pathlib import Path from pptx import Presentation # Microsoft PowerPoint from openpyxl import load_workbook # Microsoft Excel # ============================ # System Setup # ============================ # Install Tesseract OCR and dependencies for text extraction from images. os.system("apt-get update -q -y && \ apt-get install -q -y tesseract-ocr \ tesseract-ocr-eng tesseract-ocr-ind \ libleptonica-dev libtesseract-dev" ) # ============================ # HF Secrets Setup # ============================ # Initial welcome messages JARVIS_INIT = json.loads(os.getenv("HELLO", "[]")) # Deep Search DEEP_SEARCH_PROVIDER_HOST = os.getenv("DEEP_SEARCH_PROVIDER_HOST") DEEP_SEARCH_PROVIDER_KEY = os.getenv('DEEP_SEARCH_PROVIDER_KEY') DEEP_SEARCH_INSTRUCTIONS = os.getenv("DEEP_SEARCH_INSTRUCTIONS") # Servers and instructions INTERNAL_AI_GET_SERVER = os.getenv("INTERNAL_AI_GET_SERVER") INTERNAL_AI_INSTRUCTIONS = os.getenv("INTERNAL_TRAINING_DATA") # System instructions mapping SYSTEM_PROMPT_MAPPING = json.loads(os.getenv("SYSTEM_PROMPT_MAPPING", "{}")) SYSTEM_PROMPT_DEFAULT = os.getenv("DEFAULT_SYSTEM") # List of available servers LINUX_SERVER_HOSTS = [h for h in json.loads(os.getenv("LINUX_SERVER_HOST", "[]")) if h] # List of available keys LINUX_SERVER_PROVIDER_KEYS = [k for k in json.loads(os.getenv("LINUX_SERVER_PROVIDER_KEY", "[]")) if k] LINUX_SERVER_PROVIDER_KEYS_MARKED = set() LINUX_SERVER_PROVIDER_KEYS_ATTEMPTS = {} # Server errors codes LINUX_SERVER_ERRORS = set(map(int, filter(None, os.getenv("LINUX_SERVER_ERROR", "").split(",")))) # Personal UI AI_TYPES = {f"AI_TYPE_{i}": os.getenv(f"AI_TYPE_{i}") for i in range(1, 10)} RESPONSES = {f"RESPONSE_{i}": os.getenv(f"RESPONSE_{i}") for i in range(1, 11)} # Model mapping MODEL_MAPPING = json.loads(os.getenv("MODEL_MAPPING", "{}")) MODEL_CONFIG = json.loads(os.getenv("MODEL_CONFIG", "{}")) MODEL_CHOICES = list(MODEL_MAPPING.values()) # Default model config and key for fallback DEFAULT_CONFIG = json.loads(os.getenv("DEFAULT_CONFIG", "{}")) DEFAULT_MODEL_KEY = list(MODEL_MAPPING.keys())[0] if MODEL_MAPPING else None # HTML codes (SEO, etc.) META_TAGS = os.getenv("META_TAGS") # Allowed file extensions ALLOWED_EXTENSIONS = json.loads(os.getenv("ALLOWED_EXTENSIONS", "[]")) # ============================ # Session Management # ============================ class SessionWithID(requests.Session): """ Custom session object that holds a unique session ID and async control flags. Used to track individual user sessions and allow cancellation of ongoing requests. """ def __init__(self): super().__init__() self.session_id = str(uuid.uuid4()) # Unique ID per session self.stop_event = asyncio.Event() # Async event to signal stop requests self.cancel_token = {"cancelled": False} # Flag to indicate cancellation def create_session(): """ Create and return a new SessionWithID object. Called when a new user session starts or chat is reset. """ return SessionWithID() def ensure_stop_event(sess): """ Ensure that the session object has stop_event and cancel_token attributes. Useful when restoring or reusing sessions. """ if not hasattr(sess, "stop_event"): sess.stop_event = asyncio.Event() if not hasattr(sess, "cancel_token"): sess.cancel_token = {"cancelled": False} def marked_item(item, marked, attempts): """ Mark a provider key or host as temporarily problematic after repeated failures. Automatically unmark after 5 minutes to retry. This helps avoid repeatedly using failing providers. """ marked.add(item) attempts[item] = attempts.get(item, 0) + 1 if attempts[item] >= 3: def remove(): marked.discard(item) attempts.pop(item, None) threading.Timer(300, remove).start() def get_model_key(display): """ Get the internal model key (identifier) from the display name. Returns default model key if not found. """ return next((k for k, v in MODEL_MAPPING.items() if v == display), DEFAULT_MODEL_KEY) # ============================ # File Content Extraction Utilities # ============================ def extract_pdf_content(fp): """ Extract text content from PDF file. Includes OCR on embedded images to capture text within images. Also extracts tables as tab-separated text. """ content = "" try: with pdfplumber.open(fp) as pdf: for page in pdf.pages: # Extract text from page text = page.extract_text() or "" content += text + "\n" # OCR on images if any if page.images: img_obj = page.to_image(resolution=300) for img in page.images: bbox = (img["x0"], img["top"], img["x1"], img["bottom"]) cropped = img_obj.original.crop(bbox) ocr_text = pytesseract.image_to_string(cropped) if ocr_text.strip(): content += ocr_text + "\n" # Extract tables as TSV tables = page.extract_tables() for table in tables: for row in table: cells = [str(cell) for cell in row if cell is not None] if cells: content += "\t".join(cells) + "\n" except Exception as e: content += f"\n[Error reading PDF {fp}: {e}]" return content.strip() def extract_docx_content(fp): """ Extract text from Microsoft Word files. Also performs OCR on embedded images inside the Microsoft Word archive. """ content = "" try: doc = docx.Document(fp) # Extract paragraphs for para in doc.paragraphs: content += para.text + "\n" # Extract tables for table in doc.tables: for row in table.rows: cells = [cell.text for cell in row.cells] content += "\t".join(cells) + "\n" # OCR on embedded images inside Microsoft Word with zipfile.ZipFile(fp) as z: for file in z.namelist(): if file.startswith("word/media/"): data = z.read(file) try: img = Image.open(io.BytesIO(data)) ocr_text = pytesseract.image_to_string(img) if ocr_text.strip(): content += ocr_text + "\n" except Exception: # Ignore images that can't be processed pass except Exception as e: content += f"\n[Error reading Microsoft Word {fp}: {e}]" return content.strip() def extract_excel_content(fp): """ Extract content from Microsoft Excel files. Converts sheets to CSV text. Attempts OCR on embedded images if present. """ content = "" try: # Extract all sheets as CSV text sheets = pd.read_excel(fp, sheet_name=None) for name, df in sheets.items(): content += f"Sheet: {name}\n" content += df.to_csv(index=False) + "\n" # Load workbook to access images wb = load_workbook(fp, data_only=True) if wb._images: for image in wb._images: try: pil_img = Image.open(io.BytesIO(image._data())) ocr_text = pytesseract.image_to_string(pil_img) if ocr_text.strip(): content += ocr_text + "\n" except Exception: # Ignore images that can't be processed pass except Exception as e: content += f"\n[Error reading Microsoft Excel {fp}: {e}]" return content.strip() def extract_pptx_content(fp): """ Extract text content from Microsoft PowerPoint presentation slides. Includes text from shapes and tables. Performs OCR on embedded images. """ content = "" try: prs = Presentation(fp) for slide in prs.slides: for shape in slide.shapes: # Extract text from shapes if hasattr(shape, "text") and shape.text: content += shape.text + "\n" # OCR on images inside shapes if shape.shape_type == 13 and hasattr(shape, "image") and shape.image: try: img = Image.open(io.BytesIO(shape.image.blob)) ocr_text = pytesseract.image_to_string(img) if ocr_text.strip(): content += ocr_text + "\n" except Exception: pass # Extract tables for shape in slide.shapes: if shape.has_table: table = shape.table for row in table.rows: cells = [cell.text for cell in row.cells] content += "\t".join(cells) + "\n" except Exception as e: content += f"\n[Error reading Microsoft PowerPoint {fp}: {e}]" return content.strip() def extract_file_content(fp): """ Determine file type by extension and extract text content accordingly. For unknown types, attempts to read as plain text. """ ext = Path(fp).suffix.lower() if ext == ".pdf": return extract_pdf_content(fp) elif ext in [".doc", ".docx"]: return extract_docx_content(fp) elif ext in [".xlsx", ".xls"]: return extract_excel_content(fp) elif ext in [".ppt", ".pptx"]: return extract_pptx_content(fp) else: try: return Path(fp).read_text(encoding="utf-8").strip() except Exception as e: return f"\n[Error reading file {fp}: {e}]" # ============================ # AI Server Communication # ============================ async def fetch_response_stream_async(host, key, model, msgs, cfg, sid, stop_event, cancel_token): """ Async generator that streams AI responses from a backend server. Implements retry logic and marks failing keys to avoid repeated failures. Streams reasoning and content separately for richer UI updates. """ for timeout in [5, 10]: try: async with httpx.AsyncClient(timeout=timeout) as client: async with client.stream("POST", host, json={**{"model": model, "messages": msgs, "session_id": sid, "stream": True}, **cfg}, headers={"Authorization": f"Bearer {key}"}) as response: if response.status_code in LINUX_SERVER_ERRORS: marked_item(key, LINUX_SERVER_PROVIDER_KEYS_MARKED, LINUX_SERVER_PROVIDER_KEYS_ATTEMPTS) return async for line in response.aiter_lines(): if stop_event.is_set() or cancel_token["cancelled"]: return if not line: continue if line.startswith("data: "): data = line[6:] if data.strip() == RESPONSES["RESPONSE_10"]: return try: j = json.loads(data) if isinstance(j, dict) and j.get("choices"): for ch in j["choices"]: delta = ch.get("delta", {}) # Stream reasoning text separately for UI if "reasoning" in delta and delta["reasoning"]: decoded = delta["reasoning"].encode('utf-8').decode('unicode_escape') yield ("reasoning", decoded) # Stream main content text if "content" in delta and delta["content"]: yield ("content", delta["content"]) except Exception: # Ignore malformed JSON or unexpected data continue except Exception: # Network or other errors, try next timeout or mark key continue marked_item(key, LINUX_SERVER_PROVIDER_KEYS_MARKED, LINUX_SERVER_PROVIDER_KEYS_ATTEMPTS) return async def chat_with_model_async(history, user_input, model_display, sess, custom_prompt, deep_search): """ Core async function to interact with AI model. Prepares message history, system instructions, and optionally integrates deep search results. Tries multiple backend hosts and keys with fallback. Yields streamed responses for UI updates. """ ensure_stop_event(sess) sess.stop_event.clear() sess.cancel_token["cancelled"] = False if not LINUX_SERVER_PROVIDER_KEYS or not LINUX_SERVER_HOSTS: yield ("content", RESPONSES["RESPONSE_3"]) # No providers available return if not hasattr(sess, "session_id") or not sess.session_id: sess.session_id = str(uuid.uuid4()) model_key = get_model_key(model_display) cfg = MODEL_CONFIG.get(model_key, DEFAULT_CONFIG) msgs = [] # If deep search enabled and using primary model, prepend deep search instructions and results if deep_search and model_display == MODEL_CHOICES[0]: msgs.append({"role": "system", "content": DEEP_SEARCH_INSTRUCTIONS}) try: async with httpx.AsyncClient() as client: payload = { "query": user_input, "topic": "general", "search_depth": "basic", "chunks_per_source": 5, "max_results": 5, "time_range": None, "days": 7, "include_answer": True, "include_raw_content": False, "include_images": False, "include_image_descriptions": False, "include_domains": [], "exclude_domains": [] } r = await client.post(DEEP_SEARCH_PROVIDER_HOST, headers={"Authorization": f"Bearer {DEEP_SEARCH_PROVIDER_KEY}"}, json=payload) sr_json = r.json() msgs.append({"role": "system", "content": json.dumps(sr_json)}) except Exception: # Fail silently if deep search fails pass msgs.append({"role": "system", "content": INTERNAL_AI_INSTRUCTIONS}) elif model_display == MODEL_CHOICES[0]: # For primary model without deep search, use internal instructions msgs.append({"role": "system", "content": INTERNAL_AI_INSTRUCTIONS}) else: # For other models, use default instructions msgs.append({"role": "system", "content": custom_prompt or SYSTEM_PROMPT_MAPPING.get(model_key, SYSTEM_PROMPT_DEFAULT)}) # Append conversation history alternating user and assistant messages msgs.extend([{"role": "user", "content": u} for u, _ in history]) msgs.extend([{"role": "assistant", "content": a} for _, a in history if a]) # Append current user input msgs.append({"role": "user", "content": user_input}) # Shuffle provider hosts and keys for load balancing and fallback candidates = [(h, k) for h in LINUX_SERVER_HOSTS for k in LINUX_SERVER_PROVIDER_KEYS] random.shuffle(candidates) # Try each host-key pair until a successful response is received for h, k in candidates: stream_gen = fetch_response_stream_async(h, k, model_key, msgs, cfg, sess.session_id, sess.stop_event, sess.cancel_token) got_responses = False async for chunk in stream_gen: if sess.stop_event.is_set() or sess.cancel_token["cancelled"]: return got_responses = True yield chunk if got_responses: return # If no response from any provider, yield fallback message yield ("content", RESPONSES["RESPONSE_2"]) # ============================ # Gradio Interaction Handlers # ============================ async def respond_async(multi, history, model_display, sess, custom_prompt, deep_search): """ Main async handler for user input submission. Supports text + file uploads (multi-modal input). Extracts file content and appends to user input. Streams AI responses back to UI, updating chat history live. Allows stopping response generation gracefully. """ ensure_stop_event(sess) sess.stop_event.clear() sess.cancel_token["cancelled"] = False # Extract text and files from multimodal input msg_input = {"text": multi.get("text", "").strip(), "files": multi.get("files", [])} # If no input, reset UI state and return if not msg_input["text"] and not msg_input["files"]: yield history, gr.update(value="", interactive=True, submit_btn=True, stop_btn=False), sess return # Initialize input with extracted file contents inp = "" for f in msg_input["files"]: # Support dict or direct file path fp = f.get("data", f.get("name", "")) if isinstance(f, dict) else f inp += f"{Path(fp).name}\n\n{extract_file_content(fp)}\n\n" # Append user text input if any if msg_input["text"]: inp += msg_input["text"] # Append user input to chat history with placeholder response history.append([inp, RESPONSES["RESPONSE_8"]]) yield history, gr.update(interactive=False, submit_btn=False, stop_btn=True), sess queue = asyncio.Queue() # Background async task to fetch streamed AI responses async def background(): reasoning = "" responses = "" content_started = False ignore_reasoning = False async for typ, chunk in chat_with_model_async(history, inp, model_display, sess, custom_prompt, deep_search): if sess.stop_event.is_set() or sess.cancel_token["cancelled"]: break if typ == "reasoning": if ignore_reasoning: continue reasoning += chunk await queue.put(("reasoning", reasoning)) elif typ == "content": if not content_started: content_started = True ignore_reasoning = True responses = chunk await queue.put(("reasoning", "")) # Clear reasoning on content start await queue.put(("replace", responses)) else: responses += chunk await queue.put(("append", responses)) await queue.put(None) return responses bg_task = asyncio.create_task(background()) stop_task = asyncio.create_task(sess.stop_event.wait()) pending_tasks = {bg_task, stop_task} try: while True: queue_task = asyncio.create_task(queue.get()) pending_tasks.add(queue_task) done, _ = await asyncio.wait({stop_task, queue_task}, return_when=asyncio.FIRST_COMPLETED) for task in done: pending_tasks.discard(task) if task is stop_task: # User requested stop, cancel background task and update UI sess.cancel_token["cancelled"] = True bg_task.cancel() try: await bg_task except asyncio.CancelledError: pass history[-1][1] = RESPONSES["RESPONSE_1"] yield history, gr.update(value="", interactive=True, submit_btn=True, stop_btn=False), sess return result = task.result() if result is None: raise StopAsyncIteration action, text = result # Update last message content in history with streamed text history[-1][1] = text yield history, gr.update(interactive=False, submit_btn=False, stop_btn=True), sess except StopAsyncIteration: pass finally: for task in pending_tasks: task.cancel() await asyncio.gather(*pending_tasks, return_exceptions=True) yield history, gr.update(value="", interactive=True, submit_btn=True, stop_btn=False), sess def change_model(new): """ Handler to change selected AI model. Resets chat history and session. Updates system instructions and deep search checkbox visibility accordingly. """ visible = new == MODEL_CHOICES[0] default_prompt = SYSTEM_PROMPT_MAPPING.get(get_model_key(new), SYSTEM_PROMPT_DEFAULT) return [], create_session(), new, default_prompt, False, gr.update(visible=visible) def stop_response(history, sess): """ Handler to stop ongoing AI response generation. Sets cancellation flags and updates last message to cancellation notice. """ ensure_stop_event(sess) sess.stop_event.set() sess.cancel_token["cancelled"] = True if history: history[-1][1] = RESPONSES["RESPONSE_1"] return history, None, create_session() # ============================ # Gradio UI Setup # ============================ with gr.Blocks(fill_height=True, fill_width=True, title=AI_TYPES["AI_TYPE_4"], head=META_TAGS) as jarvis: user_history = gr.State([]) user_session = gr.State(create_session()) selected_model = gr.State(MODEL_CHOICES[0] if MODEL_CHOICES else "") J_A_R_V_I_S = gr.State("") # Chatbot UI chatbot = gr.Chatbot(label=AI_TYPES["AI_TYPE_1"], show_copy_button=True, scale=1, elem_id=AI_TYPES["AI_TYPE_2"], examples=JARVIS_INIT) # Deep search deep_search = gr.Checkbox(label=AI_TYPES["AI_TYPE_8"], value=False, info=AI_TYPES["AI_TYPE_9"], visible=True) # User's input msg = gr.MultimodalTextbox(show_label=False, placeholder=RESPONSES["RESPONSE_5"], interactive=True, file_count="single", file_types=ALLOWED_EXTENSIONS) # Sidebar to select AI models with gr.Sidebar(open=False): model_radio = gr.Radio(show_label=False, choices=MODEL_CHOICES, value=MODEL_CHOICES[0]) # Models change model_radio.change(fn=change_model, inputs=[model_radio], outputs=[user_history, user_session, selected_model, J_A_R_V_I_S, deep_search, deep_search]) # Initial welcome messages def on_example_select(evt: gr.SelectData): return evt.value chatbot.example_select(fn=on_example_select, inputs=[], outputs=[msg]).then(fn=respond_async, inputs=[msg, user_history, selected_model, user_session, J_A_R_V_I_S, deep_search], outputs=[chatbot, msg, user_session]) # Clear chat def clear_chat(history, sess, prompt, model): return [], create_session(), prompt, model, [] deep_search.change(fn=clear_chat, inputs=[user_history, user_session, J_A_R_V_I_S, selected_model], outputs=[chatbot, user_session, J_A_R_V_I_S, selected_model, user_history]) chatbot.clear(fn=clear_chat, inputs=[user_history, user_session, J_A_R_V_I_S, selected_model], outputs=[chatbot, user_session, J_A_R_V_I_S, selected_model, user_history]) # Submit message msg.submit(fn=respond_async, inputs=[msg, user_history, selected_model, user_session, J_A_R_V_I_S, deep_search], outputs=[chatbot, msg, user_session], api_name=INTERNAL_AI_GET_SERVER) # Stop message msg.stop(fn=stop_response, inputs=[user_history, user_session], outputs=[chatbot, msg, user_session]) # Launch jarvis.queue(default_concurrency_limit=2).launch(max_file_size="1mb")