Newer
Older
navi-1 / tools / instagram_engine.py
import asyncio
import json
import random
import logging
from typing import Optional, Dict, Any

from playwright.async_api import async_playwright, Page
import playwright_stealth

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("InstagramBrowser")

class InstagramBrowser:
    def __init__(self, proxy: Optional[Dict[str, str]] = None):
        """
        Initialize the InstagramBrowser with optional proxy configuration.
        :param proxy: Dict with 'server', 'username', 'password'
        """
        self.proxy = proxy
        self.browser_context_params = {}
        if proxy and "server" in proxy:
            self.browser_context_params["proxy"] = {
                "server": proxy["server"],
                "username": proxy.get("username"),
                "password": proxy.get("password"),
            }

    async def _human_delay(self, min_sec: float = 1.0, max_sec: float = 3.0):
        """Implements a random delay to mimic human behavior."""
        delay = random.uniform(min_sec, max_sec)
        await asyncio.sleep(delay)

    async def _is_login_wall_present(self, page: Page) -> bool:
        """Checks if a login wall or popup is blocking the view."""
        # Common Instagram login selectors
        login_selectors = [
            "text='Log in'",
            "div[role='dialog']",
            "form[action='/accounts/login/']"
        ]
        for selector in login_selectors:
            try:
                if await page.locator(selector).is_visible():
                    return True
            except:
                continue
        return False

    async def navigate_to_profile(self, page: Page, username: str):
        """Navigates to the specified Instagram profile."""
        url = f"https://www.instagram.com/{username}/"
        logger.info(f"Navigating to {url}")
        try:
            await page.goto(url, wait_until="domcontentloaded", timeout=60000)
            await self._human_delay()
        except Exception as e:
            logger.error(f"Navigation failed: {e}")
            raise e
        
        if await self._is_login_wall_present(page):
            logger.warning("Login wall detected.")

    async def get_profile_data(self, page: Page) -> Dict[str, Any]:
        """Extracts profile information."""
        data = {
            "full_name": None,
            "biography": None,
            "follower_count": None,
            "following_count": None,
            "post_count": None,
        }

        # Full Name
            try:
                name_locator = page.locator("xpath=//header//h2")
                if await name_locator.count() > 0:
                    data["full_name"] = await name_locator.first.inner_text()
            except Exception as e:
                logger.warning(f"Failed to extract full_name: {e}")

            # Biography
            try:
                # Look for elements in header that are likely bio (not name, not counts)
                # We look for text nodes that aren't part of the name or counts
                bio_elements = page.locator("header div, header span")
                count = await bio_elements.count()
                for i in range(count):
                    el = bio_elements.nth(i)
                    text = await el.inner_text()
                    clean_text = text.strip()
                    if clean_text and clean_text not in [data["full_name"], "", "
"]:
                        # Check if it's not one of the counts
                        if not any(word in clean_text.lower() for word in ["follower", "following", "post"]):
                            data["biography"] = clean_text
                            break
            except Exception as e:
                logger.warning(f"Failed to extract biography: {e}")

            # Counts (Followers, Following, Posts)
            try:
                count_elements = page.locator("header a, header span")
                count_items = await count_elements.count()
                for i in range(count_items):
                    item = count_elements.nth(i)
                    text = await item.inner_text()
                    clean_text = " ".join(text.split())
                    
                    # Look for patterns like '100 followers' or '100 Following'
                    import re
                    match = re.search(r'([\d,.]+)\s*(followers|following|posts|post)', clean_text, re.IGNORECASE)
                    if match:
                        val = match.group(1).replace(",", "")
                        label = match.group(2).lower()
                        if "follower" in label:
                            data["follower_count"] = val
                        elif "following" in label:
                            data["following_count"] = val
                        elif "post" in label:
                            data["post_count"] = val
            except Exception as e:
                logger.warning(f"Failed to extract counts: {n}")

    async def get_recent_posts(self, page: Page, limit: int = 5) -> list:
        """Scrapes the recent posts."""
        posts = []
        try:
            # Scroll to trigger lazy loading
            for _ in range(2):
                await page.evaluate("window.scrollBy(0, 800)")
                await asyncio.sleep(1)

            # Find all post links that contain '/p/' in their href
            post_links = page.locator("a[href*='/p/']")
            count = await post_links.count()
            
            for i in range(min(count, limit)):
                post_element = post_links.nth(i)
                post_url = await post_element.get_attribute("href")
                if post_url:
                    full_url = f"https://www.instagram.com{post_url}"
                    
                    post_data = {
                        "post_url": full_url,
                        "caption": None,
                        "like_count": None,
                        "comment_count": None
                    }
                    posts.append(post_data)

        except Exception as e:
            logger.error(f"Error scraping posts: {e}")
            
        return posts

    async def run_scrape(self, username: str, limit: int = 5) -> str:
        """Main entry point for the scraping process."""
        try:
            async with async_playwright() as p:
                browser = await p.chromium.launch(headless=True)
                context = await browser.new_context(**self.browser_context_params)
                page = await context.new_page()
                
                # Use the generic stealth function if available, or try to apply it manually
                try:
                    # playwright_stealth usually provides stealth_async or stealth_sync
                    # We'll try to find it in the module
                    if hasattr(playwright_stealth, 'stealth_async'):
                        await playwright_stealth.stealth_async(page)
                    elif hasattr(playwright_stealth, 'stealth_sync'):
                        # stealth_sync works on the page object too in some versions
                        await playwright_stealth.stealth_sync(page)
                    else:
                        # Fallback: if we can't use stealth, we proceed without it
                        logger.warning("Stealth module failed to provide stealth_async. Proceeding without stealth.")
                except Exception as stealth_err:
                    logger.warning(f"Stealth application failed: {stealth_err}")

                await self.navigate_to_profile(page, username)
                
                profile_data = await self.get_profile_data(page)
                recent_posts = await self.get_recent_posts(page, limit=limit)

                result = {
                    "username": username,
                    "profile": profile_data,
                    "recent_posts": recent_posts,
                    "status": "success"
                }

                await browser.close()
                return json.dumps(result, indent=2)

        except Exception as e:
            error_msg = {"username": username, "status": "error", "message": str(e)}
            return json.dumps(error_msg, indent=2)

name = "instagram_engine"
description = "Core engine for Instagram browser automation."
parameters = {
    "type": "object",
    "properties": {
        "action": {"type": "string", "enum": ["scrape"]},
        "username": {"type": "string"},
        "proxy": {"type": "object", "description": "Proxy config"},
        "limit": {"type": "integer", "description": "Number of posts to scrape"}
    },
    "required": ["action", "username"],
}

async def execute(params: dict) -> str:
    action = params.get("action")
    username = params.get("username")
    proxy = params.get("proxy")
    limit = params.get("limit", 5)

    if action != "scrape":
        return json.dumps({"error": "Unsupported action"}, indent=2)
    
    if not username:
        return json.dumps({"error": "Username is required"}, indent=2)

    engine = InstagramBrowser(proxy=proxy)
    return await engine.run_scrape(username, limit=limit)