"""Gmail tool — send, list, read, reply via IMAP/SMTP and an App Password."""
import asyncio
import imaplib
import re
import smtplib
from email import message_from_bytes
from email.header import decode_header as _decode_hdr, Header as _Header
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import html2text as _html2text
from navi.config import settings
name = "gmail"
description = (
"Interact with Gmail via IMAP/SMTP: send emails, list inbox (paginated), "
"list all unread messages, read a specific email by UID (marks it read), "
"or reply to an email thread."
)
parameters = {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["send", "list", "list_unread", "read", "reply"],
"description": (
"'send': send a new email. "
"'list': list inbox, newest first (use offset for pagination). "
"'list_unread': list all unread messages. "
"'read': fetch full message by UID and mark it read. "
"'reply': reply to a message by UID."
),
},
"to": {
"type": "string",
"description": "Recipient address. Required for 'send'.",
},
"subject": {
"type": "string",
"description": "Subject line. Required for 'send'.",
},
"body": {
"type": "string",
"description": "Email body (HTML). Required for 'send' and 'reply'.",
},
"uid": {
"type": "string",
"description": "IMAP message UID from list output. Required for 'read' and 'reply'.",
},
"max_results": {
"type": "integer",
"description": "Max messages to return for 'list' (default 10, max 50).",
},
"offset": {
"type": "integer",
"description": "Number of messages to skip from newest for 'list' pagination (default 0).",
},
"query": {
"type": "string",
"description": (
"IMAP search criteria for 'list', e.g. 'FROM \"someone@example.com\"' "
"or 'SUBJECT \"invoice\"'. Defaults to ALL."
),
},
},
"required": ["action"],
}
_IMAP_HOST = "imap.gmail.com"
_IMAP_PORT = 993
_SMTP_HOST = "smtp.gmail.com"
_SMTP_PORT = 587
_MAX_CHARS = 5000
# ─── Credentials ───────────────────────────────────────────────────────────
def _creds() -> tuple[str, str]:
addr = settings.gmail_address
pwd = settings.gmail_app_password
if not addr or not pwd:
raise RuntimeError(
"Gmail not configured. Add to .env:\n"
" GMAIL_ADDRESS=you@gmail.com\n"
" GMAIL_APP_PASSWORD=xxxx xxxx xxxx xxxx\n"
"(Generate an App Password at myaccount.google.com/apppasswords)"
)
return addr, pwd
def _imap() -> imaplib.IMAP4_SSL:
addr, pwd = _creds()
conn = imaplib.IMAP4_SSL(_IMAP_HOST, _IMAP_PORT)
conn.login(addr, pwd)
return conn
def _smtp() -> smtplib.SMTP:
addr, pwd = _creds()
conn = smtplib.SMTP(_SMTP_HOST, _SMTP_PORT)
conn.starttls()
conn.login(addr, pwd)
return conn
# ─── Helpers ───────────────────────────────────────────────────────────────
def _decode(value: str | None) -> str:
"""Decode RFC 2047 encoded email header value."""
if not value:
return ""
parts = _decode_hdr(value)
out = []
for raw, charset in parts:
if isinstance(raw, bytes):
out.append(raw.decode(charset or "utf-8", errors="replace"))
else:
out.append(raw)
return "".join(out)
def _extract_text(msg) -> str:
"""Walk MIME parts; prefer text/plain, fall back to HTML→text."""
plain = html = None
for part in msg.walk():
ct = part.get_content_type()
if ct == "text/plain" and plain is None:
raw = part.get_payload(decode=True)
if raw:
plain = raw.decode(part.get_content_charset() or "utf-8", errors="replace")
elif ct == "text/html" and html is None:
raw = part.get_payload(decode=True)
if raw:
html = raw.decode(part.get_content_charset() or "utf-8", errors="replace")
if plain:
return plain.strip()
if html:
h = _html2text.HTML2Text()
h.ignore_images = True
h.body_width = 0
return h.handle(html).strip()
return ""
def _fmt_size(n: int) -> str:
if n >= 1_048_576: return f"{n / 1_048_576:.1f}MB"
if n >= 1024: return f"{n / 1024:.1f}KB"
return f"{n}B"
def _flags_has_unseen(meta: bytes) -> bool:
m = re.search(rb"FLAGS \(([^)]*)\)", meta)
if m:
return b"\\Seen" not in m.group(1).split()
return False
def _enc_header(value: str) -> "_Header | str":
"""Encode header value as RFC 2047 if it contains non-ASCII characters."""
try:
value.encode("ascii")
return value
except UnicodeEncodeError:
return _Header(value, "utf-8")
def _build_mime(
to: str, subject: str, body_html: str,
in_reply_to: str = "", references: str = "",
) -> MIMEMultipart:
h = _html2text.HTML2Text()
h.ignore_images = True
h.body_width = 0
body_plain = h.handle(body_html)
msg = MIMEMultipart("alternative")
msg["To"] = _enc_header(to)
msg["Subject"] = _enc_header(subject)
if in_reply_to: msg["In-Reply-To"] = in_reply_to
if references: msg["References"] = references
msg.attach(MIMEText(body_plain, "plain", "utf-8"))
msg.attach(MIMEText(body_html, "html", "utf-8"))
return msg
# ─── Actions ───────────────────────────────────────────────────────────────
def _do_send(to: str, subject: str, body_html: str) -> str:
if not (to and subject and body_html):
return "Error: 'send' requires 'to', 'subject', and 'body'."
addr, _ = _creds()
msg = _build_mime(to, subject, body_html)
msg["From"] = addr
with _smtp() as smtp:
smtp.sendmail(addr, [to], msg.as_bytes())
return f"Sent → {to} | {subject}"
def _do_list(max_results: int, offset: int, query: str | None) -> str:
max_results = min(int(max_results), 50)
offset = max(int(offset), 0)
criteria = query if query else "ALL"
imap = _imap()
try:
imap.select("INBOX", readonly=True)
_, data = imap.uid("SEARCH", None, criteria)
all_uids = data[0].split()
all_uids.reverse() # newest first
total = len(all_uids)
page = all_uids[offset : offset + max_results]
if not page:
return "No messages."
lines: list[str] = []
for uid in page:
_, fd = imap.uid(
"FETCH", uid,
"(FLAGS BODY.PEEK[HEADER.FIELDS (FROM SUBJECT DATE)])",
)
if not fd or not isinstance(fd[0], tuple):
continue
meta, raw_headers = fd[0]
parsed = message_from_bytes(raw_headers)
from_ = _decode(parsed.get("From"))
subject = _decode(parsed.get("Subject")) or "(no subject)"
date = parsed.get("Date", "")
marker = "●" if _flags_has_unseen(meta) else "○"
lines.append(f"{marker} {uid.decode()} | {from_} | {subject} | {date}")
out = "\n".join(lines)
next_off = offset + max_results
if next_off < total:
out += f"\n\n[{offset + 1}–{min(next_off, total)} of {total} — use offset={next_off} for next page]"
return out
finally:
imap.logout()
def _do_list_unread() -> str:
imap = _imap()
try:
imap.select("INBOX", readonly=True)
_, data = imap.uid("SEARCH", None, "UNSEEN")
uids = data[0].split()
uids.reverse()
if not uids:
return "No unread messages."
lines = [f"Unread: {len(uids)}"]
for uid in uids:
_, fd = imap.uid(
"FETCH", uid,
"(BODY.PEEK[HEADER.FIELDS (FROM SUBJECT DATE)])",
)
if not fd or not isinstance(fd[0], tuple):
continue
_, raw_headers = fd[0]
parsed = message_from_bytes(raw_headers)
from_ = _decode(parsed.get("From"))
subject = _decode(parsed.get("Subject")) or "(no subject)"
date = parsed.get("Date", "")
lines.append(f"● {uid.decode()} | {from_} | {subject} | {date}")
return "\n".join(lines)
finally:
imap.logout()
def _do_read(uid: str) -> str:
if not uid:
return "Error: 'read' requires 'uid'."
imap = _imap()
try:
imap.select("INBOX")
_, fd = imap.uid("FETCH", uid.encode(), "(RFC822)")
if not fd or not isinstance(fd[0], tuple):
return f"Error: message UID {uid} not found."
_, raw = fd[0]
msg = message_from_bytes(raw)
from_ = _decode(msg.get("From"))
to_ = _decode(msg.get("To"))
subject = _decode(msg.get("Subject")) or "(no subject)"
date = msg.get("Date", "")
text = _extract_text(msg)
tail = ""
if len(text) > _MAX_CHARS:
tail = f"\n\n[truncated — {len(text) - _MAX_CHARS} chars omitted]"
text = text[:_MAX_CHARS]
attachments = [
(p.get_filename(), len(p.get_payload(decode=True) or b""))
for p in msg.walk()
if p.get_content_disposition() == "attachment" and p.get_filename()
]
imap.uid("STORE", uid.encode(), "+FLAGS", "\\Seen")
lines = [
f"From: {from_}",
f"To: {to_}",
f"Subject: {subject}",
f"Date: {date}",
f"UID: {uid}",
"─" * 40,
text or "(no text content)",
]
if tail:
lines.append(tail)
if attachments:
lines += ["─" * 40, "Attachments:"]
lines += [f" • {fn} ({_fmt_size(fs)})" for fn, fs in attachments]
return "\n".join(lines)
finally:
imap.logout()
def _do_reply(uid: str, body_html: str) -> str:
if not (uid and body_html):
return "Error: 'reply' requires 'uid' and 'body'."
imap = _imap()
try:
imap.select("INBOX", readonly=True)
_, fd = imap.uid(
"FETCH", uid.encode(),
"(BODY.PEEK[HEADER.FIELDS (FROM SUBJECT MESSAGE-ID REFERENCES)])",
)
if not fd or not isinstance(fd[0], tuple):
return f"Error: message UID {uid} not found."
_, raw_headers = fd[0]
orig = message_from_bytes(raw_headers)
finally:
imap.logout()
from_ = _decode(orig.get("From"))
subject = _decode(orig.get("Subject")) or ""
orig_id = orig.get("Message-ID", "")
refs = orig.get("References", "")
reply_subj = subject if subject.lower().startswith("re:") else f"Re: {subject}"
ref_chain = f"{refs} {orig_id}".strip() if refs else orig_id
addr, _ = _creds()
msg = _build_mime(from_, reply_subj, body_html, orig_id, ref_chain)
msg["From"] = addr
with _smtp() as smtp:
smtp.sendmail(addr, [from_], msg.as_bytes())
return f"Reply sent → {from_} | {reply_subj}"
# ─── Entry point ───────────────────────────────────────────────────────────
async def execute(params: dict) -> str:
action = params.get("action", "")
def _run() -> str:
try:
if action == "send":
return _do_send(params.get("to", ""), params.get("subject", ""), params.get("body", ""))
if action == "list":
return _do_list(params.get("max_results", 10), params.get("offset", 0), params.get("query"))
if action == "list_unread":
return _do_list_unread()
if action == "read":
return _do_read(params.get("uid", ""))
if action == "reply":
return _do_reply(params.get("uid", ""), params.get("body", ""))
return f"Error: unknown action '{action}'."
except RuntimeError as e:
return f"Error: {e}"
return await asyncio.to_thread(_run)