Nhóm công nhân tĩnh sẽ lãng phí tiền trong thời gian yên tĩnh và tạo ra tắc nghẽn trong thời gian cao điểm. Tự động điều chỉnh quy mô phù hợp với số lượng công nhân với nhu cầu thực tế, tối ưu hóa cả chi phí và thông lượng.
Tín hiệu chia tỷ lệ
| tín hiệu | Tăng quy mô khi | Thu nhỏ quy mô khi |
|---|---|---|
| Độ sâu hàng đợi | > 20 nhiệm vụ đang chờ xử lý | < 5 nhiệm vụ đang chờ xử lý |
| Sử dụng công nhân | > 80% bận | < 20% bận |
| Giải quyết độ trễ | P95 > 60 giây | P95 < 20 giây |
| Tỷ lệ lỗi | > 5% (cần nhân viên mới) | Ổn định < 1% |
| Cân bằng | N/A | Số dư < $1 (dừng chia tỷ lệ) |
Trình chia tỷ lệ tự động dựa trên luồng
Chia tỷ lệ các luồng công nhân trong một quy trình duy nhất:
import os
import time
import threading
import requests
import json
import redis
class AutoScalingPool:
"""Dynamically scale CaptchaAI worker threads."""
def __init__(self, api_key, redis_url="redis://localhost:6379"):
self.api_key = api_key
self.redis = redis.from_url(redis_url)
self.base = "https://ocr.captchaai.com"
self.queue_key = "captcha:tasks"
self.results_key = "captcha:results"
self.min_workers = 2
self.max_workers = 20
self.workers = []
self.active_count = 0
self.lock = threading.Lock()
self.running = True
def start(self):
"""Start the pool with minimum workers."""
for _ in range(self.min_workers):
self._add_worker()
# Start scaler in background
scaler = threading.Thread(target=self._scaling_loop, daemon=True)
scaler.start()
print(f"Pool started with {self.min_workers} workers")
def _add_worker(self):
"""Add a worker thread."""
if len(self.workers) >= self.max_workers:
return
t = threading.Thread(target=self._worker_loop, daemon=True)
t.start()
self.workers.append(t)
def _remove_worker(self):
"""Signal one worker to stop (lazy removal)."""
if len(self.workers) <= self.min_workers:
return
self.workers.pop() # Thread will exit on next idle cycle
def _worker_loop(self):
"""Worker loop: fetch and process tasks."""
while self.running and threading.current_thread() in self.workers:
result = self.redis.blpop(self.queue_key, timeout=10)
if result is None:
continue
_, raw = result
task = json.loads(raw)
task_id = task["id"]
with self.lock:
self.active_count += 1
try:
token = self._solve(task["method"], task["params"])
self.redis.hset(self.results_key, task_id, json.dumps({
"status": "success", "token": token,
}))
except Exception as e:
self.redis.hset(self.results_key, task_id, json.dumps({
"status": "error", "error": str(e),
}))
finally:
with self.lock:
self.active_count -= 1
def _scaling_loop(self):
"""Periodically adjust worker count."""
while self.running:
time.sleep(10)
queue_depth = self.redis.llen(self.queue_key)
current = len(self.workers)
utilization = (
self.active_count / current * 100 if current > 0 else 0
)
# Scale up: queue growing and workers busy
if queue_depth > 20 and utilization > 70:
new_count = min(current + 2, self.max_workers)
while len(self.workers) < new_count:
self._add_worker()
print(f"Scaled up: {current} → {len(self.workers)} workers")
# Scale down: queue empty and workers idle
elif queue_depth < 5 and utilization < 20:
target = max(current - 1, self.min_workers)
while len(self.workers) > target:
self._remove_worker()
if len(self.workers) < current:
print(f"Scaled down: {current} → {len(self.workers)} workers")
def _solve(self, method, params, timeout=120):
data = {"key": self.api_key, "method": method, "json": 1}
data.update(params)
resp = requests.post(
f"{self.base}/in.php", data=data, timeout=30,
)
result = resp.json()
if result.get("status") != 1:
raise RuntimeError(result.get("request"))
captcha_id = result["request"]
start = time.time()
while time.time() - start < timeout:
time.sleep(5)
resp = requests.get(f"{self.base}/res.php", params={
"key": self.api_key,
"action": "get",
"id": captcha_id,
"json": 1,
}, timeout=15)
data = resp.json()
if data["request"] != "CAPCHA_NOT_READY":
if data.get("status") == 1:
return data["request"]
raise RuntimeError(data["request"])
raise TimeoutError("Solve timeout")
def stats(self):
return {
"workers": len(self.workers),
"active": self.active_count,
"queue": self.redis.llen(self.queue_key),
}
# Usage
pool = AutoScalingPool(os.environ["CAPTCHAAI_KEY"])
pool.start()
# Monitor
while True:
print(pool.stats())
time.sleep(30)
Auto-Scaler dựa trên quy trình
Quy trình công nhân quy mô để cách ly CPU:
import multiprocessing
import time
import redis
import os
class ProcessScaler:
"""Scale worker processes based on queue depth."""
def __init__(self, worker_fn, redis_url="redis://localhost:6379"):
self.worker_fn = worker_fn
self.redis = redis.from_url(redis_url)
self.processes = []
self.min_workers = 2
self.max_workers = 16
def run(self, check_interval=15):
"""Run the scaler loop."""
# Start minimum workers
for _ in range(self.min_workers):
self._spawn()
while True:
time.sleep(check_interval)
self._cleanup_dead()
queue_depth = self.redis.llen("captcha:tasks")
current = len(self.processes)
# Scale up
if queue_depth > current * 5 and current < self.max_workers:
to_add = min(
max(1, queue_depth // 10),
self.max_workers - current,
)
for _ in range(to_add):
self._spawn()
print(f"Scaled up to {len(self.processes)} workers")
# Scale down
elif queue_depth < 3 and current > self.min_workers:
to_remove = min(2, current - self.min_workers)
for _ in range(to_remove):
p = self.processes.pop()
p.terminate()
print(f"Scaled down to {len(self.processes)} workers")
def _spawn(self):
p = multiprocessing.Process(target=self.worker_fn)
p.start()
self.processes.append(p)
def _cleanup_dead(self):
self.processes = [p for p in self.processes if p.is_alive()]
# Ensure minimum
while len(self.processes) < self.min_workers:
self._spawn()
Chia tỷ lệ nhận thức cân bằng
Dừng mở rộng quy mô khi tiền sắp hết:
def check_balance(api_key, min_balance=2.0):
"""Check if balance is sufficient for scaling."""
resp = requests.get("https://ocr.captchaai.com/res.php", params={
"key": api_key,
"action": "getbalance",
"json": 1,
}, timeout=15)
balance = float(resp.json()["request"])
if balance < min_balance:
print(f"Balance ${balance:.2f} below ${min_balance} — halting scale-up")
return False
return True
Tích hợp vào vòng lặp chia tỷ lệ:
# In _scaling_loop:
if queue_depth > 20 and utilization > 70:
if check_balance(self.api_key, min_balance=2.0):
# Scale up
...
else:
print("Scaling paused — low balance")
So sánh chiến lược mở rộng quy mô
| Chiến lược | Tốt nhất cho | Độ trễ | Độ phức tạp |
|---|---|---|---|
| Nhóm chủ đề | I/O-bound (lệnh gọi API) | Thấp | Thấp |
| ** Nhóm quy trình ** | Tiền xử lý giới hạn CPU | Trung bình | Trung bình |
| Kubernetes HPA | Triển khai trên nền tảng đám mây | Cao hơn | Cao |
| KEDA | Mở rộng quy mô theo hướng sự kiện | Trung bình | Trung bình |
Khắc phục sự cố
| Vấn đề | Nguyên nhân | Cách xử lý |
|---|---|---|
| Công nhân tiếp tục mở rộng quy mô | Hàng đợi không bao giờ cạn | Kiểm tra xem công nhân có đang thực sự xử lý không |
| Giảm quy mô quá tích cực | Ngưỡng thấp | Tăng độ trễ thu nhỏ lên 30 giây+ |
| Quá trình zombie | Các tiến trình chưa được dọn dẹp | Sử dụng _cleanup_dead() thường xuyên |
| Cân bằng cạn kiệt nhanh chóng | Quá nhiều công nhân | Thêm kiểm tra số dư vào logic chia tỷ lệ |
Câu hỏi thường gặp
Tỷ lệ công nhân trên hàng đợi phù hợp là bao nhiêu?
Nhắm tới 1 công nhân cho 5-10 nhiệm vụ được xếp hàng đợi. Mỗi nhân viên xử lý ~3-6 CAPTCHA mỗi phút tùy thuộc vào loại.
Tôi nên sử dụng chủ đề hoặc quy trình?
Các luồng để gọi API thuần túy (CaptchaAI là I/O-bound). Xử lý khi bạn cũng thực hiện tiền xử lý hình ảnh hoặc tính toán nặng cùng với việc giải.
Tôi nên mở rộng quy mô nhanh đến mức nào?
Tăng quy mô nhanh chóng (cứ sau 10-15 giây kiểm tra), giảm quy mô từ từ (đợi 30-60 giây tải thấp). Điều này ngăn chặn sự va chạm giữa các trạng thái.
Hướng dẫn liên quan
Quy mô thông minh —lấy khóa CaptchaAI của bạnHôm nay.