import asyncio
import json
import random
import logging
from typing import Optional, Dict, Any
from playwright.async_api import async_playwright, Page
import playwright_stealth
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("InstagramBrowser")
class InstagramBrowser:
def __init__(self, proxy: Optional[Dict[str, str]] = None):
"""
Initialize the InstagramBrowser with optional proxy configuration.
:param proxy: Dict with 'server', 'username', 'password'
"""
self.proxy = proxy
self.browser_context_params = {}
if proxy and "server" in proxy:
self.browser_context_params["proxy"] = {
"server": proxy["server"],
"username": proxy.get("username"),
"password": proxy.get("password"),
}
async def _human_delay(self, min_sec: float = 1.0, max_sec: float = 3.0):
"""Implements a random delay to mimic human behavior."""
delay = random.uniform(min_sec, max_sec)
await asyncio.sleep(delay)
async def _is_login_wall_present(self, page: Page) -> bool:
"""Checks if a login wall or popup is blocking the view."""
# Common Instagram login selectors
login_selectors = [
"text='Log in'",
"div[role='dialog']",
"form[action='/accounts/login/']"
]
for selector in login_selectors:
try:
if await page.locator(selector).is_visible():
return True
except:
continue
return False
async def navigate_to_profile(self, page: Page, username: str):
"""Navigates to the specified Instagram profile."""
url = f"https://www.instagram.com/{username}/"
logger.info(f"Navigating to {url}")
try:
await page.goto(url, wait_until="domcontentloaded", timeout=60000)
await self._human_delay()
except Exception as e:
logger.error(f"Navigation failed: {e}")
raise e
if await self._is_login_wall_present(page):
logger.warning("Login wall detected.")
async def get_profile_data(self, page: Page) -> Dict[str, Any]:
"""Extracts profile information."""
data = {
"full_name": None,
"biography": None,
"follower_count": None,
"following_count": None,
"post_count": None,
}
# Full Name
try:
name_locator = page.locator("xpath=//header//h2")
if await name_locator.count() > 0:
data["full_name"] = await name_locator.first.inner_text()
except Exception as e:
logger.warning(f"Failed to extract full_name: {e}")
# Biography
try:
# Look for elements in header that are likely bio (not name, not counts)
# We look for text nodes that aren't part of the name or counts
bio_elements = page.locator("header div, header span")
count = await bio_elements.count()
for i in range(count):
el = bio_elements.nth(i)
text = await el.inner_text()
clean_text = text.strip()
if clean_text and clean_text not in [data["full_name"], "", "
"]:
# Check if it's not one of the counts
if not any(word in clean_text.lower() for word in ["follower", "following", "post"]):
data["biography"] = clean_text
break
except Exception as e:
logger.warning(f"Failed to extract biography: {e}")
# Counts (Followers, Following, Posts)
try:
count_elements = page.locator("header a, header span")
count_items = await count_elements.count()
for i in range(count_items):
item = count_elements.nth(i)
text = await item.inner_text()
clean_text = " ".join(text.split())
# Look for patterns like '100 followers' or '100 Following'
import re
match = re.search(r'([\d,.]+)\s*(followers|following|posts|post)', clean_text, re.IGNORECASE)
if match:
val = match.group(1).replace(",", "")
label = match.group(2).lower()
if "follower" in label:
data["follower_count"] = val
elif "following" in label:
data["following_count"] = val
elif "post" in label:
data["post_count"] = val
except Exception as e:
logger.warning(f"Failed to extract counts: {n}")
async def get_recent_posts(self, page: Page, limit: int = 5) -> list:
"""Scrapes the recent posts."""
posts = []
try:
# Scroll to trigger lazy loading
for _ in range(2):
await page.evaluate("window.scrollBy(0, 800)")
await asyncio.sleep(1)
# Find all post links that contain '/p/' in their href
post_links = page.locator("a[href*='/p/']")
count = await post_links.count()
for i in range(min(count, limit)):
post_element = post_links.nth(i)
post_url = await post_element.get_attribute("href")
if post_url:
full_url = f"https://www.instagram.com{post_url}"
post_data = {
"post_url": full_url,
"caption": None,
"like_count": None,
"comment_count": None
}
posts.append(post_data)
except Exception as e:
logger.error(f"Error scraping posts: {e}")
return posts
async def run_scrape(self, username: str, limit: int = 5) -> str:
"""Main entry point for the scraping process."""
try:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(**self.browser_context_params)
page = await context.new_page()
# Use the generic stealth function if available, or try to apply it manually
try:
# playwright_stealth usually provides stealth_async or stealth_sync
# We'll try to find it in the module
if hasattr(playwright_stealth, 'stealth_async'):
await playwright_stealth.stealth_async(page)
elif hasattr(playwright_stealth, 'stealth_sync'):
# stealth_sync works on the page object too in some versions
await playwright_stealth.stealth_sync(page)
else:
# Fallback: if we can't use stealth, we proceed without it
logger.warning("Stealth module failed to provide stealth_async. Proceeding without stealth.")
except Exception as stealth_err:
logger.warning(f"Stealth application failed: {stealth_err}")
await self.navigate_to_profile(page, username)
profile_data = await self.get_profile_data(page)
recent_posts = await self.get_recent_posts(page, limit=limit)
result = {
"username": username,
"profile": profile_data,
"recent_posts": recent_posts,
"status": "success"
}
await browser.close()
return json.dumps(result, indent=2)
except Exception as e:
error_msg = {"username": username, "status": "error", "message": str(e)}
return json.dumps(error_msg, indent=2)
name = "instagram_engine"
description = "Core engine for Instagram browser automation."
parameters = {
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["scrape"]},
"username": {"type": "string"},
"proxy": {"type": "object", "description": "Proxy config"},
"limit": {"type": "integer", "description": "Number of posts to scrape"}
},
"required": ["action", "username"],
}
async def execute(params: dict) -> str:
action = params.get("action")
username = params.get("username")
proxy = params.get("proxy")
limit = params.get("limit", 5)
if action != "scrape":
return json.dumps({"error": "Unsupported action"}, indent=2)
if not username:
return json.dumps({"error": "Username is required"}, indent=2)
engine = InstagramBrowser(proxy=proxy)
return await engine.run_scrape(username, limit=limit)