Spaces:
Sleeping
Sleeping
import streamlit as st | |
from io import StringIO | |
import PyPDF4 | |
import pdfplumber | |
import docx2txt | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
import difflib | |
from huggingface_hub import InferenceClient # Import Hugging Face API | |
# ========== CONFIG ========== | |
st.set_page_config(page_title="📑 Contract Analyzer", layout="wide") | |
# ========== FUNCTIONS ========== | |
# Tải mô hình Hugging Face từ Hub | |
def load_inference_client(): | |
return InferenceClient(repo_id="HuggingFaceH4/zephyr-7b-beta") # Mô hình Zephyr | |
inference_client = load_inference_client() | |
def extract_text_from_pdf(uploaded_file): | |
try: | |
with pdfplumber.open(uploaded_file) as pdf: | |
return "\n".join(page.extract_text() or "" for page in pdf.pages) | |
except: | |
try: | |
reader = PyPDF4.PdfFileReader(uploaded_file) | |
return "\n".join([reader.getPage(i).extractText() for i in range(reader.numPages)]) | |
except Exception as e: | |
st.error(f"Error reading PDF: {e}") | |
return "" | |
def load_text(file): | |
if not file: | |
return "" | |
try: | |
ext = file.name.split('.')[-1].lower() | |
if ext == 'txt': | |
return StringIO(file.getvalue().decode("utf-8")).read() | |
elif ext == 'pdf': | |
return extract_text_from_pdf(file) | |
elif ext == 'docx': | |
return docx2txt.process(file) | |
else: | |
st.warning(f"Unsupported file type: {ext}") | |
return "" | |
except Exception as e: | |
st.error(f"Error loading file: {e}") | |
return "" | |
def highlight_diff(text1, text2): | |
differ = difflib.Differ() | |
diff = differ.compare(text1.split(), text2.split()) | |
html = "" | |
for word in diff: | |
if word.startswith("- "): | |
html += f'<span style="background-color:#ffcccc">{word[2:]}</span> ' | |
elif word.startswith("+ "): | |
html += f'<span style="background-color:#ccffcc">{word[2:]}</span> ' | |
else: | |
html += word[2:] + " " | |
return html | |
def compute_similarity(text1, text2): | |
if not text1.strip() or not text2.strip(): | |
return 0.0 | |
try: | |
tfidf = TfidfVectorizer(token_pattern=r'(?u)\b\w+\b') | |
tfidf_matrix = tfidf.fit_transform([text1, text2]) | |
sim = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2]) | |
return sim[0][0] * 100 | |
except: | |
return difflib.SequenceMatcher(None, text1, text2).ratio() * 100 | |
# Hàm truy vấn Zephyr từ Hugging Face | |
def query_zephyr_model(text1, text2, question): | |
prompt = f"Compare the following two contracts and answer the question:\nText 1: {text1}\nText 2: {text2}\nQuestion: {question}" | |
try: | |
result = inference_client(inputs=prompt) | |
return result['generated_text'] | |
except Exception as e: | |
st.error(f"Error querying the model: {e}") | |
return None | |
# ========== MAIN ========== | |
def main(): | |
st.title("📑 Contract Analyzer") | |
st.markdown("Upload two contracts, compare them, and ask any question!") | |
# Upload documents | |
st.header("1. Upload Documents") | |
col1, col2 = st.columns(2) | |
with col1: | |
file1 = st.file_uploader("Upload First Document", type=["txt", "pdf", "docx"], key="file1") | |
with col2: | |
file2 = st.file_uploader("Upload Second Document", type=["txt", "pdf", "docx"], key="file2") | |
text1, text2 = "", "" | |
if file1: text1 = load_text(file1) | |
if file2: text2 = load_text(file2) | |
if not (text1 and text2): | |
st.warning("Please upload both documents to continue.") | |
return | |
# Display uploaded texts | |
st.header("2. Documents Content") | |
col1, col2 = st.columns(2) | |
with col1: | |
st.subheader("First Document") | |
st.text_area("Content of first document:", text1, height=300) | |
with col2: | |
st.subheader("Second Document") | |
st.text_area("Content of second document:", text2, height=300) | |
# Compare documents | |
st.header("3. Compare Documents") | |
if st.button("Compare Documents"): | |
sim_score = compute_similarity(text1, text2) | |
st.metric("Similarity Score", f"{sim_score:.2f}%") | |
diff_html = highlight_diff(text1, text2) | |
st.markdown("**Differences Highlighted:**", unsafe_allow_html=True) | |
st.markdown(f"<div style='border:1px solid #ccc; padding:10px; max-height:400px; overflow:auto'>{diff_html}</div>", unsafe_allow_html=True) | |
# Ask any question | |
st.header("4. Ask a Question") | |
user_question = st.text_input("Enter your question about the contracts:") | |
if user_question and st.button("Analyze Question"): | |
col = st.columns(1) | |
with col: | |
st.subheader("Answer from Document") | |
with st.spinner("Analyzing..."): | |
try: | |
pred = query_zephyr_model(text1, text2, user_question) | |
st.success(pred) | |
except Exception as e: | |
st.error(f"Failed on Document: {e}") | |
if __name__ == "__main__": | |
main() | |