import streamlit as st from predict import run_prediction from io import StringIO import PyPDF4 import docx2txt import pdfplumber import difflib from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity from sentence_transformers import SentenceTransformer, util # ========== CONFIGURATION ========== st.set_page_config( layout="wide", page_title="Contract Analysis Suite", page_icon="📁" ) # Initialize session state variables if they don't exist if 'comparison_results' not in st.session_state: st.session_state.comparison_results = None if 'analysis_results' not in st.session_state: st.session_state.analysis_results = None # ========== CACHED DATA LOADING ========== @st.cache_data(show_spinner=False) def load_questions(): try: with open('data/questions.txt') as f: return [q.strip() for q in f.readlines() if q.strip()] except Exception as e: st.error(f"Error loading questions: {str(e)}") return [] @st.cache_data(show_spinner=False) def load_questions_short(): try: with open('data/questions_short.txt') as f: return [q.strip() for q in f.readlines() if q.strip()] except Exception as e: st.error(f"Error loading short questions: {str(e)}") return [] # ========== UTILITY FUNCTIONS ========== def extract_text_from_pdf(uploaded_file): try: with pdfplumber.open(uploaded_file) as pdf: full_text = "" for page in pdf.pages: try: text = page.extract_text_formatted() except AttributeError: text = page.extract_text() if text: full_text += text + "\n\n" else: full_text += page.extract_text() + "\n\n" return full_text if full_text.strip() else "" except Exception as e: st.error(f"PDF extraction error: {str(e)}") return "" def highlight_differences_words(text1, text2): differ = difflib.Differ() diff = list(differ.compare(text1.split(), text2.split())) highlighted_text1 = "" highlighted_text2 = "" for i, word in enumerate(diff): if word.startswith("- "): removed_word = word[2:] highlighted_text1 += f'{removed_word}' if i + 1 < len(diff) and diff[i + 1].startswith("+ "): added_word = diff[i + 1][2:] highlighted_text2 += f'{added_word}' diff[i + 1] = ' ' else: highlighted_text2 += " " elif word.startswith("+ "): added_word = word[2:] highlighted_text2 += f'{added_word}' if i - 1 >= 0 and diff[i - 1].startswith("- "): highlighted_text1 += f'{diff[i-1][2:]}' diff[i-1] = ' ' else: highlighted_text1 += " " elif word.startswith(" "): highlighted_text1 += word[2:] + " " highlighted_text2 += word[2:] + " " return highlighted_text1, highlighted_text2 def calculate_similarity(text1, text2): if not text1.strip() or not text2.strip(): return 0.0 try: model = SentenceTransformer('all-MiniLM-L6-v2') embeddings = model.encode([text1, text2], convert_to_tensor=True) similarity = util.cos_sim(embeddings[0], embeddings[1]) return float(similarity.item()) * 100 except Exception as e: st.error(f"Similarity calculation error: {e}") return 0.0 def load_contract(file): if file is None: return "" ext = file.name.split('.')[-1].lower() try: if ext == 'txt': content = StringIO(file.getvalue().decode("utf-8")).read() elif ext == 'pdf': content = extract_text_from_pdf(file) if not content: pdfReader = PyPDF4.PdfFileReader(file) full_text = "" for page in pdfReader.pages: text = page.extractText() if text: full_text += text + "\n\n" content = full_text elif ext == 'docx': content = docx2txt.process(file) else: st.warning('Unsupported file type') return "" return content.strip() if content else "" except Exception as e: st.error(f"Error loading {ext.upper()} file: {str(e)}") return "" # ========== MAIN APP ========== def main(): questions = load_questions() questions_short = load_questions_short() if not questions or not questions_short or len(questions) != len(questions_short): st.error("Failed to load questions or questions mismatch. Please check data files.") return st.title("📁 Contract Analysis Suite") st.markdown(""" Compare documents and analyze legal clauses using AI-powered question answering. """) st.header("1. Upload Documents") col1, col2 = st.columns(2) with col1: uploaded_file1 = st.file_uploader("Upload First Document", type=["txt", "pdf", "docx"], key="file1") contract_text1 = load_contract(uploaded_file1) if uploaded_file1 else "" doc1_display = st.empty() with col2: uploaded_file2 = st.file_uploader("Upload Second Document", type=["txt", "pdf", "docx"], key="file2") contract_text2 = load_contract(uploaded_file2) if uploaded_file2 else "" doc2_display = st.empty() if uploaded_file1: doc1_display.text_area("Document 1 Content", value=contract_text1, height=400, key="area1") if uploaded_file2: doc2_display.text_area("Document 2 Content", value=contract_text2, height=400, key="area2") if not (uploaded_file1 and uploaded_file2): st.warning("Please upload both documents to proceed") return st.header("2. Document Comparison") with st.expander("Show Document Differences", expanded=True): if st.button("Compare Documents"): with st.spinner("Analyzing documents..."): if not contract_text1.strip() or not contract_text2.strip(): st.error("One or both documents appear to be empty or couldn't be read properly") return similarity_score = calculate_similarity(contract_text1, contract_text2) highlighted_diff1, highlighted_diff2 = highlight_differences_words(contract_text1, contract_text2) st.session_state.comparison_results = { 'similarity_score': similarity_score, 'highlighted_diff1': highlighted_diff1, 'highlighted_diff2': highlighted_diff2, } if st.session_state.comparison_results: st.metric("Document Similarity Score", f"{st.session_state.comparison_results['similarity_score']:.2f}%") if st.session_state.comparison_results['similarity_score'] < 50: st.warning("Significant differences detected") st.markdown("**Visual Difference Highlighting:**") col1, col2 = st.columns(2) with col1: st.markdown("### Original Document") st.markdown(f'
{st.session_state.comparison_results["highlighted_diff1"]}
', unsafe_allow_html=True) with col2: st.markdown("### Modified Document") st.markdown(f'
{st.session_state.comparison_results["highlighted_diff2"]}
', unsafe_allow_html=True) st.header("3. Clause Analysis") try: question_selected = st.selectbox('Select a legal question to analyze:', questions_short, index=0, key="question_select") question_idx = questions_short.index(question_selected) selected_question = questions[question_idx] except Exception as e: st.error(f"Error selecting question: {str(e)}") return if st.button("Analyze Both Documents"): if not (contract_text1.strip() and contract_text2.strip()): st.error("Please ensure both documents have readable content") return col1, col2 = st.columns(2) with col1: st.subheader("First Document Analysis") with st.spinner('Processing first document...'): try: predictions1 = run_prediction([selected_question], contract_text1, 'marshmellow77/roberta-base-cuad', n_best_size=5) answer1 = predictions1.get('0', 'No answer found') st.session_state.analysis_results = st.session_state.analysis_results or {} st.session_state.analysis_results['doc1'] = answer1 if answer1 else "No relevant clause found" except Exception as e: st.session_state.analysis_results = st.session_state.analysis_results or {} st.session_state.analysis_results['doc1'] = f"Analysis failed: {str(e)}" with col2: st.subheader("Second Document Analysis") with st.spinner('Processing second document...'): try: predictions2 = run_prediction([selected_question], contract_text2, 'marshmellow77/roberta-base-cuad', n_best_size=5) answer2 = predictions2.get('0', 'No answer found') st.session_state.analysis_results = st.session_state.analysis_results or {} st.session_state.analysis_results['doc2'] = answer2 if answer2 else "No relevant clause found" except Exception as e: st.session_state.analysis_results = st.session_state.analysis_results or {} st.session_state.analysis_results['doc2'] = f"Analysis failed: {str(e)}" if st.session_state.analysis_results: col1, col2 = st.columns(2) with col1: st.success(st.session_state.analysis_results.get('doc1', 'No analysis performed yet')) with col2: st.success(st.session_state.analysis_results.get('doc2', 'No analysis performed yet')) if __name__ == "__main__": main()