Friday, March 11, 2016

SSLv2 Drown Attack Scan

12:25 PM Leave a Reply
#!/usr/bin/env python

import sys
from enum import Enum
import time
import datetime
import socket
import Crypto.Cipher
import signal
from binascii import hexlify
import base64

#!/usr/bin/env python

import sys
from enum import Enum
import time
import datetime
import socket
import Crypto.Cipher
import signal
from binascii import hexlify
import base64

sys.path.append("./scapy-ssl_tls/")
import logging
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
import scapy
from scapy.all import *
from ssl_tls import *
import ssl_tls_crypto

from pyx509.pkcs7.asn1_models.X509_certificate import Certificate
from pyx509.pkcs7_models import X509Certificate, PublicKeyInfo, ExtendedKeyUsageExt
from pyx509.pkcs7.asn1_models.decoder_workarounds import decode

import select

SOCKET_TIMEOUT = 15
SOCKET_RECV_SIZE = 80 * 1024

CON_FAIL = "con fail"
NO_STARTTLS = "no starttls"
NO_TLS = "no tls"
VULN = "vuln"

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler)
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result

CHALLENGE    = 'a' * 16
CLEAR_KEY    = '\0' * 11
KEY_ARGUMENT = '\0' * 8

class CipherSuite(object):
    @classmethod
    def get_string_description(cls):
        raise NotImplementedError()

    @classmethod
    def get_constant(cls):
        return eval("SSLv2CipherSuite." + cls.get_string_description())

    @classmethod
    def get_client_master_key(cls, encrypted_pms):
        raise NotImplementedError()

    @classmethod
    def verify_key(cls, connection_id, server_finished):
        raise NotImplementedError()

    @classmethod
    def get_encrypted_pms(cls, public_key, secret_key):
        pkcs1_pubkey = Crypto.Cipher.PKCS1_v1_5.new(public_key)
        encrypted_pms = pkcs1_pubkey.encrypt(secret_key)
        return encrypted_pms

class RC4Export(CipherSuite):
    SECRET_KEY = 'b' * 5

    @classmethod
    def get_string_description(cls):
        return "RC4_128_EXPORT40_WITH_MD5"

    @classmethod
    def get_client_master_key(cls, public_key):
        client_master_key = SSLv2ClientMasterKey(cipher_suite=cls.get_constant(),
                                                 encrypted_key=cls.get_encrypted_pms(public_key, cls.SECRET_KEY),
                                                 clear_key=CLEAR_KEY)
        return client_master_key

    @classmethod
    def verify_key(cls, connection_id, server_finished):
        md5 = MD5.new(CLEAR_KEY + cls.SECRET_KEY + '0' + CHALLENGE + connection_id).digest()
        rc4 = Crypto.Cipher.ARC4.new(md5)
        if not rc4.decrypt(server_finished[2:]).endswith(CHALLENGE):
            return False
        return True

class RC4(CipherSuite):
    SECRET_KEY = 'b' * 16
    CLEAR_KEY  = 'a' * 15

    @classmethod
    def get_string_description(cls):
        return "RC4_128_WITH_MD5"

    @classmethod
    def get_client_master_key(cls, public_key):
        client_master_key = SSLv2ClientMasterKey(cipher_suite=cls.get_constant(),
                                                 encrypted_key=cls.get_encrypted_pms(public_key, cls.SECRET_KEY),
                                                 clear_key=cls.CLEAR_KEY)
        return client_master_key

    @classmethod
    def verify_key(cls, connection_id, server_finished):
        md5 = MD5.new((cls.CLEAR_KEY + cls.SECRET_KEY)[:16] + '0' + CHALLENGE + connection_id).digest()
        rc4 = Crypto.Cipher.ARC4.new(md5)
        if not rc4.decrypt(server_finished[2:]).endswith(CHALLENGE):
            return False
        return True

class RC2Export(CipherSuite):
    SECRET_KEY = 'b' * 5

    @classmethod
    def get_string_description(cls):
        return "RC2_128_CBC_EXPORT40_WITH_MD5"

    @classmethod
    def get_client_master_key(cls, public_key):
        client_master_key = SSLv2ClientMasterKey(cipher_suite=cls.get_constant(),
                                                 encrypted_key=cls.get_encrypted_pms(public_key, cls.SECRET_KEY),
                                                 key_argument=KEY_ARGUMENT,
                                                 clear_key=CLEAR_KEY)
        return client_master_key

    @classmethod
    def verify_key(cls, connection_id, server_finished):
        md5 = MD5.new(CLEAR_KEY + cls.SECRET_KEY + '0' + CHALLENGE + connection_id).digest()
        rc2 = Crypto.Cipher.ARC2.new(md5, mode=Crypto.Cipher.ARC2.MODE_CBC, IV=KEY_ARGUMENT, effective_keylen=128)
        try:
            decryption = rc2.decrypt(server_finished[3:])
        except ValueError, e:
            return False
        if decryption[17:].startswith(CHALLENGE):
            return True
        return False

class DES(CipherSuite):
    SECRET_KEY = 'b' * 8

    @classmethod
    def get_string_description(cls):
        return "DES_64_CBC_WITH_MD5"

    @classmethod
    def get_client_master_key(cls, public_key):
        client_master_key = SSLv2ClientMasterKey(cipher_suite=cls.get_constant(),
                                                 encrypted_key=cls.get_encrypted_pms(public_key, cls.SECRET_KEY),
                                                 key_argument=KEY_ARGUMENT)
        return client_master_key

    @classmethod
    def verify_key(cls, connection_id, server_finished):
        md5 = MD5.new(cls.SECRET_KEY + '0' + CHALLENGE + connection_id).digest()
        des = Crypto.Cipher.DES.new(md5[:8], mode=Crypto.Cipher.DES.MODE_CBC, IV=KEY_ARGUMENT)
        try:
            decryption = des.decrypt(server_finished[3:])
        except ValueError, e:
            return False
        if decryption[17:].startswith(CHALLENGE):
            return True
        return False

cipher_suites = [RC2Export, RC4Export, RC4, DES]

def parse_certificate(derData):
    cert = decode(derData, asn1Spec=Certificate())[0]
    x509cert = X509Certificate(cert)
    tbs = x509cert.tbsCertificate

    algType = tbs.pub_key_info.algType
    algParams = tbs.pub_key_info.key

    if (algType != PublicKeyInfo.RSA):
        print 'Certificate algType is not RSA'
        raise Exception()

    return RSA.construct((long(hexlify(algParams["mod"]), 16), long(algParams["exp"])))

class Protocol(Enum):
    BARE_SSLv2 = 1
    ESMTP      = 2
    IMAP       = 3
    POP3       = 4

def sslv2_connect(ip, port, protocol, cipher_suite, result_additional_data):
    s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    s.settimeout(SOCKET_TIMEOUT)
    try:
        s.connect((ip, port))
    except socket.error, e:
        try:
            s.connect((ip, port))
        except socket.error, e:
            try:
                s.connect((ip, port))
            except socket.error, e:
                print '%s: Case 1 - port is apparently closed (after 3 tries); Connect failed' % ip
                return CON_FAIL

    starttls_response = "n/a"
    try:
        if protocol == Protocol.ESMTP:
            banner = s.recv(SOCKET_RECV_SIZE)
            s.send("EHLO testing\r\n")
            ehlo_response = s.recv(SOCKET_RECV_SIZE)
            if "starttls" not in ehlo_response.lower():
                print "%s: Case 2a; Server apparently doesn't support STARTTLS" % ip
                return NO_STARTTLS
            s.send("STARTTLS\r\n")
            starttls_response = s.recv(SOCKET_RECV_SIZE)
        if protocol == Protocol.IMAP:
            banner = s.recv(SOCKET_RECV_SIZE)
            s.send(". CAPABILITY\r\n")
            banner = s.recv(SOCKET_RECV_SIZE)
            if "starttls" not in banner.lower():
                print "%s: Case 2b; Server apparently doesn't support STARTTLS" % ip
                return NO_STARTTLS
            s.send(". STARTTLS\r\n")
            starttls_response = s.recv(SOCKET_RECV_SIZE)
        if protocol == Protocol.POP3:
            banner = s.recv(SOCKET_RECV_SIZE)
            s.send("STLS\r\n")
            starttls_response = s.recv(SOCKET_RECV_SIZE)
    except socket.error, e:
        print "Errorx: " + str(e) + " - starttls_response: '" + starttls_response + "'"
        print '%s: Case 2c; starttls negotiation failed' % ip
        return NO_STARTTLS


    client_hello = SSLv2Record()/SSLv2ClientHello(cipher_suites=SSL2_CIPHER_SUITES.keys(),challenge=CHALLENGE)
    s.sendall(str(client_hello))

    rlist, wlist, xlist = select.select([s], [], [s], SOCKET_TIMEOUT)
    if s in xlist or not s in rlist:
        print '%s: Case 3a; Server did not response properly to client hello' % ip
        s.close()
        return "3a: %s" % NO_TLS

    try:
        server_hello_raw = s.recv(SOCKET_RECV_SIZE)
    except socket.error, e:
        print '%s: Case 3b; Connection reset by peer when waiting for server hello' % ip
        s.close()
        return "3b: %s" % NO_TLS

    server_hello = timeout(SSL, (server_hello_raw,), timeout_duration=SOCKET_TIMEOUT)
    if server_hello == None:
        print '%s: Case 3c; Timeout on parsing server hello' % ip
        s.close()
        return "3c: %s" % NO_TLS

    if not SSLv2ServerHello in server_hello:
        print '%s: Case 3d; Server hello did not contain server hello' % ip
        s.close()
        return "3d: %s" % NO_TLS

    parsed_server_hello = server_hello[SSLv2ServerHello]
    connection_id = parsed_server_hello.connection_id
    cert = parsed_server_hello.certificates

    try:
        public_key = parse_certificate(cert)
    except:
        # Server could still be vulnerable, we just can't tell, so we assume it isn't
        print '%s: Case 4a; Could not extract public key from DER' % ip
        s.close()
        return "4a: %s" % NO_TLS

    server_advertised_cipher_suites = parsed_server_hello.fields["cipher_suites"]
    cipher_suite_advertised = cipher_suite.get_constant() in server_advertised_cipher_suites

    client_master_key = cipher_suite.get_client_master_key(public_key)
    client_key_exchange = SSLv2Record()/client_master_key

    s.sendall(str(client_key_exchange))

    rlist, wlist, xlist = select.select([s], [], [s], SOCKET_TIMEOUT)
    if s in xlist:
        print '%s: Case 4b; Exception on socket after sending client key exchange' % ip
        s.close()
        return "4b: %s" % NO_TLS
    if not s in rlist:
        print '%s: Case 5; Server did not send finished in time' % ip
        s.close()
        return "5: %s" % NO_TLS
    try:
        server_finished = s.recv(SOCKET_RECV_SIZE)
    except socket.error, e:
        print '%s: Case 4c; Connection reset by peer when waiting for server finished' % ip
        s.close()
        return "4c: %s" % NO_TLS

    if server_finished == '':
        print '%s: Case 4d; Empty server_finished' % ip
        s.close()
        return "4d: %s" % NO_TLS

    if not cipher_suite.verify_key(connection_id, server_finished):
        print '%s: Case 7; Symmetric key did not successfully verify on server finished message' % ip
        return "7: %s" % NO_TLS

    s.close()

    result_additional_data['cipher_suite_advertised'] = cipher_suite_advertised
    return "%s:%s" % (VULN, base64.b64encode(public_key.exportKey(format='DER')))

if __name__ == '__main__':
    ip = sys.argv[1]
    port = int(sys.argv[2])
    scan_id = os.getcwd()
    dtime = datetime.datetime.now()
    print 'Testing %s on port %s' % (ip, port)

    protocol = Protocol.BARE_SSLv2
    if len(sys.argv) >= 4:
        if sys.argv[3] == '-esmtp':
            protocol = Protocol.ESMTP
        elif sys.argv[3] == '-imap':
            protocol = Protocol.IMAP
        elif sys.argv[3] == '-pop3':
            protocol = Protocol.POP3
        elif sys.argv[3] == '-bare':
            protocol = Protocol.BARE_SSLv2
        else:
            print 'You gave 3 arguments, argument 3 is not a recognized protocol. Bailing out'
            sys.exit(1)

    vulns = []
    for cipher_suite in cipher_suites:

        string_description = cipher_suite.get_string_description()
        ret_additional_data = {}
        ret = sslv2_connect(ip, port, protocol, cipher_suite, ret_additional_data)

        if ret.startswith(VULN):
            pub_key = ret.replace('%s:' % VULN, '')

            cve_string = ""
            if not ret_additional_data['cipher_suite_advertised']:
                cve_string = " to CVE-2015-3197"
            if string_description == "RC4_128_WITH_MD5":
                if cve_string == "":
                    cve_string = " to CVE-2016-0703"
                else:
                    cve_string += " and CVE-2016-0703"

            print '%s: Server is vulnerable%s, with cipher %s\n' % (ip, cve_string, string_description)
        else:
            print '%s: Server is NOT vulnerable with cipher %s, Message: %s\n' % (ip, string_description, ret)

0 comments :