DevOps & Scaling

Công nhân giải quyết CAPTCHA tự động mở rộng quy mô

Nhóm công nhân tĩnh sẽ lãng phí tiền trong thời gian yên tĩnh và tạo ra tắc nghẽn trong thời gian cao điểm. Tự động điều chỉnh quy mô phù hợp với số lượng công nhân với nhu cầu thực tế, tối ưu hóa cả chi phí và thông lượng.


Tín hiệu chia tỷ lệ

tín hiệu Tăng quy mô khi Thu nhỏ quy mô khi
Độ sâu hàng đợi > 20 nhiệm vụ đang chờ xử lý < 5 nhiệm vụ đang chờ xử lý
Sử dụng công nhân > 80% bận < 20% bận
Giải quyết độ trễ P95 > 60 giây P95 < 20 giây
Tỷ lệ lỗi > 5% (cần nhân viên mới) Ổn định < 1%
Cân bằng N/A Số dư < $1 (dừng chia tỷ lệ)

Trình chia tỷ lệ tự động dựa trên luồng

Chia tỷ lệ các luồng công nhân trong một quy trình duy nhất:

import os
import time
import threading
import requests
import json
import redis

class AutoScalingPool:
    """Dynamically scale CaptchaAI worker threads."""

    def __init__(self, api_key, redis_url="redis://localhost:6379"):
        self.api_key = api_key
        self.redis = redis.from_url(redis_url)
        self.base = "https://ocr.captchaai.com"
        self.queue_key = "captcha:tasks"
        self.results_key = "captcha:results"

        self.min_workers = 2
        self.max_workers = 20
        self.workers = []
        self.active_count = 0
        self.lock = threading.Lock()
        self.running = True

    def start(self):
        """Start the pool with minimum workers."""
        for _ in range(self.min_workers):
            self._add_worker()

        # Start scaler in background
        scaler = threading.Thread(target=self._scaling_loop, daemon=True)
        scaler.start()
        print(f"Pool started with {self.min_workers} workers")

    def _add_worker(self):
        """Add a worker thread."""
        if len(self.workers) >= self.max_workers:
            return
        t = threading.Thread(target=self._worker_loop, daemon=True)
        t.start()
        self.workers.append(t)

    def _remove_worker(self):
        """Signal one worker to stop (lazy removal)."""
        if len(self.workers) <= self.min_workers:
            return
        self.workers.pop()  # Thread will exit on next idle cycle

    def _worker_loop(self):
        """Worker loop: fetch and process tasks."""
        while self.running and threading.current_thread() in self.workers:
            result = self.redis.blpop(self.queue_key, timeout=10)
            if result is None:
                continue

            _, raw = result
            task = json.loads(raw)
            task_id = task["id"]

            with self.lock:
                self.active_count += 1

            try:
                token = self._solve(task["method"], task["params"])
                self.redis.hset(self.results_key, task_id, json.dumps({
                    "status": "success", "token": token,
                }))
            except Exception as e:
                self.redis.hset(self.results_key, task_id, json.dumps({
                    "status": "error", "error": str(e),
                }))
            finally:
                with self.lock:
                    self.active_count -= 1

    def _scaling_loop(self):
        """Periodically adjust worker count."""
        while self.running:
            time.sleep(10)

            queue_depth = self.redis.llen(self.queue_key)
            current = len(self.workers)
            utilization = (
                self.active_count / current * 100 if current > 0 else 0
            )

            # Scale up: queue growing and workers busy
            if queue_depth > 20 and utilization > 70:
                new_count = min(current + 2, self.max_workers)
                while len(self.workers) < new_count:
                    self._add_worker()
                print(f"Scaled up: {current} → {len(self.workers)} workers")

            # Scale down: queue empty and workers idle
            elif queue_depth < 5 and utilization < 20:
                target = max(current - 1, self.min_workers)
                while len(self.workers) > target:
                    self._remove_worker()
                if len(self.workers) < current:
                    print(f"Scaled down: {current} → {len(self.workers)} workers")

    def _solve(self, method, params, timeout=120):
        data = {"key": self.api_key, "method": method, "json": 1}
        data.update(params)

        resp = requests.post(
            f"{self.base}/in.php", data=data, timeout=30,
        )
        result = resp.json()

        if result.get("status") != 1:
            raise RuntimeError(result.get("request"))

        captcha_id = result["request"]
        start = time.time()

        while time.time() - start < timeout:
            time.sleep(5)
            resp = requests.get(f"{self.base}/res.php", params={
                "key": self.api_key,
                "action": "get",
                "id": captcha_id,
                "json": 1,
            }, timeout=15)
            data = resp.json()
            if data["request"] != "CAPCHA_NOT_READY":
                if data.get("status") == 1:
                    return data["request"]
                raise RuntimeError(data["request"])

        raise TimeoutError("Solve timeout")

    def stats(self):
        return {
            "workers": len(self.workers),
            "active": self.active_count,
            "queue": self.redis.llen(self.queue_key),
        }

# Usage
pool = AutoScalingPool(os.environ["CAPTCHAAI_KEY"])
pool.start()

# Monitor
while True:
    print(pool.stats())
    time.sleep(30)

Auto-Scaler dựa trên quy trình

Quy trình công nhân quy mô để cách ly CPU:

import multiprocessing
import time
import redis
import os

class ProcessScaler:
    """Scale worker processes based on queue depth."""

    def __init__(self, worker_fn, redis_url="redis://localhost:6379"):
        self.worker_fn = worker_fn
        self.redis = redis.from_url(redis_url)
        self.processes = []
        self.min_workers = 2
        self.max_workers = 16

    def run(self, check_interval=15):
        """Run the scaler loop."""
        # Start minimum workers
        for _ in range(self.min_workers):
            self._spawn()

        while True:
            time.sleep(check_interval)
            self._cleanup_dead()

            queue_depth = self.redis.llen("captcha:tasks")
            current = len(self.processes)

            # Scale up
            if queue_depth > current * 5 and current < self.max_workers:
                to_add = min(
                    max(1, queue_depth // 10),
                    self.max_workers - current,
                )
                for _ in range(to_add):
                    self._spawn()
                print(f"Scaled up to {len(self.processes)} workers")

            # Scale down
            elif queue_depth < 3 and current > self.min_workers:
                to_remove = min(2, current - self.min_workers)
                for _ in range(to_remove):
                    p = self.processes.pop()
                    p.terminate()
                print(f"Scaled down to {len(self.processes)} workers")

    def _spawn(self):
        p = multiprocessing.Process(target=self.worker_fn)
        p.start()
        self.processes.append(p)

    def _cleanup_dead(self):
        self.processes = [p for p in self.processes if p.is_alive()]
        # Ensure minimum
        while len(self.processes) < self.min_workers:
            self._spawn()

Chia tỷ lệ nhận thức cân bằng

Dừng mở rộng quy mô khi tiền sắp hết:

def check_balance(api_key, min_balance=2.0):
    """Check if balance is sufficient for scaling."""
    resp = requests.get("https://ocr.captchaai.com/res.php", params={
        "key": api_key,
        "action": "getbalance",
        "json": 1,
    }, timeout=15)
    balance = float(resp.json()["request"])

    if balance < min_balance:
        print(f"Balance ${balance:.2f} below ${min_balance} — halting scale-up")
        return False
    return True

Tích hợp vào vòng lặp chia tỷ lệ:

# In _scaling_loop:
if queue_depth > 20 and utilization > 70:
    if check_balance(self.api_key, min_balance=2.0):
        # Scale up
        ...
    else:
        print("Scaling paused — low balance")

So sánh chiến lược mở rộng quy mô

Chiến lược Tốt nhất cho Độ trễ Độ phức tạp
Nhóm chủ đề I/O-bound (lệnh gọi API) Thấp Thấp
** Nhóm quy trình ** Tiền xử lý giới hạn CPU Trung bình Trung bình
Kubernetes HPA Triển khai trên nền tảng đám mây Cao hơn Cao
KEDA Mở rộng quy mô theo hướng sự kiện Trung bình Trung bình

Khắc phục sự cố

Vấn đề Nguyên nhân Cách xử lý
Công nhân tiếp tục mở rộng quy mô Hàng đợi không bao giờ cạn Kiểm tra xem công nhân có đang thực sự xử lý không
Giảm quy mô quá tích cực Ngưỡng thấp Tăng độ trễ thu nhỏ lên 30 giây+
Quá trình zombie Các tiến trình chưa được dọn dẹp Sử dụng _cleanup_dead() thường xuyên
Cân bằng cạn kiệt nhanh chóng Quá nhiều công nhân Thêm kiểm tra số dư vào logic chia tỷ lệ

Câu hỏi thường gặp

Tỷ lệ công nhân trên hàng đợi phù hợp là bao nhiêu?

Nhắm tới 1 công nhân cho 5-10 nhiệm vụ được xếp hàng đợi. Mỗi nhân viên xử lý ~3-6 CAPTCHA mỗi phút tùy thuộc vào loại.

Tôi nên sử dụng chủ đề hoặc quy trình?

Các luồng để gọi API thuần túy (CaptchaAI là I/O-bound). Xử lý khi bạn cũng thực hiện tiền xử lý hình ảnh hoặc tính toán nặng cùng với việc giải.

Tôi nên mở rộng quy mô nhanh đến mức nào?

Tăng quy mô nhanh chóng (cứ sau 10-15 giây kiểm tra), giảm quy mô từ từ (đợi 30-60 giây tải thấp). Điều này ngăn chặn sự va chạm giữa các trạng thái.


Hướng dẫn liên quan


Quy mô thông minh —lấy khóa CaptchaAI của bạnHôm nay.

Os comentários estão desativados para este artigo.

Postagens relacionadas

DevOps & Scaling Xây dựng giải quyết CAPTCHA theo sự kiện bằng AWS SNS và CaptchaAI
Hướng dẫn Dev Ops xây dựng giải pháp giải quyết tình huống CAPTCHA bằng AWS SNS và Captcha AI, với các quyết định về kiến ​​trúc, các cân nhắc vận hành và mô hì...

Hướng dẫn Dev Ops xây dựng giải pháp giải quyết tình huống CAPTCHA bằng AWS SNS và Captcha AI, với các quyết đ...

Apr 25, 2026
Tutorials Xây dựng hàng đợi giải CAPTCHA bằng Python với CaptchaAI
Hướng dẫn từng bước để Xây dựng hàng đợi giải mã CAPTCHA bằng Python với Captcha AI, với các ví dụ có thể sử dụng lại trực tiếp và quy trình làm việc Captcha AI...

Hướng dẫn từng bước để Xây dựng hàng đợi giải mã CAPTCHA bằng Python với Captcha AI, với các ví dụ có thể sử d...

May 01, 2026
DevOps & Scaling Triển khai Blue-Green cho cơ sở hạ tầng giải CAPTCHA
Hướng dẫn Dev Ops về Triển khai Blue-Green cho cơ sở hạ tầng giải nén CAPTCHA, với các quyết định về kiến ​​trúc, các cân nhắc vận hành và mô hình tự động hóa c...

Hướng dẫn Dev Ops về Triển khai Blue-Green cho cơ sở hạ tầng giải nén CAPTCHA, với các quyết định về kiến ​​tr...

Apr 28, 2026