Newer
Older
navi-1 / tools / gmail.py
"""Gmail tool — send, list, read, reply via IMAP/SMTP and an App Password."""

import asyncio
import imaplib
import re
import smtplib
from email import message_from_bytes
from email.header import decode_header as _decode_hdr, Header as _Header
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

import html2text as _html2text
from navi.config import settings

name = "gmail"
description = (
    "Interact with Gmail via IMAP/SMTP: send emails, list inbox (paginated), "
    "list all unread messages, read a specific email by UID (marks it read), "
    "or reply to an email thread."
)

parameters = {
    "type": "object",
    "properties": {
        "action": {
            "type": "string",
            "enum": ["send", "list", "list_unread", "read", "reply"],
            "description": (
                "'send': send a new email. "
                "'list': list inbox, newest first (use offset for pagination). "
                "'list_unread': list all unread messages. "
                "'read': fetch full message by UID and mark it read. "
                "'reply': reply to a message by UID."
            ),
        },
        "to": {
            "type": "string",
            "description": "Recipient address. Required for 'send'.",
        },
        "subject": {
            "type": "string",
            "description": "Subject line. Required for 'send'.",
        },
        "body": {
            "type": "string",
            "description": "Email body (HTML). Required for 'send' and 'reply'.",
        },
        "uid": {
            "type": "string",
            "description": "IMAP message UID from list output. Required for 'read' and 'reply'.",
        },
        "max_results": {
            "type": "integer",
            "description": "Max messages to return for 'list' (default 10, max 50).",
        },
        "offset": {
            "type": "integer",
            "description": "Number of messages to skip from newest for 'list' pagination (default 0).",
        },
        "query": {
            "type": "string",
            "description": (
                "IMAP search criteria for 'list', e.g. 'FROM \"someone@example.com\"' "
                "or 'SUBJECT \"invoice\"'. Defaults to ALL."
            ),
        },
    },
    "required": ["action"],
}

_IMAP_HOST = "imap.gmail.com"
_IMAP_PORT = 993
_SMTP_HOST = "smtp.gmail.com"
_SMTP_PORT = 587
_MAX_CHARS = 5000


# ─── Credentials ───────────────────────────────────────────────────────────

def _creds() -> tuple[str, str]:
    addr = settings.gmail_address
    pwd  = settings.gmail_app_password
    if not addr or not pwd:
        raise RuntimeError(
            "Gmail not configured. Add to .env:\n"
            "  GMAIL_ADDRESS=you@gmail.com\n"
            "  GMAIL_APP_PASSWORD=xxxx xxxx xxxx xxxx\n"
            "(Generate an App Password at myaccount.google.com/apppasswords)"
        )
    return addr, pwd


def _imap() -> imaplib.IMAP4_SSL:
    addr, pwd = _creds()
    conn = imaplib.IMAP4_SSL(_IMAP_HOST, _IMAP_PORT)
    conn.login(addr, pwd)
    return conn


def _smtp() -> smtplib.SMTP:
    addr, pwd = _creds()
    conn = smtplib.SMTP(_SMTP_HOST, _SMTP_PORT)
    conn.starttls()
    conn.login(addr, pwd)
    return conn


# ─── Helpers ───────────────────────────────────────────────────────────────

def _decode(value: str | None) -> str:
    """Decode RFC 2047 encoded email header value."""
    if not value:
        return ""
    parts = _decode_hdr(value)
    out = []
    for raw, charset in parts:
        if isinstance(raw, bytes):
            out.append(raw.decode(charset or "utf-8", errors="replace"))
        else:
            out.append(raw)
    return "".join(out)


def _extract_text(msg) -> str:
    """Walk MIME parts; prefer text/plain, fall back to HTML→text."""
    plain = html = None
    for part in msg.walk():
        ct = part.get_content_type()
        if ct == "text/plain" and plain is None:
            raw = part.get_payload(decode=True)
            if raw:
                plain = raw.decode(part.get_content_charset() or "utf-8", errors="replace")
        elif ct == "text/html" and html is None:
            raw = part.get_payload(decode=True)
            if raw:
                html = raw.decode(part.get_content_charset() or "utf-8", errors="replace")
    if plain:
        return plain.strip()
    if html:
        h = _html2text.HTML2Text()
        h.ignore_images = True
        h.body_width = 0
        return h.handle(html).strip()
    return ""


def _fmt_size(n: int) -> str:
    if n >= 1_048_576: return f"{n / 1_048_576:.1f}MB"
    if n >= 1024:      return f"{n / 1024:.1f}KB"
    return f"{n}B"


def _flags_has_unseen(meta: bytes) -> bool:
    m = re.search(rb"FLAGS \(([^)]*)\)", meta)
    if m:
        return b"\\Seen" not in m.group(1).split()
    return False


def _enc_header(value: str) -> "_Header | str":
    """Encode header value as RFC 2047 if it contains non-ASCII characters."""
    try:
        value.encode("ascii")
        return value
    except UnicodeEncodeError:
        return _Header(value, "utf-8")


def _build_mime(
    to: str, subject: str, body_html: str,
    in_reply_to: str = "", references: str = "",
) -> MIMEMultipart:
    h = _html2text.HTML2Text()
    h.ignore_images = True
    h.body_width = 0
    body_plain = h.handle(body_html)
    msg = MIMEMultipart("alternative")
    msg["To"] = _enc_header(to)
    msg["Subject"] = _enc_header(subject)
    if in_reply_to: msg["In-Reply-To"] = in_reply_to
    if references:  msg["References"] = references
    msg.attach(MIMEText(body_plain, "plain", "utf-8"))
    msg.attach(MIMEText(body_html,  "html",  "utf-8"))
    return msg


# ─── Actions ───────────────────────────────────────────────────────────────

def _do_send(to: str, subject: str, body_html: str) -> str:
    if not (to and subject and body_html):
        return "Error: 'send' requires 'to', 'subject', and 'body'."
    addr, _ = _creds()
    msg = _build_mime(to, subject, body_html)
    msg["From"] = addr
    with _smtp() as smtp:
        smtp.sendmail(addr, [to], msg.as_bytes())
    return f"Sent → {to} | {subject}"


def _do_list(max_results: int, offset: int, query: str | None) -> str:
    max_results = min(int(max_results), 50)
    offset = max(int(offset), 0)
    criteria = query if query else "ALL"

    imap = _imap()
    try:
        imap.select("INBOX", readonly=True)
        _, data = imap.uid("SEARCH", None, criteria)
        all_uids = data[0].split()
        all_uids.reverse()  # newest first
        total = len(all_uids)
        page = all_uids[offset : offset + max_results]

        if not page:
            return "No messages."

        lines: list[str] = []
        for uid in page:
            _, fd = imap.uid(
                "FETCH", uid,
                "(FLAGS BODY.PEEK[HEADER.FIELDS (FROM SUBJECT DATE)])",
            )
            if not fd or not isinstance(fd[0], tuple):
                continue
            meta, raw_headers = fd[0]
            parsed = message_from_bytes(raw_headers)
            from_   = _decode(parsed.get("From"))
            subject = _decode(parsed.get("Subject")) or "(no subject)"
            date    = parsed.get("Date", "")
            marker  = "●" if _flags_has_unseen(meta) else "○"
            lines.append(f"{marker} {uid.decode()} | {from_} | {subject} | {date}")

        out = "\n".join(lines)
        next_off = offset + max_results
        if next_off < total:
            out += f"\n\n[{offset + 1}–{min(next_off, total)} of {total} — use offset={next_off} for next page]"
        return out
    finally:
        imap.logout()


def _do_list_unread() -> str:
    imap = _imap()
    try:
        imap.select("INBOX", readonly=True)
        _, data = imap.uid("SEARCH", None, "UNSEEN")
        uids = data[0].split()
        uids.reverse()

        if not uids:
            return "No unread messages."

        lines = [f"Unread: {len(uids)}"]
        for uid in uids:
            _, fd = imap.uid(
                "FETCH", uid,
                "(BODY.PEEK[HEADER.FIELDS (FROM SUBJECT DATE)])",
            )
            if not fd or not isinstance(fd[0], tuple):
                continue
            _, raw_headers = fd[0]
            parsed  = message_from_bytes(raw_headers)
            from_   = _decode(parsed.get("From"))
            subject = _decode(parsed.get("Subject")) or "(no subject)"
            date    = parsed.get("Date", "")
            lines.append(f"● {uid.decode()} | {from_} | {subject} | {date}")

        return "\n".join(lines)
    finally:
        imap.logout()


def _do_read(uid: str) -> str:
    if not uid:
        return "Error: 'read' requires 'uid'."

    imap = _imap()
    try:
        imap.select("INBOX")
        _, fd = imap.uid("FETCH", uid.encode(), "(RFC822)")
        if not fd or not isinstance(fd[0], tuple):
            return f"Error: message UID {uid} not found."
        _, raw = fd[0]
        msg = message_from_bytes(raw)

        from_   = _decode(msg.get("From"))
        to_     = _decode(msg.get("To"))
        subject = _decode(msg.get("Subject")) or "(no subject)"
        date    = msg.get("Date", "")

        text = _extract_text(msg)
        tail = ""
        if len(text) > _MAX_CHARS:
            tail = f"\n\n[truncated — {len(text) - _MAX_CHARS} chars omitted]"
            text = text[:_MAX_CHARS]

        attachments = [
            (p.get_filename(), len(p.get_payload(decode=True) or b""))
            for p in msg.walk()
            if p.get_content_disposition() == "attachment" and p.get_filename()
        ]

        imap.uid("STORE", uid.encode(), "+FLAGS", "\\Seen")

        lines = [
            f"From:    {from_}",
            f"To:      {to_}",
            f"Subject: {subject}",
            f"Date:    {date}",
            f"UID:     {uid}",
            "─" * 40,
            text or "(no text content)",
        ]
        if tail:
            lines.append(tail)
        if attachments:
            lines += ["─" * 40, "Attachments:"]
            lines += [f"  • {fn} ({_fmt_size(fs)})" for fn, fs in attachments]

        return "\n".join(lines)
    finally:
        imap.logout()


def _do_reply(uid: str, body_html: str) -> str:
    if not (uid and body_html):
        return "Error: 'reply' requires 'uid' and 'body'."

    imap = _imap()
    try:
        imap.select("INBOX", readonly=True)
        _, fd = imap.uid(
            "FETCH", uid.encode(),
            "(BODY.PEEK[HEADER.FIELDS (FROM SUBJECT MESSAGE-ID REFERENCES)])",
        )
        if not fd or not isinstance(fd[0], tuple):
            return f"Error: message UID {uid} not found."
        _, raw_headers = fd[0]
        orig = message_from_bytes(raw_headers)
    finally:
        imap.logout()

    from_     = _decode(orig.get("From"))
    subject   = _decode(orig.get("Subject")) or ""
    orig_id   = orig.get("Message-ID", "")
    refs      = orig.get("References", "")

    reply_subj = subject if subject.lower().startswith("re:") else f"Re: {subject}"
    ref_chain  = f"{refs} {orig_id}".strip() if refs else orig_id

    addr, _ = _creds()
    msg = _build_mime(from_, reply_subj, body_html, orig_id, ref_chain)
    msg["From"] = addr
    with _smtp() as smtp:
        smtp.sendmail(addr, [from_], msg.as_bytes())

    return f"Reply sent → {from_} | {reply_subj}"


# ─── Entry point ───────────────────────────────────────────────────────────

async def execute(params: dict) -> str:
    action = params.get("action", "")

    def _run() -> str:
        try:
            if action == "send":
                return _do_send(params.get("to", ""), params.get("subject", ""), params.get("body", ""))
            if action == "list":
                return _do_list(params.get("max_results", 10), params.get("offset", 0), params.get("query"))
            if action == "list_unread":
                return _do_list_unread()
            if action == "read":
                return _do_read(params.get("uid", ""))
            if action == "reply":
                return _do_reply(params.get("uid", ""), params.get("body", ""))
            return f"Error: unknown action '{action}'."
        except RuntimeError as e:
            return f"Error: {e}"

    return await asyncio.to_thread(_run)