# # Copyright (C) Hadad # All rights reserved. # # This code is made available under the Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International (CC BY-NC-SA 4.0) License. # You are free to share and adapt the code for non-commercial purposes, as long as you provide appropriate credit, # do not use it for commercial purposes, and distribute your contributions under the same license. # # Contributions can be made by directly submitting pull requests. # # For inquiries or permission requests, please contact hadad@linuxmail.org. # # License: Creative Commons Attribution-NonCommercial-ShareAlike 4.0 International (CC BY-NC-SA 4.0) # import gradio as gr import requests import json import os import threading import random import time import pytesseract import pdfplumber import docx import pandas as pd import pptx import fitz import io from pathlib import Path from PIL import Image LINUX_SERVER_HOSTS = [host for host in json.loads(os.getenv("LINUX_SERVER_HOST", "[]")) if host] LINUX_SERVER_PROVIDER_KEYS = [key for key in json.loads(os.getenv("LINUX_SERVER_PROVIDER_KEY", "[]")) if key] AI_TYPES = {f"AI_TYPE_{i}": os.getenv(f"AI_TYPE_{i}") for i in range(1, 6)} RESPONSES = {f"RESPONSE_{i}": os.getenv(f"RESPONSE_{i}") for i in range(1, 10)} MODEL_MAPPING = json.loads(os.getenv("MODEL_MAPPING", "{}")) MODEL_CONFIG = json.loads(os.getenv("MODEL_CONFIG", "{}")) MODEL_CHOICES = list(MODEL_MAPPING.values()) DEFAULT_CONFIG = json.loads(os.getenv("DEFAULT_CONFIG", "{}")) META_TAGS = os.getenv("META_TAGS") stop_event = threading.Event() session = requests.Session() def get_model_key(display_name): return next((k for k, v in MODEL_MAPPING.items() if v == display_name), MODEL_CHOICES[0]) def extract_text(file_path): ext = Path(file_path).suffix.lower() if ext == ".txt": try: with open(file_path, "r", encoding="utf-8") as file: return file.read() except: return "" elif ext == ".pdf": text = [] try: with pdfplumber.open(file_path) as pdf: for page in pdf.pages: text.append(page.extract_text() or "") if not "".join(text).strip(): text = extract_text_from_pdf_images(file_path) except: return "" return "\n".join(text) elif ext in [".doc", ".docx"]: try: doc = docx.Document(file_path) text = "\n".join([para.text for para in doc.paragraphs]) if not text.strip(): text = extract_text_from_doc_images(file_path) return text except: return "" elif ext in [".xls", ".xlsx"]: try: df = pd.read_excel(file_path) return df.to_string() except: return "" elif ext in [".ppt", ".pptx"]: try: prs = pptx.Presentation(file_path) text = [] for slide in prs.slides: for shape in slide.shapes: if hasattr(shape, "text"): text.append(shape.text) return "\n".join(text) except: return "" return "" def extract_text_from_pdf_images(pdf_path): text = [] try: doc = fitz.open(pdf_path) for page_num in range(len(doc)): pix = doc[page_num].get_pixmap() img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) text.append(pytesseract.image_to_string(img)) except: return [] return text def extract_text_from_doc_images(doc_path): text = [] try: doc = docx.Document(doc_path) for rel in doc.part.rels: if "image" in doc.part.rels[rel].target_ref: img_data = doc.part.rels[rel].target_part.blob img = Image.open(io.BytesIO(img_data)) text.append(pytesseract.image_to_string(img)) except: return [] return "\n".join(text) def simulate_streaming_response(text): for line in text.splitlines(): if stop_event.is_set(): return yield line + "\n" time.sleep(0.05) def chat_with_model(history, user_input, selected_model_display): if stop_event.is_set(): yield RESPONSES["RESPONSE_1"] return if not LINUX_SERVER_PROVIDER_KEYS or not LINUX_SERVER_HOSTS: yield RESPONSES["RESPONSE_3"] return selected_model = get_model_key(selected_model_display) model_config = MODEL_CONFIG.get(selected_model, DEFAULT_CONFIG) messages = [{"role": "user", "content": user} for user, _ in history] messages += [{"role": "assistant", "content": assistant} for _, assistant in history if assistant] messages.append({"role": "user", "content": user_input}) data = {"model": selected_model, "messages": messages, **model_config} random.shuffle(LINUX_SERVER_PROVIDER_KEYS) random.shuffle(LINUX_SERVER_HOSTS) for api_key in LINUX_SERVER_PROVIDER_KEYS[:2]: for host in LINUX_SERVER_HOSTS[:2]: if stop_event.is_set(): yield RESPONSES["RESPONSE_1"] return try: response = session.post(host, json=data, headers={"Authorization": f"Bearer {api_key}"}) if stop_event.is_set(): yield RESPONSES["RESPONSE_1"] return if response.status_code < 400: ai_text = response.json().get("choices", [{}])[0].get("message", {}).get("content", RESPONSES["RESPONSE_2"]) yield from simulate_streaming_response(ai_text) return except requests.exceptions.RequestException: continue yield RESPONSES["RESPONSE_3"] def respond(user_input, file_path, history, selected_model_display): file_text = extract_text(file_path) if file_path else "" combined_input = f"{user_input}\n\n{file_text}".strip() if not combined_input: yield history, gr.update(value=""), gr.update(visible=False, interactive=False), gr.update(visible=True) return stop_event.clear() history.append([combined_input, RESPONSES["RESPONSE_8"]]) yield history, gr.update(value=""), gr.update(visible=False), gr.update(visible=True) ai_response = "" for chunk in chat_with_model(history, combined_input, selected_model_display): if stop_event.is_set(): history[-1][1] = RESPONSES["RESPONSE_1"] yield history, gr.update(value=""), gr.update(visible=True), gr.update(visible=False) return ai_response += chunk history[-1][1] = ai_response yield history, gr.update(value=""), gr.update(visible=False), gr.update(visible=True) yield history, gr.update(value=""), gr.update(visible=True), gr.update(visible=False) def stop_response(): stop_event.set() session.close() def change_model(new_model_display): return [], new_model_display def check_send_button_enabled(msg, file): return gr.update(visible=bool(msg.strip()) or bool(file), interactive=bool(msg.strip()) or bool(file)) with gr.Blocks(fill_height=True, fill_width=True, title=AI_TYPES["AI_TYPE_4"], head=META_TAGS) as demo: user_history = gr.State([]) selected_model = gr.State(MODEL_CHOICES[0]) chatbot = gr.Chatbot(label=AI_TYPES["AI_TYPE_1"], show_copy_button=True, show_share_button=False, scale=1, elem_id=AI_TYPES["AI_TYPE_2"]) model_dropdown = gr.Dropdown(label=AI_TYPES["AI_TYPE_3"], show_label=False, choices=MODEL_CHOICES, value=MODEL_CHOICES[0], interactive=True) msg = gr.Textbox(label=RESPONSES["RESPONSE_4"], show_label=False, scale=0, placeholder=RESPONSES["RESPONSE_5"]) with gr.Row(): send_btn = gr.Button(RESPONSES["RESPONSE_6"], visible=True, interactive=False) stop_btn = gr.Button(RESPONSES["RESPONSE_7"], variant=RESPONSES["RESPONSE_9"], visible=False) with gr.Accordion("See more...", open=False): file_upload = gr.File(label=AI_TYPES["AI_TYPE_5"], file_count="single", type="filepath") model_dropdown.change(fn=change_model, inputs=[model_dropdown], outputs=[user_history, selected_model]) send_btn.click(respond, inputs=[msg, file_upload, user_history, selected_model], outputs=[chatbot, msg, send_btn, stop_btn]) msg.change(fn=check_send_button_enabled, inputs=[msg, file_upload], outputs=[send_btn]) stop_btn.click(fn=stop_response, outputs=[send_btn, stop_btn]) file_upload.change(fn=check_send_button_enabled, inputs=[msg, file_upload], outputs=[send_btn]) demo.launch(show_api=False, max_file_size="1mb")