Spaces:
Sleeping
Sleeping
import streamlit as st | |
from predict import run_prediction | |
from io import StringIO | |
import PyPDF4 | |
import docx2txt | |
import pdfplumber | |
import difflib | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
# ========== CONFIGURATION ========== | |
st.set_page_config( | |
layout="wide", | |
page_title="Contract Analysis Suite", | |
page_icon="π" | |
) | |
# ========== CACHED DATA LOADING ========== | |
def load_questions(): | |
try: | |
with open('data/questions.txt') as f: | |
return [q.strip() for q in f.readlines() if q.strip()] | |
except Exception as e: | |
st.error(f"Error loading questions: {str(e)}") | |
return [] | |
def load_questions_short(): | |
try: | |
with open('data/questions_short.txt') as f: | |
return [q.strip() for q in f.readlines() if q.strip()] | |
except Exception as e: | |
st.error(f"Error loading short questions: {str(e)}") | |
return [] | |
# ========== UTILITY FUNCTIONS ========== | |
def extract_text_from_pdf(uploaded_file): | |
try: | |
with pdfplumber.open(uploaded_file) as pdf: | |
text = "\n".join(page.extract_text() or "" for page in pdf.pages) | |
return text if text.strip() else "" | |
except Exception as e: | |
st.error(f"PDF extraction error: {str(e)}") | |
return "" | |
def highlight_differences(text1, text2): | |
if not text1 or not text2: | |
return "" | |
differ = difflib.Differ() | |
diff = list(differ.compare(text1.split(), text2.split())) | |
highlighted_text = "" | |
for word in diff: | |
if word.startswith("- "): | |
highlighted_text += f'<span style="background-color:#ffcccc">{word[2:]}</span> ' | |
elif word.startswith("+ "): | |
highlighted_text += f'<span style="background-color:#ccffcc">{word[2:]}</span> ' | |
elif word.startswith("? "): | |
highlighted_text += f'<span style="background-color:#ffff99">{word[2:]}</span> ' | |
else: | |
highlighted_text += word[2:] + " " | |
return highlighted_text | |
def calculate_similarity(text1, text2): | |
if not text1.strip() or not text2.strip(): | |
return 0.0 | |
try: | |
vectorizer = TfidfVectorizer(token_pattern=r'(?u)\b\w+\b') | |
tfidf_matrix = vectorizer.fit_transform([text1, text2]) | |
similarity = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:2]) | |
return similarity[0][0] * 100 | |
except ValueError: | |
return difflib.SequenceMatcher(None, text1, text2).ratio() * 100 | |
def load_contract(file): | |
if file is None: | |
return "" | |
ext = file.name.split('.')[-1].lower() | |
try: | |
if ext == 'txt': | |
content = StringIO(file.getvalue().decode("utf-8")).read() | |
elif ext == 'pdf': | |
content = extract_text_from_pdf(file) | |
if not content: | |
# Fallback to PyPDF4 | |
pdfReader = PyPDF4.PdfFileReader(file) | |
content = '\n'.join([pdfReader.getPage(i).extractText() for i in range(pdfReader.numPages)]) | |
elif ext == 'docx': | |
content = docx2txt.process(file) | |
else: | |
st.warning('Unsupported file type') | |
return "" | |
return content.strip() if content else "" | |
except Exception as e: | |
st.error(f"Error loading {ext.upper()} file: {str(e)}") | |
return "" | |
# ========== MAIN APP ========== | |
def main(): | |
questions = load_questions() | |
questions_short = load_questions_short() | |
if not questions or not questions_short or len(questions) != len(questions_short): | |
st.error("Failed to load questions or questions mismatch. Please check data files.") | |
return | |
st.title("π Contract Analysis Suite") | |
st.markdown(""" | |
Compare documents and analyze legal clauses using AI-powered question answering. | |
""") | |
# ===== DOCUMENT UPLOAD SECTION ===== | |
st.header("1. Upload Documents") | |
col1, col2 = st.columns(2) | |
with col1: | |
uploaded_file1 = st.file_uploader( | |
"Upload First Document", | |
type=["txt", "pdf", "docx"], | |
key="file1" | |
) | |
contract_text1 = load_contract(uploaded_file1) if uploaded_file1 else "" | |
doc1_display = st.empty() | |
with col2: | |
uploaded_file2 = st.file_uploader( | |
"Upload Second Document", | |
type=["txt", "pdf", "docx"], | |
key="file2" | |
) | |
contract_text2 = load_contract(uploaded_file2) if uploaded_file2 else "" | |
doc2_display = st.empty() | |
# Update document displays | |
if uploaded_file1: | |
doc1_display.text_area("Document 1 Content", | |
value=contract_text1, | |
height=200, | |
key="area1") | |
if uploaded_file2: | |
doc2_display.text_area("Document 2 Content", | |
value=contract_text2, | |
height=200, | |
key="area2") | |
if not (uploaded_file1 and uploaded_file2): | |
st.warning("Please upload both documents to proceed") | |
return | |
# ===== DOCUMENT COMPARISON SECTION ===== | |
st.header("2. Document Comparison") | |
with st.expander("Show Document Differences", expanded=True): | |
if st.button("Compare Documents"): | |
with st.spinner("Analyzing documents..."): | |
if not contract_text1.strip() or not contract_text2.strip(): | |
st.error("One or both documents appear to be empty or couldn't be read properly") | |
return | |
similarity_score = calculate_similarity(contract_text1, contract_text2) | |
st.metric("Document Similarity Score", f"{similarity_score:.2f}%") | |
if similarity_score < 50: | |
st.warning("Significant differences detected") | |
highlighted_diff = highlight_differences(contract_text1, contract_text2) | |
st.markdown("**Visual Difference Highlighting:**") | |
st.markdown( | |
f'<div style="border:1px solid #ddd; padding:10px; max-height:400px; overflow-y:auto;">{highlighted_diff}</div>', | |
unsafe_allow_html=True | |
) | |
# ===== QUESTION ANALYSIS SECTION ===== | |
st.header("3. Clause Analysis") | |
try: | |
question_selected = st.selectbox( | |
'Select a legal question to analyze:', | |
questions_short, | |
index=0, | |
key="question_select" | |
) | |
question_idx = questions_short.index(question_selected) | |
selected_question = questions[question_idx] | |
except Exception as e: | |
st.error(f"Error selecting question: {str(e)}") | |
return | |
if st.button("Analyze Both Documents"): | |
if not (contract_text1.strip() and contract_text2.strip()): | |
st.error("Please ensure both documents have readable content") | |
return | |
col1, col2 = st.columns(2) | |
with col1: | |
st.subheader("First Document Analysis") | |
with st.spinner('Processing first document...'): | |
try: | |
predictions1 = run_prediction([selected_question], contract_text1, 'marshmellow77/roberta-base-cuad', n_best_size=5) | |
answer1 = predictions1.get('0', 'No answer found') | |
st.success(answer1 if answer1 else "No relevant clause found") | |
except Exception as e: | |
st.error(f"Analysis failed for Document 1: {str(e)}") | |
with col2: | |
st.subheader("Second Document Analysis") | |
with st.spinner('Processing second document...'): | |
try: | |
predictions2 = run_prediction([selected_question], contract_text2, 'marshmellow77/roberta-base-cuad', n_best_size=5) | |
answer2 = predictions2.get('0', 'No answer found') | |
st.success(answer2 if answer2 else "No relevant clause found") | |
except Exception as e: | |
st.error(f"Analysis failed for Document 2: {str(e)}") | |
if __name__ == "__main__": | |
main() |