""" Stripe Checkout 自动化支付脚本 用法: python pay.py [--card N] [--config path] [--token TOKEN] 示例: python pay.py cs_live_a12H3g13P9TH6udPmljRCpWsmHiKRFH7VUiZBbcA1U60eMzFFI2wp3rtXL """ import argparse import base64 import hashlib import json import os import random import re import string import sys import time import urllib.parse import uuid from datetime import datetime import requests LOG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "log.txt") def _init_log(): """清空并初始化 log.txt""" with open(LOG_FILE, "w", encoding="utf-8") as f: f.write(f"{'='*80}\n") f.write(f" Stripe 自动化支付 日志 — {datetime.now().isoformat()}\n") f.write(f"{'='*80}\n\n") def _log(msg: str): """追加一行到 log.txt 并同时 print""" ts = datetime.now().strftime("%H:%M:%S.%f")[:-3] line = f"[{ts}] {msg}" print(line) with open(LOG_FILE, "a", encoding="utf-8") as f: f.write(line + "\n") def _log_raw(text: str): """追加原始文本到 log.txt(不 print)""" with open(LOG_FILE, "a", encoding="utf-8") as f: f.write(text + "\n") def _log_request(method: str, url: str, data=None, params=None, tag: str = ""): """记录 HTTP 请求详情""" _log_raw(f"\n{'─'*70}") _log_raw(f">>> REQUEST {tag}") _log_raw(f" {method} {url}") if params: _log_raw(f" PARAMS: {json.dumps(params, ensure_ascii=False, indent=6)}") if data: # 脱敏卡号 safe = dict(data) if isinstance(data, dict) else {} if "card[number]" in safe: safe["card[number]"] = "****" + str(safe["card[number]"])[-4:] if "card[cvc]" in safe: safe["card[cvc]"] = "***" _log_raw(f" BODY: {json.dumps(safe, ensure_ascii=False, indent=6)}") def _log_response(resp: requests.Response, tag: str = ""): """记录 HTTP 响应详情""" _log_raw(f"<<< RESPONSE {tag} status={resp.status_code}") try: body = resp.json() _log_raw(f" BODY: {json.dumps(body, ensure_ascii=False, indent=6)}") except Exception: _log_raw(f" BODY(raw): {resp.text[:2000]}") _log_raw(f"{'─'*70}\n") # --------------------------------------------------------------------------- # 常量 # --------------------------------------------------------------------------- STRIPE_API = "https://api.stripe.com" STRIPE_VERSION_FULL = "2025-03-31.basil; checkout_server_update_beta=v1; checkout_manual_approval_preview=v1" STRIPE_VERSION_BASE = "2025-03-31.basil" USER_AGENT = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/146.0.0.0 Safari/537.36" ) HCAPTCHA_SITE_KEY_FALLBACK = "c7faac4c-1cd7-4b1b-b2d4-42ba98d09c7a" KNOWN_PUBLISHABLE_KEYS = { "1HOrSwC6h1nxGoI3": "pk_live_51HOrSwC6h1nxGoI3lTAgRjYVrz4dU3fVOabyCcKR3pbEJguCVAlqCxdxCUvoRh1XWwRacViovU3kLKvpkjh7IqkW00iXQsjo3n", } # --------------------------------------------------------------------------- # 地域 / 浏览器配置 — 必须和代理 IP 出口一致 # --------------------------------------------------------------------------- LOCALE_PROFILES = { "US": { "browser_locale": "en-US", "browser_timezone": "America/Los_Angeles", "browser_tz_offset": 360, # CST = UTC-6 → 360 "browser_language": "en-US", "color_depth": 24, "screen_w": 2560, "screen_h": 1440, "dpr": 1, } } APATA_RBA_ORG_ID = "8t63q4n4" def _build_browser_fingerprint(locale_profile: dict) -> dict: """构建 RecordBrowserInfo 的完整设备指纹 payload""" sw = locale_profile["screen_w"] sh = locale_profile["screen_h"] dpr = locale_profile["dpr"] cd = locale_profile["color_depth"] lang = locale_profile["browser_language"] tz_name = locale_profile["browser_timezone"] tz_offset = locale_profile["browser_tz_offset"] # 可用高度 = 屏幕高度 - 任务栏 (48-60px) avail_h = sh - random.randint(40, 60) return { "navigator": { "mediaDevices": {"audioinput": random.randint(1, 3), "videoinput": random.randint(0, 2), "audiooutput": random.randint(1, 3)}, "battery": {"charging": True, "chargingTime": 0, "dischargingTime": None, "level": round(random.uniform(0.5, 1.0), 2)}, "appCodeName": "Mozilla", "appName": "Netscape", "appVersion": USER_AGENT.replace("Mozilla/", ""), "cookieEnabled": True, "doNotTrack": None, "hardwareConcurrency": random.choice([8, 12, 16, 32]), "language": lang, "languages": [lang, lang.split("-")[0]], "maxTouchPoints": 0, "onLine": True, "platform": "Win32", "product": "Gecko", "productSub": "20030107", "userAgent": USER_AGENT, "vendor": "Google Inc.", "vendorSub": "", "webdriver": False, "deviceMemory": random.choice([4, 8, 16]), "pdfViewerEnabled": True, "javaEnabled": False, "plugins": "PDF Viewer,Chrome PDF Viewer,Chromium PDF Viewer,Microsoft Edge PDF Viewer,WebKit built-in PDF", "connections": { "effectiveType": "4g", "downlink": round(random.uniform(1.0, 10.0), 2), "rtt": random.choice([50, 100, 150, 200, 250, 300, 350, 400]), "saveData": False, }, }, "screen": { "availHeight": avail_h, "availWidth": sw, "availLeft": 0, "availTop": 0, "colorDepth": cd, "height": sh, "width": sw, "pixelDepth": cd, "orientation": "landscape-primary", "devicePixelRatio": dpr, }, "timezone": {"offset": tz_offset, "timezone": tz_name}, "canvas": hashlib.sha256(os.urandom(32)).hexdigest(), "permissions": { "geolocation": "denied", "notifications": "denied", "midi": "denied", "camera": "denied", "microphone": "denied", "background-fetch": "prompt", "background-sync": "granted", "persistent-storage": "granted", "accelerometer": "granted", "gyroscope": "granted", "magnetometer": "granted", "clipboard-read": "denied", "clipboard-write": "denied", "screen-wake-lock": "denied", "display-capture": "denied", "idle-detection": "denied", }, "audio": {"sum": 124.04347527516074}, "browserBars": { "locationbar": True, "menubar": True, "personalbar": True, "statusbar": True, "toolbar": True, "scrollbars": True, }, "sensors": { "accelerometer": True, "gyroscope": True, "linearAcceleration": True, "absoluteOrientation": True, "relativeOrientation": True, "magnetometer": False, "ambientLight": False, "proximity": False, }, "storage": { "localStorage": True, "sessionStorage": True, "indexedDB": True, "openDatabase": False, }, "webGl": { "dataHash": hashlib.sha256(os.urandom(32)).hexdigest(), "vendor": "Google Inc. (NVIDIA)", "renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 4060 (0x00002882) Direct3D11 vs_5_0 ps_5_0, D3D11)", }, "adblock": False, "clientRects": { "x": round(-10004 + random.uniform(-1, 1), 10), "y": round(2.35 + random.uniform(-0.01, 0.01), 10), "width": round(111.29 + random.uniform(-0.01, 0.01), 10), "height": round(111.29 + random.uniform(-0.01, 0.01), 10), "top": round(2.35 + random.uniform(-0.01, 0.01), 10), "bottom": round(113.64 + random.uniform(-0.01, 0.01), 10), "left": round(-10004 + random.uniform(-1, 1), 10), "right": round(-9893 + random.uniform(-1, 1), 10), }, "fonts": {"installed_count": random.randint(40, 60), "not_installed_count": 0}, } def _gen_fingerprint(): def _id(): return str(uuid.uuid4()).replace("-", "") + uuid.uuid4().hex[:6] return _id(), _id(), _id() _PLUGINS_STR = ( "PDF Viewer,internal-pdf-viewer,application/pdf,pdf++text/pdf,pdf, " "Chrome PDF Viewer,internal-pdf-viewer,application/pdf,pdf++text/pdf,pdf, " "Chromium PDF Viewer,internal-pdf-viewer,application/pdf,pdf++text/pdf,pdf, " "Microsoft Edge PDF Viewer,internal-pdf-viewer,application/pdf,pdf++text/pdf,pdf, " "WebKit built-in PDF,internal-pdf-viewer,application/pdf,pdf++text/pdf,pdf" ) _CANVAS_FPS = [ "0100100101111111101111101111111001110010110111110111111", "0100100101111111101111101111111001110010110111110111110", "0100100101111111101111101111111001110010110111110111101", ] _AUDIO_FPS = [ "d331ca493eb692cfcd19ae5db713ad4b", "a7c5f72e1b3d4e8f9c0d2a6b7e8f1c3d", "e4b8d6f2a0c3d5e7f9b1c3d5e7f9a0b2", ] def _encode_m6(payload: dict) -> str: """JSON → urlencode → base64 (m.stripe.com/6 编码格式)""" raw = json.dumps(payload, separators=(",", ":")) return base64.b64encode(urllib.parse.quote(raw, safe="").encode()).decode() def _b64url_seg(n: int = 32) -> str: return base64.urlsafe_b64encode(os.urandom(n)).rstrip(b"=").decode() def register_fingerprint(http: "requests.Session") -> tuple[str, str, str]: """向 m.stripe.com/6 发送 4 次指纹上报, 返回服务端分配的 (guid, muid, sid)。 如果请求失败, 返回本地随机生成的值。 """ # 本地备用值 guid, muid, sid = _gen_fingerprint() fp_id = uuid.uuid4().hex # 屏幕参数 (US 常见配置) screens = [(1920, 1080, 1), (1536, 864, 1.25), (2560, 1440, 1), (1440, 900, 1)] sw, sh, dpr = random.choice(screens) vh = sh - random.randint(40, 70) # viewport = screen - chrome cpu = random.choice([4, 8, 12, 16]) canvas_fp = random.choice(_CANVAS_FPS) audio_fp = random.choice(_AUDIO_FPS) def _build_full(v2: int, inc_ids: bool) -> dict: s1, s2, s3, s4, s5 = (_b64url_seg() for _ in range(5)) ts_now = int(time.time() * 1000) return { "v2": v2, "id": fp_id, "t": round(random.uniform(3, 120), 1), "tag": "$npm_package_version", "src": "js", "a": { "a": {"v": "true", "t": 0}, "b": {"v": "true", "t": 0}, "c": {"v": "en-US", "t": 0}, "d": {"v": "Win32", "t": 0}, "e": {"v": _PLUGINS_STR, "t": round(random.uniform(0, 0.5), 1)}, "f": {"v": f"{sw}w_{vh}h_24d_{dpr}r", "t": 0}, "g": {"v": str(cpu), "t": 0}, "h": {"v": "false", "t": 0}, "i": {"v": "sessionStorage-enabled, localStorage-enabled", "t": round(random.uniform(0.5, 2), 1)}, "j": {"v": canvas_fp, "t": round(random.uniform(5, 120), 1)}, "k": {"v": "", "t": 0}, "l": {"v": USER_AGENT, "t": 0}, "m": {"v": "", "t": 0}, "n": {"v": "false", "t": round(random.uniform(3, 50), 1)}, "o": {"v": audio_fp, "t": round(random.uniform(20, 30), 1)}, }, "b": { "a": f"https://{s1}.{s2}.{s3}/", "b": f"https://{s1}.{s3}/{s4}/{s5}/{_b64url_seg()}", "c": _b64url_seg(), "d": muid if inc_ids else "NA", "e": sid if inc_ids else "NA", "f": False, "g": True, "h": True, "i": ["location"], "j": [], "n": round(random.uniform(800, 2000), 1), "u": "chatgpt.com", "v": "auth.openai.com", "w": f"{ts_now}:{hashlib.sha256(os.urandom(32)).hexdigest()}", }, "h": os.urandom(10).hex(), } def _build_mouse(source: str) -> dict: return { "muid": muid, "sid": sid, "url": f"https://{_b64url_seg()}.{_b64url_seg()}/{_b64url_seg()}/{_b64url_seg()}/{_b64url_seg()}", "source": source, "data": [random.randint(1, 8) for _ in range(10)], } m6_headers = { "User-Agent": USER_AGENT, "Content-Type": "text/plain;charset=UTF-8", "Accept": "*/*", "Origin": "https://m.stripe.network", "Referer": "https://m.stripe.network/", } m6_url = "https://m.stripe.com/6" _log(" [指纹] 向 m.stripe.com/6 注册设备指纹 ...") # #1 完整指纹 (v2=1, 无 ID) try: r1 = http.post(m6_url, data=_encode_m6(_build_full(1, False)), headers=m6_headers, timeout=10) if r1.status_code == 200: j = r1.json() muid = j.get("muid", muid) guid = j.get("guid", guid) sid = j.get("sid", sid) _log(f" [指纹] #1 OK → muid={muid[:20]}...") except Exception as e: _log(f" [指纹] #1 失败: {e}") # #2 完整指纹 (v2=2, 带 ID) try: r2 = http.post(m6_url, data=_encode_m6(_build_full(2, True)), headers=m6_headers, timeout=10) if r2.status_code == 200: j = r2.json() guid = j.get("guid", guid) _log(f" [指纹] #2 OK → guid={guid[:20]}...") except Exception as e: _log(f" [指纹] #2 失败: {e}") # #3 鼠标行为 (mouse-timings-10-v2) try: http.post(m6_url, data=_encode_m6(_build_mouse("mouse-timings-10-v2")), headers=m6_headers, timeout=10) _log(" [指纹] #3 OK (mouse-timings-v2)") except Exception: pass # #4 鼠标行为 (mouse-timings-10) try: http.post(m6_url, data=_encode_m6(_build_mouse("mouse-timings-10")), headers=m6_headers, timeout=10) _log(" [指纹] #4 OK (mouse-timings)") except Exception: pass _log(f" [指纹] 完成 → guid={guid[:25]}... muid={muid[:25]}... sid={sid[:25]}...") return guid, muid, sid def _gen_elements_session_id(): """生成类似 elements_session_15hfldlRpSm 的 session id""" import random, string chars = string.ascii_letters + string.digits return "elements_session_" + "".join(random.choices(chars, k=11)) def _stripe_headers(): return { "User-Agent": USER_AGENT, "Accept": "application/json", "Origin": "https://js.stripe.com", "Referer": "https://js.stripe.com/", } def parse_checkout_url(raw: str) -> tuple[str, str]: """解析输入,返回 (session_id, stripe_checkout_url) 支持以下格式: - 裸 session_id: cs_live_xxx / cs_test_xxx - Stripe URL: https://checkout.stripe.com/c/pay/cs_live_xxx - ChatGPT URL: https://chatgpt.com/checkout/openai_llc/cs_live_xxx """ raw = raw.strip() m = re.search(r"(cs_(?:live|test)_[A-Za-z0-9]+)", raw) if not m: raise ValueError(f"无法从输入中提取 checkout_session_id: {raw[:120]}...") session_id = m.group(1) # 构建用于 Playwright 等回退方案的 Stripe checkout URL # 如果输入是 checkout.stripe.com 的链接则直接使用,否则用标准格式构建 if "checkout.stripe.com" in raw: stripe_url = raw else: stripe_url = f"https://checkout.stripe.com/c/pay/{session_id}" return session_id, stripe_url def fetch_publishable_key(session: requests.Session, session_id: str, stripe_checkout_url: str) -> str: checkout_url = stripe_checkout_url _log("[2/6] 获取 publishable_key ...") for acct_id_part, known_pk in KNOWN_PUBLISHABLE_KEYS.items(): try: url = f"{STRIPE_API}/v1/payment_pages/{session_id}/init" post_data = {"key": known_pk, "_stripe_version": STRIPE_VERSION_BASE, "browser_locale": "en-US"} _log_request("POST", url, data=post_data, tag="[2/6] pk探测") test_resp = session.post(url, data=post_data, headers=_stripe_headers(), timeout=15) _log_response(test_resp, tag="[2/6] pk探测") if test_resp.status_code == 200: _log(f" publishable_key: {known_pk[:30]}... (已知)") return known_pk except Exception as e: _log(f" pk探测异常: {e}") pk = _fetch_pk_playwright(checkout_url) if pk: _log(f" publishable_key: {pk[:30]}... (playwright)") return pk raise RuntimeError("无法提取 publishable_key") def _fetch_pk_playwright(checkout_url: str) -> str | None: try: from playwright.sync_api import sync_playwright except ImportError: return None pk = None def on_request(request): nonlocal pk if pk: return if "api.stripe.com" in request.url and "init" in request.url: post = request.post_data or "" m = re.search(r"key=(pk_(?:live|test)_[A-Za-z0-9]+)", post) if m: pk = m.group(1) try: with sync_playwright() as p: browser = p.chromium.launch(headless=True) page = browser.new_page() page.on("request", on_request) try: page.goto(checkout_url, wait_until="domcontentloaded", timeout=20000) for _ in range(10): if pk: break page.wait_for_timeout(1000) except Exception: pass browser.close() except Exception: return None return pk def init_checkout(session: requests.Session, session_id: str, pk: str, locale_profile: dict = None) -> tuple[dict, str, dict]: """返回 (init_resp, stripe_ver, ctx) — ctx 包含后续步骤需要的上下文""" locale_profile = locale_profile or LOCALE_PROFILES["US"] url = f"{STRIPE_API}/v1/payment_pages/{session_id}/init" stripe_js_id = str(uuid.uuid4()) elements_session_id = _gen_elements_session_id() for version in [STRIPE_VERSION_BASE, STRIPE_VERSION_FULL]: data = { "browser_locale": locale_profile["browser_locale"], "browser_timezone": locale_profile["browser_timezone"], "elements_session_client[elements_init_source]": "custom_checkout", "elements_session_client[referrer_host]": "chatgpt.com", "elements_session_client[stripe_js_id]": stripe_js_id, "elements_session_client[locale]": locale_profile["browser_locale"], "elements_session_client[is_aggregation_expected]": "false", "key": pk, "_stripe_version": version, } if version == STRIPE_VERSION_FULL: data["elements_session_client[client_betas][0]"] = "custom_checkout_server_updates_1" data["elements_session_client[client_betas][1]"] = "custom_checkout_manual_approval_1" _log(f" 初始化结账会话 (init) ... version={version[:30]}") _log_request("POST", url, data=data, tag="[2b/6] init") resp = session.post(url, data=data, headers=_stripe_headers()) _log_response(resp, tag="[2b/6] init") if resp.status_code == 200: ctx = { "stripe_js_id": stripe_js_id, "elements_session_id": elements_session_id, } return resp.json(), version, ctx if resp.status_code == 400 and "beta" in resp.text.lower(): _log(f" 版本 {version[:20]}... 不支持 beta, 尝试下一个 ...") continue raise RuntimeError(f"init 失败 [{resp.status_code}]: {resp.text[:500]}") raise RuntimeError("init 失败: 所有 Stripe API 版本均不可用") def extract_hcaptcha_config(init_resp: dict) -> dict: raw = json.dumps(init_resp) result = {"site_key": HCAPTCHA_SITE_KEY_FALLBACK, "rqdata": ""} if init_resp.get("site_key"): result["site_key"] = init_resp["site_key"] m = re.search(r'"hcaptcha_site_key"\s*:\s*"([^"]+)"', raw) if m and not init_resp.get("site_key"): result["site_key"] = m.group(1) m = re.search(r'"hcaptcha_rqdata"\s*:\s*"([^"]+)"', raw) if m: result["rqdata"] = m.group(1) return result def fetch_elements_session( session: requests.Session, pk: str, session_id: str, ctx: dict, stripe_ver: str = STRIPE_VERSION_FULL, locale_profile: dict = None, ) -> dict: """调用 elements/sessions, 返回响应 dict 并更新 ctx 中的 elements_session_id""" locale_profile = locale_profile or LOCALE_PROFILES["US"] locale_short = locale_profile["browser_locale"].split("-")[0] # HAR: "zh" 而非 "zh-CN" stripe_js_id = ctx.get("stripe_js_id", str(uuid.uuid4())) url = f"{STRIPE_API}/v1/elements/sessions" params = { "client_betas[0]": "custom_checkout_server_updates_1", "client_betas[1]": "custom_checkout_manual_approval_1", "deferred_intent[mode]": "subscription", "deferred_intent[amount]": "0", "deferred_intent[currency]": "usd", "deferred_intent[setup_future_usage]": "off_session", "deferred_intent[payment_method_types][0]": "card", "currency": "usd", "key": pk, "_stripe_version": stripe_ver, "elements_init_source": "custom_checkout", "referrer_host": "chatgpt.com", "stripe_js_id": stripe_js_id, "locale": locale_short, "type": "deferred_intent", "checkout_session_id": session_id, } _log(" [elements] GET /v1/elements/sessions ...") _log_request("GET", url, params=params, tag="[2c] elements/sessions") resp = session.get(url, params=params, headers=_stripe_headers()) _log_response(resp, tag="[2c] elements/sessions") if resp.status_code == 200: data = resp.json() # 提取真实的 elements_session_id (如果有) real_es_id = data.get("session_id") or data.get("id") if real_es_id: ctx["elements_session_id"] = real_es_id _log(f" [elements] 真实 session_id: {real_es_id}") # 提取 config_id config_id = data.get("config_id") if config_id: ctx["config_id"] = config_id _log(f" [elements] config_id: {config_id}") return data else: _log(f" [elements] 请求失败 [{resp.status_code}], 继续使用本地生成的 ID") return {} def lookup_consumer( session: requests.Session, pk: str, email: str, stripe_ver: str = STRIPE_VERSION_FULL, ): """查询 Stripe Link 消费者会话,模拟真实浏览器的两次 lookup""" url = f"{STRIPE_API}/v1/consumers/sessions/lookup" surfaces = [ ("web_link_authentication_in_payment_element", "default_value"), ("web_elements_controller", "default_value"), ] for surface, source in surfaces: data = { "request_surface": surface, "email_address": email, "email_source": source, "session_id": str(uuid.uuid4()), "key": pk, "_stripe_version": stripe_ver, } if surface == "web_elements_controller": data["do_not_log_consumer_funnel_event"] = "true" try: _log(f" [link] lookup ({surface[:30]}...) ...") _log_request("POST", url, data=data, tag="[2d] consumer/lookup") resp = session.post(url, data=data, headers=_stripe_headers(), timeout=10) _log_response(resp, tag="[2d] consumer/lookup") except Exception as e: _log(f" [link] lookup 异常: {e}") time.sleep(random.uniform(0.3, 0.8)) def update_payment_page_address( session: requests.Session, pk: str, session_id: str, card: dict, ctx: dict, stripe_ver: str = STRIPE_VERSION_FULL, ): """模拟浏览器逐字段提交地址/税区信息, 共 6 次 POST""" url = f"{STRIPE_API}/v1/payment_pages/{session_id}" addr = card.get("address", {}) elements_session_id = ctx.get("elements_session_id", _gen_elements_session_id()) stripe_js_id = ctx.get("stripe_js_id", str(uuid.uuid4())) # 基础字段 — 每次 update 都要带 base = { "elements_session_client[client_betas][0]": "custom_checkout_server_updates_1", "elements_session_client[client_betas][1]": "custom_checkout_manual_approval_1", "elements_session_client[elements_init_source]": "custom_checkout", "elements_session_client[referrer_host]": "chatgpt.com", "elements_session_client[session_id]": elements_session_id, "elements_session_client[stripe_js_id]": stripe_js_id, "elements_session_client[locale]": "en-US", "elements_session_client[is_aggregation_expected]": "false", "client_attribution_metadata[merchant_integration_additional_elements][0]": "payment", "client_attribution_metadata[merchant_integration_additional_elements][1]": "address", "key": pk, "_stripe_version": stripe_ver, } # HAR 中的逐字段提交顺序: country → (重复一次) → line1 → city → state → postal_code address_steps = [ {"tax_region[country]": addr.get("country", "US")}, {}, # 重复提交 (无新字段, 模拟用户切换焦点) {"tax_region[line1]": addr.get("line1", "")}, {"tax_region[city]": addr.get("city", "")}, {"tax_region[state]": addr.get("state", "")}, {"tax_region[postal_code]": addr.get("postal_code", "")}, ] _log(" [address] 逐字段提交税区地址 ...") accumulated = {} for step_idx, new_fields in enumerate(address_steps): accumulated.update(new_fields) data = dict(base) data.update(accumulated) step_name = list(new_fields.keys())[0] if new_fields else "(焦点变更)" _log(f" [address] step {step_idx + 1}/6: {step_name}") _log_request("POST", url, data=data, tag=f"[2e] update_address({step_idx + 1}/6)") resp = session.post(url, data=data, headers=_stripe_headers()) _log_response(resp, tag=f"[2e] update_address({step_idx + 1}/6)") if resp.status_code != 200: _log(f" [address] step {step_idx + 1} 返回 {resp.status_code}, 继续 ...") # 模拟人类输入间隔 (2-5 秒) time.sleep(random.uniform(2.0, 4.5)) def send_telemetry( session: requests.Session, event_type: str, session_id: str, ctx: dict, ): """向 r.stripe.com/b 发送遥测事件, 模拟 stripe.js 行为上报""" url = "https://r.stripe.com/b" muid = ctx.get("muid", "") sid = ctx.get("sid", "") guid = ctx.get("guid", "") payload = { "v2": 1, "tag": event_type, "src": "js", "pid": "checkout_" + session_id[:20], "muid": muid, "sid": sid, "guid": guid, } headers = { "User-Agent": USER_AGENT, "Content-Type": "text/plain;charset=UTF-8", "Accept": "*/*", "Origin": "https://js.stripe.com", "Referer": "https://js.stripe.com/", } try: body = base64.b64encode(json.dumps(payload, separators=(",", ":")).encode()).decode() session.post(url, data=body, headers=headers, timeout=5) except Exception: pass def send_telemetry_batch( session: requests.Session, session_id: str, ctx: dict, phase: str = "init", ): """按阶段批量发送遥测事件""" events_map = { "init": ["checkout.init", "elements.create", "payment_element.mount"], "address": ["address.update", "address.focus", "address.blur"], "card_input": ["card.focus", "card.input", "card.blur", "cvc.input"], "confirm": ["checkout.confirm.start", "payment_method.create", "checkout.confirm.intent"], "3ds": ["three_ds2.start", "three_ds2.fingerprint", "three_ds2.authenticate"], "poll": ["checkout.poll", "checkout.complete"], } events = events_map.get(phase, []) for evt in events: send_telemetry(session, evt, session_id, ctx) time.sleep(random.uniform(0.05, 0.2)) def submit_apata_fingerprint( session: requests.Session, three_ds_server_trans_id: str, three_ds_method_url: str, notification_url: str, locale_profile: dict, ctx: dict, ): # 1) POST acs-method.apata.io/v1/houston/method — 提交 threeDSMethodData _log(" [apata] POST houston/method ...") method_data = base64.b64encode(json.dumps({ "threeDSServerTransID": three_ds_server_trans_id, "threeDSMethodNotificationURL": notification_url, }, separators=(",", ":")).encode()).decode() try: method_url = three_ds_method_url or "https://acs-method.apata.io/v1/houston/method" resp = session.post( method_url, data={"threeDSMethodData": method_data}, headers={ "User-Agent": USER_AGENT, "Content-Type": "application/x-www-form-urlencoded", "Origin": "https://js.stripe.com", "Referer": "https://js.stripe.com/", }, timeout=15, ) _log(f" [apata] houston/method → {resp.status_code}") except Exception as e: _log(f" [apata] houston/method 异常: {e}") time.sleep(random.uniform(0.5, 1.0)) # 2) POST acs-method.apata.io/v1/RecordBrowserInfo — 设备指纹上报 _log(" [apata] POST RecordBrowserInfo ...") # 生成 possessionDeviceId (localStorage acsRbaDeviceId 模拟) possession_device_id = ctx.get("apata_device_id") or str(uuid.uuid4()) ctx["apata_device_id"] = possession_device_id fp_data = _build_browser_fingerprint(locale_profile) record_payload = { "threeDSServerTransID": three_ds_server_trans_id, "computedValue": hashlib.sha256(os.urandom(32)).hexdigest()[:20], "possessionDeviceId": possession_device_id, } record_payload.update(fp_data) try: record_url = "https://acs-method.apata.io/v1/RecordBrowserInfo" resp = session.post( record_url, json=record_payload, headers={ "User-Agent": USER_AGENT, "Content-Type": "application/json", "Origin": "https://acs-method.apata.io", "Referer": "https://acs-method.apata.io/", }, timeout=15, ) _log(f" [apata] RecordBrowserInfo → {resp.status_code}") except Exception as e: _log(f" [apata] RecordBrowserInfo 异常: {e}") time.sleep(random.uniform(0.5, 1.0)) # 3) GET rba.apata.io/xxx.js — 模拟 RBA profile 脚本加载 _log(" [apata] GET rba profile script ...") rba_session_id = ctx.get("rba_session_id") or str(uuid.uuid4()) ctx["rba_session_id"] = rba_session_id try: # HAR 中的 URL 格式: rba.apata.io/.js?=&= rba_script_name = ''.join(random.choices(string.ascii_lowercase + string.digits, k=16)) + ".js" rba_param1 = ''.join(random.choices(string.ascii_lowercase + string.digits, k=16)) rba_param2 = ''.join(random.choices(string.ascii_lowercase + string.digits, k=16)) rba_url = f"https://rba.apata.io/{rba_script_name}?{rba_param1}={APATA_RBA_ORG_ID}&{rba_param2}={rba_session_id}" resp = session.get(rba_url, headers={"User-Agent": USER_AGENT}, timeout=10) _log(f" [apata] rba profile → {resp.status_code}") except Exception as e: _log(f" [apata] rba profile 异常: {e}") # 4) 模拟 aa.online-metrix.net CONNECT (WebRTC beacon 不可模拟, 仅日志标记) _log(" [apata] online-metrix beacon (WebRTC, 已跳过 — 无法在 requests 中模拟)") # 总等待: 让 Apata 有时间处理指纹结果 (HAR 中这个窗口约 8-12 秒) wait = random.uniform(5.0, 8.0) _log(f" [apata] 等待指纹处理完成 ({wait:.1f}s) ...") time.sleep(wait) def solve_hcaptcha(captcha_cfg: dict, hcaptcha_config: dict, max_retries: int = 3) -> tuple[str, str]: """返回 (token, ekey) 元组""" api_url = captcha_cfg.get("api_url", "https://api.yescaptcha.com") client_key = captcha_cfg["api_key"] site_key = hcaptcha_config["site_key"] rqdata = hcaptcha_config.get("rqdata", "") for retry in range(max_retries): if retry > 0: _log(f" --- 重试第 {retry + 1}/{max_retries} 次 ---") _log(f" 解 hCaptcha (siteKey: {site_key[:20]}...)") # 创建 1 个任务 task_body = { "type": "HCaptchaTaskProxyless", "websiteURL": "https://b.stripecdn.com/stripethirdparty-srv/assets/v32.1/HCaptchaInvisible.html", "websiteKey": site_key, "isEnterprise": True, "userAgent": USER_AGENT, } create_payload = {"clientKey": client_key, "task": task_body} try: create_url = f"{api_url}/createTask" _log_request("POST", create_url, data=create_payload, tag="[captcha] createTask") create_resp = requests.post(create_url, json=create_payload, timeout=15) _log_response(create_resp, tag="[captcha] createTask") data = create_resp.json() if data.get("errorId", 1) != 0: _log(f" 任务创建失败: {data.get('errorDescription', '?')}") time.sleep(3) continue task_id = data["taskId"] except Exception as e: _log(f" 任务创建异常: {e}") time.sleep(3) continue _log(f" 任务: {task_id} 等待解题 ...") for attempt in range(60): time.sleep(3) try: result_url = f"{api_url}/getTaskResult" result_payload = {"clientKey": client_key, "taskId": task_id} result_resp = requests.post(result_url, json=result_payload, timeout=10) result_data = result_resp.json() except Exception: continue if result_data.get("errorId", 0) != 0: error_code = result_data.get("errorCode", "") if error_code == "ERROR_TASK_TIMEOUT": _log(" 任务超时, 重新发起 ...") break continue if result_data.get("status") == "ready": solution = result_data["solution"] _log_raw(f" solution keys: {list(solution.keys())}") _log_raw(f" solution full: {json.dumps(solution, ensure_ascii=False)[:500]}") token = solution["gRecaptchaResponse"] # eKey 可能在不同字段名下 ekey = solution.get("eKey", "") or solution.get("respKey", "") or solution.get("ekey", "") _log(f" 已解决 (token: {len(token)} chars, ekey: {len(ekey)} chars)") _log_raw(f" captcha_token(前100): {token[:100]}...") if ekey: _log_raw(f" captcha_ekey(前100): {ekey[:100]}...") return token, ekey if attempt % 5 == 4: _log(f" 等待中 ... ({attempt + 1}/60)") raise RuntimeError(f"YesCaptcha 解题失败 (已重试 {max_retries} 轮)") def create_payment_method( session: requests.Session, pk: str, card: dict, captcha_token: str, session_id: str, stripe_ver: str = STRIPE_VERSION_BASE, ctx: dict = None, ) -> str: ctx = ctx or {} guid = ctx.get("guid") or _gen_fingerprint()[0] muid = ctx.get("muid") or _gen_fingerprint()[0] sid = ctx.get("sid") or _gen_fingerprint()[0] addr = card.get("address", {}) data = { "billing_details[name]": card["name"], "billing_details[email]": card["email"], "billing_details[address][country]": addr.get("country", "US"), "billing_details[address][line1]": addr.get("line1", ""), "billing_details[address][city]": addr.get("city", ""), "billing_details[address][postal_code]": addr.get("postal_code", ""), "billing_details[address][state]": addr.get("state", ""), "type": "card", "card[number]": card["number"], "card[cvc]": card["cvc"], "card[exp_year]": card["exp_year"], "card[exp_month]": card["exp_month"], "allow_redisplay": "unspecified", "payment_user_agent": "stripe.js/5412f474d5; stripe-js-v3/5412f474d5; payment-element; deferred-intent", "referrer": "https://chatgpt.com", # time_on_page: 模拟从页面加载到提交的真实耗时 (HAR: 31368ms / 249421ms) "time_on_page": str(ctx.get("time_on_page", random.randint(25000, 55000))), "client_attribution_metadata[client_session_id]": str(uuid.uuid4()), "client_attribution_metadata[checkout_session_id]": session_id, "client_attribution_metadata[merchant_integration_source]": "elements", "client_attribution_metadata[merchant_integration_subtype]": "payment-element", "client_attribution_metadata[merchant_integration_version]": "2021", "client_attribution_metadata[payment_intent_creation_flow]": "deferred", "client_attribution_metadata[payment_method_selection_flow]": "automatic", "guid": guid, "muid": muid, "sid": sid, "key": pk, "_stripe_version": stripe_ver, } if captcha_token: data["radar_options[hcaptcha_token]"] = captcha_token url = f"{STRIPE_API}/v1/payment_methods" _log("[4/6] 创建支付方式 (payment_method) ...") _log_request("POST", url, data=data, tag="[4/6] create_payment_method") resp = session.post(url, data=data, headers=_stripe_headers()) _log_response(resp, tag="[4/6] create_payment_method") if resp.status_code != 200: raise RuntimeError(f"创建 payment_method 失败 [{resp.status_code}]: {resp.text[:500]}") pm = resp.json() pm_id = pm["id"] brand = pm.get("card", {}).get("display_brand", "unknown") last4 = pm.get("card", {}).get("last4", "????") _log(f" 成功: {pm_id} ({brand} ****{last4})") return pm_id def confirm_payment( session: requests.Session, pk: str, session_id: str, pm_id: str, captcha_token: str, init_resp: dict, stripe_ver: str = STRIPE_VERSION_BASE, captcha_cfg: dict = None, captcha_ekey: str = "", ctx: dict = None, locale_profile: dict = None, ) -> dict: ctx = ctx or {} locale_profile = locale_profile or LOCALE_PROFILES["US"] guid = ctx.get("guid") or _gen_fingerprint()[0] muid = ctx.get("muid") or _gen_fingerprint()[0] sid = ctx.get("sid") or _gen_fingerprint()[0] expected_amount = "0" line_items = init_resp.get("line_items", []) if line_items: total = sum(item.get("amount", 0) for item in line_items) expected_amount = str(total) init_checksum = init_resp.get("init_checksum", "") config_id = init_resp.get("config_id", "") stripe_js_id = ctx.get("stripe_js_id", str(uuid.uuid4())) elements_session_id = ctx.get("elements_session_id", _gen_elements_session_id()) checkout_url = init_resp.get("url") or init_resp.get("stripe_hosted_url") or "" ver = STRIPE_VERSION_FULL data = { "guid": guid, "muid": muid, "sid": sid, "payment_method": pm_id, "expected_amount": expected_amount, "expected_payment_method_type": "card", "consent[terms_of_service]": "accepted", "key": pk, "_stripe_version": ver, "init_checksum": init_checksum, "version": "5412f474d5", "return_url": checkout_url, "elements_session_client[elements_init_source]": "custom_checkout", "elements_session_client[referrer_host]": "chatgpt.com", "elements_session_client[stripe_js_id]": stripe_js_id, "elements_session_client[locale]": locale_profile.get("browser_locale", "en-US"), "elements_session_client[is_aggregation_expected]": "false", "elements_session_client[session_id]": elements_session_id, "elements_session_client[client_betas][0]": "custom_checkout_server_updates_1", "elements_session_client[client_betas][1]": "custom_checkout_manual_approval_1", "client_attribution_metadata[client_session_id]": stripe_js_id, "client_attribution_metadata[checkout_session_id]": session_id, "client_attribution_metadata[checkout_config_id]": config_id, "client_attribution_metadata[elements_session_id]": elements_session_id, "client_attribution_metadata[elements_session_config_id]": str(uuid.uuid4()), "client_attribution_metadata[merchant_integration_source]": "checkout", "client_attribution_metadata[merchant_integration_subtype]": "payment-element", "client_attribution_metadata[merchant_integration_version]": "custom", "client_attribution_metadata[payment_intent_creation_flow]": "deferred", "client_attribution_metadata[payment_method_selection_flow]": "automatic", "client_attribution_metadata[merchant_integration_additional_elements][0]": "payment", "client_attribution_metadata[merchant_integration_additional_elements][1]": "address", } if captcha_token: data["passive_captcha_token"] = captcha_token url = f"{STRIPE_API}/v1/payment_pages/{session_id}/confirm" _log("[5/6] 确认支付 (confirm) ...") _log_request("POST", url, data=data, tag="[5/6] confirm") resp = session.post(url, data=data, headers=_stripe_headers()) _log_response(resp, tag="[5/6] confirm") if resp.status_code != 200: raise RuntimeError(f"confirm 失败 [{resp.status_code}]: {resp.text[:500]}") confirm_data = resp.json() next_action = confirm_data.get("next_action") if not next_action: seti = _find_setup_intent(confirm_data) if seti and seti.get("next_action"): next_action = seti["next_action"] if next_action and next_action.get("type") == "use_stripe_sdk": _log(" 触发 3DS 验证,正在处理 ...") _handle_3ds(session, pk, confirm_data, captcha_token, stripe_ver, captcha_cfg, locale_profile=locale_profile, ctx=ctx) return confirm_data def _find_setup_intent(data: dict) -> dict | None: si = data.get("setup_intent") if si: return si pm_obj = data.get("payment_method_object") if pm_obj and isinstance(pm_obj, dict): return pm_obj.get("setup_intent") raw = json.dumps(data) m = re.search(r"seti_[A-Za-z0-9]+", raw) if m: return {"id": m.group(0)} return None def _handle_3ds( session: requests.Session, pk: str, confirm_data: dict, captcha_token: str, stripe_ver: str = STRIPE_VERSION_BASE, captcha_cfg: dict = None, locale_profile: dict = None, ctx: dict = None, ): """处理 3DS2 认证流程 (模拟浏览器: captcha → verify_challenge → Apata指纹 → 3ds2/authenticate)""" locale_profile = locale_profile or LOCALE_PROFILES["US"] ctx = ctx or {} raw = json.dumps(confirm_data) # 查找 setatt_ (直接在 confirm 响应中) source_match = re.search(r"(setatt_[A-Za-z0-9]+)", raw) source = source_match.group(1) if source_match else None _log(f" 3DS: setatt_ = {source}") # 查找 seti_ 和 client_secret seti_match = re.search(r"(seti_[A-Za-z0-9]+)", raw) seti_id = seti_match.group(1) if seti_match else None _log(f" 3DS: seti_id = {seti_id}") client_secret = None if seti_id: cs_match = re.search(rf"({re.escape(seti_id)}_secret_[A-Za-z0-9]+)", raw) if cs_match: client_secret = cs_match.group(1) _log(f" 3DS: client_secret = {client_secret[:40] + '...' if client_secret else None}") challenge_site_key = None challenge_rqdata = "" # 从 setup_intent.next_action.use_stripe_sdk.stripe_js 提取 seti_obj = _find_setup_intent(confirm_data) if seti_obj and isinstance(seti_obj, dict): na = seti_obj.get("next_action", {}) sdk_info = na.get("use_stripe_sdk", {}) stripe_js = sdk_info.get("stripe_js", {}) if stripe_js.get("site_key"): challenge_site_key = stripe_js["site_key"] challenge_rqdata = stripe_js.get("rqdata", "") _log(f" 检测到 confirmation challenge (site_key: {challenge_site_key[:20]}...)") CHALLENGE_MAX_ATTEMPTS = 5 if challenge_site_key and seti_id and client_secret and captcha_cfg: challenge_hcaptcha_cfg = { "site_key": challenge_site_key, "rqdata": challenge_rqdata, } for challenge_attempt in range(1, CHALLENGE_MAX_ATTEMPTS + 1): _log(f" 解 challenge captcha (第 {challenge_attempt}/{CHALLENGE_MAX_ATTEMPTS} 次) ...") challenge_token, challenge_ekey = solve_hcaptcha(captcha_cfg, challenge_hcaptcha_cfg, max_retries=3) verify_url = f"{STRIPE_API}/v1/setup_intents/{seti_id}/verify_challenge" _log(f" verify_challenge (seti: {seti_id[:30]}...) ...") verify_data = { "client_secret": client_secret, "challenge_response_token": challenge_token, "captcha_vendor_name": "hcaptcha", "key": pk, "_stripe_version": STRIPE_VERSION_FULL, } _log_request("POST", verify_url, data=verify_data, tag=f"[5/6] verify_challenge({challenge_attempt}/{CHALLENGE_MAX_ATTEMPTS})") resp = session.post(verify_url, data=verify_data, headers=_stripe_headers()) _log_response(resp, tag=f"[5/6] verify_challenge({challenge_attempt}/{CHALLENGE_MAX_ATTEMPTS})") if resp.status_code != 200: err_text = resp.text[:300] _log(f" verify_challenge 返回 {resp.status_code}: {err_text}") if "no valid challenge" in err_text.lower(): raise RuntimeError(f"challenge 已失效 (Stripe 返回 {resp.status_code}), 需要重新 confirm 获取新的 challenge") break # 其他非 200 退出循环 verify_result = resp.json() verify_status = verify_result.get("status", "unknown") _log(f" verify_challenge 状态: {verify_status}") # 检测 captcha challenge 失败 setup_error = verify_result.get("last_setup_error", {}) if setup_error: err_code = setup_error.get("code", "") err_msg = setup_error.get("message", "") if "captcha" in err_msg.lower() or "authentication_failure" in err_code: if challenge_attempt < CHALLENGE_MAX_ATTEMPTS: _log(f" challenge captcha 被拒, 重试 ...") continue # 重新解 captcha 再试 else: raise RuntimeError(f"challenge captcha 连续 {CHALLENGE_MAX_ATTEMPTS} 次被 Stripe 拒绝: [{err_code}] {err_msg}") # verify 成功, 从响应中提取 setatt_ verify_raw = json.dumps(verify_result) new_source = re.search(r"(setatt_[A-Za-z0-9]+)", verify_raw) if new_source: source = new_source.group(1) _log(f" 从 verify 响应中获取 setatt_: {source[:30]}...") break # 成功, 退出循环 elif seti_id and client_secret and not source: # 没有 challenge 但也没有 setatt_, 尝试原始 verify_challenge verify_url = f"{STRIPE_API}/v1/setup_intents/{seti_id}/verify_challenge" _log(f" verify_challenge (seti: {seti_id[:30]}...) ...") verify_data = { "client_secret": client_secret, "challenge_response_token": captcha_token, "captcha_vendor_name": "hcaptcha", "key": pk, "_stripe_version": STRIPE_VERSION_FULL, } _log_request("POST", verify_url, data=verify_data, tag="[5/6] verify_challenge(fallback)") resp = session.post(verify_url, data=verify_data, headers=_stripe_headers()) _log_response(resp, tag="[5/6] verify_challenge(fallback)") if resp.status_code == 200: si_result = resp.json() _log(f" verify_challenge 状态: {si_result.get('status', 'unknown')}") # 检测 captcha challenge 失败 setup_error = si_result.get("last_setup_error", {}) if setup_error: err_code = setup_error.get("code", "") err_msg = setup_error.get("message", "") if "captcha" in err_msg.lower() or "authentication_failure" in err_code: raise RuntimeError(f"challenge captcha 被 Stripe 拒绝: [{err_code}] {err_msg}") verify_raw = json.dumps(si_result) new_source = re.search(r"(setatt_[A-Za-z0-9]+)", verify_raw) if new_source: source = new_source.group(1) else: _log(f" verify_challenge 返回 {resp.status_code}: {resp.text[:300]}") send_telemetry_batch(session, "", ctx, phase="3ds") three_ds_trans_id = None three_ds_method_url = None notification_url = None for search_blob in [raw]: m_tid = re.search(r'"server_transaction_id"\s*:\s*"([^"]+)"', search_blob) if m_tid: three_ds_trans_id = m_tid.group(1) m_murl = re.search(r'"three_ds_method_url"\s*:\s*"([^"]+)"', search_blob) if m_murl: three_ds_method_url = m_murl.group(1) if source and three_ds_trans_id: # 构建 notification URL (HAR 中的格式) acct_match = re.search(r'(acct_[A-Za-z0-9]+)', raw) acct_id = acct_match.group(1) if acct_match else "acct_unknown" notification_url = f"https://hooks.stripe.com/3d_secure_2/fingerprint/{acct_id}/{source}" submit_apata_fingerprint( session=session, three_ds_server_trans_id=three_ds_trans_id, three_ds_method_url=three_ds_method_url, notification_url=notification_url, locale_profile=locale_profile, ctx=ctx, ) elif source: wait = random.uniform(8.0, 12.0) _log(f" [3ds] 未找到 server_transaction_id, 等待 {wait:.1f}s 模拟指纹窗口 ...") time.sleep(wait) if source: auth_url = f"{STRIPE_API}/v1/3ds2/authenticate" _log(f" 3DS2 authenticate (source: {source[:30]}...) ...") auth_data = { "source": source, "browser": json.dumps({ "fingerprintAttempted": True, "fingerprintData": None, "challengeWindowSize": None, "threeDSCompInd": "Y", "browserJavaEnabled": False, "browserJavascriptEnabled": True, "browserLanguage": locale_profile.get("browser_language", "en-US"), "browserColorDepth": str(locale_profile.get("color_depth", 24)), "browserScreenHeight": str(locale_profile.get("screen_h", 1080)), "browserScreenWidth": str(locale_profile.get("screen_w", 1920)), "browserTZ": str(locale_profile.get("browser_tz_offset", 360)), "browserUserAgent": USER_AGENT, }), "one_click_authn_device_support[hosted]": "false", "one_click_authn_device_support[same_origin_frame]": "false", "one_click_authn_device_support[spc_eligible]": "true", "one_click_authn_device_support[webauthn_eligible]": "true", "one_click_authn_device_support[publickey_credentials_get_allowed]": "true", "key": pk, "_stripe_version": STRIPE_VERSION_FULL, } _log_request("POST", auth_url, data=auth_data, tag="[5/6] 3ds2/authenticate") resp = session.post(auth_url, data=auth_data, headers=_stripe_headers()) _log_response(resp, tag="[5/6] 3ds2/authenticate") if resp.status_code == 200: result = resp.json() state = result.get("state", "unknown") trans_status = result.get("ares", {}).get("transStatus", "?") _log(f" 3DS2 结果: state={state}, transStatus={trans_status}") else: _log(f" 3DS2 authenticate 返回 {resp.status_code}: {resp.text[:200]}") else: _log(" ⚠ 没有 setatt_ source, 跳过 3DS2 authenticate") raise RuntimeError("3DS 验证失败: 未获取到 setatt_ source, 无法完成认证") if seti_id and client_secret: time.sleep(3) poll_url = f"{STRIPE_API}/v1/setup_intents/{seti_id}" poll_params = { "client_secret": client_secret, "is_stripe_sdk": "false", "key": pk, "_stripe_version": STRIPE_VERSION_FULL, } _log(f" 查询 setup_intent 最终状态 ...") _log_request("GET", poll_url, params=poll_params, tag="[5/6] setup_intent状态") poll_resp = session.get(poll_url, params=poll_params, headers=_stripe_headers()) _log_response(poll_resp, tag="[5/6] setup_intent状态") if poll_resp.status_code == 200: si_status = poll_resp.json().get("status", "unknown") _log(f" setup_intent 状态: {si_status}") else: _log(" ⚠ 无 seti_id / client_secret, 跳过 setup_intent 查询") def poll_result(session: requests.Session, pk: str, session_id: str, stripe_ver: str = STRIPE_VERSION_BASE) -> dict: url = f"{STRIPE_API}/v1/payment_pages/{session_id}/poll" params = { "key": pk, "_stripe_version": stripe_ver, } _log("[6/6] 轮询支付结果 ...") for attempt in range(30): time.sleep(2) _log_request("GET", url, params=params, tag=f"[6/6] poll({attempt+1}/30)") resp = session.get(url, params=params, headers=_stripe_headers()) _log_response(resp, tag=f"[6/6] poll({attempt+1}/30)") if resp.status_code != 200: _log(f" poll 返回 {resp.status_code}, 重试 ...") continue data = resp.json() state = data.get("state", "unknown") payment_status = data.get("payment_object_status", "unknown") if state == "succeeded": return_url = data.get("return_url", "") _log(f"\n{'='*60}") _log(f" 支付成功!") _log(f" state: {state}") _log(f" payment_status: {payment_status}") _log(f" mode: {data.get('mode', '?')}") _log(f" return_url: {return_url}") _log(f"{'='*60}\n") return data if state in ("failed", "expired", "canceled"): _log(f"\n 支付失败: state={state}") _log_raw(f" 完整 poll 响应: {json.dumps(data, ensure_ascii=False, indent=4)}") return data _log(f" state={state}, payment_status={payment_status} ({attempt + 1}/30)") raise TimeoutError("轮询超时 (60s)") def load_config(path: str) -> dict: with open(path, "r", encoding="utf-8") as f: return json.load(f) def run(checkout_input: str, card_index: int = 0, config_path: str = "config.json", manual_token: str = ""): _init_log() # 初始化日志文件 cfg = load_config(config_path) cards = cfg["cards"] if card_index >= len(cards): raise ValueError(f"卡索引 {card_index} 超出范围,共 {len(cards)} 张卡") card = cards[card_index] captcha_cfg = cfg["captcha"] _FIRST_NAMES = ["JAMES", "JOHN", "ROBERT", "MICHAEL", "WILLIAM", "DAVID", "RICHARD", "JOSEPH", "THOMAS", "CHARLES", "DANIEL", "MATTHEW", "ANTHONY", "MARK", "STEVEN", "MARY", "PATRICIA", "JENNIFER", "LINDA", "ELIZABETH", "BARBARA", "SUSAN", "JESSICA", "SARAH", "KAREN", "NANCY", "LISA", "BETTY", "MARGARET", "SANDRA"] _LAST_NAMES = ["SMITH", "JOHNSON", "WILLIAMS", "BROWN", "JONES", "GARCIA", "MILLER", "DAVIS", "RODRIGUEZ", "MARTINEZ", "WILSON", "ANDERSON", "TAYLOR", "THOMAS", "MOORE", "JACKSON", "MARTIN", "LEE", "THOMPSON", "WHITE", "HARRIS", "CLARK"] card["name"] = f"{random.choice(_FIRST_NAMES)} {random.choice(_LAST_NAMES)}" email_user = ''.join(random.choices(string.ascii_lowercase + string.digits, k=random.randint(8, 12))) _EMAIL_DOMAINS = ["gmail.com", "yahoo.com", "outlook.com", "hotmail.com", "icloud.com", "protonmail.com"] card["email"] = f"{email_user}@{random.choice(_EMAIL_DOMAINS)}" addr = card.get("address", {}) line1 = addr.get("line1", "") new_line1 = re.sub(r"^\d+", str(random.randint(100, 999)), line1) if new_line1 == line1 and line1: new_line1 = f"{random.randint(100, 999)} {line1}" addr["line1"] = new_line1 card["address"] = addr locale_key = cfg.get("locale", addr.get("country", "US")).upper() locale_profile = LOCALE_PROFILES.get(locale_key, LOCALE_PROFILES["US"]) _log(f" 地域: {locale_key} (tz={locale_profile['browser_timezone']}, lang={locale_profile['browser_locale']})") _log(f"\n{'='*60}") _log(f" Stripe 自动化支付") _log(f" 使用卡: ****{card['number'][-4:]} ({card['name']})") _log(f" 邮箱: {card['email']}") _log(f" 地址: {addr.get('line1', '')}") _log(f" 配置文件: {config_path}") _log(f"{'='*60}\n") _log("[1/6] 解析 checkout session ID ...") session_id, stripe_checkout_url = parse_checkout_url(checkout_input) _log(f" session_id: {session_id}") if "chatgpt.com" in checkout_input: _log(f" 输入格式: ChatGPT 嵌入式链接 → 转换为 Stripe URL") _log(f" stripe_url: {stripe_checkout_url}") http = requests.Session() http.headers.update({"User-Agent": USER_AGENT}) # 代理配置 proxy_cfg = cfg.get("proxy") if proxy_cfg: host = proxy_cfg["host"] port = proxy_cfg["port"] user = proxy_cfg.get("user", "") pwd = proxy_cfg.get("pass", "") if user and pwd: proxy_url = f"http://{user}:{pwd}@{host}:{port}" else: proxy_url = f"http://{host}:{port}" http.proxies = {"http": proxy_url, "https": proxy_url} _log(f" 代理: {host}:{port} (user={user})") else: _log(" 代理: 无 (直连)") reg_guid, reg_muid, reg_sid = register_fingerprint(http) pk = fetch_publishable_key(http, session_id, stripe_checkout_url) init_resp, stripe_ver, init_ctx = init_checkout(http, session_id, pk, locale_profile=locale_profile) init_ctx["guid"] = reg_guid init_ctx["muid"] = reg_muid init_ctx["sid"] = reg_sid init_ctx["page_load_ts"] = int(time.time() * 1000) mode = init_resp.get("mode", "unknown") display_name = init_resp.get("account_settings", {}).get("display_name", "?") _log(f" 商户: {display_name} | 模式: {mode}") send_telemetry_batch(http, session_id, init_ctx, phase="init") _log("[2c/6] 获取 elements session ...") fetch_elements_session(http, pk, session_id, init_ctx, stripe_ver=stripe_ver, locale_profile=locale_profile) _log("[2d/6] 查询 Link 消费者 ...") lookup_consumer(http, pk, card["email"], stripe_ver=stripe_ver) _log("[2e/6] 逐字段提交地址 ...") update_payment_page_address(http, pk, session_id, card, init_ctx, stripe_ver=stripe_ver) send_telemetry_batch(http, session_id, init_ctx, phase="address") init_ctx["time_on_page"] = int(time.time() * 1000) - init_ctx.get("page_load_ts", int(time.time() * 1000)) hcaptcha_cfg = extract_hcaptcha_config(init_resp) _log(f" hCaptcha site_key: {hcaptcha_cfg['site_key']}") if hcaptcha_cfg.get("rqdata"): _log(f" hCaptcha rqdata: {hcaptcha_cfg['rqdata'][:50]}...") # -- send_telemetry_batch(http, session_id, init_ctx, phase="card_input") if manual_token: _log(f"[3/6] 使用手动传入的 token (长度: {len(manual_token)})") captcha_token = manual_token captcha_ekey = "" pm_id = create_payment_method(http, pk, card, captcha_token, session_id, stripe_ver, ctx=init_ctx) # ---- 遥测: confirm 阶段 ---- send_telemetry_batch(http, session_id, init_ctx, phase="confirm") confirm_payment(http, pk, session_id, pm_id, captcha_token, init_resp, stripe_ver, captcha_cfg, captcha_ekey=captcha_ekey, ctx=init_ctx, locale_profile=locale_profile) else: _log("[3/6] 尝试不带 hCaptcha 直接提交 ...") try: pm_id = create_payment_method(http, pk, card, "", session_id, stripe_ver, ctx=init_ctx) send_telemetry_batch(http, session_id, init_ctx, phase="confirm") confirm_payment(http, pk, session_id, pm_id, "", init_resp, stripe_ver, captcha_cfg, captcha_ekey="", ctx=init_ctx, locale_profile=locale_profile) except RuntimeError as e: err_msg = str(e).lower() if any(kw in err_msg for kw in ["captcha", "hcaptcha", "blocked", "denied", "radar"]): _log(f" 需要 hCaptcha,开始解题 ...") captcha_token, captcha_ekey = solve_hcaptcha(captcha_cfg, hcaptcha_cfg) init_ctx["time_on_page"] = int(time.time() * 1000) - init_ctx.get("page_load_ts", int(time.time() * 1000)) pm_id = create_payment_method(http, pk, card, captcha_token, session_id, stripe_ver, ctx=init_ctx) send_telemetry_batch(http, session_id, init_ctx, phase="confirm") confirm_payment(http, pk, session_id, pm_id, captcha_token, init_resp, stripe_ver, captcha_cfg, captcha_ekey=captcha_ekey, ctx=init_ctx, locale_profile=locale_profile) else: raise send_telemetry_batch(http, session_id, init_ctx, phase="poll") # Step 6 result = poll_result(http, pk, session_id, stripe_ver) _log(f"\n日志已保存到: {LOG_FILE}") return result def main(): parser = argparse.ArgumentParser( description="Stripe Checkout 自动化支付", epilog="示例: python pay.py cs_live_a12H3g13P9TH6udPmljRCpWsmHiKRFH7VUiZBbcA1U60eMzFFI2wp3rtXL", ) parser.add_argument("session_id", help="Checkout Session ID (cs_live_xxx 或 cs_test_xxx)") parser.add_argument("--card", type=int, default=0, help="使用第 N 张卡 (0-based, 默认 0)") parser.add_argument("--config", default="config.json", help="配置文件路径 (默认 config.json)") parser.add_argument("--token", default="", help="手动传入 hCaptcha token (跳过打码平台)") args = parser.parse_args() try: run(args.session_id, card_index=args.card, config_path=args.config, manual_token=args.token) except Exception as e: err_msg = f"\n[ERROR] {type(e).__name__}: {e}" print(err_msg, file=sys.stderr) # 也写入日志 try: import traceback with open(LOG_FILE, "a", encoding="utf-8") as f: f.write(f"\n{'!'*60}\n") f.write(err_msg + "\n") f.write(traceback.format_exc()) f.write(f"{'!'*60}\n") except Exception: pass sys.exit(1) if __name__ == "__main__": main()