ai / jarvis.py
hadadrjt's picture
ai: Switch to production code.
acbfb1d
#
# SPDX-FileCopyrightText: Hadad <[email protected]>
# SPDX-License-Identifier: Apache-2.0
#
import asyncio
import codecs
import docx
import gradio as gr
import httpx
import json
import os
import pandas as pd
import pdfplumber
import pytesseract
import random
import requests
import threading
import uuid
import zipfile
import io
from PIL import Image
from pathlib import Path
from pptx import Presentation
from openpyxl import load_workbook
# Install OCR tools and dependencies
os.system("apt-get update -q -y && apt-get install -q -y tesseract-ocr tesseract-ocr-eng tesseract-ocr-ind libleptonica-dev libtesseract-dev")
# ============================
# Environments
# ============================
# Initial welcome messages
JARVIS_INIT = json.loads(os.getenv("HELLO", "[]"))
# Deep Search
DEEP_SEARCH_PROVIDER_HOST = os.getenv("DEEP_SEARCH_PROVIDER_HOST")
DEEP_SEARCH_PROVIDER_KEY = os.getenv('DEEP_SEARCH_PROVIDER_KEY')
DEEP_SEARCH_INSTRUCTIONS = os.getenv("DEEP_SEARCH_INSTRUCTIONS")
# AI servers, keys and instructions
INTERNAL_AI_GET_SERVER = os.getenv("INTERNAL_AI_GET_SERVER")
INTERNAL_AI_INSTRUCTIONS = os.getenv("INTERNAL_TRAINING_DATA")
# System instructions mapping for various models and default instructions
SYSTEM_PROMPT_MAPPING = json.loads(os.getenv("SYSTEM_PROMPT_MAPPING", "{}"))
SYSTEM_PROMPT_DEFAULT = os.getenv("DEFAULT_SYSTEM")
# List of available servers and keys
LINUX_SERVER_HOSTS = [h for h in json.loads(os.getenv("LINUX_SERVER_HOST", "[]")) if h]
LINUX_SERVER_PROVIDER_KEYS = [k for k in json.loads(os.getenv("LINUX_SERVER_PROVIDER_KEY", "[]")) if k]
# Sets and dicts to track problematic keys and their retry attempts
LINUX_SERVER_PROVIDER_KEYS_MARKED = set()
LINUX_SERVER_PROVIDER_KEYS_ATTEMPTS = {}
# HTTP error codes considered as server errors for marking keys
LINUX_SERVER_ERRORS = set(map(int, filter(None, os.getenv("LINUX_SERVER_ERROR", "").split(","))))
# UI labels and responses from environment variables for easy localization/customization
AI_TYPES = {f"AI_TYPE_{i}": os.getenv(f"AI_TYPE_{i}") for i in range(1, 10)}
RESPONSES = {f"RESPONSE_{i}": os.getenv(f"RESPONSE_{i}") for i in range(1, 11)}
# Model mapping and configuration loaded from environment variables
MODEL_MAPPING = json.loads(os.getenv("MODEL_MAPPING", "{}"))
MODEL_CONFIG = json.loads(os.getenv("MODEL_CONFIG", "{}"))
MODEL_CHOICES = list(MODEL_MAPPING.values())
# Default model config and key for fallback
DEFAULT_CONFIG = json.loads(os.getenv("DEFAULT_CONFIG", "{}"))
DEFAULT_MODEL_KEY = list(MODEL_MAPPING.keys())[0] if MODEL_MAPPING else None
# Meta tags for HTML head (SEO, etc.)
META_TAGS = os.getenv("META_TAGS")
# Allowed file extensions for upload (e.g., .pdf, .docx, etc.)
ALLOWED_EXTENSIONS = json.loads(os.getenv("ALLOWED_EXTENSIONS", "[]"))
# ============================
# Session Management
# ============================
class SessionWithID(requests.Session):
"""
Custom session object that holds a unique session ID and async control flags.
Used to track individual user sessions and allow cancellation of ongoing requests.
"""
def __init__(self):
super().__init__()
self.session_id = str(uuid.uuid4()) # Unique ID per session
self.stop_event = asyncio.Event() # Async event to signal stop requests
self.cancel_token = {"cancelled": False} # Flag to indicate cancellation
def create_session():
"""
Create and return a new SessionWithID object.
Called when a new user session starts or chat is reset.
"""
return SessionWithID()
def ensure_stop_event(sess):
"""
Ensure that the session object has stop_event and cancel_token attributes.
Useful when restoring or reusing sessions.
"""
if not hasattr(sess, "stop_event"):
sess.stop_event = asyncio.Event()
if not hasattr(sess, "cancel_token"):
sess.cancel_token = {"cancelled": False}
def marked_item(item, marked, attempts):
"""
Mark a provider key or host as temporarily problematic after repeated failures.
Automatically unmark after 5 minutes to retry.
This helps avoid repeatedly using failing providers.
"""
marked.add(item)
attempts[item] = attempts.get(item, 0) + 1
if attempts[item] >= 3:
def remove():
marked.discard(item)
attempts.pop(item, None)
threading.Timer(300, remove).start()
def get_model_key(display):
"""
Get the internal model key (identifier) from the display name.
Returns default model key if not found.
"""
return next((k for k, v in MODEL_MAPPING.items() if v == display), DEFAULT_MODEL_KEY)
# ============================
# File Content Extraction Utilities
# ============================
def extract_pdf_content(fp):
"""
Extract text content from PDF file.
Includes OCR on embedded images to capture text within images.
Also extracts tables as tab-separated text.
"""
content = ""
try:
with pdfplumber.open(fp) as pdf:
for page in pdf.pages:
# Extract text from page
text = page.extract_text() or ""
content += text + "\n"
# OCR on images if any
if page.images:
img_obj = page.to_image(resolution=300)
for img in page.images:
bbox = (img["x0"], img["top"], img["x1"], img["bottom"])
cropped = img_obj.original.crop(bbox)
ocr_text = pytesseract.image_to_string(cropped)
if ocr_text.strip():
content += ocr_text + "\n"
# Extract tables as TSV
tables = page.extract_tables()
for table in tables:
for row in table:
cells = [str(cell) for cell in row if cell is not None]
if cells:
content += "\t".join(cells) + "\n"
except Exception as e:
content += f"\n[Error reading PDF {fp}: {e}]"
return content.strip()
def extract_docx_content(fp):
"""
Extract text from Microsoft Word files.
Also performs OCR on embedded images inside the Microsoft Word archive.
"""
content = ""
try:
doc = docx.Document(fp)
# Extract paragraphs
for para in doc.paragraphs:
content += para.text + "\n"
# Extract tables
for table in doc.tables:
for row in table.rows:
cells = [cell.text for cell in row.cells]
content += "\t".join(cells) + "\n"
# OCR on embedded images inside Microsoft Word
with zipfile.ZipFile(fp) as z:
for file in z.namelist():
if file.startswith("word/media/"):
data = z.read(file)
try:
img = Image.open(io.BytesIO(data))
ocr_text = pytesseract.image_to_string(img)
if ocr_text.strip():
content += ocr_text + "\n"
except Exception:
# Ignore images that can't be processed
pass
except Exception as e:
content += f"\n[Error reading Microsoft Word {fp}: {e}]"
return content.strip()
def extract_excel_content(fp):
"""
Extract content from Microsoft Excel files.
Converts sheets to CSV text.
Attempts OCR on embedded images if present.
"""
content = ""
try:
# Extract all sheets as CSV text
sheets = pd.read_excel(fp, sheet_name=None)
for name, df in sheets.items():
content += f"Sheet: {name}\n"
content += df.to_csv(index=False) + "\n"
# Load workbook to access images
wb = load_workbook(fp, data_only=True)
if wb._images:
for image in wb._images:
try:
pil_img = Image.open(io.BytesIO(image._data()))
ocr_text = pytesseract.image_to_string(pil_img)
if ocr_text.strip():
content += ocr_text + "\n"
except Exception:
# Ignore images that can't be processed
pass
except Exception as e:
content += f"\n[Error reading Microsoft Excel {fp}: {e}]"
return content.strip()
def extract_pptx_content(fp):
"""
Extract text content from Microsoft PowerPoint presentation slides.
Includes text from shapes and tables.
Performs OCR on embedded images.
"""
content = ""
try:
prs = Presentation(fp)
for slide in prs.slides:
for shape in slide.shapes:
# Extract text from shapes
if hasattr(shape, "text") and shape.text:
content += shape.text + "\n"
# OCR on images inside shapes
if shape.shape_type == 13 and hasattr(shape, "image") and shape.image:
try:
img = Image.open(io.BytesIO(shape.image.blob))
ocr_text = pytesseract.image_to_string(img)
if ocr_text.strip():
content += ocr_text + "\n"
except Exception:
pass
# Extract tables
for shape in slide.shapes:
if shape.has_table:
table = shape.table
for row in table.rows:
cells = [cell.text for cell in row.cells]
content += "\t".join(cells) + "\n"
except Exception as e:
content += f"\n[Error reading Microsoft PowerPoint {fp}: {e}]"
return content.strip()
def extract_file_content(fp):
"""
Determine file type by extension and extract text content accordingly.
For unknown types, attempts to read as plain text.
"""
ext = Path(fp).suffix.lower()
if ext == ".pdf":
return extract_pdf_content(fp)
elif ext in [".doc", ".docx"]:
return extract_docx_content(fp)
elif ext in [".xlsx", ".xls"]:
return extract_excel_content(fp)
elif ext in [".ppt", ".pptx"]:
return extract_pptx_content(fp)
else:
try:
return Path(fp).read_text(encoding="utf-8").strip()
except Exception as e:
return f"\n[Error reading file {fp}: {e}]"
# ============================
# Server Communication
# ============================
async def fetch_response_stream_async(host, key, model, msgs, cfg, sid, stop_event, cancel_token):
"""
Async generator that streams AI responses from a backend server.
Implements retry logic and marks failing keys to avoid repeated failures.
Streams reasoning and content separately for richer UI updates.
"""
for timeout in [5, 10]:
try:
async with httpx.AsyncClient(timeout=timeout) as client:
async with client.stream(
"POST",
host,
json={**{"model": model, "messages": msgs, "session_id": sid, "stream": True}, **cfg},
headers={"Authorization": f"Bearer {key}"}
) as response:
if response.status_code in LINUX_SERVER_ERRORS:
marked_item(key, LINUX_SERVER_PROVIDER_KEYS_MARKED, LINUX_SERVER_PROVIDER_KEYS_ATTEMPTS)
return
async for line in response.aiter_lines():
if stop_event.is_set() or cancel_token["cancelled"]:
return
if not line:
continue
if line.startswith("data: "):
data = line[6:]
if data.strip() == RESPONSES["RESPONSE_10"]:
return
try:
j = json.loads(data)
if isinstance(j, dict) and j.get("choices"):
for ch in j["choices"]:
delta = ch.get("delta", {})
# Stream reasoning text separately for UI
if "reasoning" in delta and delta["reasoning"]:
decoded = delta["reasoning"].encode('utf-8').decode('unicode_escape')
yield ("reasoning", decoded)
# Stream main content text
if "content" in delta and delta["content"]:
yield ("content", delta["content"])
except Exception:
# Ignore malformed JSON or unexpected data
continue
except Exception:
# Network or other errors, try next timeout or mark key
continue
marked_item(key, LINUX_SERVER_PROVIDER_KEYS_MARKED, LINUX_SERVER_PROVIDER_KEYS_ATTEMPTS)
return
async def chat_with_model_async(history, user_input, model_display, sess, custom_prompt, deep_search):
"""
Core async function to interact with AI model.
Prepares message history, system instructions, and optionally integrates deep search results.
Tries multiple backend hosts and keys with fallback.
Yields streamed responses for UI updates.
"""
ensure_stop_event(sess)
sess.stop_event.clear()
sess.cancel_token["cancelled"] = False
if not LINUX_SERVER_PROVIDER_KEYS or not LINUX_SERVER_HOSTS:
yield ("content", RESPONSES["RESPONSE_3"]) # No providers available
return
if not hasattr(sess, "session_id") or not sess.session_id:
sess.session_id = str(uuid.uuid4())
model_key = get_model_key(model_display)
cfg = MODEL_CONFIG.get(model_key, DEFAULT_CONFIG)
msgs = []
# If deep search enabled and using primary model, prepend deep search instructions and results
if deep_search and model_display == MODEL_CHOICES[0]:
msgs.append({"role": "system", "content": DEEP_SEARCH_INSTRUCTIONS})
try:
async with httpx.AsyncClient() as client:
payload = {
"query": user_input,
"topic": "general",
"search_depth": "basic",
"chunks_per_source": 5,
"max_results": 5,
"time_range": None,
"days": 7,
"include_answer": True,
"include_raw_content": False,
"include_images": False,
"include_image_descriptions": False,
"include_domains": [],
"exclude_domains": []
}
r = await client.post(DEEP_SEARCH_PROVIDER_HOST, headers={"Authorization": f"Bearer {DEEP_SEARCH_PROVIDER_KEY}"}, json=payload)
sr_json = r.json()
msgs.append({"role": "system", "content": json.dumps(sr_json)})
except Exception:
# Fail silently if deep search fails
pass
msgs.append({"role": "system", "content": INTERNAL_AI_INSTRUCTIONS})
elif model_display == MODEL_CHOICES[0]:
# For primary model without deep search, use internal instructions
msgs.append({"role": "system", "content": INTERNAL_AI_INSTRUCTIONS})
else:
# For other models, use default instructions
msgs.append({"role": "system", "content": custom_prompt or SYSTEM_PROMPT_MAPPING.get(model_key, SYSTEM_PROMPT_DEFAULT)})
# Append conversation history alternating user and assistant messages
msgs.extend([{"role": "user", "content": u} for u, _ in history])
msgs.extend([{"role": "assistant", "content": a} for _, a in history if a])
# Append current user input
msgs.append({"role": "user", "content": user_input})
# Shuffle provider hosts and keys for load balancing and fallback
candidates = [(h, k) for h in LINUX_SERVER_HOSTS for k in LINUX_SERVER_PROVIDER_KEYS]
random.shuffle(candidates)
# Try each host-key pair until a successful response is received
for h, k in candidates:
stream_gen = fetch_response_stream_async(h, k, model_key, msgs, cfg, sess.session_id, sess.stop_event, sess.cancel_token)
got_responses = False
async for chunk in stream_gen:
if sess.stop_event.is_set() or sess.cancel_token["cancelled"]:
return
got_responses = True
yield chunk
if got_responses:
return
# If no response from any provider, yield fallback message
yield ("content", RESPONSES["RESPONSE_2"])
# ============================
# Gradio Interaction Handlers
# ============================
async def respond_async(multi, history, model_display, sess, custom_prompt, deep_search):
"""
Main async handler for user input submission.
Supports text + file uploads (multi-modal input).
Extracts file content and appends to user input.
Streams AI responses back to UI, updating chat history live.
Allows stopping response generation gracefully.
"""
ensure_stop_event(sess)
sess.stop_event.clear()
sess.cancel_token["cancelled"] = False
# Extract text and files from multimodal input
msg_input = {"text": multi.get("text", "").strip(), "files": multi.get("files", [])}
# If no input, reset UI state and return
if not msg_input["text"] and not msg_input["files"]:
yield history, gr.update(value="", interactive=True, submit_btn=True, stop_btn=False), sess
return
# Initialize input with extracted file contents
inp = ""
for f in msg_input["files"]:
# Support dict or direct file path
fp = f.get("data", f.get("name", "")) if isinstance(f, dict) else f
inp += f"{Path(fp).name}\n\n{extract_file_content(fp)}\n\n"
# Append user text input if any
if msg_input["text"]:
inp += msg_input["text"]
# Append user input to chat history with placeholder response
history.append([inp, RESPONSES["RESPONSE_8"]])
yield history, gr.update(interactive=False, submit_btn=False, stop_btn=True), sess
queue = asyncio.Queue()
# Background async task to fetch streamed AI responses
async def background():
reasoning = ""
responses = ""
content_started = False
ignore_reasoning = False
async for typ, chunk in chat_with_model_async(history, inp, model_display, sess, custom_prompt, deep_search):
if sess.stop_event.is_set() or sess.cancel_token["cancelled"]:
break
if typ == "reasoning":
if ignore_reasoning:
continue
reasoning += chunk
await queue.put(("reasoning", reasoning))
elif typ == "content":
if not content_started:
content_started = True
ignore_reasoning = True
responses = chunk
await queue.put(("reasoning", "")) # Clear reasoning on content start
await queue.put(("replace", responses))
else:
responses += chunk
await queue.put(("append", responses))
await queue.put(None)
return responses
bg_task = asyncio.create_task(background())
stop_task = asyncio.create_task(sess.stop_event.wait())
try:
while True:
done, _ = await asyncio.wait({stop_task, asyncio.create_task(queue.get())}, return_when=asyncio.FIRST_COMPLETED)
if stop_task in done:
# User requested stop, cancel background task and update UI
sess.cancel_token["cancelled"] = True
bg_task.cancel()
history[-1][1] = RESPONSES["RESPONSE_1"]
yield history, gr.update(value="", interactive=True, submit_btn=True, stop_btn=False), sess
return
for d in done:
result = d.result()
if result is None:
raise StopAsyncIteration
action, text = result
# Update last message content in history with streamed text
history[-1][1] = text
yield history, gr.update(interactive=False, submit_btn=False, stop_btn=True), sess
except StopAsyncIteration:
pass
finally:
stop_task.cancel()
# Await full response to ensure completion
full_response = await bg_task
yield history, gr.update(value="", interactive=True, submit_btn=True, stop_btn=False), sess
def change_model(new):
"""
Handler to change selected AI model.
Resets chat history and session.
Updates system instructions and deep search checkbox visibility accordingly.
"""
visible = new == MODEL_CHOICES[0]
default_prompt = SYSTEM_PROMPT_MAPPING.get(get_model_key(new), SYSTEM_PROMPT_DEFAULT)
return [], create_session(), new, default_prompt, False, gr.update(visible=visible)
def stop_response(history, sess):
"""
Handler to stop ongoing AI response generation.
Sets cancellation flags and updates last message to cancellation notice.
"""
ensure_stop_event(sess)
sess.stop_event.set()
sess.cancel_token["cancelled"] = True
if history:
history[-1][1] = RESPONSES["RESPONSE_1"]
return history, None, create_session()
# ============================
# Gradio UI Setup
# ============================
with gr.Blocks(fill_height=True, fill_width=True, title=AI_TYPES["AI_TYPE_4"], head=META_TAGS) as jarvis:
# States to keep chat history, user session, selected model and custom instructions
user_history = gr.State([])
user_session = gr.State(create_session())
selected_model = gr.State(MODEL_CHOICES[0] if MODEL_CHOICES else "")
J_A_R_V_I_S = gr.State("")
# Chatbot UI component with initial welcome messages loaded from env variable
chatbot = gr.Chatbot(label=AI_TYPES["AI_TYPE_1"], show_copy_button=True, scale=1, elem_id=AI_TYPES["AI_TYPE_2"], examples=JARVIS_INIT)
# Checkbox to enable/disable deep search feature
deep_search = gr.Checkbox(label=AI_TYPES["AI_TYPE_8"], value=False, info=AI_TYPES["AI_TYPE_9"], visible=True)
# Multimodal Textbox (support text input + file upload. Limited to single file, restricted file types)
msg = gr.MultimodalTextbox(show_label=False, placeholder=RESPONSES["RESPONSE_5"], interactive=True, file_count="single", file_types=ALLOWED_EXTENSIONS)
# Sidebar with radio buttons to select AI model
with gr.Sidebar(open=False):
model_radio = gr.Radio(show_label=False, choices=MODEL_CHOICES, value=MODEL_CHOICES[0])
# When model changes, reset chat and update prompt/deep search checkbox visibility
model_radio.change(fn=change_model, inputs=[model_radio], outputs=[user_history, user_session, selected_model, J_A_R_V_I_S, deep_search, deep_search])
# When initial welcome messages selected from chatbot, populate input box and trigger response
def on_example_select(evt: gr.SelectData):
return evt.value
chatbot.example_select(fn=on_example_select, inputs=[], outputs=[msg]).then(
fn=respond_async,
inputs=[msg, user_history, selected_model, user_session, J_A_R_V_I_S, deep_search],
outputs=[chatbot, msg, user_session]
)
# Clear chat resets history, session and states
def clear_chat(history, sess, prompt, model):
return [], create_session(), prompt, model, []
deep_search.change(fn=clear_chat, inputs=[user_history, user_session, J_A_R_V_I_S, selected_model], outputs=[chatbot, user_session, J_A_R_V_I_S, selected_model, user_history])
chatbot.clear(fn=clear_chat, inputs=[user_history, user_session, J_A_R_V_I_S, selected_model], outputs=[chatbot, user_session, J_A_R_V_I_S, selected_model, user_history])
# Submit message triggers async AI response generation
msg.submit(fn=respond_async, inputs=[msg, user_history, selected_model, user_session, J_A_R_V_I_S, deep_search], outputs=[chatbot, msg, user_session], api_name=INTERNAL_AI_GET_SERVER)
# Stop button cancels ongoing response generation
msg.stop(fn=stop_response, inputs=[user_history, user_session], outputs=[chatbot, msg, user_session])
# Launch
jarvis.queue(default_concurrency_limit=2).launch(max_file_size="1mb")