Quét giá của đối thủ cạnh tranh, danh sách sản phẩm và các trang tính năng. Lưu trữ dữ liệu lịch sử và tạo báo cáo so sánh.
Kiến trúc
Competitor Sites ──> CAPTCHA Solver ──> Data Extractors
│
SQLite Store
│
Dashboard Report
Mô hình dữ liệu
# models.py
import sqlite3
from datetime import datetime
from dataclasses import dataclass
from typing import Optional
@dataclass
class CompetitorData:
competitor: str
metric: str
value: str
numeric_value: Optional[float] = None
url: str = ""
scraped_at: str = ""
def __post_init__(self):
if not self.scraped_at:
self.scraped_at = datetime.now().isoformat()
class CompetitorDB:
def __init__(self, path="competitor_data.db"):
self.conn = sqlite3.connect(path)
self._init()
def _init(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
competitor TEXT,
metric TEXT,
value TEXT,
numeric_value REAL,
url TEXT,
scraped_at TEXT
)
""")
self.conn.commit()
def save(self, data: CompetitorData):
self.conn.execute(
"""INSERT INTO metrics
(competitor, metric, value, numeric_value, url, scraped_at)
VALUES (?, ?, ?, ?, ?, ?)""",
(data.competitor, data.metric, data.value,
data.numeric_value, data.url, data.scraped_at),
)
self.conn.commit()
def get_history(self, competitor, metric, limit=30):
cursor = self.conn.execute(
"""SELECT value, numeric_value, scraped_at
FROM metrics
WHERE competitor = ? AND metric = ?
ORDER BY scraped_at DESC LIMIT ?""",
(competitor, metric, limit),
)
return cursor.fetchall()
def latest_comparison(self, metric):
cursor = self.conn.execute(
"""SELECT competitor, value, numeric_value, MAX(scraped_at) as latest
FROM metrics WHERE metric = ?
GROUP BY competitor ORDER BY numeric_value""",
(metric,),
)
return cursor.fetchall()
Trình giải CAPTCHA
# solver.py
import requests
import time
import re
import os
class CaptchaSolver:
def __init__(self):
self.api_key = os.environ["CAPTCHAAI_API_KEY"]
def solve_if_needed(self, session, url, html):
if "data-sitekey" not in html:
return html
match = re.search(r'data-sitekey="([^"]+)"', html)
if not match:
return html
sitekey = match.group(1)
resp = requests.post("https://ocr.captchaai.com/in.php", data={
"key": self.api_key,
"method": "userrecaptcha",
"googlekey": sitekey,
"pageurl": url,
"json": 1,
}, timeout=30)
task_id = resp.json()["request"]
time.sleep(15)
for _ in range(24):
resp = requests.get("https://ocr.captchaai.com/res.php", params={
"key": self.api_key, "action": "get",
"id": task_id, "json": 1,
}, timeout=15)
data = resp.json()
if data.get("status") == 1:
post_resp = session.post(url, data={
"g-recaptcha-response": data["request"],
}, timeout=30)
return post_resp.text
if data["request"] != "CAPCHA_NOT_READY":
raise RuntimeError(data["request"])
time.sleep(5)
raise TimeoutError("CAPTCHA solve timeout")
Máy cạp của đối thủ cạnh tranh
# scraper.py
import requests
import re
from bs4 import BeautifulSoup
from solver import CaptchaSolver
from models import CompetitorData
class CompetitorScraper:
def __init__(self):
self.solver = CaptchaSolver()
self.session = requests.Session()
self.session.headers["User-Agent"] = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 Chrome/125.0.0.0 Safari/537.36"
)
def scrape_pricing(self, competitor_name, url, plan_selector, price_selector):
html = self._fetch(url)
soup = BeautifulSoup(html, "html.parser")
plans = soup.select(plan_selector)
data = []
for plan in plans:
name_el = plan.select_one("h3, h2, .plan-name")
price_el = plan.select_one(price_selector)
if not name_el or not price_el:
continue
price_text = price_el.get_text(strip=True)
match = re.search(r'[\d,.]+', price_text)
numeric = float(match.group().replace(",", "")) if match else None
data.append(CompetitorData(
competitor=competitor_name,
metric=f"price_{name_el.get_text(strip=True).lower().replace(' ', '_')}",
value=price_text,
numeric_value=numeric,
url=url,
))
return data
def scrape_features(self, competitor_name, url, feature_list_selector):
html = self._fetch(url)
soup = BeautifulSoup(html, "html.parser")
features = soup.select(f"{feature_list_selector} li")
return [
CompetitorData(
competitor=competitor_name,
metric="feature",
value=f.get_text(strip=True),
url=url,
)
for f in features if f.get_text(strip=True)
]
def scrape_product_count(self, competitor_name, url, count_selector):
html = self._fetch(url)
soup = BeautifulSoup(html, "html.parser")
el = soup.select_one(count_selector)
if el:
text = el.get_text(strip=True)
match = re.search(r'[\d,]+', text)
if match:
count = int(match.group().replace(",", ""))
return CompetitorData(
competitor=competitor_name,
metric="product_count",
value=text,
numeric_value=count,
url=url,
)
return None
def _fetch(self, url):
resp = self.session.get(url, timeout=20)
return self.solver.solve_if_needed(self.session, url, resp.text)
Trình tạo báo cáo
# report.py
from models import CompetitorDB
def generate_report(db: CompetitorDB, metrics):
lines = ["=" * 60, "Competitor Analysis Report", "=" * 60, ""]
for metric in metrics:
results = db.latest_comparison(metric)
if not results:
continue
lines.append(f"--- {metric.replace('_', ' ').title()} ---")
for comp, value, numeric, ts in results:
marker = ""
if numeric is not None:
marker = f" (${numeric:,.2f})" if "price" in metric else f" ({numeric:,.0f})"
lines.append(f" {comp}: {value}{marker}")
lines.append("")
return "\n".join(lines)
def generate_trend(db: CompetitorDB, competitor, metric, periods=10):
history = db.get_history(competitor, metric, limit=periods)
if not history:
return f"No data for {competitor} — {metric}"
lines = [f"Trend: {competitor} — {metric}", "-" * 40]
for value, numeric, ts in reversed(history):
date = ts[:10]
lines.append(f" {date}: {value}")
return "\n".join(lines)
Á hậu chính
# main.py
import time
from models import CompetitorDB
from scraper import CompetitorScraper
from report import generate_report
COMPETITORS = [
{
"name": "Competitor A",
"pricing_url": "https://competitor-a.example.com/pricing",
"plan_selector": ".pricing-plan",
"price_selector": ".price",
},
{
"name": "Competitor B",
"pricing_url": "https://competitor-b.example.com/pricing",
"plan_selector": ".plan-card",
"price_selector": ".plan-price",
},
]
def main():
db = CompetitorDB()
scraper = CompetitorScraper()
for comp in COMPETITORS:
print(f"Scraping {comp['name']}...")
try:
pricing = scraper.scrape_pricing(
comp["name"], comp["pricing_url"],
comp["plan_selector"], comp["price_selector"],
)
for p in pricing:
db.save(p)
print(f" {p.metric}: {p.value}")
except Exception as e:
print(f" Error: {e}")
time.sleep(5)
# Generate report
metrics = ["price_basic", "price_pro", "price_enterprise", "product_count"]
report = generate_report(db, metrics)
print(report)
with open("competitor_report.txt", "w") as f:
f.write(report)
if __name__ == "__main__":
main()
Khắc phục sự cố
| Vấn đề | Nguyên nhân | Cách xử lý |
|---|---|---|
| Giá không được trích xuất | Bộ chọn không khớp | Kiểm tra HTML trang và cập nhật bộ chọn cho mỗi đối thủ cạnh tranh |
| Thiếu dữ liệu lịch sử | Lần chạy đầu tiên | Dữ liệu tích lũy; chạy hàng ngày để biết xu hướng |
| CAPTCHA trên trang định giá | Phát hiện bot | Thêm độ trễ và sử dụng cookie phiên |
| Báo cáo hiển thị dữ liệu cũ | Mục tương tự được chèn lại | Sử dụng latest_comparison để nhóm theo ngày MAX |
Câu hỏi thường gặp
Làm cách nào để hình dung xu hướng?
Xuất dữ liệu từ SQLite và vẽ đồ thị bằng matplotlib hoặc chuyển đầu ra CSV vào Google Trang tính để lập biểu đồ tích hợp.
Tôi có thể theo dõi các số liệu không liên quan đến giá cả không?
Vâng. Sử dụng scrape_features cho danh sách tính năng hoặc scrape_product_count cho kích thước danh mục. Thêm các công cụ dọn dẹp tùy chỉnh cho bất kỳ số liệu nào.
Làm cách nào để nhận thông báo về thay đổi giá?
So sánh giá được thu thập ngày hôm nay với giá trị được lưu trữ của ngày hôm qua và gửi thông báo (Slack/email) khi chênh lệch vượt quá ngưỡng.
Hướng dẫn liên quan
Theo dõi đối thủ cạnh tranh trên quy mô lớn —bắt đầu với CaptchaAI.