[Twisted-Python] Re: How to make a secure connection between two computers
Noam Raphael
noamraph at gmail.com
Tue Feb 12 16:04:28 MST 2008
2008/2/12, Drew Smathers <drew.smathers at gmail.com>:
> I think you could have written the equivalent program in less lines of code
> using facilities provided by Twisted. And you'd have the benefits of a
> number of things your program doesn't provide including:
>
> * transport/protocol separation
> * ability to handle multiple clients efficiently
>
As it happens, I don't really need to handle multiple clients
efficiently. I started to write the program using twisted, and stopped
when I realized that I had to write a fairly complex state machine. If
you have a simple way to write this protocol using twisted, it would
be nice to know.
> It is not my area expertise by any means, but shared-secret message hashing
> is a rather crude (read : easy to sniff and replay traffic) security
> mechanism.
>
Well, in TLS, after handshaking, both sides have a shared secret they
use for communication. I just skip the handshaking. (I do have a
challenge-response in the protocol, so I don't think replaying will
work.) But I did forget about hashing the data, so here's a better
version:
#!/usr/bin/env python
import sys
import struct
import socket
from Crypto.Hash import SHA
from Crypto.Cipher import ARC4
RANDOM_LEN = 16
def get_random():
return open('/dev/urandom').read(RANDOM_LEN)
class SocketEOFError(socket.error):
pass
def recv(sock, n):
buf = ''
while len(buf) < n:
r = sock.recv(n - len(buf))
if not r:
if not buf:
raise SocketEOFError("Connection closed")
else:
raise socket.error("Couldn't read enough bytes")
buf += r
return buf
def handshake(sock, secret):
my_random = get_random()
my_rc4 = ARC4.new(SHA.new(secret + my_random).digest())
sock.sendall(my_random)
peer_random = recv(sock, RANDOM_LEN)
peer_rc4 = ARC4.new(SHA.new(secret + peer_random).digest())
sock.sendall(peer_rc4.encrypt('\0'*RANDOM_LEN))
peer_response = recv(sock, RANDOM_LEN)
if my_rc4.decrypt(peer_response) != '\0' * RANDOM_LEN:
raise socket.error('Failed peer authentication')
return my_rc4, peer_rc4
def send_data(sock, peer_rc4, secret, data):
length_s = struct.pack('<h', len(data))
mac = SHA.new(secret + data + length_s).digest()
sock.sendall(length_s + peer_rc4.encrypt(data + mac))
def recv_data(sock, my_rc4, secret):
try:
length_s = recv(sock, 2)
except SocketEOFError:
return ''
length = struct.unpack('<h', length_s)[0]
msg = my_rc4.decrypt(recv(sock, length + SHA.digest_size))
data = msg[:length]
mac = SHA.new(secret + data + length_s).digest()
if mac != msg[length:]:
raise socket.error('Failed hash')
return data
def server(host, port, secret):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, port))
s.listen(1)
while True:
conn, addr = s.accept()
print 'Connected by', addr
conn.settimeout(1)
try:
my_rc4, peer_rc4 = handshake(conn, secret)
while True:
data = recv_data(conn, my_rc4, secret)
if not data: break
print 'Received', repr(data)
send_data(conn, peer_rc4, secret, data)
conn.close()
except socket.error, e:
print 'Error:', e
else:
print 'Closed connection'
def client(host, port, secret):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(1)
s.connect((host, port))
my_rc4, peer_rc4 = handshake(s, secret)
send_data(s, peer_rc4, secret, 'Hello, world!')
data = recv_data(s, my_rc4, secret)
s.close()
print 'Received', repr(data)
def main():
if len(sys.argv) != 5 or sys.argv[1] not in ('server', 'client'):
print "Usage: %s server/client host port secret"
return
is_server = sys.argv[1] == 'server'
host = sys.argv[2]
port = int(sys.argv[3])
secret = sys.argv[4]
if is_server:
server(host, port, secret)
else:
client(host, port, secret)
if __name__ == '__main__':
main()
More information about the Twisted-Python
mailing list