Spaces:
Sleeping
Sleeping
import gradio as gr | |
from langchain_community.document_loaders import UnstructuredMarkdownLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_core.documents import Document | |
from langchain_huggingface import HuggingFaceEmbeddings | |
from langchain_community.vectorstores import FAISS | |
from langchain_community.llms import HuggingFaceHub | |
from langchain.prompts import ChatPromptTemplate | |
from smolagents import CodeAgent, DuckDuckGoSearchTool, HfApiModel, load_tool, tool | |
import smolagents # Added for aliasing | |
from tools.final_answer import FinalAnswerTool | |
from dotenv import load_dotenv | |
import os | |
import base64 | |
import numpy as np | |
from datetime import datetime | |
from skyfield.api import load | |
import matplotlib.pyplot as plt | |
from io import BytesIO | |
from PIL import Image | |
from opentelemetry.sdk.trace import TracerProvider | |
from openinference.instrumentation.smolagents import SmolagentsInstrumentor | |
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter | |
from opentelemetry.sdk.trace.export import SimpleSpanProcessor | |
from langfuse import Langfuse | |
# Load environment variables | |
load_dotenv() | |
# Add the alias before instrumentation | |
smolagents.ApiModel = smolagents.HfApiModel | |
LANGFUSE_PUBLIC_KEY = "pk-lf-23dd0190-7c1d-4ac9-be62-9aaf1370ef6d" | |
LF_SECRET_KEY = "sk-lf-f8fe856f-7569-4aec-9a08-dabbac9e83b9" | |
#langfuse = Langfuse( | |
# secret_key="sk-lf-f8fe856f-7569-4aec-9a08-dabbac9e83b9", | |
# public_key="pk-lf-23dd0190-7c1d-4ac9-be62-9aaf1370ef6d", | |
# host="https://cloud.langfuse.com" | |
#) | |
LANGFUSE_AUTH=base64.b64encode(f"{LANGFUSE_PUBLIC_KEY}:{LF_SECRET_KEY}".encode()).decode() | |
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = "https://cloud.langfuse.com/api/public/otel" # EU data region | |
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {LANGFUSE_AUTH}" | |
trace_provider = TracerProvider() | |
trace_provider.add_span_processor(SimpleSpanProcessor(OTLPSpanExporter())) | |
SmolagentsInstrumentor().instrument(tracer_provider=trace_provider) | |
DATA_PATH = "" | |
# Specify the path to your file | |
PROMPT_TEMPLATE = """ | |
Ответь на вопрос, используя только следующий контекст: | |
{context} | |
--- | |
Ответь на вопрос на основе приведенного контекста: {question} | |
""" | |
# Global variable for status | |
status_message = "Инициализация..." | |
# Translation dictionaries | |
classification_ru = { | |
'Swallowed': 'проглоченная', | |
'Tiny': 'сверхмалая', | |
'Small': 'малая', | |
'Normal': 'нормальная', | |
'Ideal': 'идеальная', | |
'Big': 'большая' | |
} | |
planet_ru = { | |
'Sun': 'Солнце', | |
'Moon': 'Луна', | |
'Mercury': 'Меркурий', | |
'Venus': 'Венера', | |
'Mars': 'Марс', | |
'Jupiter': 'Юпитер', | |
'Saturn': 'Сатурн' | |
} | |
planet_symbols = { | |
'Sun': '☉', | |
'Moon': '☾', | |
'Mercury': '☿', | |
'Venus': '♀', | |
'Mars': '♂', | |
'Jupiter': '♃', | |
'Saturn': '♄' | |
} | |
def initialize_vectorstore(): | |
"""Initialize the FAISS vector store for document retrieval.""" | |
global status_message | |
try: | |
status_message = "Загрузка и обработка документов..." | |
documents = load_documents() | |
chunks = split_text(documents) | |
status_message = "Создание векторной базы..." | |
vectorstore = save_to_faiss(chunks) | |
status_message = "База данных готова к использованию." | |
return vectorstore | |
except Exception as e: | |
status_message = f"Ошибка инициализации: {str(e)}" | |
raise | |
def load_documents(): | |
"""Load documents from the specified file path.""" | |
file_path = os.path.join(DATA_PATH, "pl250320252.md") | |
if not os.path.exists(file_path): | |
raise FileNotFoundError(f"Файл {file_path} не найден") | |
loader = UnstructuredMarkdownLoader(file_path) | |
return loader.load() | |
def split_text(documents: list[Document]): | |
"""Split documents into chunks for vectorization.""" | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=900, | |
chunk_overlap=300, | |
length_function=len, | |
add_start_index=True, | |
) | |
return text_splitter.split_documents(documents) | |
def save_to_faiss(chunks: list[Document]): | |
"""Save document chunks to a FAISS vector store.""" | |
embeddings = HuggingFaceEmbeddings( | |
model_name="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2", | |
model_kwargs={'device': 'cpu'}, | |
encode_kwargs={'normalize_embeddings': True} | |
) | |
return FAISS.from_documents(chunks, embeddings) | |
def process_query(query_text: str, vectorstore): | |
"""Process a query using the RAG system.""" | |
if vectorstore is None: | |
return "База данных не инициализирована", [] | |
try: | |
results = vectorstore.similarity_search_with_relevance_scores(query_text, k=3) | |
global status_message | |
status_message += f"\nНайдено {len(results)} результатов" | |
if not results: | |
return "Не найдено результатов.", [] | |
context_text = "\n\n---\n\n".join([ | |
f"Релевантность: {score:.2f}\n{doc.page_content}" | |
for doc, score in results | |
]) | |
prompt_template = ChatPromptTemplate.from_template(PROMPT_TEMPLATE) | |
prompt = prompt_template.format(context=context_text, question=query_text) | |
model = HuggingFaceEndpoint( | |
endpoint_url="https://pflgm2locj2t89co.us-east-1.aws.endpoints.huggingface.cloud/", | |
task="text2text-generation", | |
# huggingfacehub_api_token=os.getenv("HUGGINGFACEHUB_API_TOKEN"), | |
model_kwargs={"temperature": 0.5, "max_length": 512} | |
) | |
response_text = model.invoke(prompt) | |
sources = list(set([doc.metadata.get("source", "") for doc, _ in results])) | |
return response_text, sources | |
except Exception as e: | |
return f"Ошибка обработки запроса: {str(e)}", [] | |
# Function to parse date and time into ISO format | |
def parse_date_time(date_time_str): | |
try: | |
dt = parser.parse(date_time_str) | |
return dt.isoformat() | |
except ValueError: | |
return None | |
# Function to convert longitude to zodiac sign and degrees | |
def lon_to_sign(lon): | |
signs = ["Овен", "Телец", "Близнецы", "Рак", "Лев", "Дева", | |
"Весы", "Скорпион", "Стрелец", "Козерог", "Водолей", "Рыбы"] | |
sign_index = int(lon // 30) | |
sign = signs[sign_index] | |
degrees = int(lon % 30) | |
minutes = int((lon % 1) * 60) | |
return f"{sign} {degrees}°{minutes}'" | |
# Function to calculate PLadder and zone sizes | |
def PLadder_ZSizes(date_time_iso: str): | |
""" | |
Calculate the planetary ladder and zone sizes for a given date and time. | |
Args: | |
date_time_iso (str): Date and time in ISO format (e.g., '2023-10-10T12:00:00') | |
Returns: | |
dict: Contains 'PLadder' (list of planets) and 'ZSizes' (list of zone sizes with classifications) | |
or an error message if unsuccessful | |
""" | |
try: | |
dt = datetime.fromisoformat(date_time_iso) | |
if dt.year < 1900 or dt.year > 2050: | |
return {"error": "Дата вне диапазона. Должна быть между 1900 и 2050 годами."} | |
# Load ephemeris | |
planets = load('de421.bsp') | |
earth = planets['earth'] | |
# Define planet objects | |
planet_objects = { | |
'Sun': planets['sun'], | |
'Moon': planets['moon'], | |
'Mercury': planets['mercury'], | |
'Venus': planets['venus'], | |
'Mars': planets['mars'], | |
'Jupiter': planets['jupiter barycenter'], | |
'Saturn': planets['saturn barycenter'] | |
} | |
# Create time object | |
ts = load.timescale() | |
t = ts.utc(dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second) | |
# Compute ecliptic longitudes | |
longitudes = {} | |
for planet in planet_objects: | |
apparent = earth.at(t).observe(planet_objects[planet]).apparent() | |
_, lon, _ = apparent.ecliptic_latlon() | |
longitudes[planet] = lon.degrees | |
# Sort planets by longitude to form PLadder | |
sorted_planets = sorted(longitudes.items(), key=lambda x: x[1]) | |
PLadder = [p for p, _ in sorted_planets] | |
sorted_lons = [lon for _, lon in sorted_planets] | |
# Calculate zone sizes | |
zone_sizes = [sorted_lons[0]] + [sorted_lons[i+1] - sorted_lons[i] for i in range(6)] + [360 - sorted_lons[6]] | |
# Determine bordering planets for classification | |
bordering = [[PLadder[0]]] + [[PLadder[i-1], PLadder[i]] for i in range(1, 7)] + [[PLadder[6]]] | |
# Classify each zone | |
ZSizes = [] | |
for i, size in enumerate(zone_sizes): | |
bord = bordering[i] | |
if any(p in ['Sun', 'Moon'] for p in bord): | |
X = 7 | |
elif any(p in ['Mercury', 'Venus', 'Mars'] for p in bord): | |
X = 6 | |
else: | |
X = 5 | |
if size <= 1: | |
classification = 'Swallowed' | |
elif size <= X: | |
classification = 'Tiny' | |
elif size <= 40: | |
classification = 'Small' | |
elif size < 60: | |
if 50 <= size <= 52: | |
classification = 'Ideal' | |
else: | |
classification = 'Normal' | |
else: | |
classification = 'Big' | |
# Convert size to degrees and minutes | |
d = int(size) | |
m = int((size - d) * 60) | |
size_str = f"{d}°{m}'" | |
ZSizes.append((size_str, classification)) | |
return {'PLadder': PLadder, 'ZSizes': ZSizes} | |
except ValueError: | |
return {"error": "Неверный формат даты и времени. Используйте ISO формат, например, '2023-10-10T12:00:00'"} | |
except Exception as e: | |
return {"error": f"Ошибка при вычислении: {str(e)}"} | |
def plot_pladder(PLadder): | |
""" | |
Plot the planetary ladder as a right triangle with planet symbols. | |
Args: | |
PLadder (list): List of planet names in order | |
Returns: | |
matplotlib.figure.Figure: The generated plot | |
""" | |
fig, ax = plt.subplots() | |
# Plot triangle with right angle on top: vertices at (0,0), (1.5,3), (3,0) | |
ax.plot([0, 1.5, 3, 0], [0, 3, 0, 0], 'k-') | |
# Draw horizontal lines dividing height into three equal parts | |
ax.plot([0, 3], [1, 1], 'k--') | |
ax.plot([0, 3], [2, 2], 'k--') | |
# Define positions for planets 1 to 7, adjusted to avoid overlap | |
positions = [(0.2, 0.2), (0.2, 1.2), (0.2, 2.2), (1.5, 3.2), (2.8, 2.2), (2.8, 1.2), (2.8, 0.2)] | |
for i, pos in enumerate(positions): | |
symbol = planet_symbols[PLadder[i]] | |
ax.text(pos[0], pos[1], symbol, ha='center', va='center', fontsize=24) # Doubled font size | |
ax.set_xlim(-0.5, 3.5) | |
ax.set_ylim(-0.5, 3.5) | |
ax.set_aspect('equal') | |
ax.axis('off') | |
return fig | |
def chat_interface(query_text): | |
""" | |
Handle user queries, either for planetary ladder or general RAG questions. | |
Args: | |
query_text (str): User's input query | |
Returns: | |
tuple: (text response, plot figure or None) | |
""" | |
global status_message | |
try: | |
vectorstore = initialize_vectorstore() | |
if query_text.startswith("PLadder "): | |
# Extract date and time from query | |
date_time_iso = query_text.split(" ", 1)[1] | |
result = PLadder_ZSizes(date_time_iso) | |
if "error" in result: | |
return result["error"], None | |
PLadder = result["PLadder"] | |
ZSizes = result["ZSizes"] | |
# Translate to Russian | |
PLadder_ru = [planet_ru[p] for p in PLadder] | |
ZSizes_ru = [(size_str, classification_ru[classification]) for size_str, classification in ZSizes] | |
# Prepare queries and get responses | |
responses = [] | |
for i in range(7): | |
planet = PLadder_ru[i] | |
size_str, class_ru = ZSizes_ru[i] | |
query = f"Что значит {planet} на {i+1}-й ступени и {size_str} {class_ru} {i+1}-я зона?" | |
response, _ = process_query(query, vectorstore) | |
responses.append(f"Интерпретация для {i+1}-й ступени и {i+1}-й зоны: {response}") | |
# Query for 8th zone | |
size_str, class_ru = ZSizes_ru[7] | |
query = f"Что значит {size_str} {class_ru} восьмая зона?" | |
response, _ = process_query(query, vectorstore) | |
responses.append(f"Интерпретация для 8-й зоны: {response}") | |
# Generate plot | |
fig = plot_pladder(PLadder) | |
buf = BytesIO() | |
fig.savefig(buf, format='png') # Save figure to buffer as PNG | |
buf.seek(0) | |
img = Image.open(buf) # Convert to PIL image | |
plt.close(fig) # Close the figure to free memory | |
# Compile response text | |
text = "Планетарная лестница: " + ", ".join(PLadder_ru) + "\n" | |
text += "Размеры зон:\n" + "\n".join([f"Зона {i+1}: {size_str} {class_ru}" | |
for i, (size_str, class_ru) in enumerate(ZSizes_ru)]) + "\n\n" | |
text += "\n".join(responses) | |
return text, img | |
else: | |
# Handle regular RAG query | |
response, sources = process_query(query_text, vectorstore) | |
full_response = f"{status_message}\n\nОтвет: {response}\n\nИсточники: {', '.join(sources) if sources else 'Нет источников'}" | |
return full_response, None | |
except Exception as e: | |
return f"Критическая ошибка: {str(e)}", None | |
# Define Gradio Interface | |
#interface = gr.Interface( | |
# fn=chat_interface, | |
# inputs=gr.Textbox(lines=2, placeholder="Введите ваш вопрос здесь..."), | |
# outputs=[gr.Textbox(), gr.Image()], | |
# title="Чат с документами", | |
# description="Задайте вопрос, и я отвечу на основе книги Павла Глобы Планетарная Лестница. " | |
# "Для быстрого запроса трактовки планетарной лестницы используйте формат: PLadder DD-MM-YYYY HH:MM:SS место" | |
#) | |
# UI layout with Gradio Blocks | |
with gr.Blocks() as interface: | |
with gr.Row(): | |
with gr.Column(scale=2): | |
output_text = gr.Textbox(label="Response", lines=10) | |
with gr.Column(scale=1): | |
output_image = gr.Image(label="Planetary Ladder Plot") | |
with gr.Row(): | |
query_text = gr.Textbox(label="Query", placeholder="e.g., PLadder 2023-10-10 12:00") | |
location_lat = gr.Textbox(label="Latitude", placeholder="e.g., 37.7749") | |
location_lon = gr.Textbox(label="Longitude", placeholder="e.g., -122.4194") | |
query_text.submit(chat_interface, | |
inputs=[query_text, location_lat, location_lon], | |
outputs=[output_text, output_image]) | |
if __name__ == "__main__": | |
interface.launch() |