File size: 3,480 Bytes
fc223bf
 
 
 
adff8c4
 
 
0895159
 
fc223bf
0895159
fc223bf
 
 
 
 
 
 
93f1414
fc223bf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
adff8c4
4b51872
fc223bf
 
 
 
 
 
 
 
 
 
 
 
 
 
adff8c4
 
 
 
 
 
fc223bf
 
adff8c4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fc223bf
 
 
 
 
adff8c4
0895159
adff8c4
0895159
 
 
 
 
 
 
 
adff8c4
fc223bf
 
 
adff8c4
fc223bf
 
 
 
 
 
adff8c4
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import re
import requests
from uuid import uuid4
import json
from typing import Any, AsyncGenerator, Dict, Generator
import os 
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

class v2:
    def __init__(
        self,
        timeout: int = 100,
        proxies: dict = {},
    ):  
        self.session = requests.Session()
        self.chat_endpoint = os.getenv("v2")
        self.stream_chunk_size = 64
        self.timeout = timeout
        self.last_response = {}
        self.headers = {
            "accept": "*/*",
            "accept-encoding": "gzip, deflate, br, zstd",
            "accept-language": "en-US,en;q=0.9,en-IN;q=0.8",
            "content-type": "application/json",
        }

        self.session.headers.update(self.headers)
        self.session.proxies = proxies

    def ask(
        self,
        prompt: str,
        stream: bool = False,
        raw: bool = False,
    ) -> Generator[Dict[str, Any], None, None]:
        conversation_prompt = prompt

        self.session.headers.update(self.headers)
        payload = {
            "query": conversation_prompt,
            "search_uuid": uuid4().hex,
            "lang": "",
            "agent_lang": "en",
            "search_options": {
                "langcode": "en-US"
            },
            "search_video": True,
            "contexts_from": "google"
        }

        response = self.session.post(
            self.chat_endpoint, json=payload, stream=True, timeout=self.timeout
        )
        if not response.ok:
            raise Exception(
                f"Failed to generate response - ({response.status_code}, {response.reason}) - {response.text}"
            )

        streaming_text = ""
        for line in response.iter_lines(decode_unicode=True):
            if line.startswith('data:'):
                try:
                    data = json.loads(line[5:].strip())
                    if data['type'] == 'answer' and 'text' in data['data']:
                        new_text = data['data']['text']
                        if len(new_text) > len(streaming_text):
                            delta = new_text[len(streaming_text):]
                            streaming_text = new_text
                            resp = dict(text=delta)
                            self.last_response.update(dict(text=streaming_text))
                            yield line if raw else resp
                except json.JSONDecodeError:
                    pass

    def chat(
        self,
        prompt: str,
        stream: bool = False,
    ) -> Generator[str, None, None]:
        buffer = ""
        for response in self.ask(prompt, True):
            text = self.get_message(response)
            buffer += text
            lines = buffer.split('\n')
            for line in lines[:-1]:
                yield self.format_text(line) + '\n\n'
            buffer = lines[-1]
        if buffer:
            yield self.format_text(buffer) + '\n\n'
        yield "[DONE]"

    def get_message(self, response: dict) -> str:
        assert isinstance(response, dict), "Response should be of dict data-type only"
        
        if "text" in response:
            text = re.sub(r'\[\[\d+\]\]', '', response["text"])
            return text
        else:
            return ""

    def format_text(self, text: str) -> str:
        # Convert *text* to <i>text</i> for italic
        text = re.sub(r'\*(.*?)\*', r'<i>\1</i>', text)
        return text