baf392761160b5f2823eac1d33a529b939f1f8b2
[nepi.git] / src / nepi / util / tunchannel.py
1 import select
2 import sys
3 import os
4 import struct
5 import socket
6 import threading
7 import errno
8 import fcntl
9 import random
10 import traceback
11 import functools
12 import collections
13 import ctypes
14 import time
15
16 def ipfmt(ip):
17     ipbytes = map(ord,ip.decode("hex"))
18     return '.'.join(map(str,ipbytes))
19
20 tagtype = {
21     '0806' : 'arp',
22     '0800' : 'ipv4',
23     '8870' : 'jumbo',
24     '8863' : 'PPPoE discover',
25     '8864' : 'PPPoE',
26     '86dd' : 'ipv6',
27 }
28 def etherProto(packet, len=len):
29     if len(packet) > 14:
30         if packet[12] == "\x81" and packet[13] == "\x00":
31             # tagged
32             return packet[16:18]
33         else:
34             # untagged
35             return packet[12:14]
36     # default: ip
37     return "\x08\x00"
38 def formatPacket(packet, ether_mode):
39     if ether_mode:
40         stripped_packet = etherStrip(packet)
41         if not stripped_packet:
42             packet = packet.encode("hex")
43             if len(packet) < 28:
44                 return "malformed eth " + packet.encode("hex")
45             else:
46                 if packet[24:28] == "8100":
47                     # tagged
48                     ethertype = tagtype.get(packet[32:36], 'eth')
49                     return ethertype + " " + ( '-'.join( (
50                         packet[0:12], # MAC dest
51                         packet[12:24], # MAC src
52                         packet[24:32], # VLAN tag
53                         packet[32:36], # Ethertype/len
54                         packet[36:], # Payload
55                     ) ) )
56                 else:
57                     # untagged
58                     ethertype = tagtype.get(packet[24:28], 'eth')
59                     return ethertype + " " + ( '-'.join( (
60                         packet[0:12], # MAC dest
61                         packet[12:24], # MAC src
62                         packet[24:28], # Ethertype/len
63                         packet[28:], # Payload
64                     ) ) )
65         else:
66             packet = stripped_packet
67     packet = packet.encode("hex")
68     if len(packet) < 48:
69         return "malformed ip " + packet
70     else:
71         return "ip " + ( '-'.join( (
72             packet[0:1], #version
73             packet[1:2], #header length
74             packet[2:4], #diffserv/ECN
75             packet[4:8], #total length
76             packet[8:12], #ident
77             packet[12:16], #flags/fragment offs
78             packet[16:18], #ttl
79             packet[18:20], #ip-proto
80             packet[20:24], #checksum
81             ipfmt(packet[24:32]), # src-ip
82             ipfmt(packet[32:40]), # dst-ip
83             packet[40:48] if (int(packet[1],16) > 5) else "", # options
84             packet[48:] if (int(packet[1],16) > 5) else packet[40:], # payload
85         ) ) )
86
87 def _packetReady(buf, ether_mode=False, len=len):
88     if not buf:
89         return False
90         
91     rv = False
92     while not rv:
93         if len(buf[0]) < 4:
94             rv = False
95         elif ether_mode:
96             rv = True
97         else:
98             _,totallen = struct.unpack('HH',buf[0][:4])
99             totallen = socket.htons(totallen)
100             rv = len(buf[0]) >= totallen
101         if not rv and len(buf) > 1:
102             nbuf = ''.join(buf)
103             buf.clear()
104             buf.append(nbuf)
105         else:
106             return rv
107     return rv
108
109 def _pullPacket(buf, ether_mode=False, len=len):
110     if ether_mode:
111         return buf.popleft()
112     else:
113         _,totallen = struct.unpack('HH',buf[0][:4])
114         totallen = socket.htons(totallen)
115         if len(buf[0]) < totallen:
116             rv = buf[0][:totallen]
117             buf[0] = buf[0][totallen:]
118         else:
119             rv = buf.popleft()
120         return rv
121
122 def etherStrip(buf):
123     if len(buf) < 14:
124         return ""
125     if buf[12:14] == '\x08\x10' and buf[16:18] == '\x08\x00':
126         # tagged ethernet frame
127         return buf[18:]
128     elif buf[12:14] == '\x08\x00':
129         # untagged ethernet frame
130         return buf[14:]
131     else:
132         return ""
133
134 def etherWrap(packet):
135     return ''.join((
136         "\x00"*6*2 # bogus src and dst mac
137         +"\x08\x00", # IPv4
138         packet, # payload
139         "\x00"*4, # bogus crc
140     ))
141
142 def piStrip(buf, len=len):
143     if len(buf) < 4:
144         return buf
145     else:
146         return buffer(buf,4)
147     
148 def piWrap(buf, ether_mode, etherProto=etherProto):
149     if ether_mode:
150         proto = etherProto(buf)
151     else:
152         proto = "\x08\x00"
153     return ''.join((
154         "\x00\x00", # PI: 16 bits flags
155         proto, # 16 bits proto
156         buf,
157     ))
158
159 _padmap = [ chr(padding) * padding for padding in xrange(127) ]
160 del padding
161
162 def encrypt(packet, crypter, len=len, padmap=_padmap):
163     # pad
164     padding = crypter.block_size - len(packet) % crypter.block_size
165     packet += padmap[padding]
166     
167     # encrypt
168     return crypter.encrypt(packet)
169
170 def decrypt(packet, crypter, ord=ord):
171     if packet:
172         # decrypt
173         packet = crypter.decrypt(packet)
174         
175         # un-pad
176         padding = ord(packet[-1])
177         if not (0 < padding <= crypter.block_size):
178             # wrong padding
179             raise RuntimeError, "Truncated packet"
180         packet = packet[:-padding]
181     
182     return packet
183
184 def nonblock(fd):
185     try:
186         fl = fcntl.fcntl(fd, fcntl.F_GETFL)
187         fl |= os.O_NONBLOCK
188         fcntl.fcntl(fd, fcntl.F_SETFL, fl)
189         return True
190     except:
191         traceback.print_exc(file=sys.stderr)
192         # Just ignore
193         return False
194
195 def tun_fwd(tun, remote, with_pi, ether_mode, cipher_key, udp, TERMINATE, stderr=sys.stderr, reconnect=None, rwrite=None, rread=None, tunqueue=1000, tunkqueue=1000,
196         cipher='AES', accept_local=None, accept_remote=None, slowlocal=True, queueclass=None, bwlimit=None,
197         len=len, max=max, min=min, OSError=OSError, select=select.select, selecterror=select.error, os=os, socket=socket,
198         retrycodes=(os.errno.EWOULDBLOCK, os.errno.EAGAIN, os.errno.EINTR) ):
199     crypto_mode = False
200     crypter = None
201
202     try:
203         if cipher_key and cipher:
204             import Crypto.Cipher
205             import hashlib
206             __import__('Crypto.Cipher.'+cipher)
207             
208             ciphername = cipher
209             cipher = getattr(Crypto.Cipher, cipher)
210             hashed_key = hashlib.sha256(cipher_key).digest()
211             if getattr(cipher, 'key_size'):
212                 hashed_key = hashed_key[:cipher.key_size]
213             elif ciphername == 'DES3':
214                 hashed_key = hashed_key[:24]
215             crypter = cipher.new(
216                 hashed_key, 
217                 cipher.MODE_ECB)
218             crypto_mode = True
219     except:
220         traceback.print_exc(file=sys.stderr)
221         crypto_mode = False
222         crypter = None
223
224     if stderr is not None:
225         if crypto_mode:
226             print >>stderr, "Packets are transmitted in CIPHER"
227         else:
228             print >>stderr, "Packets are transmitted in PLAINTEXT"
229     
230     if hasattr(remote, 'fileno'):
231         remote_fd = remote.fileno()
232         if rwrite is None:
233             def rwrite(remote, packet, os_write=os.write):
234                 return os_write(remote_fd, packet)
235         if rread is None:
236             def rread(remote, maxlen, os_read=os.read):
237                 return os_read(remote_fd, maxlen)
238  
239     rnonblock = nonblock(remote)
240     tnonblock = nonblock(tun)
241     
242     # Pick up TUN/TAP writing method
243     if with_pi:
244         try:
245             import iovec
246             
247             # We have iovec, so we can skip PI injection
248             # and use iovec which does it natively
249             if ether_mode:
250                 twrite = iovec.ethpiwrite
251                 tread = iovec.piread2
252             else:
253                 twrite = iovec.ippiwrite
254                 tread = iovec.piread2
255         except ImportError:
256             # We have to inject PI headers pythonically
257             def twrite(fd, packet, oswrite=os.write, piWrap=piWrap, ether_mode=ether_mode):
258                 return oswrite(fd, piWrap(packet, ether_mode))
259             
260             # For reading, we strip PI headers with buffer slicing and that's it
261             def tread(fd, maxlen, osread=os.read, piStrip=piStrip):
262                 return piStrip(osread(fd, maxlen))
263     else:
264         # No need to inject PI headers
265         twrite = os.write
266         tread = os.read
267     
268     encrypt_ = encrypt
269     decrypt_ = decrypt
270     xrange_ = xrange
271
272     if accept_local is not None:
273         def tread(fd, maxlen, _tread=tread, accept=accept_local):
274             packet = _tread(fd, maxlen)
275             if accept(packet, 0):
276                 return packet
277             else:
278                 return None
279
280     if accept_remote is not None:
281         if crypto_mode:
282             def decrypt_(packet, crypter, decrypt_=decrypt_, accept=accept_remote):
283                 packet = decrypt_(packet, crypter)
284                 if accept(packet, 1):
285                     return packet
286                 else:
287                     return None
288         else:
289             def rread(fd, maxlen, _rread=rread, accept=accept_remote):
290                 packet = _rread(fd, maxlen)
291                 if accept(packet, 1):
292                     return packet
293                 else:
294                     return None
295     
296     maxbkbuf = maxfwbuf = max(10,tunqueue-tunkqueue)
297     tunhurry = max(0,maxbkbuf/2)
298     
299     if queueclass is None:
300         queueclass = collections.deque
301         maxbatch = 2000
302         maxtbatch = 50
303     else:
304         maxfwbuf = maxbkbuf = 2000000000
305         maxbatch = 50
306         maxtbatch = 30
307         tunhurry = 30
308     
309     fwbuf = queueclass()
310     bkbuf = queueclass()
311     nfwbuf = 0
312     nbkbuf = 0
313     if ether_mode or udp:
314         packetReady = bool
315         pullPacket = queueclass.popleft
316         reschedule = queueclass.appendleft
317     else:
318         packetReady = _packetReady
319         pullPacket = _pullPacket
320         reschedule = queueclass.appendleft
321     tunfd = tun.fileno()
322     os_read = os.read
323     os_write = os.write
324     
325     tget = time.time
326     maxbwfree = bwfree = 1500 * tunqueue
327     lastbwtime = tget()
328     
329     remoteok = True
330     
331     
332     while not TERMINATE:
333         wset = []
334         if packetReady(bkbuf):
335             wset.append(tun)
336         if remoteok and packetReady(fwbuf) and (not bwlimit or bwfree > 0):
337             wset.append(remote)
338         
339         rset = []
340         if len(fwbuf) < maxfwbuf:
341             rset.append(tun)
342         if remoteok and len(bkbuf) < maxbkbuf:
343             rset.append(remote)
344         
345         if remoteok:
346             eset = (tun,remote)
347         else:
348             eset = (tun,)
349         
350         try:
351             rdrdy, wrdy, errs = select(rset,wset,eset,1)
352         except selecterror, e:
353             if e.args[0] == errno.EINTR:
354                 # just retry
355                 continue
356             else:
357                 raise
358
359         # check for errors
360         if errs:
361             if reconnect is not None and remote in errs and tun not in errs:
362                 remote = reconnect()
363                 if hasattr(remote, 'fileno'):
364                     remote_fd = remote.fileno()
365             elif udp and remote in errs and tun not in errs:
366                 # In UDP mode, those are always transient errors
367                 # Usually, an error will imply a read-ready socket
368                 # that will raise an "Connection refused" error, so
369                 # disable read-readiness just for now, and retry
370                 # the select
371                 remoteok = False
372                 continue
373             else:
374                 break
375         else:
376             remoteok = True
377         
378         # check to see if we can write
379         #rr = wr = rt = wt = 0
380         if remote in wrdy:
381             sent = 0
382             try:
383                 try:
384                     for x in xrange(maxbatch):
385                         packet = pullPacket(fwbuf)
386
387                         if crypto_mode:
388                             packet = encrypt_(packet, crypter)
389                         
390                         sent += rwrite(remote, packet)
391                         #wr += 1
392                         
393                         if not rnonblock or not packetReady(fwbuf):
394                             break
395                 except OSError,e:
396                     # This except handles the entire While block on PURPOSE
397                     # as an optimization (setting a try/except block is expensive)
398                     # The only operation that can raise this exception is rwrite
399                     if e.errno in retrycodes:
400                         # re-schedule packet
401                         reschedule(fwbuf, packet)
402                     else:
403                         raise
404             except:
405                 if reconnect is not None:
406                     # in UDP mode, sometimes connected sockets can return a connection refused.
407                     # Give the caller a chance to reconnect
408                     remote = reconnect()
409                     if hasattr(remote, 'fileno'):
410                         remote_fd = remote.fileno()
411                 elif not udp:
412                     # in UDP mode, we ignore errors - packet loss man...
413                     raise
414                 #traceback.print_exc(file=sys.stderr)
415             
416             if bwlimit:
417                 bwfree -= sent
418         if tun in wrdy:
419             try:
420                 for x in xrange(maxtbatch):
421                     packet = pullPacket(bkbuf)
422                     twrite(tunfd, packet)
423                     #wt += 1
424                     
425                     # Do not inject packets into the TUN faster than they arrive, unless we're falling
426                     # behind. TUN devices discard packets if their queue is full (tunkqueue), but they
427                     # don't block either (they're always ready to write), so if we flood the device 
428                     # we'll have high packet loss.
429                     if not tnonblock or (slowlocal and len(bkbuf) < tunhurry) or not packetReady(bkbuf):
430                         break
431                 else:
432                     if slowlocal:
433                         # Give some time for the kernel to process the packets
434                         time.sleep(0)
435             except OSError,e:
436                 # This except handles the entire While block on PURPOSE
437                 # as an optimization (setting a try/except block is expensive)
438                 # The only operation that can raise this exception is os_write
439                 if e.errno in retrycodes:
440                     # re-schedule packet
441                     reschedule(bkbuf, packet)
442                 else:
443                     raise
444         
445         # check incoming data packets
446         if tun in rdrdy:
447             try:
448                 for x in xrange(maxbatch):
449                     packet = tread(tunfd,2000) # tun.read blocks until it gets 2k!
450                     if not packet:
451                         continue
452                     #rt += 1
453                     fwbuf.append(packet)
454                     
455                     if not tnonblock or len(fwbuf) >= maxfwbuf:
456                         break
457             except OSError,e:
458                 # This except handles the entire While block on PURPOSE
459                 # as an optimization (setting a try/except block is expensive)
460                 # The only operation that can raise this exception is os_read
461                 if e.errno not in retrycodes:
462                     raise
463         if remote in rdrdy:
464             try:
465                 try:
466                     for x in xrange(maxbatch):
467                         packet = rread(remote,2000)
468                         
469                         #rr += 1
470                         
471                         if crypto_mode:
472                             packet = decrypt_(packet, crypter)
473                             if not packet:
474                                 continue
475                         elif not packet:
476                             if not udp and packet == "":
477                                 # Connection broken, try to reconnect (or just die)
478                                 raise RuntimeError, "Connection broken"
479                             else:
480                                 continue
481
482                         bkbuf.append(packet)
483                         
484                         if not rnonblock or len(bkbuf) >= maxbkbuf:
485                             break
486                 except OSError,e:
487                     # This except handles the entire While block on PURPOSE
488                     # as an optimization (setting a try/except block is expensive)
489                     # The only operation that can raise this exception is rread
490                     if e.errno not in retrycodes:
491                         raise
492             except Exception, e:
493                 if reconnect is not None:
494                     # in UDP mode, sometimes connected sockets can return a connection refused
495                     # on read. Give the caller a chance to reconnect
496                     remote = reconnect()
497                     if hasattr(remote, 'fileno'):
498                         remote_fd = remote.fileno()
499                 elif not udp:
500                     # in UDP mode, we ignore errors - packet loss man...
501                     raise
502                 traceback.print_exc(file=sys.stderr)
503
504         if bwlimit:
505             tnow = tget()
506             delta = tnow - lastbwtime
507             if delta > 0.001:
508                 delta = int(bwlimit * delta)
509                 if delta > 0:
510                     bwfree = min(bwfree+delta, maxbwfree)
511                     lastbwtime = tnow
512         
513         #print >>sys.stderr, "rr:%d\twr:%d\trt:%d\twt:%d" % (rr,wr,rt,wt)
514
515 def udp_connect(TERMINATE, local_addr, local_port, peer_addr, peer_port):
516     rsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
517     retrydelay = 1.0
518     for i in xrange(30):
519         # TERMINATE is a array. An item can be added to TERMINATE, from
520         # outside this function to force termination of the loop
521         if TERMINATE:
522             raise OSError, "Killed"
523         try:
524             rsock.bind((local_addr, local_port))
525             break
526         except socket.error:
527             # wait a while, retry
528             print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
529             time.sleep(min(30.0,retrydelay))
530             retrydelay *= 1.1
531     else:
532         rsock.bind((local_addr, local_port))
533     print >>sys.stderr, "Listening UDP at: %s:%d" % (local_addr, local_port)
534     print >>sys.stderr, "Connecting UDP to: %s:%d" % (peer_addr, peer_port)
535     rsock.connect((peer_addr, peer_port))
536     return rsock
537
538 def udp_handshake(TERMINATE, rsock):
539     endme = False
540     def keepalive():
541         while not endme and not TERMINATE:
542             try:
543                 rsock.send('')
544             except:
545                 pass
546             time.sleep(1)
547         try:
548             rsock.send('')
549         except:
550             pass
551     keepalive_thread = threading.Thread(target=keepalive)
552     keepalive_thread.start()
553     retrydelay = 1.0
554     for i in xrange(30):
555         if TERMINATE:
556             raise OSError, "Killed"
557         try:
558             heartbeat = rsock.recv(10)
559             break
560         except:
561             time.sleep(min(30.0,retrydelay))
562             retrydelay *= 1.1
563     else:
564         heartbeat = rsock.recv(10)
565     endme = True
566     keepalive_thread.join()
567
568 def udp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
569     rsock = udp_connect(TERMINATE, local_addr, local_port, peer_addr,
570             peer_port)
571     udp_handshake(TERMINATE, rsock)
572     return rsock 
573
574 def tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port):
575     sock = None
576     retrydelay = 1.0
577     # The peer has a firewall that prevents a response to the connect, we 
578     # will be forever blocked in the connect, so we put a reasonable timeout.
579     rsock.settimeout(10) 
580     # We wait for 
581     for i in xrange(30):
582         if stop:
583             break
584         if TERMINATE:
585             raise OSError, "Killed"
586         try:
587             rsock.connect((peer_addr, peer_port))
588             sock = rsock
589             break
590         except socket.error:
591             # wait a while, retry
592             print >>sys.stderr, "%s: Could not connect. Retrying in a sec..." % (time.strftime('%c'),)
593             time.sleep(min(30.0,retrydelay))
594             retrydelay *= 1.1
595     else:
596         rsock.connect((peer_addr, peer_port))
597         sock = rsock
598     if sock:
599         print >>sys.stderr, "tcp_connect: TCP sock connected to remote %s:%s" % (peer_addr, peer_port)
600         sock.settimeout(0) 
601     return sock
602
603 def tcp_listen(TERMINATE, stop, lsock, local_addr, local_port):
604     sock = None
605     retrydelay = 1.0
606     # We try to bind to the local virtual interface. 
607     # It might not exist yet so we wait in a loop.
608     for i in xrange(30):
609         if stop:
610             break
611         if TERMINATE:
612             raise OSError, "Killed"
613         try:
614             lsock.bind((local_addr, local_port))
615             break
616         except socket.error:
617             # wait a while, retry
618             print >>sys.stderr, "%s: Could not bind. Retrying in a sec..." % (time.strftime('%c'),)
619             time.sleep(min(30.0,retrydelay))
620             retrydelay *= 1.1
621     else:
622         lsock.bind((local_addr, local_port))
623
624     print >>sys.stderr, "tcp_listen: TCP sock listening in local sock %s:%s" % (local_addr, local_port)
625     # Now we wait until the other side connects. 
626     # The other side might not be ready yet, so we also wait in a loop for timeouts.
627     timeout = 1
628     lsock.listen(1)
629     for i in xrange(30):
630         if TERMINATE:
631             raise OSError, "Killed"
632         rlist, wlist, xlist = select.select([lsock], [], [], timeout)
633         if stop:
634             break
635         if lsock in rlist:
636             sock,raddr = lsock.accept()
637             print >>sys.stderr, "tcp_listen: TCP connection accepted in local sock %s:%s" % (local_addr, local_port)
638             break
639         timeout += 5
640     return sock
641
642 def tcp_handshake(TERMINATE, rsock, listen, dice):
643     # we are going to use a barrier algorithm to decide wich side listen.
644     # each side will "roll a dice" and send the resulting value to the other 
645     # side. 
646     sock = None
647     rsock.settimeout(5)
648     for i in xrange(100):
649         if TERMINATE:
650             raise OSError, "Killed"
651         try:
652             hand = dice.read()
653             rsock.send(str(hand))
654             peer_hand = rsock.recv(1)
655             print >>sys.stderr, "tcp_handshake: hand %s, peer_hand %s" % (hand, peer_hand)
656             if hand < peer_hand:
657                 if listen:
658                     sock = rsock
659                 break   
660             elif hand > peer_hand:
661                 if not listen:
662                     sock = rsock
663                 break
664             else:
665                 dice.release()
666                 dice.throw()
667         except socket.error:
668             dice.release()
669             break
670     if sock:
671         sock.settimeout(0)
672     return sock
673
674 def tcp_establish(TERMINATE, local_addr, local_port, peer_addr, peer_port):
675     def listen(stop, result, lsock, dice):
676         rsock = tcp_listen(TERMINATE, stop, lsock, local_addr, local_port)
677         if rsock:
678             rsock = tcp_handshake(TERMINATE, rsock, True, dice)
679             if rsock:
680                 stop.append(True)
681                 result.append(rsock)
682                 rsock.send("GATO")
683                 rsock.settimeout(6)
684                 try:
685                     perro = rsock.recv(5)
686                 except:
687                     perro = "ERROR! TIMEOUT"
688                 rsock.settimeout(0)
689                 print >>sys.stderr, "tcp_establish: listen %s" % perro
690
691     def connect(stop, result, rsock, dice):
692         rsock = tcp_connect(TERMINATE, stop, rsock, peer_addr, peer_port)
693         if rsock:
694             rsock = tcp_handshake(TERMINATE, rsock, False, dice)
695             if rsock:
696                 stop.append(True)
697                 result.append(rsock)
698                 rsock.send("PERRO")
699                 rsock.settimeout(6)
700                 try:
701                     gato = rsock.recv(4)
702                 except:
703                     gato = "TIMEOUT!!"
704                 rsock.settimeout(0)
705                 print >>sys.stderr, "tcp_establish: connected %s" % gato
706     
707     dice = Dice()
708     dice.throw()
709     stop = []
710     result = []
711     lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
712     rsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
713     listen_thread = threading.Thread(target=listen, args=(stop, result, lsock, dice))
714     connect_thread = threading.Thread(target=connect, args=(stop, result, rsock, dice))
715     connect_thread.start()
716     listen_thread.start()
717     connect_thread.join()
718     listen_thread.join()
719     if not result:
720         raise OSError, "Error: tcp_establish could not establish connection."
721     sock = result[0]
722     return sock
723
724 class Dice(object):
725     def __init__(self):
726         self._condition = threading.Condition(threading.Lock())
727         self._readers = 0
728         self._value = None
729
730     def read(self):
731         self._condition.acquire()
732         try:
733             self._readers += 1
734         finally:
735             self._condition.release()
736         return self._value
737
738     def release(self):
739         self._condition.acquire()
740         try:
741             if self._readers > 0:
742                 self._readers -= 1
743             if self._readers == 0:
744                 self._condition.notifyAll()
745         finally:
746             self._condition.release()
747
748     def throw(self):
749         self._condition.acquire()
750         try:
751             while self._readers > 0:
752                 self._condition.wait()
753             self._value = random.randint(1, 6)
754         finally:
755             self._condition.release()
756