Spaces:
Running
Running
"""Module to search and list emails from Gmail.""" | |
import base64 | |
from datetime import datetime, timedelta | |
import pandas as pd | |
from langchain_core.documents import Document | |
from venv import logger | |
from models.mails import build_gmail_service | |
from models.chroma import vectorstore | |
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] | |
EMAIL_PATTERN = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}' | |
service = build_gmail_service() | |
def search_emails(query): | |
"""Search emails based on a query.""" | |
result = service.users().messages().list(userId='me', q=query).execute() | |
messages = [] | |
if 'messages' in result: | |
messages.extend(result['messages']) | |
while 'nextPageToken' in result: | |
page_token = result['nextPageToken'] | |
result = service.users().messages().list( | |
userId='me', q=query, pageToken=page_token).execute() | |
if 'messages' in result: | |
messages.extend(result['messages']) | |
return messages | |
def list_emails(messages): | |
"""List emails from the search results.""" | |
ids = [] | |
documents = [] | |
for message in messages[:50]: | |
msg = service.users().messages().get(userId='me', id=message['id'], format='full').execute() | |
metadata = {} | |
for header in msg['payload']['headers']: | |
if header['name'] == 'From': | |
metadata['from'] = header['value'] | |
elif header['name'] == 'To': | |
metadata['to'] = header['value'] | |
elif header['name'] == 'Subject': | |
metadata['subject'] = header['value'] | |
elif header['name'] == 'Cc': | |
metadata['cc'] = header['value'] | |
metadata['date'] = datetime.fromtimestamp( | |
int(msg['internalDate']) / 1000).strftime("%d/%m/%Y %H:%M:%S") | |
if 'parts' in msg['payload']: | |
body = ''.join( | |
part['body']['data'] for part in msg['payload']['parts'] if 'data' in part['body'] | |
) | |
body = base64.urlsafe_b64decode(body).decode('utf-8') | |
else: | |
body = base64.urlsafe_b64decode(msg['payload']['body']['data']).decode('utf-8') | |
ids.append(msg['id']) | |
documents.append(Document( | |
page_content=body, | |
metadata=metadata | |
)) | |
return vectorstore.add_documents(documents= documents, ids = ids) | |
def collect(query = (datetime.today() - timedelta(days=21)).strftime('after:%Y/%m/%d')): | |
""" | |
Main function to search and list emails from Gmail. | |
This function builds a Gmail service, constructs a query to search for emails | |
received in the last 14 days, and lists the found emails. If no emails are found, | |
it prints a message indicating so. | |
Returns: | |
None | |
""" | |
emails = search_emails(query) | |
if emails: | |
logger.info("Found %d emails after two_weeks_ago:\n", len(emails)) | |
return f"{len(list_emails(emails))} emails added to the collection." | |
else: | |
logger.info("No emails found after two weeks ago.") | |
def get_documents(): | |
""" | |
Main function to list emails from the database. | |
This function lists all emails stored in the database. | |
Returns: | |
None | |
""" | |
data = vectorstore.get() | |
df = pd.DataFrame({ | |
'ids': data['ids'], | |
'documents': data['documents'], | |
'metadatas': data['metadatas'] | |
}) | |
df = pd.concat( | |
[df.drop('metadatas', axis=1), df['metadatas'].apply(pd.Series)], | |
axis=1).to_csv('collection_data.csv', index=False) | |