Tutorials

Xây dựng hàng đợi giải CAPTCHA bằng Python với CaptchaAI

Khi quét trên quy mô lớn, bạn cần giải CAPTCHA nhiều lần một lần. Hệ thống xếp hàng tách riêng việc gửi CAPTCHA khỏi quá trình truy xuất kết quả, cho phép giải quyết song song hàng trăm tác vụ.


Tại sao sử dụng hàng đợi?

Giải quyết CAPTCHA một yêu cầu sẽ lãng phí thời gian chờ đợi. Hệ thống xếp hàng:

  • Gửi tất cả CAPTCHA ngay lập tức
  • Thăm dò nhiều ID nhiệm vụ song song
  • Thử lại không thành công sẽ tự động giải quyết
  • Kiểm soát đồng thời để tôn trọng giới hạn tốc độ API
  • Cung cấp theo dõi tiến độ và gọi lại

Hàng đợi luồng cơ bản

import time
import threading
import requests
from queue import Queue, Empty

API_KEY = "YOUR_API_KEY"

class CaptchaQueue:
    """Thread-based CAPTCHA solving queue."""

    def __init__(self, api_key, max_workers=10):
        self.api_key = api_key
        self.task_queue = Queue()
        self.result_queue = Queue()
        self.max_workers = max_workers
        self.workers = []

    def submit(self, method, callback=None, **params):
        """Add a CAPTCHA task to the queue."""
        task = {
            "method": method,
            "params": params,
            "callback": callback,
        }
        self.task_queue.put(task)

    def start(self):
        """Start worker threads."""
        for _ in range(self.max_workers):
            t = threading.Thread(target=self._worker, daemon=True)
            t.start()
            self.workers.append(t)

    def wait(self):
        """Wait for all tasks to complete."""
        self.task_queue.join()

    def get_results(self):
        """Get all available results."""
        results = []
        while not self.result_queue.empty():
            try:
                results.append(self.result_queue.get_nowait())
            except Empty:
                break
        return results

    def _worker(self):
        while True:
            try:
                task = self.task_queue.get(timeout=1)
            except Empty:
                continue

            try:
                result = self._solve(task["method"], **task["params"])
                entry = {"status": "solved", "result": result, "task": task}
                self.result_queue.put(entry)
                if task["callback"]:
                    task["callback"](result)
            except Exception as e:
                entry = {"status": "error", "error": str(e), "task": task}
                self.result_queue.put(entry)
            finally:
                self.task_queue.task_done()

    def _solve(self, method, **params):
        submit = requests.post("https://ocr.captchaai.com/in.php", data={
            "key": self.api_key, "method": method, "json": 1, **params,
        }, timeout=30).json()

        if submit.get("status") != 1:
            raise Exception(f"Submit error: {submit.get('request')}")

        task_id = submit["request"]
        for _ in range(30):
            time.sleep(5)
            result = requests.get("https://ocr.captchaai.com/res.php", params={
                "key": self.api_key, "action": "get", "id": task_id, "json": 1,
            }, timeout=30).json()
            if result.get("status") == 1:
                return result["request"]
            if result.get("request") == "ERROR_CAPTCHA_UNSOLVABLE":
                raise Exception("CAPTCHA unsolvable")
        raise TimeoutError("Solve timed out")

# Usage
queue = CaptchaQueue(API_KEY, max_workers=5)
queue.start()

# Submit multiple CAPTCHAs
urls_and_sitekeys = [
    ("https://example.com/page1", "SITEKEY_1"),
    ("https://example.com/page2", "SITEKEY_2"),
    ("https://example.com/page3", "SITEKEY_3"),
]

for url, sitekey in urls_and_sitekeys:
    queue.submit("userrecaptcha", googlekey=sitekey, pageurl=url)

queue.wait()
results = queue.get_results()
print(f"Solved {len(results)} CAPTCHAs")
for r in results:
    print(f"  {r['status']}: {r.get('result', r.get('error', ''))[:50]}")

Hàng đợi không đồng bộ với asyncio

import asyncio
import aiohttp

API_KEY = "YOUR_API_KEY"

class AsyncCaptchaQueue:
    """Async CAPTCHA solving queue with concurrency control."""

    def __init__(self, api_key, max_concurrent=10):
        self.api_key = api_key
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.results = []

    async def solve_batch(self, tasks):
        """Solve a batch of CAPTCHA tasks concurrently."""
        coros = [self._solve_task(task) for task in tasks]
        self.results = await asyncio.gather(*coros, return_exceptions=True)
        return self.results

    async def _solve_task(self, task):
        async with self.semaphore:
            return await self._solve(task["method"], **task["params"])

    async def _solve(self, method, **params):
        async with aiohttp.ClientSession() as session:
            # Submit
            async with session.post("https://ocr.captchaai.com/in.php", data={
                "key": self.api_key, "method": method, "json": 1, **params,
            }) as resp:
                data = await resp.json(content_type=None)
                if data.get("status") != 1:
                    raise Exception(f"Submit error: {data.get('request')}")
                task_id = data["request"]

            # Poll
            for _ in range(30):
                await asyncio.sleep(5)
                async with session.get("https://ocr.captchaai.com/res.php", params={
                    "key": self.api_key, "action": "get", "id": task_id, "json": 1,
                }) as resp:
                    result = await resp.json(content_type=None)
                    if result.get("status") == 1:
                        return result["request"]
                    if result.get("request") == "ERROR_CAPTCHA_UNSOLVABLE":
                        raise Exception("CAPTCHA unsolvable")

            raise TimeoutError("Solve timed out")

# Usage
async def main():
    queue = AsyncCaptchaQueue(API_KEY, max_concurrent=5)

    tasks = [
        {"method": "userrecaptcha", "params": {"googlekey": f"SITEKEY_{i}", "pageurl": f"https://example.com/page{i}"}}
        for i in range(10)
    ]

    results = await queue.solve_batch(tasks)
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i}: ERROR — {result}")
        else:
            print(f"Task {i}: {result[:50]}...")

asyncio.run(main())

Mô hình nhà sản xuất-người tiêu dùng

Đối với khối lượng công việc quét liên tục trong đó các trang được phát hiện động:

import asyncio
import aiohttp

API_KEY = "YOUR_API_KEY"

class ProducerConsumerQueue:
    """Continuous CAPTCHA solving with producer-consumer pattern."""

    def __init__(self, api_key, queue_size=100, num_consumers=5):
        self.api_key = api_key
        self.queue = asyncio.Queue(maxsize=queue_size)
        self.num_consumers = num_consumers
        self.solved_count = 0
        self.error_count = 0
        self.running = True

    async def produce(self, tasks):
        """Producer: feed CAPTCHA tasks into the queue."""
        for task in tasks:
            await self.queue.put(task)
        # Signal consumers to stop
        for _ in range(self.num_consumers):
            await self.queue.put(None)

    async def consume(self, result_handler):
        """Consumer: solve CAPTCHAs and call result handler."""
        async with aiohttp.ClientSession() as session:
            while True:
                task = await self.queue.get()
                if task is None:
                    self.queue.task_done()
                    break

                try:
                    result = await self._solve(session, task["method"], **task["params"])
                    self.solved_count += 1
                    if result_handler:
                        await result_handler(task, result)
                except Exception as e:
                    self.error_count += 1
                    print(f"Error: {e}")
                finally:
                    self.queue.task_done()

    async def run(self, tasks, result_handler=None):
        """Run the producer-consumer pipeline."""
        # Start producer
        producer = asyncio.create_task(self.produce(tasks))

        # Start consumers
        consumers = [
            asyncio.create_task(self.consume(result_handler))
            for _ in range(self.num_consumers)
        ]

        # Wait for everything to finish
        await producer
        await asyncio.gather(*consumers)

        print(f"Complete: {self.solved_count} solved, {self.error_count} errors")

    async def _solve(self, session, method, **params):
        async with session.post("https://ocr.captchaai.com/in.php", data={
            "key": self.api_key, "method": method, "json": 1, **params,
        }) as resp:
            data = await resp.json(content_type=None)
            if data.get("status") != 1:
                raise Exception(f"Submit: {data.get('request')}")
            task_id = data["request"]

        for _ in range(30):
            await asyncio.sleep(5)
            async with session.get("https://ocr.captchaai.com/res.php", params={
                "key": self.api_key, "action": "get", "id": task_id, "json": 1,
            }) as resp:
                result = await resp.json(content_type=None)
                if result.get("status") == 1:
                    return result["request"]
        raise TimeoutError("Timed out")

# Usage
async def handle_result(task, token):
    url = task["params"]["pageurl"]
    print(f"Solved for {url}: {token[:30]}...")

async def main():
    queue = ProducerConsumerQueue(API_KEY, num_consumers=5)

    tasks = [
        {"method": "userrecaptcha", "params": {"googlekey": f"SITEKEY_{i}", "pageurl": f"https://example.com/page{i}"}}
        for i in range(20)
    ]

    await queue.run(tasks, result_handler=handle_result)

asyncio.run(main())

Hàng đợi ưu tiên

Khi một số CAPTCHA quan trọng hơn những CAPTCHA khác:

import asyncio
from dataclasses import dataclass, field

API_KEY = "YOUR_API_KEY"

@dataclass(order=True)
class PriorityTask:
    priority: int
    task: dict = field(compare=False)

class PriorityCaptchaQueue:
    """CAPTCHA queue with priority levels."""

    def __init__(self, api_key, num_workers=5):
        self.api_key = api_key
        self.queue = asyncio.PriorityQueue()
        self.num_workers = num_workers
        self.results = {}

    async def submit(self, task_id, method, priority=5, **params):
        """Submit with priority (lower number = higher priority)."""
        await self.queue.put(PriorityTask(
            priority=priority,
            task={"id": task_id, "method": method, "params": params},
        ))

    async def process(self):
        """Process all queued tasks by priority."""
        workers = [asyncio.create_task(self._worker()) for _ in range(self.num_workers)]

        # Wait for queue to drain
        await self.queue.join()

        # Cancel workers
        for w in workers:
            w.cancel()

        return self.results

    async def _worker(self):
        import aiohttp
        async with aiohttp.ClientSession() as session:
            while True:
                item = await self.queue.get()
                task = item.task
                try:
                    result = await self._solve(session, task["method"], **task["params"])
                    self.results[task["id"]] = {"status": "solved", "token": result}
                except Exception as e:
                    self.results[task["id"]] = {"status": "error", "error": str(e)}
                finally:
                    self.queue.task_done()

    async def _solve(self, session, method, **params):
        import aiohttp
        async with session.post("https://ocr.captchaai.com/in.php", data={
            "key": self.api_key, "method": method, "json": 1, **params,
        }) as resp:
            data = await resp.json(content_type=None)
            if data.get("status") != 1:
                raise Exception(data.get("request"))
            task_id = data["request"]

        for _ in range(30):
            await asyncio.sleep(5)
            async with session.get("https://ocr.captchaai.com/res.php", params={
                "key": self.api_key, "action": "get", "id": task_id, "json": 1,
            }) as resp:
                result = await resp.json(content_type=None)
                if result.get("status") == 1:
                    return result["request"]
        raise TimeoutError()

# Usage
async def main():
    pq = PriorityCaptchaQueue(API_KEY, num_workers=3)

    # High priority — checkout pages
    await pq.submit("checkout_1", "turnstile", priority=1, sitekey="KEY", pageurl="https://shop.com/checkout")

    # Normal priority — product pages
    for i in range(5):
        await pq.submit(f"product_{i}", "userrecaptcha", priority=5, googlekey="KEY", pageurl=f"https://shop.com/p/{i}")

    # Low priority — info pages
    for i in range(3):
        await pq.submit(f"info_{i}", "userrecaptcha", priority=10, googlekey="KEY", pageurl=f"https://shop.com/info/{i}")

    results = await pq.process()
    for task_id, result in results.items():
        print(f"{task_id}: {result['status']}")

asyncio.run(main())

Giám sát và báo cáo

import time
from dataclasses import dataclass, field

@dataclass
class QueueMetrics:
    submitted: int = 0
    solved: int = 0
    failed: int = 0
    total_solve_time: float = 0.0
    start_time: float = field(default_factory=time.time)

    @property
    def avg_solve_time(self):
        return self.total_solve_time / self.solved if self.solved else 0

    @property
    def success_rate(self):
        total = self.solved + self.failed
        return (self.solved / total * 100) if total else 0

    @property
    def throughput(self):
        elapsed = time.time() - self.start_time
        return self.solved / elapsed * 60 if elapsed > 0 else 0

    def report(self):
        return (
            f"Submitted: {self.submitted} | "
            f"Solved: {self.solved} | "
            f"Failed: {self.failed} | "
            f"Avg time: {self.avg_solve_time:.1f}s | "
            f"Success: {self.success_rate:.1f}% | "
            f"Throughput: {self.throughput:.0f}/min"
        )

Khắc phục sự cố

Triệu chứng nguyên nhân sửa chữa
Hàng đợi tăng lên nhưng nhiệm vụ không hoàn thành Quá nhiều công nhân áp đảo API Giảm max_workers / max_concurrent
ERROR_NO_SLOT_AVAILABLE Đã đạt đến giới hạn đồng thời API Thêm độ trễ giữa các lần gửi
Nhiệm vụ bị mắc kẹt trong hàng đợi Chủ đề công nhân đã chết vì ngoại lệ Gói vòng lặp công nhân trong try/except
Trí nhớ tăng dần theo thời gian Kết quả không được tiêu thụ Gọi get_results() định kỳ
Khối hàng đợi không đồng bộ Thiếu await Đảm bảo tất cả các cuộc gọi không đồng bộ đều được chờ đợi

Câu hỏi thường gặp

Tôi có thể chạy bao nhiêu giải pháp đồng thời?

CaptchaAI xử lý đồng thời ở phía máy chủ. Bắt đầu với 10 công nhân đồng thời và tăng dần dựa trên giới hạn kế hoạch của bạn. Kiểm tra ERROR_NO_SLOT_AVAILABLE để biết khi nào cần tăng ga.

Tôi nên sử dụng luồng hay asyncio?

Sử dụng asyncio cho các dự án mới — nó xử lý việc giải I/O-bound CAPTCHA hiệu quả hơn. Sử dụng phân luồng nếu tích hợp vào mã đồng bộ hiện có.

Làm cách nào để xử lý giới hạn tỷ lệ API?

Sử dụng semaphore (không đồng bộ) hoặc hàng đợi giới hạn (luồng) để giới hạn các yêu cầu đồng thời. Thêm độ trễ ngắn giữa các lần gửi nếu bạn nhấn ERROR_NO_SLOT_AVAILABLE.


Tóm tắt

Hàng đợi giải CAPTCHA tách riêng việc gửi khỏi việc bỏ phiếu, cho phép giải quyết song song vớiCaptchaAI. Chọn phân luồng cho mã đồng bộ, asyncio cho Python hiện đại và nhà sản xuất-người tiêu dùng cho khối lượng công việc liên tục.

bài viết liên quan

Os comentários estão desativados para este artigo.

Postagens relacionadas

DevOps & Scaling Xây dựng giải quyết CAPTCHA theo sự kiện bằng AWS SNS và CaptchaAI
Hướng dẫn Dev Ops xây dựng giải pháp giải quyết tình huống CAPTCHA bằng AWS SNS và Captcha AI, với các quyết định về kiến ​​trúc, các cân nhắc vận hành và mô hì...

Hướng dẫn Dev Ops xây dựng giải pháp giải quyết tình huống CAPTCHA bằng AWS SNS và Captcha AI, với các quyết đ...

Apr 25, 2026
DevOps & Scaling Công nhân giải quyết CAPTCHA tự động mở rộng quy mô
Hướng dẫn Dev Ops về Công nhân giải quyết quy trình tự động mở rộng CAPTCHA, với các quyết định về kiến ​​trúc, các cân nhắc vận hành và mô hình tự động hóa cho...

Hướng dẫn Dev Ops về Công nhân giải quyết quy trình tự động mở rộng CAPTCHA, với các quyết định về kiến ​​trúc...

Apr 24, 2026
DevOps & Scaling Triển khai Blue-Green cho cơ sở hạ tầng giải CAPTCHA
Hướng dẫn Dev Ops về Triển khai Blue-Green cho cơ sở hạ tầng giải nén CAPTCHA, với các quyết định về kiến ​​trúc, các cân nhắc vận hành và mô hình tự động hóa c...

Hướng dẫn Dev Ops về Triển khai Blue-Green cho cơ sở hạ tầng giải nén CAPTCHA, với các quyết định về kiến ​​tr...

Apr 28, 2026