Spaces:
Paused
Paused
from unsloth import FastLanguageModel | |
from peft import PeftModel | |
# Load the base model with FastLanguageModel | |
model, tokenizer = FastLanguageModel.from_pretrained( | |
model_name="unsloth/Llama-3.2-3B-Instruct", | |
max_seq_length=2048, | |
dtype=None, | |
load_in_4bit=True | |
) | |
base_model_name = "unsloth/Llama-3.2-3B-Instruct" | |
adapter_path = "jaspersands/model" # Path to LoRA adapter on Hugging Face | |
model = PeftModel.from_pretrained(model, adapter_path) | |
# Code for processing a query | |
import pandas as pd | |
from unsloth.chat_templates import get_chat_template | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
from sentence_transformers import SentenceTransformer, util | |
import nltk | |
# Ensure you have NLTK stopwords downloaded | |
nltk.download("stopwords") | |
from nltk.corpus import stopwords | |
# Step 1: Load the CSV file | |
file_path = 'Clean Missouri Data.csv' | |
df = pd.read_csv(file_path, encoding='MacRoman') | |
# Step 2: Define a function to search relevant policies based on the user's query using cosine similarity | |
def search_relevant_policies(query, df, top_n=10, max_chars = 40000): | |
# Convert policies into a TF-IDF matrix | |
tfidf = TfidfVectorizer(stop_words='english') | |
tfidf_matrix = tfidf.fit_transform(df['Content']) | |
# Get the query as a TF-IDF vector | |
query_vector = tfidf.transform([query]) | |
# Calculate cosine similarity between query and policies | |
cosine_sim = cosine_similarity(query_vector, tfidf_matrix).flatten() | |
# Get the top N relevant policies | |
top_indices = cosine_sim.argsort()[-top_n:][::-1] | |
relevant_policies = df.iloc[top_indices] | |
top_indices = cosine_sim.argsort()[-top_n:][::-1] | |
relevant_policies = df.iloc[top_indices].copy() | |
# Ensure total text is capped at max_chars | |
char_count = 0 | |
valid_indices = [] | |
for idx, row in relevant_policies.iterrows(): | |
content_length = len(row["Content"]) | |
# If adding this content exceeds max_chars, stop adding any further policies | |
if char_count + content_length > max_chars: | |
break | |
# Otherwise, keep this policy | |
char_count += content_length | |
valid_indices.append(idx) | |
# Filter the dataframe to only include valid rows | |
truncated_policies = relevant_policies.loc[valid_indices] | |
return truncated_policies | |
def get_content_after_query(response_text, query): | |
# Find the position of the query within the response text | |
query_position = response_text.lower().find(query.lower()) | |
if query_position != -1: | |
# Return the content after the query position | |
res = response_text[query_position + len(query):].strip() | |
return res[11:] | |
else: | |
# If the query is not found, return the full response text as a fallback | |
return response_text.strip() | |
def process_query(query,tokenizer): | |
relevant_policies = search_relevant_policies(query, df) | |
# Step 5: Combine the relevant policies with the user's query for the model | |
formatted_policies = [] | |
for index, row in relevant_policies.iterrows(): | |
# formatted_policy = f"Title: {row['Title']}\nTerritory: {row['Territory']}\nType: {row['Type']}\nYear: {row['Year']}\nCategory: {row['Category']}\nFrom: {row['From']}\nTo: {row['To']}\nContent: {row['Content']}\nLink: {row['Link to Content']}\n" | |
# formatted_policies.append(formatted_policy) | |
formatted_policies.append(row['Content']) | |
relevant_policy_text = "\n\n".join(formatted_policies) | |
# Messages with relevant policies for the model | |
messages_with_relevant_policies = [ | |
{"role": "system", "content": relevant_policy_text}, | |
{"role": "user", "content": query}, | |
] | |
# Step 6: Apply chat template and tokenize | |
tokenizer = get_chat_template( | |
tokenizer, | |
chat_template="llama-3.1", | |
) | |
inputs = tokenizer.apply_chat_template( | |
messages_with_relevant_policies, | |
tokenize=True, | |
add_generation_prompt=True, | |
return_tensors="pt" | |
).to("cuda") | |
FastLanguageModel.for_inference(model) | |
outputs = model.generate(input_ids=inputs, max_new_tokens=512, use_cache=True, temperature=0.7, min_p=0.1) | |
# Step 7: Decode the output | |
generated_response = tokenizer.batch_decode(outputs, skip_special_tokens=True)[0] | |
response = get_content_after_query(generated_response, query) | |
# Step 8: Rank the top 10 policies using SBERT for the final link | |
# Load SBERT model | |
model_sbert = SentenceTransformer('all-MiniLM-L6-v2') # You can choose another SBERT model if desired | |
# Encode the generated response using SBERT | |
response_embedding = model_sbert.encode(generated_response, convert_to_tensor=True) | |
# Encode each policy in the top 10 list | |
policy_embeddings = model_sbert.encode(relevant_policies['Content'].tolist(), convert_to_tensor=True) | |
# Calculate cosine similarities between the generated response and each policy embedding | |
cosine_similarities = util.cos_sim(response_embedding, policy_embeddings).flatten() | |
# Identify the policy with the highest SBERT cosine similarity score | |
most_relevant_index = cosine_similarities.argmax().item() | |
most_relevant_link = relevant_policies.iloc[most_relevant_index]['Link to Content'] | |
# Print the link to the most relevant source | |
return { | |
"response": response, | |
"most_relevant_link": most_relevant_link | |
} | |
# Load Google Sheets to store results | |
import json | |
from google.oauth2.service_account import Credentials | |
import gspread | |
import pandas as pd | |
# Load the service account JSON | |
json_file_path = "fostercare-449201-75a303a8c238.json" # Load the credentials for the service account | |
with open(json_file_path, 'r') as file: | |
service_account_data = json.load(file) | |
# Authenticate using the loaded service account data | |
scopes = ["https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive"] | |
creds = Credentials.from_service_account_info(service_account_data, scopes=scopes) | |
client = gspread.authorize(creds) | |
# Open the shared Google Sheet by name | |
spreadsheet = client.open("Foster Care RA Responses").sheet1 | |
# Link to Google Sheet | |
# https://docs.google.com/spreadsheets/d/15iEcxmTgkgfcxzDGnq3i_nP1hiAXgb3RplHgqAMEyHA/edit?usp=sharing | |
# Code to set up Gradio GUI | |
import gradio as gr | |
def greet(query): | |
result_1 = process_query(query, tokenizer) | |
content_after_query_1 = result_1["response"] | |
result_2 = process_query(query, tokenizer) | |
content_after_query_2 = result_2["response"] | |
return [content_after_query_1, content_after_query_2] | |
def choose_preference(name, output1, output2, preference, query): | |
if not name: | |
return "Please enter your name before submitting." | |
if preference == "Output 1": | |
new_row = [query, output1, output2, name] | |
spreadsheet.append_row(new_row) | |
return f"You preferred: Output 1 - {output1}" | |
elif preference == "Output 2": | |
new_row = [query, output2, output1, name] | |
spreadsheet.append_row(new_row) | |
return f"You preferred: Output 2 - {output2}" | |
else: | |
return "No preference selected." | |
# Define the interface | |
with gr.Blocks() as demo: | |
# Name input | |
name_input = gr.Textbox(label="Enter your name") | |
# Input for query | |
query_input = gr.Textbox(label="Enter your query") | |
# Outputs | |
output_1 = gr.Textbox(label="Output 1", interactive=False) | |
output_2 = gr.Textbox(label="Output 2", interactive=False) | |
# Preference selection | |
preference = gr.Radio(["Output 1", "Output 2"], label="Choose your preferred output") | |
preference_result = gr.Textbox(label="Your Preference", interactive=False) | |
# Buttons | |
generate_button = gr.Button("Generate Outputs") | |
submit_button = gr.Button("Submit Preference") | |
# Link actions to buttons | |
generate_button.click(greet, inputs=query_input, outputs=[output_1, output_2]) | |
submit_button.click(choose_preference, inputs=[name_input, output_1, output_2, preference, query_input], outputs=preference_result) | |
demo.launch() |