diff --git a/tools/gmail.py b/tools/gmail.py index 8998ed0..7574cc1 100644 --- a/tools/gmail.py +++ b/tools/gmail.py @@ -1,21 +1,22 @@ -"""Gmail tool — send, list, read, and reply to emails via Gmail API.""" +"""Gmail tool — send, list, read, reply via IMAP/SMTP and an App Password.""" import asyncio -import base64 +import imaplib +import re +import smtplib +import os +from email import message_from_bytes +from email.header import decode_header as _decode_hdr from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText -from pathlib import Path -from typing import Any import html2text as _html2text -from google.auth.transport.requests import Request -from google.oauth2.credentials import Credentials -from googleapiclient.discovery import build name = "gmail" description = ( - "Interact with Gmail: send emails, list inbox (paginated), list all unread messages, " - "read a specific email by ID (marks it as read), or reply to an email thread." + "Interact with Gmail via IMAP/SMTP: send emails, list inbox (paginated), " + "list all unread messages, read a specific email by UID (marks it read), " + "or reply to an email thread." ) parameters = { @@ -25,309 +26,326 @@ "type": "string", "enum": ["send", "list", "list_unread", "read", "reply"], "description": ( - "Action to perform. " "'send': send a new email. " - "'list': list inbox messages (paginated). " + "'list': list inbox, newest first (use offset for pagination). " "'list_unread': list all unread messages. " - "'read': fetch and display a specific message by ID (marks it read). " - "'reply': reply to a message thread by ID." + "'read': fetch full message by UID and mark it read. " + "'reply': reply to a message by UID." ), }, "to": { "type": "string", - "description": "Recipient email address. Required for 'send'.", + "description": "Recipient address. Required for 'send'.", }, "subject": { "type": "string", - "description": "Email subject line. Required for 'send'.", + "description": "Subject line. Required for 'send'.", }, "body": { "type": "string", "description": "Email body (HTML). Required for 'send' and 'reply'.", }, - "message_id": { + "uid": { "type": "string", - "description": "Gmail message ID (from list output). Required for 'read' and 'reply'.", + "description": "IMAP message UID from list output. Required for 'read' and 'reply'.", }, "max_results": { "type": "integer", "description": "Max messages to return for 'list' (default 10, max 50).", }, - "page_token": { - "type": "string", - "description": "Page token returned by a previous 'list' call for pagination.", + "offset": { + "type": "integer", + "description": "Number of messages to skip from newest for 'list' pagination (default 0).", }, "query": { "type": "string", - "description": "Gmail search query to filter 'list' results (e.g. 'from:example.com subject:invoice').", + "description": ( + "IMAP search criteria for 'list', e.g. 'FROM \"someone@example.com\"' " + "or 'SUBJECT \"invoice\"'. Defaults to ALL." + ), }, }, "required": ["action"], } -_SCOPES = ["https://www.googleapis.com/auth/gmail.modify"] -_TOKEN_PATH = Path(__file__).parent / "gmail_token.json" +_IMAP_HOST = "imap.gmail.com" +_IMAP_PORT = 993 +_SMTP_HOST = "smtp.gmail.com" +_SMTP_PORT = 587 _MAX_CHARS = 5000 -# ─── Auth ────────────────────────────────────────────────────────────────── +# ─── Credentials ─────────────────────────────────────────────────────────── -def _get_service(): - if not _TOKEN_PATH.exists(): +def _creds() -> tuple[str, str]: + addr = os.environ.get("GMAIL_ADDRESS", "") + pwd = os.environ.get("GMAIL_APP_PASSWORD", "") + if not addr or not pwd: raise RuntimeError( - "Gmail not authorized. Steps:\n" - "1. Place gmail_credentials.json in the tools/ directory\n" - "2. Run: python tools/gmail_auth.py\n" - "3. Complete the browser login flow" + "Gmail not configured. Add to .env:\n" + " GMAIL_ADDRESS=you@gmail.com\n" + " GMAIL_APP_PASSWORD=xxxx xxxx xxxx xxxx\n" + "(Generate an App Password at myaccount.google.com/apppasswords)" ) - creds = Credentials.from_authorized_user_file(str(_TOKEN_PATH), _SCOPES) - if creds.expired and creds.refresh_token: - creds.refresh(Request()) - _TOKEN_PATH.write_text(creds.to_json()) - return build("gmail", "v1", credentials=creds, cache_discovery=False) + return addr, pwd -# ─── MIME helpers ────────────────────────────────────────────────────────── - -def _header(msg: dict, name: str) -> str: - for h in msg.get("payload", {}).get("headers", []): - if h["name"].lower() == name.lower(): - return h["value"] - return "" +def _imap() -> imaplib.IMAP4_SSL: + addr, pwd = _creds() + conn = imaplib.IMAP4_SSL(_IMAP_HOST, _IMAP_PORT) + conn.login(addr, pwd) + return conn -def _find_part(payload: dict, mime_type: str) -> str | None: - """Recursively find the first part with the given MIME type; return decoded text.""" - if payload.get("mimeType") == mime_type: - data = payload.get("body", {}).get("data", "") - if data: - return base64.urlsafe_b64decode(data + "==").decode("utf-8", errors="replace") - for part in payload.get("parts", []): - result = _find_part(part, mime_type) - if result is not None: - return result - return None +def _smtp() -> smtplib.SMTP: + addr, pwd = _creds() + conn = smtplib.SMTP(_SMTP_HOST, _SMTP_PORT) + conn.starttls() + conn.login(addr, pwd) + return conn -def _extract_text(payload: dict) -> str: - """Extract readable text. Prefers text/plain; falls back to HTML→text conversion.""" - plain = _find_part(payload, "text/plain") +# ─── Helpers ─────────────────────────────────────────────────────────────── + +def _decode(value: str | None) -> str: + """Decode RFC 2047 encoded email header value.""" + if not value: + return "" + parts = _decode_hdr(value) + out = [] + for raw, charset in parts: + if isinstance(raw, bytes): + out.append(raw.decode(charset or "utf-8", errors="replace")) + else: + out.append(raw) + return "".join(out) + + +def _extract_text(msg) -> str: + """Walk MIME parts; prefer text/plain, fall back to HTML→text.""" + plain = html = None + for part in msg.walk(): + ct = part.get_content_type() + if ct == "text/plain" and plain is None: + raw = part.get_payload(decode=True) + if raw: + plain = raw.decode(part.get_content_charset() or "utf-8", errors="replace") + elif ct == "text/html" and html is None: + raw = part.get_payload(decode=True) + if raw: + html = raw.decode(part.get_content_charset() or "utf-8", errors="replace") if plain: return plain.strip() - html = _find_part(payload, "text/html") if html: h = _html2text.HTML2Text() - h.ignore_links = False h.ignore_images = True h.body_width = 0 return h.handle(html).strip() return "" -def _list_attachments(payload: dict) -> list[tuple[str, int]]: - result: list[tuple[str, int]] = [] - def _scan(part: dict) -> None: - filename = part.get("filename", "") - if filename: - size = part.get("body", {}).get("size", 0) - result.append((filename, size)) - for p in part.get("parts", []): - _scan(p) - _scan(payload) - return result - - def _fmt_size(n: int) -> str: - if n >= 1024 * 1024: - return f"{n / 1024 / 1024:.1f}MB" - if n >= 1024: - return f"{n / 1024:.1f}KB" + if n >= 1_048_576: return f"{n / 1_048_576:.1f}MB" + if n >= 1024: return f"{n / 1024:.1f}KB" return f"{n}B" +def _flags_has_unseen(meta: bytes) -> bool: + m = re.search(rb"FLAGS \(([^)]*)\)", meta) + if m: + return b"\\Seen" not in m.group(1).split() + return False + + def _build_mime( - to: str, - subject: str, - body_html: str, - in_reply_to: str = "", - references: str = "", + to: str, subject: str, body_html: str, + in_reply_to: str = "", references: str = "", ) -> MIMEMultipart: h = _html2text.HTML2Text() - h.ignore_links = False h.ignore_images = True h.body_width = 0 body_plain = h.handle(body_html) - msg = MIMEMultipart("alternative") msg["To"] = to msg["Subject"] = subject - if in_reply_to: - msg["In-Reply-To"] = in_reply_to - if references: - msg["References"] = references + if in_reply_to: msg["In-Reply-To"] = in_reply_to + if references: msg["References"] = references msg.attach(MIMEText(body_plain, "plain", "utf-8")) - msg.attach(MIMEText(body_html, "html", "utf-8")) + msg.attach(MIMEText(body_html, "html", "utf-8")) return msg # ─── Actions ─────────────────────────────────────────────────────────────── -def _do_send(service: Any, to: str, subject: str, body_html: str) -> str: - if not to: - return "Error: 'to' is required." - if not subject: - return "Error: 'subject' is required." - if not body_html: - return "Error: 'body' is required." - +def _do_send(to: str, subject: str, body_html: str) -> str: + if not (to and subject and body_html): + return "Error: 'send' requires 'to', 'subject', and 'body'." + addr, _ = _creds() msg = _build_mime(to, subject, body_html) - raw = base64.urlsafe_b64encode(msg.as_bytes()).decode() - service.users().messages().send(userId="me", body={"raw": raw}).execute() - return f"Sent to {to} | Subject: {subject}" + msg["From"] = addr + with _smtp() as smtp: + smtp.sendmail(addr, [to], msg.as_bytes()) + return f"Sent → {to} | {subject}" -def _do_list( - service: Any, - max_results: int, - page_token: str | None, - query: str | None, -) -> str: +def _do_list(max_results: int, offset: int, query: str | None) -> str: max_results = min(int(max_results), 50) - kwargs: dict[str, Any] = {"userId": "me", "maxResults": max_results} - if page_token: - kwargs["pageToken"] = page_token - if query: - kwargs["q"] = query + offset = max(int(offset), 0) + criteria = query if query else "ALL" - result = service.users().messages().list(**kwargs).execute() - messages = result.get("messages", []) - next_token = result.get("nextPageToken") + imap = _imap() + try: + imap.select("INBOX", readonly=True) + _, data = imap.uid("SEARCH", None, criteria) + all_uids = data[0].split() + all_uids.reverse() # newest first + total = len(all_uids) + page = all_uids[offset : offset + max_results] - if not messages: - return "No messages found." + if not page: + return "No messages." - lines: list[str] = [] - for m in messages: - full = service.users().messages().get( - userId="me", - id=m["id"], - format="metadata", - metadataHeaders=["From", "Subject", "Date"], - ).execute() - from_hdr = _header(full, "From") - subject = _header(full, "Subject") or "(no subject)" - date = _header(full, "Date") - unread = "●" if "UNREAD" in full.get("labelIds", []) else "○" - lines.append(f"{unread} {m['id']} | {from_hdr} | {subject} | {date}") + lines: list[str] = [] + for uid in page: + _, fd = imap.uid( + "FETCH", uid, + "(FLAGS BODY.PEEK[HEADER.FIELDS (FROM SUBJECT DATE)])", + ) + if not fd or not isinstance(fd[0], tuple): + continue + meta, raw_headers = fd[0] + parsed = message_from_bytes(raw_headers) + from_ = _decode(parsed.get("From")) + subject = _decode(parsed.get("Subject")) or "(no subject)" + date = parsed.get("Date", "") + marker = "●" if _flags_has_unseen(meta) else "○" + lines.append(f"{marker} {uid.decode()} | {from_} | {subject} | {date}") - out = "\n".join(lines) - if next_token: - out += f"\n\n[next_page_token: {next_token}]" - return out + out = "\n".join(lines) + next_off = offset + max_results + if next_off < total: + out += f"\n\n[{offset + 1}–{min(next_off, total)} of {total} — use offset={next_off} for next page]" + return out + finally: + imap.logout() -def _do_list_unread(service: Any) -> str: - result = service.users().messages().list( - userId="me", q="is:unread", maxResults=100 - ).execute() - messages = result.get("messages", []) +def _do_list_unread() -> str: + imap = _imap() + try: + imap.select("INBOX", readonly=True) + _, data = imap.uid("SEARCH", None, "UNSEEN") + uids = data[0].split() + uids.reverse() - if not messages: - return "No unread messages." + if not uids: + return "No unread messages." - lines: list[str] = [] - for m in messages: - full = service.users().messages().get( - userId="me", - id=m["id"], - format="metadata", - metadataHeaders=["From", "Subject", "Date"], - ).execute() - from_hdr = _header(full, "From") - subject = _header(full, "Subject") or "(no subject)" - date = _header(full, "Date") - lines.append(f"● {m['id']} | {from_hdr} | {subject} | {date}") + lines = [f"Unread: {len(uids)}"] + for uid in uids: + _, fd = imap.uid( + "FETCH", uid, + "(BODY.PEEK[HEADER.FIELDS (FROM SUBJECT DATE)])", + ) + if not fd or not isinstance(fd[0], tuple): + continue + _, raw_headers = fd[0] + parsed = message_from_bytes(raw_headers) + from_ = _decode(parsed.get("From")) + subject = _decode(parsed.get("Subject")) or "(no subject)" + date = parsed.get("Date", "") + lines.append(f"● {uid.decode()} | {from_} | {subject} | {date}") - header_line = f"Unread: {len(messages)}" + ( - " (showing first 100)" if len(messages) == 100 else "" - ) - return header_line + "\n" + "\n".join(lines) + return "\n".join(lines) + finally: + imap.logout() -def _do_read(service: Any, message_id: str) -> str: - if not message_id: - return "Error: 'message_id' is required." +def _do_read(uid: str) -> str: + if not uid: + return "Error: 'read' requires 'uid'." - msg = service.users().messages().get( - userId="me", id=message_id, format="full" - ).execute() + imap = _imap() + try: + imap.select("INBOX") + _, fd = imap.uid("FETCH", uid.encode(), "(RFC822)") + if not fd or not isinstance(fd[0], tuple): + return f"Error: message UID {uid} not found." + _, raw = fd[0] + msg = message_from_bytes(raw) - from_hdr = _header(msg, "From") - to_hdr = _header(msg, "To") - subject = _header(msg, "Subject") or "(no subject)" - date = _header(msg, "Date") + from_ = _decode(msg.get("From")) + to_ = _decode(msg.get("To")) + subject = _decode(msg.get("Subject")) or "(no subject)" + date = msg.get("Date", "") - text = _extract_text(msg.get("payload", {})) - truncated = "" - if len(text) > _MAX_CHARS: - truncated = f"\n\n[truncated — {len(text) - _MAX_CHARS} chars omitted]" - text = text[:_MAX_CHARS] + text = _extract_text(msg) + tail = "" + if len(text) > _MAX_CHARS: + tail = f"\n\n[truncated — {len(text) - _MAX_CHARS} chars omitted]" + text = text[:_MAX_CHARS] - attachments = _list_attachments(msg.get("payload", {})) + attachments = [ + (p.get_filename(), len(p.get_payload(decode=True) or b"")) + for p in msg.walk() + if p.get_content_disposition() == "attachment" and p.get_filename() + ] - lines = [ - f"From: {from_hdr}", - f"To: {to_hdr}", - f"Subject: {subject}", - f"Date: {date}", - "─" * 40, - text or "(no text content)", - ] - if truncated: - lines.append(truncated) - if attachments: - lines.append("─" * 40) - lines.append("Attachments:") - for fname, fsize in attachments: - lines.append(f" • {fname} ({_fmt_size(fsize)})") + imap.uid("STORE", uid.encode(), "+FLAGS", "\\Seen") - # Mark as read - service.users().messages().modify( - userId="me", id=message_id, body={"removeLabelIds": ["UNREAD"]} - ).execute() + lines = [ + f"From: {from_}", + f"To: {to_}", + f"Subject: {subject}", + f"Date: {date}", + f"UID: {uid}", + "─" * 40, + text or "(no text content)", + ] + if tail: + lines.append(tail) + if attachments: + lines += ["─" * 40, "Attachments:"] + lines += [f" • {fn} ({_fmt_size(fs)})" for fn, fs in attachments] - return "\n".join(lines) + return "\n".join(lines) + finally: + imap.logout() -def _do_reply(service: Any, message_id: str, body_html: str) -> str: - if not message_id: - return "Error: 'message_id' is required." - if not body_html: - return "Error: 'body' is required." +def _do_reply(uid: str, body_html: str) -> str: + if not (uid and body_html): + return "Error: 'reply' requires 'uid' and 'body'." - orig = service.users().messages().get( - userId="me", - id=message_id, - format="metadata", - metadataHeaders=["From", "Subject", "Message-ID", "References"], - ).execute() + imap = _imap() + try: + imap.select("INBOX", readonly=True) + _, fd = imap.uid( + "FETCH", uid.encode(), + "(BODY.PEEK[HEADER.FIELDS (FROM SUBJECT MESSAGE-ID REFERENCES)])", + ) + if not fd or not isinstance(fd[0], tuple): + return f"Error: message UID {uid} not found." + _, raw_headers = fd[0] + orig = message_from_bytes(raw_headers) + finally: + imap.logout() - thread_id = orig.get("threadId", "") - from_hdr = _header(orig, "From") - subject = _header(orig, "Subject") or "" - orig_msg_id = _header(orig, "Message-ID") - references = _header(orig, "References") + from_ = _decode(orig.get("From")) + subject = _decode(orig.get("Subject")) or "" + orig_id = orig.get("Message-ID", "") + refs = orig.get("References", "") - reply_subject = subject if subject.lower().startswith("re:") else f"Re: {subject}" - ref_chain = f"{references} {orig_msg_id}".strip() if references else orig_msg_id + reply_subj = subject if subject.lower().startswith("re:") else f"Re: {subject}" + ref_chain = f"{refs} {orig_id}".strip() if refs else orig_id - msg = _build_mime(from_hdr, reply_subject, body_html, orig_msg_id, ref_chain) - raw = base64.urlsafe_b64encode(msg.as_bytes()).decode() - service.users().messages().send( - userId="me", body={"raw": raw, "threadId": thread_id} - ).execute() + addr, _ = _creds() + msg = _build_mime(from_, reply_subj, body_html, orig_id, ref_chain) + msg["From"] = addr + with _smtp() as smtp: + smtp.sendmail(addr, [from_], msg.as_bytes()) - return f"Reply sent to {from_hdr} | Subject: {reply_subject}" + return f"Reply sent → {from_} | {reply_subj}" # ─── Entry point ─────────────────────────────────────────────────────────── @@ -337,34 +355,18 @@ def _run() -> str: try: - service = _get_service() + if action == "send": + return _do_send(params.get("to", ""), params.get("subject", ""), params.get("body", "")) + if action == "list": + return _do_list(params.get("max_results", 10), params.get("offset", 0), params.get("query")) + if action == "list_unread": + return _do_list_unread() + if action == "read": + return _do_read(params.get("uid", "")) + if action == "reply": + return _do_reply(params.get("uid", ""), params.get("body", "")) + return f"Error: unknown action '{action}'." except RuntimeError as e: return f"Error: {e}" - if action == "send": - return _do_send( - service, - to=params.get("to", ""), - subject=params.get("subject", ""), - body_html=params.get("body", ""), - ) - if action == "list": - return _do_list( - service, - max_results=params.get("max_results", 10), - page_token=params.get("page_token"), - query=params.get("query"), - ) - if action == "list_unread": - return _do_list_unread(service) - if action == "read": - return _do_read(service, message_id=params.get("message_id", "")) - if action == "reply": - return _do_reply( - service, - message_id=params.get("message_id", ""), - body_html=params.get("body", ""), - ) - return f"Error: unknown action '{action}'. Valid: send, list, list_unread, read, reply." - return await asyncio.to_thread(_run) diff --git a/tools/gmail_auth.py b/tools/gmail_auth.py deleted file mode 100644 index 450e9f0..0000000 --- a/tools/gmail_auth.py +++ /dev/null @@ -1,33 +0,0 @@ -"""One-time OAuth2 authorization for the Gmail tool. - -Steps: - 1. Go to https://console.cloud.google.com/ - 2. Create a project → Enable Gmail API - 3. Credentials → Create OAuth 2.0 Client ID (Desktop app) - 4. Download the JSON → save as tools/gmail_credentials.json - 5. Run: python tools/gmail_auth.py - 6. Complete the browser flow → token saved to tools/gmail_token.json -""" - -from pathlib import Path -from google_auth_oauthlib.flow import InstalledAppFlow - -_SCOPES = ["https://www.googleapis.com/auth/gmail.modify"] -_CREDS_PATH = Path(__file__).parent / "gmail_credentials.json" -_TOKEN_PATH = Path(__file__).parent / "gmail_token.json" - - -def main() -> None: - if not _CREDS_PATH.exists(): - print(f"Error: credentials file not found at {_CREDS_PATH}") - print("Download it from Google Cloud Console → APIs & Services → Credentials") - return - - flow = InstalledAppFlow.from_client_secrets_file(str(_CREDS_PATH), _SCOPES) - creds = flow.run_local_server(port=0, open_browser=True) - _TOKEN_PATH.write_text(creds.to_json()) - print(f"Authorization complete. Token saved to {_TOKEN_PATH}") - - -if __name__ == "__main__": - main()