Module skypekit

Source Code for Module skypekit

  1  #!/usr/bin/python 
  2  import sys 
  3  if sys.version_info < (2, 6): 
  4      print("must use python 2.6 or greater") 
  5  if sys.version_info < (3, 0): 
  6      import Queue 
  7      QUEUE_CLASS = Queue.Queue 
  8      UNICODE_CLASS = unicode 
  9  else: 
 10      import queue 
 11      QUEUE_CLASS = queue.Queue 
 12      UNICODE_CLASS = str 
 13  import array 
 14  import collections 
 15  import socket, ssl 
 16  import weakref 
 17  import threading 
 18  import time 
19 20 -def enumof(enum_dictionary, int_key):
21 try: 22 return enum_dictionary[int_key] 23 except: 24 return ""
25 26 ''' Protocol error. 27 '''
28 -class Error(Exception):
29 - def __init__(self, msg):
30 self.msg = msg
31
32 - def __str__(self):
33 return self.msg
34
35 -class ConnectionClosed(Error):
36 - def __init__(self):
37 Error.__init__(self, "Connection closed")
38
39 -class ResponseError(Error):
40 - def __init__(self):
41 Error.__init__(self, "response error (either invalid in parameters for the call or call isn't allowed or call failed")
42
43 44 -class InvalidObjectIdError(Error):
45 - def __init__(self):
46 Error.__init__(self, "object id is 0")
47 48 MAX_UINT = 2**32-1
49 50 -class ScopedLock(object):
51 - def __init__(self, mutex):
52 self.mutex = mutex 53 self.mutex.acquire()
54 - def __del__(self):
55 self.mutex.release()
56
57 -class Cached(object):
58 '''Base class for all cached objects. 59 60 Every object is identified by an Id specified as first parameter of the constructor. 61 Trying to create two objects with same Id yields the same object. Uses weak references 62 to allow the objects to be deleted normally. 63 64 @warning: C{__init__()} is always called, don't use it to prevent initializing an already 65 initialized object. Use C{__sk_init_()} instead, it is called only once. 66 '''
67 - def __new__(cls, oid, root, *args, **kwargs):
68 if oid == 0: 69 return False # invalid id, still return something not to shift parameters 70 scl = ScopedLock(root._lock_) 71 hashk = cls, oid 72 obj = None 73 try: 74 obj = root._cache_[hashk] 75 except KeyError: 76 obj = object.__new__(cls) 77 root._cache_[hashk] = obj 78 if hasattr(obj, '_sk_init_'): 79 obj._sk_init_(oid, root, *args, **kwargs) 80 return obj
81 @staticmethod
82 - def sk_exists(cls, oid, root):
83 if oid == 0: 84 return None # invalid id 85 scl = ScopedLock(root._lock_) 86 hashk = cls, oid 87 try: 88 return root._cache_[hashk] 89 except KeyError: 90 return None
91 - def __copy__(self):
92 return self
93
94 -class Object(Cached):
95 rwlock = threading.Lock()
96 - def _sk_init_(self, object_id, transport):
97 self.transport = transport 98 self.object_id = object_id 99 self.properties = {}
100 ''' Retrieve given property id. 101 '''
102 - def _sk_property(self, header, prop_id, cached):
103 hit = cached #True 104 val = 0 105 try: 106 self.rwlock.acquire() 107 if hit: 108 val = self.properties[prop_id] 109 self.rwlock.release() 110 except KeyError: 111 self.rwlock.release() 112 hit = False 113 if not hit: 114 val = self.transport.get(GetRequest(header, self.object_id)) 115 return val
116 - def multiget(self, header):
117 self.transport.get(GetRequest(header, self.object_id))
118 119 ''' Connection class that implements Skype IPC. 120 '''
121 -class SkypeKit:
122 _decoders = {} 123
124 - class EventDispatcher(threading.Thread):
125 - def __init__(self, connection):
126 self.connection = connection 127 threading.Thread.__init__(self) 128 self.setName('event thread')
129 - def run(self):
130 try: 131 self.connection.process_events(True) 132 except: 133 self.connection.stop() 134 raise
135
136 - class ResponseListener(threading.Thread):
137 - def __init__(self, connection):
138 self.connection = connection 139 threading.Thread.__init__(self) 140 self.setName('responser listener thread')
141 - def run(self):
142 try: 143 self.connection._start() 144 except: 145 self.connection.stop() 146 raise 147
148 - def __init__(self, apptoken, module_id2classes, 149 has_event_thread = True, host = '127.0.0.1', port = 8963, logtransport=False, secure=True, setup=''):
150 self.module_id2classes = module_id2classes 151 if port != None: 152 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 153 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True) 154 sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, True) 155 else: 156 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 157 sock.settimeout(30.5) 158 self.pending_requests = {} 159 self.pending_gets = collections.deque() 160 self.pending_lock = threading.Lock() 161 self.encoding_lock = threading.Lock() 162 self.decoded = threading.Event() 163 self.event_queue = QUEUE_CLASS() 164 self._lock_ = threading.Lock() 165 self._cache_ = weakref.WeakValueDictionary() 166 self.inlog = False 167 self.outlog = False 168 if logtransport: 169 try: 170 self.inlog = open(logtransport+'_log_in.1', 'wb') 171 except IOError: 172 self.inlog = False 173 try: 174 self.outlog = open(logtransport+'_log_out.1', 'wb') 175 except IOError: 176 self.outlog = False 177 self.stopped = False 178 retry = 3 179 while retry > 0: 180 try: 181 if port != None: 182 sock.connect((host, port)) 183 else: 184 sock.connect('\0'+host) 185 retry = -1 186 except: 187 retry = retry - 1 188 if retry == 0: 189 raise 190 time.sleep(5) 191 if secure: 192 cert = "" 193 sock = ssl.wrap_socket(sock, server_side=True, certfile=apptoken, ssl_version=ssl.PROTOCOL_TLSv1) 194 else: 195 f = open(apptoken, 'r') 196 cert = f.read() 197 f.close() 198 self.socket = sock 199 self.read_buffer = '' 200 self.handshake(sock, setup, cert) 201 if has_event_thread: 202 self.event_dispatcher = SkypeKit.EventDispatcher(self) 203 self.event_dispatcher.start() 204 self.listener = SkypeKit.ResponseListener(self) 205 self.listener.start()
206
207 - def handshake(self, sock, setup, cert):
208 setup = setup.encode('utf-8') 209 cert = cert+setup 210 req = ('%08x'%len(cert))+cert 211 sock.sendall(req) 212 if self.outlog: 213 try: 214 self.outlog.write(req) 215 except IOError: 216 self.outlog.close() 217 self.outlog = False 218 if self._read_byte(2) != 'OK': 219 raise ConnectionClosed
220
221 - def set_root(self, root):
222 self.root = root
223
224 - def __del__(self):
225 if self.socket != None: 226 self.socket.close() 227 if self.inlog: 228 self.inlog.close() 229 if self.outlog: 230 self.outlog.close()
231
232 - def _read_byte(self, num_bytes_to_read = 1):
233 result = self.read_buffer 234 while not self.stopped and len(result) < num_bytes_to_read: 235 try: 236 read = self.socket.recv(4096) 237 if not read: 238 self.stop() 239 raise ConnectionClosed 240 result += read 241 if self.inlog: 242 try: 243 self.inlog.write(read) 244 except IOError: 245 self.inlog.close() 246 self.inlog = False 247 except socket.timeout: 248 pass # retry always: if connection is closed we will see it at next read 249 except ssl.SSLError as strerror: 250 if "timed out" in str(strerror): 251 pass 252 elif not self.stopped: 253 self.stop(); 254 raise 255 except: 256 if not self.stopped: 257 self.stop() 258 raise 259 if self.stopped: 260 return None 261 self.read_buffer = result[num_bytes_to_read:] 262 result = result[:num_bytes_to_read] 263 return result
264
265 - def process_events(self, in_thread = False):
266 while in_thread or not self.event_queue.empty(): 267 event = self.event_queue.get() 268 if self.stopped: 269 return 270 event.dispatch()
271
272 - def xcall(self, request):
273 ev_resp_here = threading.Event() 274 self.encoding_lock.acquire() 275 self.pending_lock.acquire() 276 self.pending_requests[request.rid] = ev_resp_here 277 self.pending_lock.release() 278 req = request.send() 279 self.socket.sendall(req) 280 if self.outlog: 281 try: 282 self.outlog.write(req) 283 except IOError: 284 self.outlog.close() 285 self.outlog = False 286 self.encoding_lock.release() 287 ev_resp_here.wait() # no need to clear: one shot 288 if self.stopped: 289 raise ConnectionClosed() 290 resp = self._decode_parms() 291 self.decoded.set() 292 return resp
293
294 - def multiget(self, header, objects):
295 if len(objects) > 0: 296 self.get(GetRequest(header, objects))
297
298 - def get(self, request):
299 ev_resp_here = threading.Event() 300 self.encoding_lock.acquire() 301 self.pending_lock.acquire() 302 self.pending_gets.append(ev_resp_here) 303 self.pending_lock.release() 304 req = request.send() 305 self.socket.sendall(req) 306 if self.outlog: 307 try: 308 self.outlog.write(req) 309 except IOError: 310 self.outlog.close() 311 self.outlog = False 312 self.encoding_lock.release() # wait until response is here 313 ev_resp_here.wait() 314 if self.stopped: 315 raise ConnectionClosed() 316 # process the response with patching the instances... 317 return self._decode_get_response()
318
319 - def _decode_get_response(self):
320 response = None 321 mresponse = {} 322 continue_sign = ',' 323 count = 0 324 while continue_sign == ',': 325 modid = self._decode_varuint() # modid 326 while continue_sign == ',': 327 oid = self._decode_varuint() # oid 328 obj = self.module_id2classes[modid](oid, self) 329 if not obj in mresponse: 330 mresponse[obj] = {} 331 kind = self._read_byte() 332 while kind != ']': 333 propid = self._decode_varuint() # propid 334 if kind != 'N': 335 response = self._decoders[kind](self) 336 mresponse[obj][propid] = response 337 obj.rwlock.acquire() 338 obj.properties[propid] = response 339 obj.rwlock.release() 340 count = count + 1 341 kind = self._read_byte() # ] finish the list 342 if kind != ']': 343 raise ResponseError() 344 continue_sign = self._read_byte() 345 if continue_sign != ']': 346 raise ResponseError() 347 continue_sign = self._read_byte() 348 if continue_sign != ']': 349 raise ResponseError() 350 if self._read_byte() != 'z': 351 raise ResponseError() 352 self.decoded.set() 353 if count > 1: 354 response = mresponse 355 return response
356
357 - def _get_response(self):
358 self.pending_lock.acquire() 359 self.pending_gets.popleft().set() 360 self.pending_lock.release()
361 _decoders['g'] = _get_response 362
363 - def _decode_varuint(self):
364 shift = 0 365 result = 0 366 while 1: 367 value = ord(self._read_byte()) & 0xff 368 result = result | ((value & 0x7f) << shift) 369 shift = shift + 7 370 if not (value & 0x80): break 371 return result
372 _decoders['u'] = _decode_varuint 373 _decoders['O'] = _decode_varuint 374 _decoders['e'] = _decode_varuint 375
376 - def _decode_varint(self):
377 value = self._decode_varuint() 378 if not value & 0x1: 379 return value >> 1 380 return (value >> 1) ^ (~0)
381 _decoders['i'] = _decode_varint 382
383 - def _decode_true(self):
384 return True
385 _decoders['T'] = _decode_true 386
387 - def _decode_false(self):
388 return False
389 _decoders['F'] = _decode_false 390
391 - def _decode_list(self):
392 decoded_list = [] 393 while True: 394 byte = self._read_byte() 395 if byte == ']': 396 return decoded_list 397 decoder = self._decoders[byte] 398 if decoder: 399 decoded_list.append(decoder(self))
400 _decoders['['] = _decode_list 401
402 - def _decode_binary(self):
403 length = self._decode_varuint() 404 val = '' 405 if length > 0: 406 val = self._read_byte(length) 407 return val
408
409 - def _decode_string(self):
410 string = self._decode_binary() 411 return string.decode('utf-8', 'ignore')
412 _decoders['f'] = _decode_string 413 _decoders['B'] = _decode_binary 414 _decoders['S'] = _decode_string 415 _decoders['X'] = _decode_string 416
417 - class Parms(dict):
418 - def get(self, index, defval = None):
419 try: 420 return self[index] 421 except: 422 if defval == None: defval = 0 423 return defval
424
425 - def _decode_parms(self):
426 parms = self.Parms() 427 decoder = True 428 while decoder != None: 429 byte = self._read_byte() 430 if self.stopped or byte == 'z': break 431 if byte != 'N': 432 decoder = self._decoders[byte] 433 tag = self._decode_varuint() 434 if decoder: 435 parms[tag] = decoder(self) 436 else: 437 #print "response error ", self.read_byte() # shall be z 438 #self.decoded.set() 439 self._read_byte() # z 440 self.decoded.set() 441 raise ResponseError() 442 #self.decoded.set() 443 return parms
444
445 - class Event(object):
446 - def __init__(self, transport, modid, target, evid, parms):
447 self.module_id = modid 448 self.target = target 449 self.event_id = evid 450 self.transport = transport 451 self.parms = parms 452 transport.decoded.set()
453 - def dispatch(self):
454 target = self.target 455 if self.module_id != 0: 456 target = Cached.sk_exists(self.transport.module_id2classes[self.module_id], self.parms[0], self.transport) 457 if target == None: 458 return 459 getattr(target, target.event_handlers[self.event_id])(self.parms)
460
461 - def _decode_event(self):
462 # push the event in the event queue 463 modid = self._decode_varuint() 464 target = self.root 465 evid = self._decode_varuint() 466 parms = self._decode_parms() 467 self.event_queue.put(SkypeKit.Event(self, modid, target, evid, parms))
468 _decoders['E'] = _decode_event 469
470 - class PropertyChange(object):
471 - def __init__(self, transport, modid, oid, propid, val):
472 self.transport = transport 473 self.modid = modid 474 self.oid = oid 475 self.propid = propid 476 self.val = val 477 transport.decoded.set()
478 - def dispatch(self):
479 obj = Cached.sk_exists(self.transport.module_id2classes[self.modid], self.oid, self.transport) 480 if obj == None: 481 return 482 try: 483 propname = obj.propid2label[self.propid] 484 except KeyError: 485 return 486 obj.rwlock.acquire() 487 if self.val: 488 obj.properties[self.propid] = self.val 489 else: 490 try: 491 del obj.properties[self.propid] 492 except KeyError: 493 pass 494 obj.rwlock.release() 495 obj.OnPropertyChange(propname)
496
497 - def _decode_property_change(self):
498 # push the event in the event queue 499 modid = self._decode_varuint() 500 oid = self._decode_varuint() # obj id 501 kind = self._read_byte() # prop kind 502 propid = self._decode_varuint() # prop id 503 val = None # invalidate the value 504 if kind != 'N': 505 val = self._decoders[kind](self) 506 self._read_byte(4) # ]]]z 507 self.event_queue.put(SkypeKit.PropertyChange(self, modid, oid, propid, val))
508 _decoders['C'] = _decode_property_change 509
510 - def _xcall_response(self):
511 rid = self._decode_varuint() 512 self.pending_lock.acquire() 513 ev_resp_here = self.pending_requests[rid] 514 del self.pending_requests[rid] 515 self.pending_lock.release() 516 ev_resp_here.set()
517 _decoders['r'] = _xcall_response 518
519 - def _start(self):
520 while not self.stopped: 521 if self._read_byte(1) == 'Z': 522 if self.stopped: 523 return 524 cmd = self._read_byte() 525 if self.stopped: 526 return 527 decoder = self._decoders[cmd] 528 if decoder: 529 decoder(self) 530 self.decoded.wait() 531 self.decoded.clear() # shall be done immediatly after set?
532
533 - def stop(self):
534 if not self.stopped: 535 self.stopped = True 536 if self.socket: 537 self.socket.shutdown(socket.SHUT_RDWR) 538 self.socket.close() 539 self.socket = None 540 self.decoded.set() # ensure that Listener thread resumes 541 self.event_queue.put({}) # ensure that event thread resumes 542 for ev_get in self.pending_gets: 543 ev_get.set() 544 for ev_req in self.pending_requests: 545 self.pending_requests[ev_req].set()
546
547 -class Request:
548 ''' Base class for all request that provides the encoding primitives 549 and a write buffer 550 '''
551 - def __init__(self):
552 self.tokens = array.array('B')
553 _encoders = { } 554
555 - def _encode_varint(self, number):
556 if number >= 0: 557 number = number << 1 558 else: 559 number = (number << 1) ^ (~0) 560 self._encode_varuint(number)
561 _encoders['i'] = _encode_varint 562
563 - def _encode_varuint(self, number):
564 tok = self.tokens 565 while 1: 566 towrite = number & 0x7f 567 number = number >> 7 568 if number == 0: 569 tok.append(towrite) 570 break 571 tok.append(0x80|towrite)
572 _encoders['u'] = _encode_varuint 573 _encoders['e'] = _encode_varuint 574 _encoders['o'] = _encode_varuint 575
576 - def _encode_objectid(self, val):
577 if not val: 578 self._encode_varuint(0) 579 else: 580 self._encode_varuint(val.object_id)
581 _encoders['O'] = _encode_objectid 582
583 - def _encode_string(self, val):
584 tok = self.tokens 585 if isinstance(val, UNICODE_CLASS): 586 val = val.encode('utf-8') 587 length = len(val) 588 self._encode_varuint(length) 589 if length > 0: 590 tok.fromstring(val)
591 _encoders['S'] = _encode_string 592 _encoders['X'] = _encode_string 593 _encoders['f'] = _encode_string 594 _encoders['B'] = _encode_string 595
596 - def add_parm(self, kind, tag, val):
597 tok = self.tokens 598 if isinstance(val, list): 599 tok.append(ord('[')) 600 self._encode_varuint(tag) 601 encoder = self._encoders[kind] 602 for elem in val: 603 if kind != 'b': 604 tok.append(ord(kind)) 605 encoder(self, elem) 606 else: 607 if elem: 608 tok.append(ord('T')) 609 else: 610 tok.append(ord('F')) 611 tok.append(ord(']')) 612 elif kind != 'b': 613 tok.append(ord(kind)) 614 if tag == 0: 615 self.oid = val.object_id 616 self._encode_varuint(tag) 617 self._encoders[kind](self, val) 618 else: 619 if val: 620 tok.append(ord('T')) 621 else: 622 tok.append(ord('F')) 623 self._encode_varuint(tag) 624 return self
625
626 -class XCallRequest(Request):
627 ''' action call request 628 ''' 629 __request_id = 0 630 __request_lock = threading.Lock()
631 - def __init__(self, header, module_id, method_id):
632 Request.__init__(self) 633 self.tokens.fromstring(header) 634 self.module_id = module_id 635 self.method_id = method_id 636 self.oid = 0 637 XCallRequest.__request_lock.acquire() 638 self.rid = XCallRequest.__request_id 639 XCallRequest.__request_id = XCallRequest.__request_id + 1 640 XCallRequest.__request_lock.release() 641 self._encode_varuint(self.rid)
642 - def send(self):
643 tok = self.tokens 644 tok.append(ord('z')) 645 self.tokens = None 646 return tok
647
648 -class GetRequest(Request):
649 ''' get request: support multiple object id, but not heterogeneous object classes in 650 one request 651 '''
652 - def __init__(self, header, object_id):
653 ''' constructor, takes a preencoded header, and a signle object id or a list of 654 object ids 655 ''' 656 Request.__init__(self) 657 tok = self.tokens 658 tok.fromstring(header) 659 if isinstance(object_id, list): 660 prefix = '' 661 for obj in object_id: 662 tok.fromstring(prefix) 663 self._encode_varuint(obj.object_id) 664 prefix = ',' 665 else: 666 self._encode_varuint(object_id) 667 tok.fromstring(']]z')
668 - def send(self):
669 tok = self.tokens 670 self.tokens = None 671 return tok
672