Commit 6256ca50 by cbolich

readme update

parent 7de85add
File added
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by PDM, PEP 582 proposal
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static analysis results
.pytype/
# Cython debug symbols
cython_debug/
# Version control directories
.git/
.hg/
.svn/
# Docker files
Dockerfile
docker-compose.yml
# VS Code settings
.vscode/
\ No newline at end of file
# Use an official Python runtime as a parent image
FROM python:3.10-slim
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
# Set work directory
WORKDIR /app
# Install system dependencies required by Playwright's browsers
# Using the combined command to install dependencies for all browsers
# See: https://playwright.dev/docs/docker#install-system-dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
# --- Playwright dependencies ---
libnss3 libnspr4 libdbus-1-3 libatk1.0-0 libatk-bridge2.0-0 libcups2 libdrm2 libxkbcommon0 libxcomposite1 libxdamage1 libxfixes3 libxrandr2 libgbm1 libpango-1.0-0 libcairo2 libasound2 \
# --- Other useful packages ---
curl \
# --- Cleanup ---
&& apt-get purge -y --auto-remove -o APT::AutoRemove::RecommendsImportant=false \
&& rm -rf /var/lib/apt/lists/*
# Copy the requirements file into the container at /app
COPY requirements.txt setup.py ./
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install -e . --no-deps
# Install Playwright browsers
# This command downloads the browser binaries into the image
RUN playwright install --with-deps
# Copy the rest of the application code into the container at /app
COPY . .
# Expose the port the app runs on
EXPOSE 8001
# Define the command to run the application
# Use 0.0.0.0 to make it accessible from outside the container
CMD ["uvicorn", "gmaps_scraper_server.main_api:app", "--host", "0.0.0.0", "--port", "8001"]
\ No newline at end of file
# google-maps-scraper
# Google Maps Scraper API
A FastAPI service for scraping Google Maps data based on search queries.
Very high performance, watch out for rate limiting!
Use variables to replace URL parameters
scrape-get?query=hotels%20in%2098392&max_places=100&lang=en&headless=true"
If using n8n or other automation, use the /scrape-get endpoint for it to return results
simple install, copy files and run docker compose up -d
## API Endpoints
### POST `/scrape`
Main scraping endpoint (recommended for production)
**Parameters:**
- `query` (required): Search query (e.g., "hotels in 98392")
- `max_places` (optional): Maximum number of results to return
- `lang` (optional, default "en"): Language code for results
- `headless` (optional, default true): Run browser in headless mode
### GET `/scrape-get`
Alternative GET endpoint with same functionality
### GET `/`
Health check endpoint
## Example Requests
### POST Example
```bash
curl -X POST "http://localhost:8001/scrape" \
-H "Content-Type: application/json" \
-d '{
"query": "hotels in 98392",
"max_places": 10,
"lang": "en",
"headless": true
}'
```
### GET Example
```bash
curl "http://localhost:8001/scrape-get?query=hotels%20in%2098392&max_places=10&lang=en&headless=true"
```
## Running the Service
### Docker
```bash
docker-compose up --build
```
### Local Development
1. Install dependencies:
```bash
pip install -r requirements.txt
```
2. Run the API:
```bash
uvicorn gmaps_scraper_server.main_api:app --reload
```
The API will be available at `http://localhost:8001`
## Notes
- For production use, consider adding authentication
- The scraping process may take several seconds to complete
- Results format depends on the underlying scraper implementation
\ No newline at end of file
services:
scraper-api:
build: . # Build the image from the Dockerfile in the current directory
container_name: gmaps_scraper_api_service # Optional: specify a container name
ports:
- "8001-8004:8001" # Map host port 8001 to container port 8001
restart: unless-stopped # Restart policy
volumes:
- .:/app # Mount current directory to /app in container
working_dir: /app # Set working directory to mounted volume
# Optional: Add environment variables if needed for configuration
# environment:
# - HEADLESS_MODE=true
networks:
- shark
cpu_shares: 1024 # Add cpu_shares here if not using Swarm mode
# deploy:
# replicas: 4
# resources:
# limits:
# cpus: '1'
# memory: 2G
networks:
shark:
external: true
\ No newline at end of file
# Initialize the gmaps_scraper_server package
\ No newline at end of file
import json
import re
def safe_get(data, *keys):
"""
Safely retrieves nested data from a dictionary or list using a sequence of keys/indices.
Returns None if any key/index is not found or if the data structure is invalid.
"""
current = data
for key in keys:
try:
if isinstance(current, list):
if isinstance(key, int) and 0 <= key < len(current):
current = current[key]
else:
# print(f"Index {key} out of bounds or invalid for list.")
return None
elif isinstance(current, dict):
if key in current:
current = current[key]
else:
# print(f"Key {key} not found in dict.")
return None
else:
# print(f"Cannot access key {key} on non-dict/list item: {type(current)}")
return None
except (IndexError, TypeError, KeyError) as e:
# print(f"Error accessing key {key}: {e}")
return None
return current
def extract_initial_json(html_content):
"""
Extracts the JSON string assigned to window.APP_INITIALIZATION_STATE from HTML content.
"""
try:
match = re.search(r';window\.APP_INITIALIZATION_STATE\s*=\s*(.*?);window\.APP_FLAGS', html_content, re.DOTALL)
if match:
json_str = match.group(1)
if json_str.strip().startswith(('[', '{')):
return json_str
else:
print("Extracted content doesn't look like valid JSON start.")
return None
else:
print("APP_INITIALIZATION_STATE pattern not found.")
return None
except Exception as e:
print(f"Error extracting JSON string: {e}")
return None
def parse_json_data(json_str):
"""
Parses the extracted JSON string, handling the nested JSON string if present.
Returns the main data blob (list) or None if parsing fails or structure is unexpected.
"""
if not json_str:
return None
try:
initial_data = json.loads(json_str)
# Check the initial heuristic path [3][6]
if isinstance(initial_data, list) and len(initial_data) > 3 and isinstance(initial_data[3], list) and len(initial_data[3]) > 6:
data_blob_or_str = initial_data[3][6]
# Case 1: It's already the list we expect (older format?)
if isinstance(data_blob_or_str, list):
print("Found expected list structure directly at initial_data[3][6].")
return data_blob_or_str
# Case 2: It's the string containing the actual JSON
elif isinstance(data_blob_or_str, str) and data_blob_or_str.startswith(")]}'\n"):
print("Found string at initial_data[3][6], attempting to parse inner JSON.")
try:
json_str_inner = data_blob_or_str.split(")]}'\n", 1)[1]
actual_data = json.loads(json_str_inner)
# Check if the parsed inner data is a list and has the expected sub-structure at index 6
if isinstance(actual_data, list) and len(actual_data) > 6:
potential_data_blob = safe_get(actual_data, 6)
if isinstance(potential_data_blob, list):
print("Returning data blob found at actual_data[6].")
return potential_data_blob # This is the main data structure
else:
print(f"Data at actual_data[6] is not a list, but {type(potential_data_blob)}. Saving inner data for inspection.")
# Save actual_data for debugging
try:
with open("debug_inner_data.json", "w", encoding="utf-8") as f_inner:
json.dump(actual_data, f_inner, indent=2)
print("...Successfully saved debug_inner_data.json")
except Exception as dump_error_inner:
print(f"Error saving inner debug file: {dump_error_inner}")
return None # Structure mismatch within inner data
else:
print(f"Parsed inner JSON is not a list or too short (len <= 6), type: {type(actual_data)}. Saving inner data for inspection.")
# Save actual_data for debugging
try:
with open("debug_inner_data.json", "w", encoding="utf-8") as f_inner:
json.dump(actual_data, f_inner, indent=2)
print("...Successfully saved debug_inner_data.json")
except Exception as dump_error_inner:
print(f"Error saving inner debug file: {dump_error_inner}")
return None # Inner JSON structure not as expected
except json.JSONDecodeError as e_inner:
print(f"Error decoding inner JSON string: {e_inner}")
return None
except Exception as e_inner_general:
print(f"Unexpected error processing inner JSON string: {e_inner_general}")
return None
# Case 3: Data at [3][6] is neither a list nor the expected string
else:
print(f"Parsed JSON structure unexpected at [3][6]. Expected list or prefixed JSON string, got {type(data_blob_or_str)}.")
# Save initial_data for debugging
print("Attempting to save full structure to debug_initial_data.json...")
try:
with open("debug_initial_data.json", "w", encoding="utf-8") as f:
json.dump(initial_data, f, indent=2)
print("...Successfully saved debug_initial_data.json")
except Exception as dump_error:
print(f"Error saving debug file: {dump_error}")
return None # Unexpected structure at [3][6]
# Case 4: Initial path [3][6] itself wasn't valid
else:
print(f"Initial JSON structure not as expected (list[3][6] path not valid). Type: {type(initial_data)}")
# Save initial_data for debugging
print("Attempting to save unexpected structure to debug_initial_data.json...")
try:
with open("debug_initial_data.json", "w", encoding="utf-8") as f:
json.dump(initial_data, f, indent=2)
print("...Successfully saved debug_initial_data.json")
except Exception as dump_error:
print(f"Error saving debug file: {dump_error}")
return None # Initial structure invalid
except json.JSONDecodeError as e:
print(f"Error decoding initial JSON: {e}")
return None
except Exception as e:
print(f"Unexpected error parsing JSON data: {e}")
return None
# --- Field Extraction Functions (Indices relative to the data_blob returned by parse_json_data) ---
def get_main_name(data):
"""Extracts the main name of the place."""
# Index relative to the data_blob returned by parse_json_data
# Confirmed via debug_inner_data.json: data_blob = actual_data[6], name = data_blob[11]
return safe_get(data, 11)
def get_place_id(data):
"""Extracts the Google Place ID."""
return safe_get(data, 10) # Updated index
def get_gps_coordinates(data):
"""Extracts latitude and longitude."""
lat = safe_get(data, 9, 2)
lon = safe_get(data, 9, 3)
if lat is not None and lon is not None:
return {"latitude": lat, "longitude": lon}
return None
def get_complete_address(data):
"""Extracts structured address components and joins them."""
address_parts = safe_get(data, 2) # Updated index
if isinstance(address_parts, list):
formatted = ", ".join(filter(None, address_parts))
return formatted if formatted else None
return None
def get_rating(data):
"""Extracts the average star rating."""
return safe_get(data, 4, 7)
def get_reviews_count(data):
"""Extracts the total number of reviews."""
return safe_get(data, 4, 8)
def get_website(data):
"""Extracts the primary website link."""
# Index based on debug_inner_data.json structure relative to data_blob (actual_data[6])
return safe_get(data, 7, 0)
def _find_phone_recursively(data_structure):
"""
Recursively searches a nested list/dict structure for a list containing
the phone icon URL followed by the phone number string.
"""
if isinstance(data_structure, list):
# Check if this list matches the pattern [icon_url, phone_string, ...]
if len(data_structure) >= 2 and \
isinstance(data_structure[0], str) and "call_googblue" in data_structure[0] and \
isinstance(data_structure[1], str):
# Found the pattern, assume data_structure[1] is the phone number
phone_number_str = data_structure[1]
standardized_number = re.sub(r'\D', '', phone_number_str)
if standardized_number:
# print(f"Debug: Found phone via recursive search: {standardized_number}")
return standardized_number
# If not the target list, recurse into list elements
for item in data_structure:
found_phone = _find_phone_recursively(item)
if found_phone:
return found_phone
elif isinstance(data_structure, dict):
# Recurse into dictionary values
for key, value in data_structure.items():
found_phone = _find_phone_recursively(value)
if found_phone:
return found_phone
# Base case: not a list/dict or pattern not found in this branch
return None
def get_phone_number(data_blob):
"""
Extracts and standardizes the primary phone number by recursively searching
the data_blob for the phone icon pattern.
"""
# data_blob is the main list structure (e.g., actual_data[6])
found_phone = _find_phone_recursively(data_blob)
if found_phone:
return found_phone
else:
# print("Debug: Phone number pattern not found in data_blob.")
return None
def get_categories(data):
"""Extracts the list of categories/types."""
return safe_get(data, 13)
def get_thumbnail(data):
"""Extracts the main thumbnail image URL."""
# This path might still be relative to the old structure, needs verification
# If data_blob is the list starting at actual_data[6], this index is likely wrong.
# We need to find the thumbnail within the new structure from debug_inner_data.json
# For now, returning None until verified.
# return safe_get(data, 72, 0, 1, 6, 0) # Placeholder index - LIKELY WRONG
# Tentative guess based on debug_inner_data structure (might be in a sublist like [14][0][0][6][0]?)
return safe_get(data, 14, 0, 0, 6, 0) # Tentative guess
# Add more extraction functions here as needed, using the indices
# from omkarcloud/src/extract_data.py as a reference, BUT VERIFYING against debug_inner_data.json
def extract_place_data(html_content):
"""
High-level function to orchestrate extraction from HTML content.
"""
json_str = extract_initial_json(html_content)
if not json_str:
print("Failed to extract JSON string from HTML.")
return None
data_blob = parse_json_data(json_str)
if not data_blob:
print("Failed to parse JSON data or find expected structure.")
return None
# Now extract individual fields using the helper functions
place_details = {
"name": get_main_name(data_blob),
"place_id": get_place_id(data_blob),
"coordinates": get_gps_coordinates(data_blob),
"address": get_complete_address(data_blob),
"rating": get_rating(data_blob),
"reviews_count": get_reviews_count(data_blob),
"categories": get_categories(data_blob),
"website": get_website(data_blob),
"phone": get_phone_number(data_blob), # Needs index verification
"thumbnail": get_thumbnail(data_blob), # Needs index verification
# Add other fields as needed
}
# Filter out None values if desired
place_details = {k: v for k, v in place_details.items() if v is not None}
return place_details if place_details else None
# Example usage (for testing):
if __name__ == '__main__':
# Load sample HTML content from a file (replace 'sample_place.html' with your file)
try:
with open('sample_place.html', 'r', encoding='utf-8') as f:
sample_html = f.read()
extracted_info = extract_place_data(sample_html)
if extracted_info:
print("Extracted Place Data:")
print(json.dumps(extracted_info, indent=2))
else:
print("Could not extract data from the sample HTML.")
except FileNotFoundError:
print("Sample HTML file 'sample_place.html' not found. Cannot run example.")
except Exception as e:
print(f"An error occurred during example execution: {e}")
\ No newline at end of file
from fastapi import FastAPI, HTTPException, Query
from typing import Optional, List, Dict, Any
import logging
# Import the scraper function (adjust path if necessary)
try:
from gmaps_scraper_server.scraper import scrape_google_maps
except ImportError:
# Handle case where scraper might be in a different structure later
logging.error("Could not import scrape_google_maps from scraper.py")
# Define a dummy function to allow API to start, but fail on call
def scrape_google_maps(*args, **kwargs):
raise ImportError("Scraper function not available.")
# Configure basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
app = FastAPI(
title="Google Maps Scraper API",
description="API to trigger Google Maps scraping based on a query.",
version="0.1.0",
)
@app.post("/scrape", response_model=List[Dict[str, Any]])
async def run_scrape(
query: str = Query(..., description="The search query for Google Maps (e.g., 'restaurants in New York')"),
max_places: Optional[int] = Query(None, description="Maximum number of places to scrape. Scrapes all found if None."),
lang: str = Query("en", description="Language code for Google Maps results (e.g., 'en', 'es')."),
headless: bool = Query(True, description="Run the browser in headless mode (no UI). Set to false for debugging locally.")
):
"""
Triggers the Google Maps scraping process for the given query.
"""
logging.info(f"Received scrape request for query: '{query}', max_places: {max_places}, lang: {lang}, headless: {headless}")
try:
# Run the potentially long-running scraping task
# Note: For production, consider running this in a background task queue (e.g., Celery)
# to avoid blocking the API server for long durations.
results = await scrape_google_maps( # Added await
query=query,
max_places=max_places,
lang=lang,
headless=headless # Pass headless option from API
)
logging.info(f"Scraping finished for query: '{query}'. Found {len(results)} results.")
return results
except ImportError as e:
logging.error(f"ImportError during scraping for query '{query}': {e}")
raise HTTPException(status_code=500, detail="Server configuration error: Scraper not available.")
except Exception as e:
logging.error(f"An error occurred during scraping for query '{query}': {e}", exc_info=True)
# Consider more specific error handling based on scraper exceptions
raise HTTPException(status_code=500, detail=f"An internal error occurred during scraping: {str(e)}")
@app.get("/scrape-get", response_model=List[Dict[str, Any]])
async def run_scrape_get(
query: str = Query(..., description="The search query for Google Maps (e.g., 'restaurants in New York')"),
max_places: Optional[int] = Query(None, description="Maximum number of places to scrape. Scrapes all found if None."),
lang: str = Query("en", description="Language code for Google Maps results (e.g., 'en', 'es')."),
headless: bool = Query(True, description="Run the browser in headless mode (no UI). Set to false for debugging locally.")
):
"""
Triggers the Google Maps scraping process for the given query via GET request.
"""
logging.info(f"Received GET scrape request for query: '{query}', max_places: {max_places}, lang: {lang}, headless: {headless}")
try:
# Run the potentially long-running scraping task
# Note: For production, consider running this in a background task queue (e.g., Celery)
# to avoid blocking the API server for long durations.
results = await scrape_google_maps( # Added await
query=query,
max_places=max_places,
lang=lang,
headless=headless # Pass headless option from API
)
logging.info(f"Scraping finished for query: '{query}'. Found {len(results)} results.")
return results
except ImportError as e:
logging.error(f"ImportError during scraping for query '{query}': {e}")
raise HTTPException(status_code=500, detail="Server configuration error: Scraper not available.")
except Exception as e:
logging.error(f"An error occurred during scraping for query '{query}': {e}", exc_info=True)
# Consider more specific error handling based on scraper exceptions
raise HTTPException(status_code=500, detail=f"An internal error occurred during scraping: {str(e)}")
# Basic root endpoint for health check or info
@app.get("/")
async def read_root():
return {"message": "Google Maps Scraper API is running."}
# Example for running locally (uvicorn main_api:app --reload)
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=8001)
\ No newline at end of file
import json
import asyncio # Changed from time
import re
from playwright.async_api import async_playwright, TimeoutError as PlaywrightTimeoutError # Changed to async
from urllib.parse import urlencode
# Import the extraction functions from our helper module
from . import extractor
# --- Constants ---
BASE_URL = "https://www.google.com/maps/search/"
DEFAULT_TIMEOUT = 30000 # 30 seconds for navigation and selectors
SCROLL_PAUSE_TIME = 1.5 # Pause between scrolls
MAX_SCROLL_ATTEMPTS_WITHOUT_NEW_LINKS = 5 # Stop scrolling if no new links found after this many scrolls
# --- Helper Functions ---
def create_search_url(query, lang="en", geo_coordinates=None, zoom=None):
"""Creates a Google Maps search URL."""
params = {'q': query, 'hl': lang}
# Note: geo_coordinates and zoom might require different URL structure (/maps/@lat,lng,zoom)
# For simplicity, starting with basic query search
return BASE_URL + "?" + urlencode(params)
# --- Main Scraping Logic ---
async def scrape_google_maps(query, max_places=None, lang="en", headless=True): # Added async
"""
Scrapes Google Maps for places based on a query.
Args:
query (str): The search query (e.g., "restaurants in New York").
max_places (int, optional): Maximum number of places to scrape. Defaults to None (scrape all found).
lang (str, optional): Language code for Google Maps (e.g., 'en', 'es'). Defaults to "en".
headless (bool, optional): Whether to run the browser in headless mode. Defaults to True.
Returns:
list: A list of dictionaries, each containing details for a scraped place.
Returns an empty list if no places are found or an error occurs.
"""
results = []
place_links = set()
scroll_attempts_no_new = 0
async with async_playwright() as p: # Changed to async
try:
browser = await p.chromium.launch(headless=headless) # Added await
context = await browser.new_context( # Added await
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
java_script_enabled=True,
accept_downloads=False,
# Consider setting viewport, locale, timezone if needed
locale=lang,
)
page = await context.new_page() # Added await
if not page:
await browser.close() # Close browser before raising
raise Exception("Failed to create a new browser page (context.new_page() returned None).")
# Removed problematic: await page.set_default_timeout(DEFAULT_TIMEOUT)
# Removed associated debug prints
search_url = create_search_url(query, lang)
print(f"Navigating to search URL: {search_url}")
await page.goto(search_url, wait_until='domcontentloaded') # Added await
await asyncio.sleep(2) # Changed to asyncio.sleep, added await
# --- Handle potential consent forms ---
# This is a common pattern, might need adjustment based on specific consent popups
try:
consent_button_xpath = "//button[.//span[contains(text(), 'Accept all') or contains(text(), 'Reject all')]]"
# Wait briefly for the button to potentially appear
await page.wait_for_selector(consent_button_xpath, state='visible', timeout=5000) # Added await
# Click the "Accept all" or equivalent button if found
# Example: Prioritize "Accept all"
accept_button = await page.query_selector("//button[.//span[contains(text(), 'Accept all')]]") # Added await
if accept_button:
print("Accepting consent form...")
await accept_button.click() # Added await
else:
# Fallback to clicking the first consent button found (might be reject)
print("Clicking first available consent button...")
await page.locator(consent_button_xpath).first.click() # Added await
# Wait for navigation/popup closure
await page.wait_for_load_state('networkidle', timeout=5000) # Added await
except PlaywrightTimeoutError:
print("No consent form detected or timed out waiting.")
except Exception as e:
print(f"Error handling consent form: {e}")
# --- Scrolling and Link Extraction ---
print("Scrolling to load places...")
feed_selector = '[role="feed"]'
try:
await page.wait_for_selector(feed_selector, state='visible', timeout=25000) # Added await
except PlaywrightTimeoutError:
# Check if it's a single result page (maps/place/)
if "/maps/place/" in page.url:
print("Detected single place page.")
place_links.add(page.url)
else:
print(f"Error: Feed element '{feed_selector}' not found. Maybe no results? Taking screenshot.")
await page.screenshot(path='feed_not_found_screenshot.png') # Added await
await browser.close() # Added await
return [] # No results or page structure changed
if await page.locator(feed_selector).count() > 0: # Added await
last_height = await page.evaluate(f'document.querySelector(\'{feed_selector}\').scrollHeight') # Added await
while True:
# Scroll down
await page.evaluate(f'document.querySelector(\'{feed_selector}\').scrollTop = document.querySelector(\'{feed_selector}\').scrollHeight') # Added await
await asyncio.sleep(SCROLL_PAUSE_TIME) # Changed to asyncio.sleep, added await
# Extract links after scroll
current_links_list = await page.locator(f'{feed_selector} a[href*="/maps/place/"]').evaluate_all('elements => elements.map(a => a.href)') # Added await
current_links = set(current_links_list)
new_links_found = len(current_links - place_links) > 0
place_links.update(current_links)
print(f"Found {len(place_links)} unique place links so far...")
if max_places is not None and len(place_links) >= max_places:
print(f"Reached max_places limit ({max_places}).")
place_links = set(list(place_links)[:max_places]) # Trim excess links
break
# Check if scroll height has changed
new_height = await page.evaluate(f'document.querySelector(\'{feed_selector}\').scrollHeight') # Added await
if new_height == last_height:
# Check for the "end of results" marker
end_marker_xpath = "//span[contains(text(), \"You've reached the end of the list.\")]"
if await page.locator(end_marker_xpath).count() > 0: # Added await
print("Reached the end of the results list.")
break
else:
# If height didn't change but end marker isn't there, maybe loading issue?
# Increment no-new-links counter
if not new_links_found:
scroll_attempts_no_new += 1
print(f"Scroll height unchanged and no new links. Attempt {scroll_attempts_no_new}/{MAX_SCROLL_ATTEMPTS_WITHOUT_NEW_LINKS}")
if scroll_attempts_no_new >= MAX_SCROLL_ATTEMPTS_WITHOUT_NEW_LINKS:
print("Stopping scroll due to lack of new links.")
break
else:
scroll_attempts_no_new = 0 # Reset if new links were found this cycle
else:
last_height = new_height
scroll_attempts_no_new = 0 # Reset if scroll height changed
# Optional: Add a hard limit on scrolls to prevent infinite loops
# if scroll_count > MAX_SCROLLS: break
# --- Scraping Individual Places ---
print(f"\nScraping details for {len(place_links)} places...")
count = 0
for link in place_links:
count += 1
print(f"Processing link {count}/{len(place_links)}: {link}") # Keep sync print
try:
await page.goto(link, wait_until='domcontentloaded') # Added await
# Wait a bit for dynamic content if needed, or wait for a specific element
# await page.wait_for_load_state('networkidle', timeout=10000) # Or networkidle if needed
html_content = await page.content() # Added await
place_data = extractor.extract_place_data(html_content)
if place_data:
place_data['link'] = link # Add the source link
results.append(place_data)
# print(json.dumps(place_data, indent=2)) # Optional: print data as it's scraped
else:
print(f" - Failed to extract data for: {link}")
# Optionally save the HTML for debugging
# with open(f"error_page_{count}.html", "w", encoding="utf-8") as f:
# f.write(html_content)
except PlaywrightTimeoutError:
print(f" - Timeout navigating to or processing: {link}")
except Exception as e:
print(f" - Error processing {link}: {e}")
await asyncio.sleep(0.5) # Changed to asyncio.sleep, added await
await browser.close() # Added await
except PlaywrightTimeoutError:
print(f"Timeout error during scraping process.")
except Exception as e:
print(f"An error occurred during scraping: {e}")
import traceback
traceback.print_exc() # Print detailed traceback for debugging
finally:
# Ensure browser is closed if an error occurred mid-process
if 'browser' in locals() and browser.is_connected(): # Check if browser exists and is connected
await browser.close() # Added await
print(f"\nScraping finished. Found details for {len(results)} places.")
return results
# --- Example Usage ---
# (Example usage block removed as this script is now intended to be imported as a module)
\ No newline at end of file
{
"nodes": [
{
"parameters": {
"url": "http://100.95.78.54:8001/scrape-get?query=hotels%20in%2098392&max_places=100&lang=en&headless=true",
"options": {}
},
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 4.2,
"position": [
220,
0
],
"id": "9738622e-6a34-483f-87e4-7f0cda074bff",
"name": "HTTP Request"
}
],
"connections": {},
"pinData": {},
"meta": {
"instanceId": "bfc265a0402eb6543e6cbf43d37210f6fa8cb72736676656a159075d75879e79"
}
}
\ No newline at end of file
playwright
fastapi
uvicorn[standard]
\ No newline at end of file
from setuptools import setup, find_packages
setup(
name="gmaps_scraper_server",
version="0.1",
packages=find_packages(),
install_requires=[
"playwright",
"fastapi",
"uvicorn[standard]"
],
)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment