import numpy as np import pandas as pd from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.cluster import KMeans from sklearn.metrics.pairwise import cosine_similarity import random import json from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Dict, Any, Optional from prompts import CATEGORY_SUGGESTION_PROMPT, TEXT_CLASSIFICATION_PROMPT class BaseClassifier: """Base class for text classifiers""" def __init__(self): pass def classify(self, texts, categories=None): """ Classify a list of texts into categories Args: texts (list): List of text strings to classify categories (list, optional): List of category names. If None, categories will be auto-detected Returns: list: List of classification results with categories, confidence scores, and explanations """ raise NotImplementedError("Subclasses must implement this method") def _generate_default_categories(self, texts, num_clusters=5): """ Generate default categories based on text clustering Args: texts (list): List of text strings num_clusters (int): Number of clusters to generate Returns: list: List of category names """ # Simple implementation - in real system this would be more sophisticated default_categories = [f"Category {i+1}" for i in range(num_clusters)] return default_categories class TFIDFClassifier(BaseClassifier): """Classifier using TF-IDF and clustering for fast classification""" def __init__(self): super().__init__() self.vectorizer = TfidfVectorizer( max_features=1000, stop_words="english", ngram_range=(1, 2) ) self.model = None self.feature_names = None self.categories = None self.centroids = None def classify(self, texts, categories=None): """Classify texts using TF-IDF and clustering""" # Vectorize the texts X = self.vectorizer.fit_transform(texts) self.feature_names = self.vectorizer.get_feature_names_out() # Auto-detect categories if not provided if not categories: num_clusters = min(5, len(texts)) # Don't create more clusters than texts self.categories = self._generate_default_categories(texts, num_clusters) else: self.categories = categories num_clusters = len(categories) # Cluster the texts self.model = KMeans(n_clusters=num_clusters, random_state=42) clusters = self.model.fit_predict(X) self.centroids = self.model.cluster_centers_ # Calculate distances to centroids for confidence distances = self._calculate_distances(X) # Prepare results results = [] for i, text in enumerate(texts): cluster_idx = clusters[i] # Calculate confidence (inverse of distance, normalized) confidence = self._calculate_confidence(distances[i]) # Create explanation explanation = self._generate_explanation(X[i], cluster_idx) results.append( { "category": self.categories[cluster_idx], "confidence": confidence, "explanation": explanation, } ) return results def _calculate_distances(self, X): """Calculate distances from each point to each centroid""" return np.sqrt( ( (X.toarray()[:, np.newaxis, :] - self.centroids[np.newaxis, :, :]) ** 2 ).sum(axis=2) ) def _calculate_confidence(self, distances): """Convert distances to confidence scores (0-100)""" min_dist = np.min(distances) max_dist = np.max(distances) # Normalize and invert (smaller distance = higher confidence) if max_dist == min_dist: return 70 # Default mid-range confidence when all distances are equal normalized_dist = (distances - min_dist) / (max_dist - min_dist) min_normalized = np.min(normalized_dist) # Invert and scale to 50-100 range (TF-IDF is never 100% confident) confidence = 100 - (min_normalized * 50) return round(confidence, 1) def _generate_explanation(self, text_vector, cluster_idx): """Generate an explanation for the classification""" # Get the most important features for this cluster centroid = self.centroids[cluster_idx] # Get indices of top features for this text text_array = text_vector.toarray()[0] top_indices = text_array.argsort()[-5:][::-1] # Get the feature names for these indices top_features = [self.feature_names[i] for i in top_indices if text_array[i] > 0] if not top_features: return "No significant features identified for this classification." explanation = f"Classification based on key terms: {', '.join(top_features)}" return explanation class LLMClassifier(BaseClassifier): """Classifier using a Large Language Model for more accurate but slower classification""" def __init__(self, client, model="gpt-3.5-turbo"): super().__init__() self.client = client self.model = model def classify( self, texts: List[str], categories: Optional[List[str]] = None ) -> List[Dict[str, Any]]: """Classify texts using an LLM with parallel processing""" if not categories: # First, use LLM to generate appropriate categories categories = self._suggest_categories(texts) # Process texts in parallel with ThreadPoolExecutor(max_workers=10) as executor: # Submit all tasks with their original indices future_to_index = { executor.submit(self._classify_text, text, categories): idx for idx, text in enumerate(texts) } # Initialize results list with None values results = [None] * len(texts) # Collect results as they complete for future in as_completed(future_to_index): original_idx = future_to_index[future] try: result = future.result() results[original_idx] = result except Exception as e: print(f"Error processing text: {str(e)}") results[original_idx] = { "category": categories[0], "confidence": 50, "explanation": f"Error during classification: {str(e)}", } return results def _suggest_categories(self, texts: List[str], sample_size: int = 20) -> List[str]: """Use LLM to suggest appropriate categories for the dataset""" # Take a sample of texts to avoid token limitations if len(texts) > sample_size: sample_texts = random.sample(texts, sample_size) else: sample_texts = texts prompt = CATEGORY_SUGGESTION_PROMPT.format("\n---\n".join(sample_texts)) try: response = self.client.chat.completions.create( model=self.model, messages=[{"role": "user", "content": prompt}], temperature=0.2, max_tokens=100, ) # Parse response to get categories categories_text = response.choices[0].message.content.strip() categories = [cat.strip() for cat in categories_text.split(",")] return categories except Exception as e: # Fallback to default categories on error print(f"Error suggesting categories: {str(e)}") return self._generate_default_categories(texts) def _classify_text(self, text: str, categories: List[str]) -> Dict[str, Any]: """Use LLM to classify a single text""" prompt = TEXT_CLASSIFICATION_PROMPT.format( categories=", ".join(categories), text=text ) try: response = self.client.chat.completions.create( model=self.model, messages=[{"role": "user", "content": prompt}], temperature=0, max_tokens=200, ) # Parse JSON response response_text = response.choices[0].message.content.strip() result = json.loads(response_text) # Ensure all required fields are present if not all(k in result for k in ["category", "confidence", "explanation"]): raise ValueError("Missing required fields in LLM response") # Validate category is in the list if result["category"] not in categories: result["category"] = categories[ 0 ] # Default to first category if invalid # Validate confidence is a number between 0 and 100 try: result["confidence"] = float(result["confidence"]) if not 0 <= result["confidence"] <= 100: result["confidence"] = 50 except: result["confidence"] = 50 return result except json.JSONDecodeError: # Fall back to simple parsing if JSON fails category = categories[0] # Default for cat in categories: if cat.lower() in response_text.lower(): category = cat break return { "category": category, "confidence": 50, "explanation": f"Classification based on language model analysis. (Note: Structured response parsing failed)", }