From 464bfd9b77ca2af1cda8625115dad4c4dc50529c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=9D=E4=BB=A3=E5=B0=BE?= Date: Tue, 31 Mar 2026 11:07:41 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20team.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- team.py | 1537 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1537 insertions(+) create mode 100644 team.py diff --git a/team.py b/team.py new file mode 100644 index 0000000..75a2847 --- /dev/null +++ b/team.py @@ -0,0 +1,1537 @@ +""" +Stripe Checkout 自动化支付脚本 +用法: + python pay.py [--card N] [--config path] [--token TOKEN] + +示例: + python pay.py cs_live_a12H3g13P9TH6udPmljRCpWsmHiKRFH7VUiZBbcA1U60eMzFFI2wp3rtXL +""" + +import argparse +import base64 +import hashlib +import json +import os +import random +import re +import string +import sys +import time +import urllib.parse +import uuid +from datetime import datetime + +import requests + + +LOG_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "log.txt") + +def _init_log(): + """清空并初始化 log.txt""" + with open(LOG_FILE, "w", encoding="utf-8") as f: + f.write(f"{'='*80}\n") + f.write(f" Stripe 自动化支付 日志 — {datetime.now().isoformat()}\n") + f.write(f"{'='*80}\n\n") + +def _log(msg: str): + """追加一行到 log.txt 并同时 print""" + ts = datetime.now().strftime("%H:%M:%S.%f")[:-3] + line = f"[{ts}] {msg}" + print(line) + with open(LOG_FILE, "a", encoding="utf-8") as f: + f.write(line + "\n") + +def _log_raw(text: str): + """追加原始文本到 log.txt(不 print)""" + with open(LOG_FILE, "a", encoding="utf-8") as f: + f.write(text + "\n") + +def _log_request(method: str, url: str, data=None, params=None, tag: str = ""): + """记录 HTTP 请求详情""" + _log_raw(f"\n{'─'*70}") + _log_raw(f">>> REQUEST {tag}") + _log_raw(f" {method} {url}") + if params: + _log_raw(f" PARAMS: {json.dumps(params, ensure_ascii=False, indent=6)}") + if data: + # 脱敏卡号 + safe = dict(data) if isinstance(data, dict) else {} + if "card[number]" in safe: + safe["card[number]"] = "****" + str(safe["card[number]"])[-4:] + if "card[cvc]" in safe: + safe["card[cvc]"] = "***" + _log_raw(f" BODY: {json.dumps(safe, ensure_ascii=False, indent=6)}") + +def _log_response(resp: requests.Response, tag: str = ""): + """记录 HTTP 响应详情""" + _log_raw(f"<<< RESPONSE {tag} status={resp.status_code}") + try: + body = resp.json() + _log_raw(f" BODY: {json.dumps(body, ensure_ascii=False, indent=6)}") + except Exception: + _log_raw(f" BODY(raw): {resp.text[:2000]}") + _log_raw(f"{'─'*70}\n") + +# --------------------------------------------------------------------------- +# 常量 +# --------------------------------------------------------------------------- +STRIPE_API = "https://api.stripe.com" +STRIPE_VERSION_FULL = "2025-03-31.basil; checkout_server_update_beta=v1; checkout_manual_approval_preview=v1" +STRIPE_VERSION_BASE = "2025-03-31.basil" +USER_AGENT = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " + "AppleWebKit/537.36 (KHTML, like Gecko) " + "Chrome/146.0.0.0 Safari/537.36" +) +HCAPTCHA_SITE_KEY_FALLBACK = "c7faac4c-1cd7-4b1b-b2d4-42ba98d09c7a" + +KNOWN_PUBLISHABLE_KEYS = { + "1HOrSwC6h1nxGoI3": "pk_live_51HOrSwC6h1nxGoI3lTAgRjYVrz4dU3fVOabyCcKR3pbEJguCVAlqCxdxCUvoRh1XWwRacViovU3kLKvpkjh7IqkW00iXQsjo3n", +} + +# --------------------------------------------------------------------------- +# 地域 / 浏览器配置 — 必须和代理 IP 出口一致 +# --------------------------------------------------------------------------- +LOCALE_PROFILES = { + "US": { + "browser_locale": "en-US", + "browser_timezone": "America/Los_Angeles", + "browser_tz_offset": 360, # CST = UTC-6 → 360 + "browser_language": "en-US", + "color_depth": 24, + "screen_w": 2560, "screen_h": 1440, "dpr": 1, + } +} + + +APATA_RBA_ORG_ID = "8t63q4n4" + +def _build_browser_fingerprint(locale_profile: dict) -> dict: + """构建 RecordBrowserInfo 的完整设备指纹 payload""" + sw = locale_profile["screen_w"] + sh = locale_profile["screen_h"] + dpr = locale_profile["dpr"] + cd = locale_profile["color_depth"] + lang = locale_profile["browser_language"] + tz_name = locale_profile["browser_timezone"] + tz_offset = locale_profile["browser_tz_offset"] + + # 可用高度 = 屏幕高度 - 任务栏 (48-60px) + avail_h = sh - random.randint(40, 60) + + return { + "navigator": { + "mediaDevices": {"audioinput": random.randint(1, 3), "videoinput": random.randint(0, 2), + "audiooutput": random.randint(1, 3)}, + "battery": {"charging": True, "chargingTime": 0, "dischargingTime": None, + "level": round(random.uniform(0.5, 1.0), 2)}, + "appCodeName": "Mozilla", "appName": "Netscape", + "appVersion": USER_AGENT.replace("Mozilla/", ""), + "cookieEnabled": True, "doNotTrack": None, + "hardwareConcurrency": random.choice([8, 12, 16, 32]), + "language": lang, + "languages": [lang, lang.split("-")[0]], + "maxTouchPoints": 0, "onLine": True, + "platform": "Win32", "product": "Gecko", "productSub": "20030107", + "userAgent": USER_AGENT, + "vendor": "Google Inc.", "vendorSub": "", + "webdriver": False, + "deviceMemory": random.choice([4, 8, 16]), + "pdfViewerEnabled": True, "javaEnabled": False, + "plugins": "PDF Viewer,Chrome PDF Viewer,Chromium PDF Viewer,Microsoft Edge PDF Viewer,WebKit built-in PDF", + "connections": { + "effectiveType": "4g", + "downlink": round(random.uniform(1.0, 10.0), 2), + "rtt": random.choice([50, 100, 150, 200, 250, 300, 350, 400]), + "saveData": False, + }, + }, + "screen": { + "availHeight": avail_h, "availWidth": sw, + "availLeft": 0, "availTop": 0, + "colorDepth": cd, "height": sh, "width": sw, + "pixelDepth": cd, + "orientation": "landscape-primary", + "devicePixelRatio": dpr, + }, + "timezone": {"offset": tz_offset, "timezone": tz_name}, + "canvas": hashlib.sha256(os.urandom(32)).hexdigest(), + "permissions": { + "geolocation": "denied", "notifications": "denied", + "midi": "denied", "camera": "denied", "microphone": "denied", + "background-fetch": "prompt", "background-sync": "granted", + "persistent-storage": "granted", "accelerometer": "granted", + "gyroscope": "granted", "magnetometer": "granted", + "clipboard-read": "denied", "clipboard-write": "denied", + "screen-wake-lock": "denied", "display-capture": "denied", + "idle-detection": "denied", + }, + "audio": {"sum": 124.04347527516074}, + "browserBars": { + "locationbar": True, "menubar": True, "personalbar": True, + "statusbar": True, "toolbar": True, "scrollbars": True, + }, + "sensors": { + "accelerometer": True, "gyroscope": True, "linearAcceleration": True, + "absoluteOrientation": True, "relativeOrientation": True, + "magnetometer": False, "ambientLight": False, "proximity": False, + }, + "storage": { + "localStorage": True, "sessionStorage": True, + "indexedDB": True, "openDatabase": False, + }, + "webGl": { + "dataHash": hashlib.sha256(os.urandom(32)).hexdigest(), + "vendor": "Google Inc. (NVIDIA)", + "renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 4060 (0x00002882) Direct3D11 vs_5_0 ps_5_0, D3D11)", + }, + "adblock": False, + "clientRects": { + "x": round(-10004 + random.uniform(-1, 1), 10), + "y": round(2.35 + random.uniform(-0.01, 0.01), 10), + "width": round(111.29 + random.uniform(-0.01, 0.01), 10), + "height": round(111.29 + random.uniform(-0.01, 0.01), 10), + "top": round(2.35 + random.uniform(-0.01, 0.01), 10), + "bottom": round(113.64 + random.uniform(-0.01, 0.01), 10), + "left": round(-10004 + random.uniform(-1, 1), 10), + "right": round(-9893 + random.uniform(-1, 1), 10), + }, + "fonts": {"installed_count": random.randint(40, 60), "not_installed_count": 0}, + } + + +def _gen_fingerprint(): + def _id(): + return str(uuid.uuid4()).replace("-", "") + uuid.uuid4().hex[:6] + return _id(), _id(), _id() + + + +_PLUGINS_STR = ( + "PDF Viewer,internal-pdf-viewer,application/pdf,pdf++text/pdf,pdf, " + "Chrome PDF Viewer,internal-pdf-viewer,application/pdf,pdf++text/pdf,pdf, " + "Chromium PDF Viewer,internal-pdf-viewer,application/pdf,pdf++text/pdf,pdf, " + "Microsoft Edge PDF Viewer,internal-pdf-viewer,application/pdf,pdf++text/pdf,pdf, " + "WebKit built-in PDF,internal-pdf-viewer,application/pdf,pdf++text/pdf,pdf" +) +_CANVAS_FPS = [ + "0100100101111111101111101111111001110010110111110111111", + "0100100101111111101111101111111001110010110111110111110", + "0100100101111111101111101111111001110010110111110111101", +] +_AUDIO_FPS = [ + "d331ca493eb692cfcd19ae5db713ad4b", + "a7c5f72e1b3d4e8f9c0d2a6b7e8f1c3d", + "e4b8d6f2a0c3d5e7f9b1c3d5e7f9a0b2", +] + + +def _encode_m6(payload: dict) -> str: + """JSON → urlencode → base64 (m.stripe.com/6 编码格式)""" + raw = json.dumps(payload, separators=(",", ":")) + return base64.b64encode(urllib.parse.quote(raw, safe="").encode()).decode() + + +def _b64url_seg(n: int = 32) -> str: + return base64.urlsafe_b64encode(os.urandom(n)).rstrip(b"=").decode() + + +def register_fingerprint(http: "requests.Session") -> tuple[str, str, str]: + """向 m.stripe.com/6 发送 4 次指纹上报, 返回服务端分配的 (guid, muid, sid)。 + 如果请求失败, 返回本地随机生成的值。 + """ + # 本地备用值 + guid, muid, sid = _gen_fingerprint() + fp_id = uuid.uuid4().hex + + # 屏幕参数 (US 常见配置) + screens = [(1920, 1080, 1), (1536, 864, 1.25), (2560, 1440, 1), (1440, 900, 1)] + sw, sh, dpr = random.choice(screens) + vh = sh - random.randint(40, 70) # viewport = screen - chrome + cpu = random.choice([4, 8, 12, 16]) + canvas_fp = random.choice(_CANVAS_FPS) + audio_fp = random.choice(_AUDIO_FPS) + + def _build_full(v2: int, inc_ids: bool) -> dict: + s1, s2, s3, s4, s5 = (_b64url_seg() for _ in range(5)) + ts_now = int(time.time() * 1000) + return { + "v2": v2, "id": fp_id, + "t": round(random.uniform(3, 120), 1), + "tag": "$npm_package_version", "src": "js", + "a": { + "a": {"v": "true", "t": 0}, + "b": {"v": "true", "t": 0}, + "c": {"v": "en-US", "t": 0}, + "d": {"v": "Win32", "t": 0}, + "e": {"v": _PLUGINS_STR, "t": round(random.uniform(0, 0.5), 1)}, + "f": {"v": f"{sw}w_{vh}h_24d_{dpr}r", "t": 0}, + "g": {"v": str(cpu), "t": 0}, + "h": {"v": "false", "t": 0}, + "i": {"v": "sessionStorage-enabled, localStorage-enabled", "t": round(random.uniform(0.5, 2), 1)}, + "j": {"v": canvas_fp, "t": round(random.uniform(5, 120), 1)}, + "k": {"v": "", "t": 0}, + "l": {"v": USER_AGENT, "t": 0}, + "m": {"v": "", "t": 0}, + "n": {"v": "false", "t": round(random.uniform(3, 50), 1)}, + "o": {"v": audio_fp, "t": round(random.uniform(20, 30), 1)}, + }, + "b": { + "a": f"https://{s1}.{s2}.{s3}/", + "b": f"https://{s1}.{s3}/{s4}/{s5}/{_b64url_seg()}", + "c": _b64url_seg(), + "d": muid if inc_ids else "NA", + "e": sid if inc_ids else "NA", + "f": False, "g": True, "h": True, + "i": ["location"], "j": [], + "n": round(random.uniform(800, 2000), 1), + "u": "chatgpt.com", "v": "auth.openai.com", + "w": f"{ts_now}:{hashlib.sha256(os.urandom(32)).hexdigest()}", + }, + "h": os.urandom(10).hex(), + } + + def _build_mouse(source: str) -> dict: + return { + "muid": muid, "sid": sid, + "url": f"https://{_b64url_seg()}.{_b64url_seg()}/{_b64url_seg()}/{_b64url_seg()}/{_b64url_seg()}", + "source": source, + "data": [random.randint(1, 8) for _ in range(10)], + } + + m6_headers = { + "User-Agent": USER_AGENT, + "Content-Type": "text/plain;charset=UTF-8", + "Accept": "*/*", + "Origin": "https://m.stripe.network", + "Referer": "https://m.stripe.network/", + } + m6_url = "https://m.stripe.com/6" + _log(" [指纹] 向 m.stripe.com/6 注册设备指纹 ...") + + # #1 完整指纹 (v2=1, 无 ID) + try: + r1 = http.post(m6_url, data=_encode_m6(_build_full(1, False)), headers=m6_headers, timeout=10) + if r1.status_code == 200: + j = r1.json() + muid = j.get("muid", muid) + guid = j.get("guid", guid) + sid = j.get("sid", sid) + _log(f" [指纹] #1 OK → muid={muid[:20]}...") + except Exception as e: + _log(f" [指纹] #1 失败: {e}") + + # #2 完整指纹 (v2=2, 带 ID) + try: + r2 = http.post(m6_url, data=_encode_m6(_build_full(2, True)), headers=m6_headers, timeout=10) + if r2.status_code == 200: + j = r2.json() + guid = j.get("guid", guid) + _log(f" [指纹] #2 OK → guid={guid[:20]}...") + except Exception as e: + _log(f" [指纹] #2 失败: {e}") + + # #3 鼠标行为 (mouse-timings-10-v2) + try: + http.post(m6_url, data=_encode_m6(_build_mouse("mouse-timings-10-v2")), headers=m6_headers, timeout=10) + _log(" [指纹] #3 OK (mouse-timings-v2)") + except Exception: + pass + + # #4 鼠标行为 (mouse-timings-10) + try: + http.post(m6_url, data=_encode_m6(_build_mouse("mouse-timings-10")), headers=m6_headers, timeout=10) + _log(" [指纹] #4 OK (mouse-timings)") + except Exception: + pass + + _log(f" [指纹] 完成 → guid={guid[:25]}... muid={muid[:25]}... sid={sid[:25]}...") + return guid, muid, sid + + +def _gen_elements_session_id(): + """生成类似 elements_session_15hfldlRpSm 的 session id""" + import random, string + chars = string.ascii_letters + string.digits + return "elements_session_" + "".join(random.choices(chars, k=11)) + + +def _stripe_headers(): + return { + "User-Agent": USER_AGENT, + "Accept": "application/json", + "Origin": "https://js.stripe.com", + "Referer": "https://js.stripe.com/", + } +def parse_checkout_url(raw: str) -> tuple[str, str]: + """解析输入,返回 (session_id, stripe_checkout_url) + + 支持以下格式: + - 裸 session_id: cs_live_xxx / cs_test_xxx + - Stripe URL: https://checkout.stripe.com/c/pay/cs_live_xxx + - ChatGPT URL: https://chatgpt.com/checkout/openai_llc/cs_live_xxx + """ + raw = raw.strip() + m = re.search(r"(cs_(?:live|test)_[A-Za-z0-9]+)", raw) + if not m: + raise ValueError(f"无法从输入中提取 checkout_session_id: {raw[:120]}...") + session_id = m.group(1) + + # 构建用于 Playwright 等回退方案的 Stripe checkout URL + # 如果输入是 checkout.stripe.com 的链接则直接使用,否则用标准格式构建 + if "checkout.stripe.com" in raw: + stripe_url = raw + else: + stripe_url = f"https://checkout.stripe.com/c/pay/{session_id}" + + return session_id, stripe_url + +def fetch_publishable_key(session: requests.Session, session_id: str, stripe_checkout_url: str) -> str: + checkout_url = stripe_checkout_url + + _log("[2/6] 获取 publishable_key ...") + + for acct_id_part, known_pk in KNOWN_PUBLISHABLE_KEYS.items(): + try: + url = f"{STRIPE_API}/v1/payment_pages/{session_id}/init" + post_data = {"key": known_pk, "_stripe_version": STRIPE_VERSION_BASE, + "browser_locale": "en-US"} + _log_request("POST", url, data=post_data, tag="[2/6] pk探测") + test_resp = session.post(url, data=post_data, headers=_stripe_headers(), timeout=15) + _log_response(test_resp, tag="[2/6] pk探测") + if test_resp.status_code == 200: + _log(f" publishable_key: {known_pk[:30]}... (已知)") + return known_pk + except Exception as e: + _log(f" pk探测异常: {e}") + + pk = _fetch_pk_playwright(checkout_url) + if pk: + _log(f" publishable_key: {pk[:30]}... (playwright)") + return pk + + raise RuntimeError("无法提取 publishable_key") + + +def _fetch_pk_playwright(checkout_url: str) -> str | None: + try: + from playwright.sync_api import sync_playwright + except ImportError: + return None + + pk = None + + def on_request(request): + nonlocal pk + if pk: + return + if "api.stripe.com" in request.url and "init" in request.url: + post = request.post_data or "" + m = re.search(r"key=(pk_(?:live|test)_[A-Za-z0-9]+)", post) + if m: + pk = m.group(1) + + try: + with sync_playwright() as p: + browser = p.chromium.launch(headless=True) + page = browser.new_page() + page.on("request", on_request) + try: + page.goto(checkout_url, wait_until="domcontentloaded", timeout=20000) + for _ in range(10): + if pk: + break + page.wait_for_timeout(1000) + except Exception: + pass + browser.close() + except Exception: + return None + + return pk + + +def init_checkout(session: requests.Session, session_id: str, pk: str, locale_profile: dict = None) -> tuple[dict, str, dict]: + """返回 (init_resp, stripe_ver, ctx) — ctx 包含后续步骤需要的上下文""" + locale_profile = locale_profile or LOCALE_PROFILES["US"] + url = f"{STRIPE_API}/v1/payment_pages/{session_id}/init" + stripe_js_id = str(uuid.uuid4()) + elements_session_id = _gen_elements_session_id() + + for version in [STRIPE_VERSION_BASE, STRIPE_VERSION_FULL]: + data = { + "browser_locale": locale_profile["browser_locale"], + "browser_timezone": locale_profile["browser_timezone"], + "elements_session_client[elements_init_source]": "custom_checkout", + "elements_session_client[referrer_host]": "chatgpt.com", + "elements_session_client[stripe_js_id]": stripe_js_id, + "elements_session_client[locale]": locale_profile["browser_locale"], + "elements_session_client[is_aggregation_expected]": "false", + "key": pk, + "_stripe_version": version, + } + if version == STRIPE_VERSION_FULL: + data["elements_session_client[client_betas][0]"] = "custom_checkout_server_updates_1" + data["elements_session_client[client_betas][1]"] = "custom_checkout_manual_approval_1" + + _log(f" 初始化结账会话 (init) ... version={version[:30]}") + _log_request("POST", url, data=data, tag="[2b/6] init") + resp = session.post(url, data=data, headers=_stripe_headers()) + _log_response(resp, tag="[2b/6] init") + if resp.status_code == 200: + ctx = { + "stripe_js_id": stripe_js_id, + "elements_session_id": elements_session_id, + } + return resp.json(), version, ctx + if resp.status_code == 400 and "beta" in resp.text.lower(): + _log(f" 版本 {version[:20]}... 不支持 beta, 尝试下一个 ...") + continue + raise RuntimeError(f"init 失败 [{resp.status_code}]: {resp.text[:500]}") + + raise RuntimeError("init 失败: 所有 Stripe API 版本均不可用") + + +def extract_hcaptcha_config(init_resp: dict) -> dict: + raw = json.dumps(init_resp) + result = {"site_key": HCAPTCHA_SITE_KEY_FALLBACK, "rqdata": ""} + + if init_resp.get("site_key"): + result["site_key"] = init_resp["site_key"] + m = re.search(r'"hcaptcha_site_key"\s*:\s*"([^"]+)"', raw) + if m and not init_resp.get("site_key"): + result["site_key"] = m.group(1) + + m = re.search(r'"hcaptcha_rqdata"\s*:\s*"([^"]+)"', raw) + if m: + result["rqdata"] = m.group(1) + + return result + + +def fetch_elements_session( + session: requests.Session, + pk: str, + session_id: str, + ctx: dict, + stripe_ver: str = STRIPE_VERSION_FULL, + locale_profile: dict = None, +) -> dict: + """调用 elements/sessions, 返回响应 dict 并更新 ctx 中的 elements_session_id""" + locale_profile = locale_profile or LOCALE_PROFILES["US"] + locale_short = locale_profile["browser_locale"].split("-")[0] # HAR: "zh" 而非 "zh-CN" + stripe_js_id = ctx.get("stripe_js_id", str(uuid.uuid4())) + url = f"{STRIPE_API}/v1/elements/sessions" + params = { + "client_betas[0]": "custom_checkout_server_updates_1", + "client_betas[1]": "custom_checkout_manual_approval_1", + "deferred_intent[mode]": "subscription", + "deferred_intent[amount]": "0", + "deferred_intent[currency]": "usd", + "deferred_intent[setup_future_usage]": "off_session", + "deferred_intent[payment_method_types][0]": "card", + "currency": "usd", + "key": pk, + "_stripe_version": stripe_ver, + "elements_init_source": "custom_checkout", + "referrer_host": "chatgpt.com", + "stripe_js_id": stripe_js_id, + "locale": locale_short, + "type": "deferred_intent", + "checkout_session_id": session_id, + } + _log(" [elements] GET /v1/elements/sessions ...") + _log_request("GET", url, params=params, tag="[2c] elements/sessions") + resp = session.get(url, params=params, headers=_stripe_headers()) + _log_response(resp, tag="[2c] elements/sessions") + + if resp.status_code == 200: + data = resp.json() + # 提取真实的 elements_session_id (如果有) + real_es_id = data.get("session_id") or data.get("id") + if real_es_id: + ctx["elements_session_id"] = real_es_id + _log(f" [elements] 真实 session_id: {real_es_id}") + # 提取 config_id + config_id = data.get("config_id") + if config_id: + ctx["config_id"] = config_id + _log(f" [elements] config_id: {config_id}") + return data + else: + _log(f" [elements] 请求失败 [{resp.status_code}], 继续使用本地生成的 ID") + return {} + + + +def lookup_consumer( + session: requests.Session, + pk: str, + email: str, + stripe_ver: str = STRIPE_VERSION_FULL, +): + """查询 Stripe Link 消费者会话,模拟真实浏览器的两次 lookup""" + url = f"{STRIPE_API}/v1/consumers/sessions/lookup" + surfaces = [ + ("web_link_authentication_in_payment_element", "default_value"), + ("web_elements_controller", "default_value"), + ] + for surface, source in surfaces: + data = { + "request_surface": surface, + "email_address": email, + "email_source": source, + "session_id": str(uuid.uuid4()), + "key": pk, + "_stripe_version": stripe_ver, + } + if surface == "web_elements_controller": + data["do_not_log_consumer_funnel_event"] = "true" + try: + _log(f" [link] lookup ({surface[:30]}...) ...") + _log_request("POST", url, data=data, tag="[2d] consumer/lookup") + resp = session.post(url, data=data, headers=_stripe_headers(), timeout=10) + _log_response(resp, tag="[2d] consumer/lookup") + except Exception as e: + _log(f" [link] lookup 异常: {e}") + time.sleep(random.uniform(0.3, 0.8)) + + +def update_payment_page_address( + session: requests.Session, + pk: str, + session_id: str, + card: dict, + ctx: dict, + stripe_ver: str = STRIPE_VERSION_FULL, +): + """模拟浏览器逐字段提交地址/税区信息, 共 6 次 POST""" + url = f"{STRIPE_API}/v1/payment_pages/{session_id}" + addr = card.get("address", {}) + elements_session_id = ctx.get("elements_session_id", _gen_elements_session_id()) + stripe_js_id = ctx.get("stripe_js_id", str(uuid.uuid4())) + + # 基础字段 — 每次 update 都要带 + base = { + "elements_session_client[client_betas][0]": "custom_checkout_server_updates_1", + "elements_session_client[client_betas][1]": "custom_checkout_manual_approval_1", + "elements_session_client[elements_init_source]": "custom_checkout", + "elements_session_client[referrer_host]": "chatgpt.com", + "elements_session_client[session_id]": elements_session_id, + "elements_session_client[stripe_js_id]": stripe_js_id, + "elements_session_client[locale]": "en-US", + "elements_session_client[is_aggregation_expected]": "false", + "client_attribution_metadata[merchant_integration_additional_elements][0]": "payment", + "client_attribution_metadata[merchant_integration_additional_elements][1]": "address", + "key": pk, + "_stripe_version": stripe_ver, + } + + # HAR 中的逐字段提交顺序: country → (重复一次) → line1 → city → state → postal_code + address_steps = [ + {"tax_region[country]": addr.get("country", "US")}, + {}, # 重复提交 (无新字段, 模拟用户切换焦点) + {"tax_region[line1]": addr.get("line1", "")}, + {"tax_region[city]": addr.get("city", "")}, + {"tax_region[state]": addr.get("state", "")}, + {"tax_region[postal_code]": addr.get("postal_code", "")}, + ] + + _log(" [address] 逐字段提交税区地址 ...") + accumulated = {} + for step_idx, new_fields in enumerate(address_steps): + accumulated.update(new_fields) + data = dict(base) + data.update(accumulated) + + step_name = list(new_fields.keys())[0] if new_fields else "(焦点变更)" + _log(f" [address] step {step_idx + 1}/6: {step_name}") + _log_request("POST", url, data=data, tag=f"[2e] update_address({step_idx + 1}/6)") + resp = session.post(url, data=data, headers=_stripe_headers()) + _log_response(resp, tag=f"[2e] update_address({step_idx + 1}/6)") + + if resp.status_code != 200: + _log(f" [address] step {step_idx + 1} 返回 {resp.status_code}, 继续 ...") + + # 模拟人类输入间隔 (2-5 秒) + time.sleep(random.uniform(2.0, 4.5)) + +def send_telemetry( + session: requests.Session, + event_type: str, + session_id: str, + ctx: dict, +): + """向 r.stripe.com/b 发送遥测事件, 模拟 stripe.js 行为上报""" + url = "https://r.stripe.com/b" + muid = ctx.get("muid", "") + sid = ctx.get("sid", "") + guid = ctx.get("guid", "") + + payload = { + "v2": 1, + "tag": event_type, + "src": "js", + "pid": "checkout_" + session_id[:20], + "muid": muid, + "sid": sid, + "guid": guid, + } + headers = { + "User-Agent": USER_AGENT, + "Content-Type": "text/plain;charset=UTF-8", + "Accept": "*/*", + "Origin": "https://js.stripe.com", + "Referer": "https://js.stripe.com/", + } + try: + body = base64.b64encode(json.dumps(payload, separators=(",", ":")).encode()).decode() + session.post(url, data=body, headers=headers, timeout=5) + except Exception: + pass + + +def send_telemetry_batch( + session: requests.Session, + session_id: str, + ctx: dict, + phase: str = "init", +): + """按阶段批量发送遥测事件""" + events_map = { + "init": ["checkout.init", "elements.create", "payment_element.mount"], + "address": ["address.update", "address.focus", "address.blur"], + "card_input": ["card.focus", "card.input", "card.blur", "cvc.input"], + "confirm": ["checkout.confirm.start", "payment_method.create", "checkout.confirm.intent"], + "3ds": ["three_ds2.start", "three_ds2.fingerprint", "three_ds2.authenticate"], + "poll": ["checkout.poll", "checkout.complete"], + } + events = events_map.get(phase, []) + for evt in events: + send_telemetry(session, evt, session_id, ctx) + time.sleep(random.uniform(0.05, 0.2)) + + +def submit_apata_fingerprint( + session: requests.Session, + three_ds_server_trans_id: str, + three_ds_method_url: str, + notification_url: str, + locale_profile: dict, + ctx: dict, +): + + + # 1) POST acs-method.apata.io/v1/houston/method — 提交 threeDSMethodData + _log(" [apata] POST houston/method ...") + method_data = base64.b64encode(json.dumps({ + "threeDSServerTransID": three_ds_server_trans_id, + "threeDSMethodNotificationURL": notification_url, + }, separators=(",", ":")).encode()).decode() + + try: + method_url = three_ds_method_url or "https://acs-method.apata.io/v1/houston/method" + resp = session.post( + method_url, + data={"threeDSMethodData": method_data}, + headers={ + "User-Agent": USER_AGENT, + "Content-Type": "application/x-www-form-urlencoded", + "Origin": "https://js.stripe.com", + "Referer": "https://js.stripe.com/", + }, + timeout=15, + ) + _log(f" [apata] houston/method → {resp.status_code}") + except Exception as e: + _log(f" [apata] houston/method 异常: {e}") + + time.sleep(random.uniform(0.5, 1.0)) + + # 2) POST acs-method.apata.io/v1/RecordBrowserInfo — 设备指纹上报 + _log(" [apata] POST RecordBrowserInfo ...") + # 生成 possessionDeviceId (localStorage acsRbaDeviceId 模拟) + possession_device_id = ctx.get("apata_device_id") or str(uuid.uuid4()) + ctx["apata_device_id"] = possession_device_id + + fp_data = _build_browser_fingerprint(locale_profile) + record_payload = { + "threeDSServerTransID": three_ds_server_trans_id, + "computedValue": hashlib.sha256(os.urandom(32)).hexdigest()[:20], + "possessionDeviceId": possession_device_id, + } + record_payload.update(fp_data) + + try: + record_url = "https://acs-method.apata.io/v1/RecordBrowserInfo" + resp = session.post( + record_url, + json=record_payload, + headers={ + "User-Agent": USER_AGENT, + "Content-Type": "application/json", + "Origin": "https://acs-method.apata.io", + "Referer": "https://acs-method.apata.io/", + }, + timeout=15, + ) + _log(f" [apata] RecordBrowserInfo → {resp.status_code}") + except Exception as e: + _log(f" [apata] RecordBrowserInfo 异常: {e}") + + time.sleep(random.uniform(0.5, 1.0)) + + # 3) GET rba.apata.io/xxx.js — 模拟 RBA profile 脚本加载 + _log(" [apata] GET rba profile script ...") + rba_session_id = ctx.get("rba_session_id") or str(uuid.uuid4()) + ctx["rba_session_id"] = rba_session_id + try: + # HAR 中的 URL 格式: rba.apata.io/.js?=&= + rba_script_name = ''.join(random.choices(string.ascii_lowercase + string.digits, k=16)) + ".js" + rba_param1 = ''.join(random.choices(string.ascii_lowercase + string.digits, k=16)) + rba_param2 = ''.join(random.choices(string.ascii_lowercase + string.digits, k=16)) + rba_url = f"https://rba.apata.io/{rba_script_name}?{rba_param1}={APATA_RBA_ORG_ID}&{rba_param2}={rba_session_id}" + resp = session.get(rba_url, headers={"User-Agent": USER_AGENT}, timeout=10) + _log(f" [apata] rba profile → {resp.status_code}") + except Exception as e: + _log(f" [apata] rba profile 异常: {e}") + + # 4) 模拟 aa.online-metrix.net CONNECT (WebRTC beacon 不可模拟, 仅日志标记) + _log(" [apata] online-metrix beacon (WebRTC, 已跳过 — 无法在 requests 中模拟)") + + # 总等待: 让 Apata 有时间处理指纹结果 (HAR 中这个窗口约 8-12 秒) + wait = random.uniform(5.0, 8.0) + _log(f" [apata] 等待指纹处理完成 ({wait:.1f}s) ...") + time.sleep(wait) + +def solve_hcaptcha(captcha_cfg: dict, hcaptcha_config: dict, max_retries: int = 3) -> tuple[str, str]: + """返回 (token, ekey) 元组""" + api_url = captcha_cfg.get("api_url", "https://api.yescaptcha.com") + client_key = captcha_cfg["api_key"] + site_key = hcaptcha_config["site_key"] + rqdata = hcaptcha_config.get("rqdata", "") + + for retry in range(max_retries): + if retry > 0: + _log(f" --- 重试第 {retry + 1}/{max_retries} 次 ---") + + _log(f" 解 hCaptcha (siteKey: {site_key[:20]}...)") + + # 创建 1 个任务 + task_body = { + "type": "HCaptchaTaskProxyless", + "websiteURL": "https://b.stripecdn.com/stripethirdparty-srv/assets/v32.1/HCaptchaInvisible.html", + "websiteKey": site_key, + "isEnterprise": True, + "userAgent": USER_AGENT, + } + + + create_payload = {"clientKey": client_key, "task": task_body} + try: + create_url = f"{api_url}/createTask" + _log_request("POST", create_url, data=create_payload, tag="[captcha] createTask") + create_resp = requests.post(create_url, json=create_payload, timeout=15) + _log_response(create_resp, tag="[captcha] createTask") + data = create_resp.json() + if data.get("errorId", 1) != 0: + _log(f" 任务创建失败: {data.get('errorDescription', '?')}") + time.sleep(3) + continue + task_id = data["taskId"] + except Exception as e: + _log(f" 任务创建异常: {e}") + time.sleep(3) + continue + + _log(f" 任务: {task_id} 等待解题 ...") + + + for attempt in range(60): + time.sleep(3) + try: + result_url = f"{api_url}/getTaskResult" + result_payload = {"clientKey": client_key, "taskId": task_id} + result_resp = requests.post(result_url, json=result_payload, timeout=10) + result_data = result_resp.json() + except Exception: + continue + + if result_data.get("errorId", 0) != 0: + error_code = result_data.get("errorCode", "") + if error_code == "ERROR_TASK_TIMEOUT": + _log(" 任务超时, 重新发起 ...") + break + continue + + if result_data.get("status") == "ready": + solution = result_data["solution"] + _log_raw(f" solution keys: {list(solution.keys())}") + _log_raw(f" solution full: {json.dumps(solution, ensure_ascii=False)[:500]}") + token = solution["gRecaptchaResponse"] + # eKey 可能在不同字段名下 + ekey = solution.get("eKey", "") or solution.get("respKey", "") or solution.get("ekey", "") + _log(f" 已解决 (token: {len(token)} chars, ekey: {len(ekey)} chars)") + _log_raw(f" captcha_token(前100): {token[:100]}...") + if ekey: + _log_raw(f" captcha_ekey(前100): {ekey[:100]}...") + return token, ekey + + if attempt % 5 == 4: + _log(f" 等待中 ... ({attempt + 1}/60)") + + raise RuntimeError(f"YesCaptcha 解题失败 (已重试 {max_retries} 轮)") + + +def create_payment_method( + session: requests.Session, + pk: str, + card: dict, + captcha_token: str, + session_id: str, + stripe_ver: str = STRIPE_VERSION_BASE, + ctx: dict = None, +) -> str: + ctx = ctx or {} + guid = ctx.get("guid") or _gen_fingerprint()[0] + muid = ctx.get("muid") or _gen_fingerprint()[0] + sid = ctx.get("sid") or _gen_fingerprint()[0] + addr = card.get("address", {}) + + data = { + "billing_details[name]": card["name"], + "billing_details[email]": card["email"], + "billing_details[address][country]": addr.get("country", "US"), + "billing_details[address][line1]": addr.get("line1", ""), + "billing_details[address][city]": addr.get("city", ""), + "billing_details[address][postal_code]": addr.get("postal_code", ""), + "billing_details[address][state]": addr.get("state", ""), + "type": "card", + "card[number]": card["number"], + "card[cvc]": card["cvc"], + "card[exp_year]": card["exp_year"], + "card[exp_month]": card["exp_month"], + "allow_redisplay": "unspecified", + + "payment_user_agent": "stripe.js/5412f474d5; stripe-js-v3/5412f474d5; payment-element; deferred-intent", + "referrer": "https://chatgpt.com", + # time_on_page: 模拟从页面加载到提交的真实耗时 (HAR: 31368ms / 249421ms) + "time_on_page": str(ctx.get("time_on_page", random.randint(25000, 55000))), + "client_attribution_metadata[client_session_id]": str(uuid.uuid4()), + "client_attribution_metadata[checkout_session_id]": session_id, + "client_attribution_metadata[merchant_integration_source]": "elements", + "client_attribution_metadata[merchant_integration_subtype]": "payment-element", + "client_attribution_metadata[merchant_integration_version]": "2021", + "client_attribution_metadata[payment_intent_creation_flow]": "deferred", + "client_attribution_metadata[payment_method_selection_flow]": "automatic", + "guid": guid, + "muid": muid, + "sid": sid, + "key": pk, + "_stripe_version": stripe_ver, + } + if captcha_token: + data["radar_options[hcaptcha_token]"] = captcha_token + + url = f"{STRIPE_API}/v1/payment_methods" + _log("[4/6] 创建支付方式 (payment_method) ...") + _log_request("POST", url, data=data, tag="[4/6] create_payment_method") + resp = session.post(url, data=data, headers=_stripe_headers()) + _log_response(resp, tag="[4/6] create_payment_method") + if resp.status_code != 200: + raise RuntimeError(f"创建 payment_method 失败 [{resp.status_code}]: {resp.text[:500]}") + + pm = resp.json() + pm_id = pm["id"] + brand = pm.get("card", {}).get("display_brand", "unknown") + last4 = pm.get("card", {}).get("last4", "????") + _log(f" 成功: {pm_id} ({brand} ****{last4})") + return pm_id + +def confirm_payment( + session: requests.Session, + pk: str, + session_id: str, + pm_id: str, + captcha_token: str, + init_resp: dict, + stripe_ver: str = STRIPE_VERSION_BASE, + captcha_cfg: dict = None, + captcha_ekey: str = "", + ctx: dict = None, + locale_profile: dict = None, +) -> dict: + ctx = ctx or {} + locale_profile = locale_profile or LOCALE_PROFILES["US"] + guid = ctx.get("guid") or _gen_fingerprint()[0] + muid = ctx.get("muid") or _gen_fingerprint()[0] + sid = ctx.get("sid") or _gen_fingerprint()[0] + + expected_amount = "0" + line_items = init_resp.get("line_items", []) + if line_items: + total = sum(item.get("amount", 0) for item in line_items) + expected_amount = str(total) + + + init_checksum = init_resp.get("init_checksum", "") + config_id = init_resp.get("config_id", "") + stripe_js_id = ctx.get("stripe_js_id", str(uuid.uuid4())) + elements_session_id = ctx.get("elements_session_id", _gen_elements_session_id()) + checkout_url = init_resp.get("url") or init_resp.get("stripe_hosted_url") or "" + + + ver = STRIPE_VERSION_FULL + + data = { + "guid": guid, + "muid": muid, + "sid": sid, + "payment_method": pm_id, + "expected_amount": expected_amount, + "expected_payment_method_type": "card", + "consent[terms_of_service]": "accepted", + "key": pk, + "_stripe_version": ver, + + "init_checksum": init_checksum, + + "version": "5412f474d5", + + "return_url": checkout_url, + + "elements_session_client[elements_init_source]": "custom_checkout", + "elements_session_client[referrer_host]": "chatgpt.com", + "elements_session_client[stripe_js_id]": stripe_js_id, + "elements_session_client[locale]": locale_profile.get("browser_locale", "en-US"), + "elements_session_client[is_aggregation_expected]": "false", + "elements_session_client[session_id]": elements_session_id, + "elements_session_client[client_betas][0]": "custom_checkout_server_updates_1", + "elements_session_client[client_betas][1]": "custom_checkout_manual_approval_1", + + "client_attribution_metadata[client_session_id]": stripe_js_id, + "client_attribution_metadata[checkout_session_id]": session_id, + "client_attribution_metadata[checkout_config_id]": config_id, + "client_attribution_metadata[elements_session_id]": elements_session_id, + "client_attribution_metadata[elements_session_config_id]": str(uuid.uuid4()), + "client_attribution_metadata[merchant_integration_source]": "checkout", + "client_attribution_metadata[merchant_integration_subtype]": "payment-element", + "client_attribution_metadata[merchant_integration_version]": "custom", + "client_attribution_metadata[payment_intent_creation_flow]": "deferred", + "client_attribution_metadata[payment_method_selection_flow]": "automatic", + "client_attribution_metadata[merchant_integration_additional_elements][0]": "payment", + "client_attribution_metadata[merchant_integration_additional_elements][1]": "address", + } + + + if captcha_token: + data["passive_captcha_token"] = captcha_token + + + url = f"{STRIPE_API}/v1/payment_pages/{session_id}/confirm" + _log("[5/6] 确认支付 (confirm) ...") + _log_request("POST", url, data=data, tag="[5/6] confirm") + resp = session.post(url, data=data, headers=_stripe_headers()) + _log_response(resp, tag="[5/6] confirm") + if resp.status_code != 200: + raise RuntimeError(f"confirm 失败 [{resp.status_code}]: {resp.text[:500]}") + + confirm_data = resp.json() + + next_action = confirm_data.get("next_action") + if not next_action: + seti = _find_setup_intent(confirm_data) + if seti and seti.get("next_action"): + next_action = seti["next_action"] + + if next_action and next_action.get("type") == "use_stripe_sdk": + _log(" 触发 3DS 验证,正在处理 ...") + _handle_3ds(session, pk, confirm_data, captcha_token, stripe_ver, captcha_cfg, + locale_profile=locale_profile, ctx=ctx) + + return confirm_data + + +def _find_setup_intent(data: dict) -> dict | None: + si = data.get("setup_intent") + if si: + return si + pm_obj = data.get("payment_method_object") + if pm_obj and isinstance(pm_obj, dict): + return pm_obj.get("setup_intent") + raw = json.dumps(data) + m = re.search(r"seti_[A-Za-z0-9]+", raw) + if m: + return {"id": m.group(0)} + return None + + +def _handle_3ds( + session: requests.Session, + pk: str, + confirm_data: dict, + captcha_token: str, + stripe_ver: str = STRIPE_VERSION_BASE, + captcha_cfg: dict = None, + locale_profile: dict = None, + ctx: dict = None, +): + """处理 3DS2 认证流程 (模拟浏览器: captcha → verify_challenge → Apata指纹 → 3ds2/authenticate)""" + locale_profile = locale_profile or LOCALE_PROFILES["US"] + ctx = ctx or {} + raw = json.dumps(confirm_data) + + # 查找 setatt_ (直接在 confirm 响应中) + source_match = re.search(r"(setatt_[A-Za-z0-9]+)", raw) + source = source_match.group(1) if source_match else None + _log(f" 3DS: setatt_ = {source}") + + # 查找 seti_ 和 client_secret + seti_match = re.search(r"(seti_[A-Za-z0-9]+)", raw) + seti_id = seti_match.group(1) if seti_match else None + _log(f" 3DS: seti_id = {seti_id}") + + client_secret = None + if seti_id: + cs_match = re.search(rf"({re.escape(seti_id)}_secret_[A-Za-z0-9]+)", raw) + if cs_match: + client_secret = cs_match.group(1) + _log(f" 3DS: client_secret = {client_secret[:40] + '...' if client_secret else None}") + + + challenge_site_key = None + challenge_rqdata = "" + # 从 setup_intent.next_action.use_stripe_sdk.stripe_js 提取 + seti_obj = _find_setup_intent(confirm_data) + if seti_obj and isinstance(seti_obj, dict): + na = seti_obj.get("next_action", {}) + sdk_info = na.get("use_stripe_sdk", {}) + stripe_js = sdk_info.get("stripe_js", {}) + if stripe_js.get("site_key"): + challenge_site_key = stripe_js["site_key"] + challenge_rqdata = stripe_js.get("rqdata", "") + _log(f" 检测到 confirmation challenge (site_key: {challenge_site_key[:20]}...)") + + CHALLENGE_MAX_ATTEMPTS = 5 + if challenge_site_key and seti_id and client_secret and captcha_cfg: + challenge_hcaptcha_cfg = { + "site_key": challenge_site_key, + "rqdata": challenge_rqdata, + } + + for challenge_attempt in range(1, CHALLENGE_MAX_ATTEMPTS + 1): + _log(f" 解 challenge captcha (第 {challenge_attempt}/{CHALLENGE_MAX_ATTEMPTS} 次) ...") + challenge_token, challenge_ekey = solve_hcaptcha(captcha_cfg, challenge_hcaptcha_cfg, max_retries=3) + + verify_url = f"{STRIPE_API}/v1/setup_intents/{seti_id}/verify_challenge" + _log(f" verify_challenge (seti: {seti_id[:30]}...) ...") + verify_data = { + "client_secret": client_secret, + "challenge_response_token": challenge_token, + "captcha_vendor_name": "hcaptcha", + "key": pk, + "_stripe_version": STRIPE_VERSION_FULL, + } + + _log_request("POST", verify_url, data=verify_data, tag=f"[5/6] verify_challenge({challenge_attempt}/{CHALLENGE_MAX_ATTEMPTS})") + resp = session.post(verify_url, data=verify_data, headers=_stripe_headers()) + _log_response(resp, tag=f"[5/6] verify_challenge({challenge_attempt}/{CHALLENGE_MAX_ATTEMPTS})") + + if resp.status_code != 200: + err_text = resp.text[:300] + _log(f" verify_challenge 返回 {resp.status_code}: {err_text}") + if "no valid challenge" in err_text.lower(): + raise RuntimeError(f"challenge 已失效 (Stripe 返回 {resp.status_code}), 需要重新 confirm 获取新的 challenge") + break # 其他非 200 退出循环 + + verify_result = resp.json() + verify_status = verify_result.get("status", "unknown") + _log(f" verify_challenge 状态: {verify_status}") + + # 检测 captcha challenge 失败 + setup_error = verify_result.get("last_setup_error", {}) + if setup_error: + err_code = setup_error.get("code", "") + err_msg = setup_error.get("message", "") + if "captcha" in err_msg.lower() or "authentication_failure" in err_code: + if challenge_attempt < CHALLENGE_MAX_ATTEMPTS: + _log(f" challenge captcha 被拒, 重试 ...") + continue # 重新解 captcha 再试 + else: + raise RuntimeError(f"challenge captcha 连续 {CHALLENGE_MAX_ATTEMPTS} 次被 Stripe 拒绝: [{err_code}] {err_msg}") + + # verify 成功, 从响应中提取 setatt_ + verify_raw = json.dumps(verify_result) + new_source = re.search(r"(setatt_[A-Za-z0-9]+)", verify_raw) + if new_source: + source = new_source.group(1) + _log(f" 从 verify 响应中获取 setatt_: {source[:30]}...") + break # 成功, 退出循环 + + elif seti_id and client_secret and not source: + # 没有 challenge 但也没有 setatt_, 尝试原始 verify_challenge + verify_url = f"{STRIPE_API}/v1/setup_intents/{seti_id}/verify_challenge" + _log(f" verify_challenge (seti: {seti_id[:30]}...) ...") + verify_data = { + "client_secret": client_secret, + "challenge_response_token": captcha_token, + "captcha_vendor_name": "hcaptcha", + "key": pk, + "_stripe_version": STRIPE_VERSION_FULL, + } + _log_request("POST", verify_url, data=verify_data, tag="[5/6] verify_challenge(fallback)") + resp = session.post(verify_url, data=verify_data, headers=_stripe_headers()) + _log_response(resp, tag="[5/6] verify_challenge(fallback)") + if resp.status_code == 200: + si_result = resp.json() + _log(f" verify_challenge 状态: {si_result.get('status', 'unknown')}") + # 检测 captcha challenge 失败 + setup_error = si_result.get("last_setup_error", {}) + if setup_error: + err_code = setup_error.get("code", "") + err_msg = setup_error.get("message", "") + if "captcha" in err_msg.lower() or "authentication_failure" in err_code: + raise RuntimeError(f"challenge captcha 被 Stripe 拒绝: [{err_code}] {err_msg}") + verify_raw = json.dumps(si_result) + new_source = re.search(r"(setatt_[A-Za-z0-9]+)", verify_raw) + if new_source: + source = new_source.group(1) + else: + _log(f" verify_challenge 返回 {resp.status_code}: {resp.text[:300]}") + + + send_telemetry_batch(session, "", ctx, phase="3ds") + + + three_ds_trans_id = None + three_ds_method_url = None + notification_url = None + + for search_blob in [raw]: + m_tid = re.search(r'"server_transaction_id"\s*:\s*"([^"]+)"', search_blob) + if m_tid: + three_ds_trans_id = m_tid.group(1) + m_murl = re.search(r'"three_ds_method_url"\s*:\s*"([^"]+)"', search_blob) + if m_murl: + three_ds_method_url = m_murl.group(1) + + if source and three_ds_trans_id: + # 构建 notification URL (HAR 中的格式) + acct_match = re.search(r'(acct_[A-Za-z0-9]+)', raw) + acct_id = acct_match.group(1) if acct_match else "acct_unknown" + notification_url = f"https://hooks.stripe.com/3d_secure_2/fingerprint/{acct_id}/{source}" + + submit_apata_fingerprint( + session=session, + three_ds_server_trans_id=three_ds_trans_id, + three_ds_method_url=three_ds_method_url, + notification_url=notification_url, + locale_profile=locale_profile, + ctx=ctx, + ) + elif source: + + wait = random.uniform(8.0, 12.0) + _log(f" [3ds] 未找到 server_transaction_id, 等待 {wait:.1f}s 模拟指纹窗口 ...") + time.sleep(wait) + + + if source: + auth_url = f"{STRIPE_API}/v1/3ds2/authenticate" + _log(f" 3DS2 authenticate (source: {source[:30]}...) ...") + auth_data = { + "source": source, + "browser": json.dumps({ + "fingerprintAttempted": True, + "fingerprintData": None, + "challengeWindowSize": None, + "threeDSCompInd": "Y", + "browserJavaEnabled": False, + "browserJavascriptEnabled": True, + "browserLanguage": locale_profile.get("browser_language", "en-US"), + "browserColorDepth": str(locale_profile.get("color_depth", 24)), + "browserScreenHeight": str(locale_profile.get("screen_h", 1080)), + "browserScreenWidth": str(locale_profile.get("screen_w", 1920)), + "browserTZ": str(locale_profile.get("browser_tz_offset", 360)), + "browserUserAgent": USER_AGENT, + }), + "one_click_authn_device_support[hosted]": "false", + "one_click_authn_device_support[same_origin_frame]": "false", + "one_click_authn_device_support[spc_eligible]": "true", + "one_click_authn_device_support[webauthn_eligible]": "true", + "one_click_authn_device_support[publickey_credentials_get_allowed]": "true", + "key": pk, + "_stripe_version": STRIPE_VERSION_FULL, + } + _log_request("POST", auth_url, data=auth_data, tag="[5/6] 3ds2/authenticate") + resp = session.post(auth_url, data=auth_data, headers=_stripe_headers()) + _log_response(resp, tag="[5/6] 3ds2/authenticate") + if resp.status_code == 200: + result = resp.json() + state = result.get("state", "unknown") + trans_status = result.get("ares", {}).get("transStatus", "?") + _log(f" 3DS2 结果: state={state}, transStatus={trans_status}") + else: + _log(f" 3DS2 authenticate 返回 {resp.status_code}: {resp.text[:200]}") + else: + _log(" ⚠ 没有 setatt_ source, 跳过 3DS2 authenticate") + raise RuntimeError("3DS 验证失败: 未获取到 setatt_ source, 无法完成认证") + + + if seti_id and client_secret: + time.sleep(3) + poll_url = f"{STRIPE_API}/v1/setup_intents/{seti_id}" + poll_params = { + "client_secret": client_secret, + "is_stripe_sdk": "false", + "key": pk, + "_stripe_version": STRIPE_VERSION_FULL, + } + _log(f" 查询 setup_intent 最终状态 ...") + _log_request("GET", poll_url, params=poll_params, tag="[5/6] setup_intent状态") + poll_resp = session.get(poll_url, params=poll_params, headers=_stripe_headers()) + _log_response(poll_resp, tag="[5/6] setup_intent状态") + if poll_resp.status_code == 200: + si_status = poll_resp.json().get("status", "unknown") + _log(f" setup_intent 状态: {si_status}") + else: + _log(" ⚠ 无 seti_id / client_secret, 跳过 setup_intent 查询") + + +def poll_result(session: requests.Session, pk: str, session_id: str, stripe_ver: str = STRIPE_VERSION_BASE) -> dict: + url = f"{STRIPE_API}/v1/payment_pages/{session_id}/poll" + params = { + "key": pk, + "_stripe_version": stripe_ver, + } + + _log("[6/6] 轮询支付结果 ...") + for attempt in range(30): + time.sleep(2) + _log_request("GET", url, params=params, tag=f"[6/6] poll({attempt+1}/30)") + resp = session.get(url, params=params, headers=_stripe_headers()) + _log_response(resp, tag=f"[6/6] poll({attempt+1}/30)") + if resp.status_code != 200: + _log(f" poll 返回 {resp.status_code}, 重试 ...") + continue + + data = resp.json() + state = data.get("state", "unknown") + payment_status = data.get("payment_object_status", "unknown") + + if state == "succeeded": + return_url = data.get("return_url", "") + _log(f"\n{'='*60}") + _log(f" 支付成功!") + _log(f" state: {state}") + _log(f" payment_status: {payment_status}") + _log(f" mode: {data.get('mode', '?')}") + _log(f" return_url: {return_url}") + _log(f"{'='*60}\n") + return data + + if state in ("failed", "expired", "canceled"): + _log(f"\n 支付失败: state={state}") + _log_raw(f" 完整 poll 响应: {json.dumps(data, ensure_ascii=False, indent=4)}") + return data + + _log(f" state={state}, payment_status={payment_status} ({attempt + 1}/30)") + + raise TimeoutError("轮询超时 (60s)") + +def load_config(path: str) -> dict: + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + + +def run(checkout_input: str, card_index: int = 0, config_path: str = "config.json", manual_token: str = ""): + _init_log() # 初始化日志文件 + + cfg = load_config(config_path) + cards = cfg["cards"] + if card_index >= len(cards): + raise ValueError(f"卡索引 {card_index} 超出范围,共 {len(cards)} 张卡") + card = cards[card_index] + captcha_cfg = cfg["captcha"] + + + _FIRST_NAMES = ["JAMES", "JOHN", "ROBERT", "MICHAEL", "WILLIAM", "DAVID", "RICHARD", "JOSEPH", + "THOMAS", "CHARLES", "DANIEL", "MATTHEW", "ANTHONY", "MARK", "STEVEN", + "MARY", "PATRICIA", "JENNIFER", "LINDA", "ELIZABETH", "BARBARA", "SUSAN", + "JESSICA", "SARAH", "KAREN", "NANCY", "LISA", "BETTY", "MARGARET", "SANDRA"] + _LAST_NAMES = ["SMITH", "JOHNSON", "WILLIAMS", "BROWN", "JONES", "GARCIA", "MILLER", + "DAVIS", "RODRIGUEZ", "MARTINEZ", "WILSON", "ANDERSON", "TAYLOR", "THOMAS", + "MOORE", "JACKSON", "MARTIN", "LEE", "THOMPSON", "WHITE", "HARRIS", "CLARK"] + card["name"] = f"{random.choice(_FIRST_NAMES)} {random.choice(_LAST_NAMES)}" + + email_user = ''.join(random.choices(string.ascii_lowercase + string.digits, k=random.randint(8, 12))) + _EMAIL_DOMAINS = ["gmail.com", "yahoo.com", "outlook.com", "hotmail.com", "icloud.com", "protonmail.com"] + card["email"] = f"{email_user}@{random.choice(_EMAIL_DOMAINS)}" + + addr = card.get("address", {}) + line1 = addr.get("line1", "") + + new_line1 = re.sub(r"^\d+", str(random.randint(100, 999)), line1) + if new_line1 == line1 and line1: + new_line1 = f"{random.randint(100, 999)} {line1}" + addr["line1"] = new_line1 + card["address"] = addr + + + locale_key = cfg.get("locale", addr.get("country", "US")).upper() + locale_profile = LOCALE_PROFILES.get(locale_key, LOCALE_PROFILES["US"]) + _log(f" 地域: {locale_key} (tz={locale_profile['browser_timezone']}, lang={locale_profile['browser_locale']})") + + _log(f"\n{'='*60}") + _log(f" Stripe 自动化支付") + _log(f" 使用卡: ****{card['number'][-4:]} ({card['name']})") + _log(f" 邮箱: {card['email']}") + _log(f" 地址: {addr.get('line1', '')}") + _log(f" 配置文件: {config_path}") + _log(f"{'='*60}\n") + + + _log("[1/6] 解析 checkout session ID ...") + session_id, stripe_checkout_url = parse_checkout_url(checkout_input) + _log(f" session_id: {session_id}") + if "chatgpt.com" in checkout_input: + _log(f" 输入格式: ChatGPT 嵌入式链接 → 转换为 Stripe URL") + _log(f" stripe_url: {stripe_checkout_url}") + + http = requests.Session() + http.headers.update({"User-Agent": USER_AGENT}) + + # 代理配置 + proxy_cfg = cfg.get("proxy") + if proxy_cfg: + host = proxy_cfg["host"] + port = proxy_cfg["port"] + user = proxy_cfg.get("user", "") + pwd = proxy_cfg.get("pass", "") + if user and pwd: + proxy_url = f"http://{user}:{pwd}@{host}:{port}" + else: + proxy_url = f"http://{host}:{port}" + http.proxies = {"http": proxy_url, "https": proxy_url} + _log(f" 代理: {host}:{port} (user={user})") + else: + _log(" 代理: 无 (直连)") + + + reg_guid, reg_muid, reg_sid = register_fingerprint(http) + + + pk = fetch_publishable_key(http, session_id, stripe_checkout_url) + + + init_resp, stripe_ver, init_ctx = init_checkout(http, session_id, pk, locale_profile=locale_profile) + init_ctx["guid"] = reg_guid + init_ctx["muid"] = reg_muid + init_ctx["sid"] = reg_sid + + init_ctx["page_load_ts"] = int(time.time() * 1000) + mode = init_resp.get("mode", "unknown") + display_name = init_resp.get("account_settings", {}).get("display_name", "?") + _log(f" 商户: {display_name} | 模式: {mode}") + + + send_telemetry_batch(http, session_id, init_ctx, phase="init") + + + _log("[2c/6] 获取 elements session ...") + fetch_elements_session(http, pk, session_id, init_ctx, stripe_ver=stripe_ver, locale_profile=locale_profile) + + + _log("[2d/6] 查询 Link 消费者 ...") + lookup_consumer(http, pk, card["email"], stripe_ver=stripe_ver) + + + _log("[2e/6] 逐字段提交地址 ...") + update_payment_page_address(http, pk, session_id, card, init_ctx, stripe_ver=stripe_ver) + + + send_telemetry_batch(http, session_id, init_ctx, phase="address") + + + init_ctx["time_on_page"] = int(time.time() * 1000) - init_ctx.get("page_load_ts", int(time.time() * 1000)) + + hcaptcha_cfg = extract_hcaptcha_config(init_resp) + _log(f" hCaptcha site_key: {hcaptcha_cfg['site_key']}") + if hcaptcha_cfg.get("rqdata"): + _log(f" hCaptcha rqdata: {hcaptcha_cfg['rqdata'][:50]}...") + + -- + send_telemetry_batch(http, session_id, init_ctx, phase="card_input") + + if manual_token: + _log(f"[3/6] 使用手动传入的 token (长度: {len(manual_token)})") + captcha_token = manual_token + captcha_ekey = "" + pm_id = create_payment_method(http, pk, card, captcha_token, session_id, stripe_ver, ctx=init_ctx) + # ---- 遥测: confirm 阶段 ---- + send_telemetry_batch(http, session_id, init_ctx, phase="confirm") + confirm_payment(http, pk, session_id, pm_id, captcha_token, init_resp, stripe_ver, captcha_cfg, + captcha_ekey=captcha_ekey, ctx=init_ctx, locale_profile=locale_profile) + else: + + _log("[3/6] 尝试不带 hCaptcha 直接提交 ...") + try: + pm_id = create_payment_method(http, pk, card, "", session_id, stripe_ver, ctx=init_ctx) + send_telemetry_batch(http, session_id, init_ctx, phase="confirm") + confirm_payment(http, pk, session_id, pm_id, "", init_resp, stripe_ver, captcha_cfg, + captcha_ekey="", ctx=init_ctx, locale_profile=locale_profile) + except RuntimeError as e: + err_msg = str(e).lower() + if any(kw in err_msg for kw in ["captcha", "hcaptcha", "blocked", "denied", "radar"]): + _log(f" 需要 hCaptcha,开始解题 ...") + captcha_token, captcha_ekey = solve_hcaptcha(captcha_cfg, hcaptcha_cfg) + + init_ctx["time_on_page"] = int(time.time() * 1000) - init_ctx.get("page_load_ts", int(time.time() * 1000)) + pm_id = create_payment_method(http, pk, card, captcha_token, session_id, stripe_ver, ctx=init_ctx) + send_telemetry_batch(http, session_id, init_ctx, phase="confirm") + confirm_payment(http, pk, session_id, pm_id, captcha_token, init_resp, stripe_ver, captcha_cfg, + captcha_ekey=captcha_ekey, ctx=init_ctx, locale_profile=locale_profile) + else: + raise + + + send_telemetry_batch(http, session_id, init_ctx, phase="poll") + + # Step 6 + result = poll_result(http, pk, session_id, stripe_ver) + _log(f"\n日志已保存到: {LOG_FILE}") + return result + + + +def main(): + parser = argparse.ArgumentParser( + description="Stripe Checkout 自动化支付", + epilog="示例: python pay.py cs_live_a12H3g13P9TH6udPmljRCpWsmHiKRFH7VUiZBbcA1U60eMzFFI2wp3rtXL", + ) + parser.add_argument("session_id", help="Checkout Session ID (cs_live_xxx 或 cs_test_xxx)") + parser.add_argument("--card", type=int, default=0, help="使用第 N 张卡 (0-based, 默认 0)") + parser.add_argument("--config", default="config.json", help="配置文件路径 (默认 config.json)") + parser.add_argument("--token", default="", help="手动传入 hCaptcha token (跳过打码平台)") + args = parser.parse_args() + + try: + run(args.session_id, card_index=args.card, config_path=args.config, manual_token=args.token) + except Exception as e: + err_msg = f"\n[ERROR] {type(e).__name__}: {e}" + print(err_msg, file=sys.stderr) + # 也写入日志 + try: + import traceback + with open(LOG_FILE, "a", encoding="utf-8") as f: + f.write(f"\n{'!'*60}\n") + f.write(err_msg + "\n") + f.write(traceback.format_exc()) + f.write(f"{'!'*60}\n") + except Exception: + pass + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file