File size: 9,936 Bytes
1bc76b5
 
 
 
 
 
 
ca09c52
 
a241f5a
1bc76b5
6f39808
1bc76b5
 
6f39808
1bc76b5
 
6f39808
1bc76b5
 
 
6f39808
1bc76b5
 
 
6f39808
1bc76b5
 
 
 
6f39808
1bc76b5
 
 
6f39808
1bc76b5
 
 
6f39808
1bc76b5
 
 
 
 
 
 
 
 
 
6f39808
1bc76b5
 
 
6f39808
1bc76b5
 
 
 
 
6f39808
1bc76b5
 
 
 
 
6f39808
1bc76b5
 
 
 
 
 
 
6f39808
1bc76b5
 
 
 
6f39808
1bc76b5
 
6f39808
1bc76b5
 
 
 
6f39808
1bc76b5
 
6f39808
1bc76b5
 
6f39808
 
 
 
 
 
 
 
 
1bc76b5
6f39808
1bc76b5
 
6f39808
 
 
 
 
 
1bc76b5
 
 
 
6f39808
1bc76b5
 
 
6f39808
1bc76b5
 
6f39808
1bc76b5
 
 
6f39808
1bc76b5
 
 
 
6f39808
1bc76b5
 
 
6f39808
1bc76b5
 
6f39808
1bc76b5
 
6f39808
1bc76b5
 
 
 
 
 
6f39808
1bc76b5
 
 
 
6f39808
 
 
 
ca09c52
1bc76b5
 
 
6f39808
ca09c52
 
dc38c9a
 
6f39808
dc38c9a
ca09c52
6f39808
dc38c9a
 
6f39808
ca09c52
dc38c9a
 
ca09c52
 
dc38c9a
ca09c52
 
dc38c9a
ca09c52
 
6f39808
dc38c9a
6f39808
1bc76b5
6f39808
ca09c52
1bc76b5
 
 
 
 
 
6f39808
a241f5a
6f39808
1bc76b5
 
 
 
 
6f39808
1bc76b5
6f39808
1bc76b5
 
 
6f39808
1bc76b5
 
 
 
 
6f39808
ca09c52
1bc76b5
a241f5a
6f39808
a241f5a
6f39808
1bc76b5
 
 
 
 
6f39808
1bc76b5
6f39808
1bc76b5
 
6f39808
1bc76b5
 
 
 
6f39808
1bc76b5
 
6f39808
 
 
 
1bc76b5
 
 
 
 
 
 
6f39808
1bc76b5
 
 
 
 
 
 
 
6f39808
1bc76b5
 
 
6f39808
1bc76b5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
import numpy as np
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans
from sklearn.metrics.pairwise import cosine_similarity
import random
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Any, Optional
from prompts import CATEGORY_SUGGESTION_PROMPT, TEXT_CLASSIFICATION_PROMPT


class BaseClassifier:
    """Base class for text classifiers"""

    def __init__(self):
        pass

    def classify(self, texts, categories=None):
        """
        Classify a list of texts into categories

        Args:
            texts (list): List of text strings to classify
            categories (list, optional): List of category names. If None, categories will be auto-detected

        Returns:
            list: List of classification results with categories, confidence scores, and explanations
        """
        raise NotImplementedError("Subclasses must implement this method")

    def _generate_default_categories(self, texts, num_clusters=5):
        """
        Generate default categories based on text clustering

        Args:
            texts (list): List of text strings
            num_clusters (int): Number of clusters to generate

        Returns:
            list: List of category names
        """
        # Simple implementation - in real system this would be more sophisticated
        default_categories = [f"Category {i+1}" for i in range(num_clusters)]
        return default_categories


class TFIDFClassifier(BaseClassifier):
    """Classifier using TF-IDF and clustering for fast classification"""

    def __init__(self):
        super().__init__()
        self.vectorizer = TfidfVectorizer(
            max_features=1000, stop_words="english", ngram_range=(1, 2)
        )
        self.model = None
        self.feature_names = None
        self.categories = None
        self.centroids = None

    def classify(self, texts, categories=None):
        """Classify texts using TF-IDF and clustering"""
        # Vectorize the texts
        X = self.vectorizer.fit_transform(texts)
        self.feature_names = self.vectorizer.get_feature_names_out()

        # Auto-detect categories if not provided
        if not categories:
            num_clusters = min(5, len(texts))  # Don't create more clusters than texts
            self.categories = self._generate_default_categories(texts, num_clusters)
        else:
            self.categories = categories
            num_clusters = len(categories)

        # Cluster the texts
        self.model = KMeans(n_clusters=num_clusters, random_state=42)
        clusters = self.model.fit_predict(X)
        self.centroids = self.model.cluster_centers_

        # Calculate distances to centroids for confidence
        distances = self._calculate_distances(X)

        # Prepare results
        results = []
        for i, text in enumerate(texts):
            cluster_idx = clusters[i]

            # Calculate confidence (inverse of distance, normalized)
            confidence = self._calculate_confidence(distances[i])

            # Create explanation
            explanation = self._generate_explanation(X[i], cluster_idx)

            results.append(
                {
                    "category": self.categories[cluster_idx],
                    "confidence": confidence,
                    "explanation": explanation,
                }
            )

        return results

    def _calculate_distances(self, X):
        """Calculate distances from each point to each centroid"""
        return np.sqrt(
            (
                (X.toarray()[:, np.newaxis, :] - self.centroids[np.newaxis, :, :]) ** 2
            ).sum(axis=2)
        )

    def _calculate_confidence(self, distances):
        """Convert distances to confidence scores (0-100)"""
        min_dist = np.min(distances)
        max_dist = np.max(distances)

        # Normalize and invert (smaller distance = higher confidence)
        if max_dist == min_dist:
            return 70  # Default mid-range confidence when all distances are equal

        normalized_dist = (distances - min_dist) / (max_dist - min_dist)
        min_normalized = np.min(normalized_dist)

        # Invert and scale to 50-100 range (TF-IDF is never 100% confident)
        confidence = 100 - (min_normalized * 50)
        return round(confidence, 1)

    def _generate_explanation(self, text_vector, cluster_idx):
        """Generate an explanation for the classification"""
        # Get the most important features for this cluster
        centroid = self.centroids[cluster_idx]

        # Get indices of top features for this text
        text_array = text_vector.toarray()[0]
        top_indices = text_array.argsort()[-5:][::-1]

        # Get the feature names for these indices
        top_features = [self.feature_names[i] for i in top_indices if text_array[i] > 0]

        if not top_features:
            return "No significant features identified for this classification."

        explanation = f"Classification based on key terms: {', '.join(top_features)}"
        return explanation


class LLMClassifier(BaseClassifier):
    """Classifier using a Large Language Model for more accurate but slower classification"""

    def __init__(self, client, model="gpt-3.5-turbo"):
        super().__init__()
        self.client = client
        self.model = model

    def classify(
        self, texts: List[str], categories: Optional[List[str]] = None
    ) -> List[Dict[str, Any]]:
        """Classify texts using an LLM with parallel processing"""
        if not categories:
            # First, use LLM to generate appropriate categories
            categories = self._suggest_categories(texts)

        # Process texts in parallel
        with ThreadPoolExecutor(max_workers=10) as executor:
            # Submit all tasks with their original indices
            future_to_index = {
                executor.submit(self._classify_text, text, categories): idx
                for idx, text in enumerate(texts)
            }

            # Initialize results list with None values
            results = [None] * len(texts)

            # Collect results as they complete
            for future in as_completed(future_to_index):
                original_idx = future_to_index[future]
                try:
                    result = future.result()
                    results[original_idx] = result
                except Exception as e:
                    print(f"Error processing text: {str(e)}")
                    results[original_idx] = {
                        "category": categories[0],
                        "confidence": 50,
                        "explanation": f"Error during classification: {str(e)}",
                    }

        return results

    def _suggest_categories(self, texts: List[str], sample_size: int = 20) -> List[str]:
        """Use LLM to suggest appropriate categories for the dataset"""
        # Take a sample of texts to avoid token limitations
        if len(texts) > sample_size:
            sample_texts = random.sample(texts, sample_size)
        else:
            sample_texts = texts

        prompt = CATEGORY_SUGGESTION_PROMPT.format("\n---\n".join(sample_texts))

        try:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=[{"role": "user", "content": prompt}],
                temperature=0.2,
                max_tokens=100,
            )

            # Parse response to get categories
            categories_text = response.choices[0].message.content.strip()
            categories = [cat.strip() for cat in categories_text.split(",")]

            return categories
        except Exception as e:
            # Fallback to default categories on error
            print(f"Error suggesting categories: {str(e)}")
            return self._generate_default_categories(texts)

    def _classify_text(self, text: str, categories: List[str]) -> Dict[str, Any]:
        """Use LLM to classify a single text"""
        prompt = TEXT_CLASSIFICATION_PROMPT.format(
            categories=", ".join(categories), text=text
        )

        try:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=[{"role": "user", "content": prompt}],
                temperature=0,
                max_tokens=200,
            )

            # Parse JSON response
            response_text = response.choices[0].message.content.strip()

            result = json.loads(response_text)
            # Ensure all required fields are present
            if not all(k in result for k in ["category", "confidence", "explanation"]):
                raise ValueError("Missing required fields in LLM response")

            # Validate category is in the list
            if result["category"] not in categories:
                result["category"] = categories[
                    0
                ]  # Default to first category if invalid

            # Validate confidence is a number between 0 and 100
            try:
                result["confidence"] = float(result["confidence"])
                if not 0 <= result["confidence"] <= 100:
                    result["confidence"] = 50
            except:
                result["confidence"] = 50

            return result
        except json.JSONDecodeError:
            # Fall back to simple parsing if JSON fails
            category = categories[0]  # Default
            for cat in categories:
                if cat.lower() in response_text.lower():
                    category = cat
                    break

            return {
                "category": category,
                "confidence": 50,
                "explanation": f"Classification based on language model analysis. (Note: Structured response parsing failed)",
            }