-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathquickstart.py
More file actions
114 lines (99 loc) · 3.79 KB
/
quickstart.py
File metadata and controls
114 lines (99 loc) · 3.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import emailcache
from email import message
import os.path
import base64
import time
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
# If modifying these scopes, delete the file token.json.
SCOPES = ["https://www.googleapis.com/auth/gmail.readonly"]
def decode_body(payload):
if 'parts' in payload:
for part in payload['parts']:
if part['mimeType'] in ['text/plain', 'text/html']:
data = part['body'].get('data')
if data:
return base64.urlsafe_b64decode(data).decode('utf-8')
else:
# Recurse if nested parts exist
content = decode_body(part)
if content:
return content
else:
data = payload['body'].get('data')
if data:
return base64.urlsafe_b64decode(data).decode('utf-8')
return None
def clean_body(body : str) -> str:
return " ".join(body.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ').split())
def get_emails():
try:
last_refreshed = emailcache.get_last_refreshed()
except:
last_refreshed = -1
print(last_refreshed)
print(int(time.time()) - 3600 - last_refreshed)
if last_refreshed == -1 or last_refreshed < int(time.time()) - 3600:
get_new_emails()
print("Refreshed emails.")
email_list = []
emails = emailcache.get_all_emails()
for email in emails:
email_list.append({"id" : email[0], "Sender" : email[1], "Subject" : email[2], "Body" : clean_body(email[3][:50])})
return email_list
def get_new_emails():
"""Shows basic usage of the Gmail API.
Lists the user's Gmail labels.
"""
creds = None
# The file token.json stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
if os.path.exists("token.json"):
creds = Credentials.from_authorized_user_file("token.json", SCOPES)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
"C:\\Users\\wumar\\Desktop\\Personal Codes\\SimpleMCPServer\\credentials.json", SCOPES
)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open("token.json", "w") as token:
token.write(creds.to_json())
try:
# Call the Gmail API
service = build("gmail", "v1", credentials=creds)
results = service.users().messages().list(userId="me", includeSpamTrash=False, labelIds=["INBOX"], q="newer_than:1d").execute()
messages = results.get("messages", [])
if not messages:
print("No messages found.")
return
print("Messages:")
emails = []
for message in messages:
msg = service.users().messages().get(userId="me", id=message["id"]).execute()
for header in msg["payload"]["headers"]:
if header["name"] == "From":
sender = header["value"]
if header["name"] == "Subject":
subject = header["value"]
email = (msg["id"], sender, subject, decode_body(msg["payload"]), msg["internalDate"])
emails.append(email)
emailcache.add_to_cache(emails)
emailcache.prune_cache()
except HttpError as error:
# TODO(developer) - Handle errors from gmail API.
print(f"An error occurred: {error}")
def get_email_details(email_id : str) -> dict:
email = emailcache.get_by_id(email_id)
return {"Sender" : email[1], "Subject" : email[2], "Body" : clean_body(email[3])}
def main():
get_emails()
if __name__ == "__main__":
main()