Python web scraping tool works on simple sites but fails on sites with anti-bot protections
18:39 05 Apr 2026

I’m building a lead generation tool in Python that extracts publicly available data from websites.

The scraper works as expected on simpler sites, especially when the data is directly accessible in the HTML. However, I’m running into issues on more complex websites that implement anti-bot measures (e.g., CAPTCHAs, rate limiting, or dynamic content loading).

Current behavior:
- Works on static pages with accessible lists of leads
- Fails or gets blocked on sites with stronger protections
- Sometimes returns incomplete or empty responses

What I’ve tried:
- Using `requests` and `BeautifulSoup`
- Adding headers (User-Agent, etc.)
- Introducing delays between requests

What I’m looking for:
- Best practices for making scraping more reliable and robust
- How to handle dynamically loaded content (e.g., JavaScript-rendered pages)
- General approaches to avoid being blocked

Here is my code:

import json
import logging
import re
import time
from typing import Dict, List, Optional
from urllib.parse import urljoin, urlparse
from urllib.robotparser import RobotFileParser

import pandas as pd
import requests
from bs4 import BeautifulSoup

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
DEFAULT_HEADERS = {"User-Agent": USER_AGENT, "Accept-Language": "en-US,en;q=0.9"}
REQUEST_DELAY_SECONDS = 2
MAX_RETRIES = 3


def is_scraping_allowed(url: str) -> bool:
    parsed = urlparse(url)
    robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
    rp = RobotFileParser()
    rp.set_url(robots_url)
    try:
        rp.read()
        allowed = rp.can_fetch(USER_AGENT, url)
        logging.info("robots.txt check for %s: %s", url, allowed)
        return allowed
    except Exception as exc:
        logging.warning("robots.txt not available or failed (%s): %s", robots_url, exc)
        return True


def fetch_page(url: str, headers: dict = None) -> Optional[str]:
    headers = headers or DEFAULT_HEADERS
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            response = requests.get(url, headers=headers, timeout=15)
            response.raise_for_status()
            time.sleep(REQUEST_DELAY_SECONDS)
            return response.text
        except requests.RequestException as exc:
            logging.warning("Fetch failed on attempt %s for %s: %s", attempt, url, exc)
            time.sleep(2)
    logging.error("Giving up on %s", url)
    return None


def normalize_url(base_url: str, link: str) -> str:
    if not link:
        return ""
    link = link.strip()
    if link.startswith("javascript:"):
        return ""
    return urljoin(base_url, link)


def extract_emails(text: str) -> list[str]:
    pattern = r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+"
    return sorted(set(re.findall(pattern, text)))


def extract_phone_numbers(text: str) -> list[str]:
    pattern = r"(?:\+?\d[\d\s()\-]{6,}\d)"
    raw_numbers = re.findall(pattern, text)
    cleaned = [re.sub(r"[^+\d]", "", phone) for phone in raw_numbers]
    return sorted(set(cleaned))


def extract_social_links(base_url: str, soup: BeautifulSoup) -> Dict[str, str]:
    social = {"linkedin": "", "twitter": "", "facebook": "", "instagram": "", "youtube": ""}
    for anchor in soup.find_all("a", href=True):
        href = anchor["href"].strip()
        href_lower = href.lower()
        if "linkedin.com" in href_lower and not social["linkedin"]:
            social["linkedin"] = normalize_url(base_url, href)
        if "twitter.com" in href_lower or "x.com" in href_lower:
            social["twitter"] = normalize_url(base_url, href)
        if "facebook.com" in href_lower and not social["facebook"]:
            social["facebook"] = normalize_url(base_url, href)
        if "instagram.com" in href_lower and not social["instagram"]:
            social["instagram"] = normalize_url(base_url, href)
        if "youtube.com" in href_lower and not social["youtube"]:
            social["youtube"] = normalize_url(base_url, href)
    return social


def parse_directory_page(html: str, base_url: str, selector_map: dict) -> List[Dict[str, object]]:
    soup = BeautifulSoup(html, "lxml")
    companies = []
    list_items = soup.select(selector_map.get("directory_listing_selector", "div.company-card"))
    logging.info("Found %s candidate company records", len(list_items))

    for item in list_items:
        raw_text = item.get_text(separator=" ", strip=True)
        record = {
            "company_name": "",
            "emails": [],
            "phones": [],
            "website": "",
            "linkedin": "",
            "twitter": "",
            "facebook": "",
            "instagram": "",
            "youtube": "",
            "source_url": base_url,
        }

        name_tag = item.select_one(selector_map.get("company_name_selector", "h2, .company-name"))
        if name_tag:
            record["company_name"] = name_tag.get_text(strip=True)

        email_tags = item.select(selector_map.get("email_selector", "a[href^=mailto:]"))
        for tag in email_tags:
            href = tag.get("href", "")
            if href.startswith("mailto:"):
                record["emails"].append(href.replace("mailto:", "").split("?")[0])
        if not record["emails"]:
            record["emails"] = extract_emails(raw_text)

        phone_tags = item.select(selector_map.get("phone_selector", ".phone, a[href^=tel:]") )
        for tag in phone_tags:
            href = tag.get("href", "")
            if href.startswith("tel:"):
                record["phones"].append(re.sub(r"[^+\d]", "", href.replace("tel:", "")))
        if not record["phones"]:
            record["phones"] = extract_phone_numbers(raw_text)

        website_tag = item.select_one(selector_map.get("website_selector", "a.website, a[href^='http']"))
        if website_tag and website_tag.get("href"):
            record["website"] = normalize_url(base_url, website_tag["href"])

        page_html = None
        detail_url = None
        detail_selector = selector_map.get("detail_link_selector")
        if detail_selector:
            link_tag = item.select_one(detail_selector)
            if link_tag and link_tag.get("href"):
                detail_url = normalize_url(base_url, link_tag["href"])
                page_html = fetch_page(detail_url)

        details_soup = BeautifulSoup(page_html, "lxml") if page_html else item
        social_data = extract_social_links(base_url, details_soup)
        record.update(social_data)

        if detail_selector and page_html:
            detail_text = details_soup.get_text(separator=" ", strip=True)
            if not record["emails"]:
                record["emails"] = extract_emails(detail_text)
            if not record["phones"]:
                record["phones"] = extract_phone_numbers(detail_text)
            if not record["website"]:
                link = details_soup.select_one(selector_map.get("detail_website_selector", "a[href^='http']"))
                if link and link.get("href"):
                    record["website"] = normalize_url(base_url, link["href"])

        record["emails"] = sorted(set(record["emails"]))
        record["phones"] = sorted(set(record["phones"]))
        companies.append(record)
    return companies


def find_next_page(soup: BeautifulSoup, base_url: str, pagination_selector: str) -> Optional[str]:
    if not pagination_selector:
        return None
    next_link = soup.select_one(pagination_selector)
    if next_link and next_link.get("href"):
        return normalize_url(base_url, next_link["href"])
    return None


def deduplicate_records(records: List[Dict[str, object]]) -> List[Dict[str, object]]:
    deduped = {}
    for record in records:
        key = (record.get("company_name", "").lower().strip(), record.get("website", "").lower().strip())
        if key in deduped:
            existing = deduped[key]
            existing["emails"] = sorted(set(existing["emails"] + record["emails"]))
            existing["phones"] = sorted(set(existing["phones"] + record["phones"]))
            for key_name in ["linkedin", "twitter", "facebook", "instagram", "youtube"]:
                if not existing.get(key_name) and record.get(key_name):
                    existing[key_name] = record[key_name]
        else:
            deduped[key] = record
    return list(deduped.values())


def save_to_csv(records: List[Dict[str, object]], filename: str) -> None:
    if not records:
        logging.warning("No records to save to %s", filename)
        return
    df = pd.DataFrame([
        {
            "Company Name": r.get("company_name", ""),
            "Emails": "; ".join(r.get("emails", [])),
            "Phones": "; ".join(r.get("phones", [])),
            "Website": r.get("website", ""),
            "LinkedIn": r.get("linkedin", ""),
            "Twitter": r.get("twitter", ""),
            "Facebook": r.get("facebook", ""),
            "Instagram": r.get("instagram", ""),
            "YouTube": r.get("youtube", ""),
            "Source URL": r.get("source_url", ""),
        }
        for r in records
    ])
    df.to_csv(filename, index=False)
    logging.info("Saved %s records to %s", len(records), filename)


def save_to_json(records: List[Dict[str, object]], filename: str) -> None:
    with open(filename, "w", encoding="utf-8") as fh:
        json.dump(records, fh, indent=2, ensure_ascii=False)
    logging.info("Saved %s records to %s", len(records), filename)


def scrape_directory(start_url: str, selector_map: dict, max_pages: int = 5) -> List[Dict[str, object]]:
    if not is_scraping_allowed(start_url):
        raise RuntimeError(f"Scraping blocked by robots.txt for {start_url}")

    current_url = start_url
    all_records = []
    page_count = 0

    while current_url and page_count < max_pages:
        html = fetch_page(current_url)
        if not html:
            break

        soup = BeautifulSoup(html, "lxml")
        page_records = parse_directory_page(html, current_url, selector_map)
        all_records.extend(page_records)
        current_url = find_next_page(soup, current_url, selector_map.get("pagination_selector", "a.next"))
        page_count += 1
        logging.info("Completed page %s: %s records", page_count, len(page_records))

    return deduplicate_records(all_records)


def main() -> None:
    sample_config = {
        "directory_listing_selector": "div.listing-card",
        "company_name_selector": "h3.listing-title",
        "email_selector": "a[href^=mailto:]",
        "phone_selector": "a[href^=tel:]",
        "website_selector": "a.website-link",
        "detail_link_selector": "a.details-link",
        "detail_website_selector": "a.website-link",
        "pagination_selector": "a.next-page",
    }


    # This is just to test the code
    start_url = "https://www.linkedin.com/feed/"

    try:
        records = scrape_directory(start_url, sample_config, max_pages=3)
        save_to_csv(records, "crm_leads.csv")
        save_to_json(records, "crm_leads.json")
    except Exception as exc:
        logging.error("Scraping aborted: %s", exc)


if __name__ == "__main__":
    main()
web-scraping