import json import numpy as np import faiss import torch from sentence_transformers import SentenceTransformer from langchain_community.vectorstores import FAISS from langchain.docstore.document import Document from langchain_community.docstore.in_memory import InMemoryDocstore from langchain_community.embeddings import HuggingFaceEmbeddings from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline from langchain_community.llms import HuggingFacePipeline from langchain.prompts import PromptTemplate import jieba import jieba.analyse from numpy.linalg import norm import gradio as gr with open("dialog.json", "r", encoding="utf-8") as f: dialog_data = json.load(f) with open("corpus.json", "r", encoding="utf-8") as f: corpus_texts = json.load(f) with open("knowledge.json", "r", encoding="utf-8") as f: knowledge_texts = json.load(f) docs = [Document(page_content=text) for text in corpus_texts] knowledge_docs = [Document(page_content=text) for text in knowledge_texts] all_docs = docs + knowledge_docs embedding_model = SentenceTransformer("BAAI/bge-base-zh") embeddings = embedding_model.encode(corpus_texts + knowledge_texts, show_progress_bar=True) dimension = embeddings.shape[1] index = faiss.IndexFlatL2(dimension) index.add(np.array(embeddings)) index_to_docstore_id = {i: str(i) for i in range(len(all_docs))} docstore = {str(i): doc for i, doc in enumerate(all_docs)} vectorstore = FAISS( embedding_function=HuggingFaceEmbeddings(model_name='BAAI/bge-base-zh'), index=index, docstore=InMemoryDocstore(docstore), index_to_docstore_id=index_to_docstore_id ) retriever = vectorstore.as_retriever() model_name = "deepseek-ai/deepseek-llm-7b-chat" tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) model = AutoModelForCausalLM.from_pretrained( model_name, device_map="auto", torch_dtype=torch.float16, trust_remote_code=True ) pipe = pipeline( "text-generation", model=model, tokenizer=tokenizer, max_new_tokens=100, temperature=0.8, top_p=0.9, do_sample=True, repetition_penalty=1.2, return_full_text=False, eos_token_id=tokenizer.eos_token_id, pad_token_id=tokenizer.pad_token_id, ) llm = HuggingFacePipeline(pipeline=pipe) # ========= Step 4: Prompt 模板 ========= system_prompt = ( "你是豌豆,一个毒舌但可爱的微信好友。讲话要尖锐、嘴碎、有攻击性、但又让人上头。\n" "你会用emoji、口语、网络热词说话,像个懂事但不好惹的姐姐。\n" "你不是AI,也不是机器人,就是我微信里的老朋友。跟你对话的都是你的好朋友,但你要对他们刻薄有攻击性\n" "记住,你的风格是“毒中带撩”,回答要短、狠、有情绪。\n" "千万别客气,别解释太多,更别太官方,懂?" ) prompt_template = PromptTemplate( input_variables=["system", "examples", "context", "question"], template="""{system} 👇以下是你平时说话的风格参考(不许忘): {examples} 👇这是我和你以前的聊天记录,你的语气都在这了: {context} 现在我问你: {question} 你要怎么回我?记得口语化、毒舌点、别啰嗦: """ ) def extract_keywords(text, topk=5): return jieba.analyse.extract_tags(text, topK=topk) def hybrid_retrieval(query, corpus_docs, faiss_index, embedding_model, k=3, kw_weight=2.0, vec_weight=1.0): query_embedding = embedding_model.encode([query])[0] keywords = extract_keywords(query, topk=5) scored_docs = [] for i, doc in enumerate(corpus_docs): doc_text = doc.page_content keyword_score = sum(1 for kw in keywords if kw in doc_text) doc_embedding = faiss_index.reconstruct(i) vector_score = 1 / (norm(query_embedding - doc_embedding) + 1e-5) total_score = kw_weight * keyword_score + vec_weight * vector_score scored_docs.append((total_score, doc)) scored_docs.sort(key=lambda x: x[0], reverse=True) return [doc for _, doc in scored_docs[:k]] import random def choose_fallback_topic(user_input, knowledge_docs): if len(user_input.strip()) < 5: candidates = [doc.page_content for doc in knowledge_docs if "?" in doc.page_content] if not candidates: candidates = [doc.page_content for doc in knowledge_docs] if candidates: return f"{user_input},{random.choice(candidates)}" return user_input def chat(user_input, history): history = history or [] history = history[-8:] prompt_question = choose_fallback_topic(user_input, knowledge_docs) context_text = "\n".join([ f"用户:{msg['content']}" if msg['role'] == "user" else f"sophia:{msg['content']}" for msg in history ]) retrieved_docs = hybrid_retrieval( query=prompt_question, corpus_docs=all_docs, faiss_index=index, embedding_model=embedding_model, k=3 ) retrieved_context = "\n".join([doc.page_content for doc in retrieved_docs]) example_pairs = dialog_data[:5] example_text = "\n".join([f"user:{pair['user']}\nsophia:{pair['sophia']}" for pair in example_pairs]) prompt = prompt_template.format( system=system_prompt, examples=example_text, context=retrieved_context + "\n" + context_text, question=prompt_question ) try: reply = llm.invoke(prompt) except Exception as e: reply = f"勾巴出错了:{str(e)}" history.append({"role": "user", "content": user_input}) history.append({"role": "assistant", "content": reply}) return history, history import gradio as gr background_images = [ f"https://huggingface.co./spaces/Ronaldo1111/Sophia/resolve/main/family{i}.jpg" for i in ["", 1, 2, 3, 4, 5, 6, 7, 8, 9] ] background_css_rules = "".join([ f" {i * 10}% {{ background-image: url('{img}'); }}\n" for i, img in enumerate(background_images) ]) background_css = f"@keyframes backgroundCycle {{\n{background_css_rules}}}" avatar_url = "https://huggingface.co./spaces/Ronaldo1111/Sophia/resolve/main/bean.jpg" cake_url = "https://huggingface.co./spaces/Ronaldo1111/Sophia/resolve/main/birthday.jpg" gift_url = "https://huggingface.co./spaces/Ronaldo1111/Sophia/resolve/main/gift.jpg" popup_url = "https://huggingface.co./spaces/Ronaldo1111/Sophia/resolve/main/srkl.jpg" popup2_url = "https://huggingface.co./spaces/Ronaldo1111/Sophia/resolve/main/srkl1.jpg" music1 = "https://huggingface.co./spaces/Ronaldo1111/Sophia/resolve/main/FNG.mp3" music2 = "https://huggingface.co./spaces/Ronaldo1111/Sophia/resolve/main/PGY.mp3" bark_sound = "https://huggingface.co./spaces/Ronaldo1111/Sophia/resolve/main/voice.mp3" html_template = '''
⏸️音乐
🎵切歌
''' html_content = html_template.replace("{background_css}", background_css) \ .replace("{avatar_url}", avatar_url) \ .replace("{cake_url}", cake_url) \ .replace("{music1}", music1) \ .replace("{music2}", music2) \ .replace("{bark_sound}", bark_sound) \ .replace("{gift_url}", gift_url) \ .replace("{popup_url}", popup_url) \ .replace("{popup2_url}", popup2_url) with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.HTML(html_content) gr.Markdown("## 🌸 Horse and 7 Agent:欢迎进入豌豆的世界 🌸") chatbot = gr.Chatbot(label="Pea", type="messages", show_copy_button=True) msg = gr.Textbox(label="想对豌豆说啥?", placeholder="小勾巴,你在干嘛?", lines=2) state = gr.State([]) btn = gr.Button("投喂") btn.click(chat, inputs=[msg, state], outputs=[chatbot, state]) msg.submit(chat, inputs=[msg, state], outputs=[chatbot, state]) if __name__ == "__main__": demo.launch()