1
2 import sys
3 if sys.version_info < (2, 6):
4 print("must use python 2.6 or greater")
5 if sys.version_info < (3, 0):
6 import Queue
7 QUEUE_CLASS = Queue.Queue
8 UNICODE_CLASS = unicode
9 else:
10 import queue
11 QUEUE_CLASS = queue.Queue
12 UNICODE_CLASS = str
13 import array
14 import collections
15 import socket, ssl
16 import weakref
17 import threading
18 import time
19
20 -def enumof(enum_dictionary, int_key):
21 try:
22 return enum_dictionary[int_key]
23 except:
24 return ""
25
26 ''' Protocol error.
27 '''
34
38
41 Error.__init__(self, "response error (either invalid in parameters for the call or call isn't allowed or call failed")
42
47
48 MAX_UINT = 2**32-1
52 self.mutex = mutex
53 self.mutex.acquire()
56
58 '''Base class for all cached objects.
59
60 Every object is identified by an Id specified as first parameter of the constructor.
61 Trying to create two objects with same Id yields the same object. Uses weak references
62 to allow the objects to be deleted normally.
63
64 @warning: C{__init__()} is always called, don't use it to prevent initializing an already
65 initialized object. Use C{__sk_init_()} instead, it is called only once.
66 '''
67 - def __new__(cls, oid, root, *args, **kwargs):
68 if oid == 0:
69 return False
70 scl = ScopedLock(root._lock_)
71 hashk = cls, oid
72 obj = None
73 try:
74 obj = root._cache_[hashk]
75 except KeyError:
76 obj = object.__new__(cls)
77 root._cache_[hashk] = obj
78 if hasattr(obj, '_sk_init_'):
79 obj._sk_init_(oid, root, *args, **kwargs)
80 return obj
81 @staticmethod
83 if oid == 0:
84 return None
85 scl = ScopedLock(root._lock_)
86 hashk = cls, oid
87 try:
88 return root._cache_[hashk]
89 except KeyError:
90 return None
93
95 rwlock = threading.Lock()
97 self.transport = transport
98 self.object_id = object_id
99 self.properties = {}
100 ''' Retrieve given property id.
101 '''
103 hit = cached
104 val = 0
105 try:
106 self.rwlock.acquire()
107 if hit:
108 val = self.properties[prop_id]
109 self.rwlock.release()
110 except KeyError:
111 self.rwlock.release()
112 hit = False
113 if not hit:
114 val = self.transport.get(GetRequest(header, self.object_id))
115 return val
118
119 ''' Connection class that implements Skype IPC.
120 '''
122 _decoders = {}
123
126 self.connection = connection
127 threading.Thread.__init__(self)
128 self.setName('event thread')
130 try:
131 self.connection.process_events(True)
132 except:
133 self.connection.stop()
134 raise
135
138 self.connection = connection
139 threading.Thread.__init__(self)
140 self.setName('responser listener thread')
142 try:
143 self.connection._start()
144 except:
145 self.connection.stop()
146 raise
147
148 - def __init__(self, apptoken, module_id2classes,
149 has_event_thread = True, host = '127.0.0.1', port = 8963, logtransport=False, secure=True, setup=''):
150 self.module_id2classes = module_id2classes
151 if port != None:
152 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
153 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, True)
154 sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, True)
155 else:
156 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
157 sock.settimeout(30.5)
158 self.pending_requests = {}
159 self.pending_gets = collections.deque()
160 self.pending_lock = threading.Lock()
161 self.encoding_lock = threading.Lock()
162 self.decoded = threading.Event()
163 self.event_queue = QUEUE_CLASS()
164 self._lock_ = threading.Lock()
165 self._cache_ = weakref.WeakValueDictionary()
166 self.inlog = False
167 self.outlog = False
168 if logtransport:
169 try:
170 self.inlog = open(logtransport+'_log_in.1', 'wb')
171 except IOError:
172 self.inlog = False
173 try:
174 self.outlog = open(logtransport+'_log_out.1', 'wb')
175 except IOError:
176 self.outlog = False
177 self.stopped = False
178 retry = 3
179 while retry > 0:
180 try:
181 if port != None:
182 sock.connect((host, port))
183 else:
184 sock.connect('\0'+host)
185 retry = -1
186 except:
187 retry = retry - 1
188 if retry == 0:
189 raise
190 time.sleep(5)
191 if secure:
192 cert = ""
193 sock = ssl.wrap_socket(sock, server_side=True, certfile=apptoken, ssl_version=ssl.PROTOCOL_TLSv1)
194 else:
195 f = open(apptoken, 'r')
196 cert = f.read()
197 f.close()
198 self.socket = sock
199 self.read_buffer = ''
200 self.handshake(sock, setup, cert)
201 if has_event_thread:
202 self.event_dispatcher = SkypeKit.EventDispatcher(self)
203 self.event_dispatcher.start()
204 self.listener = SkypeKit.ResponseListener(self)
205 self.listener.start()
206
208 setup = setup.encode('utf-8')
209 cert = cert+setup
210 req = ('%08x'%len(cert))+cert
211 sock.sendall(req)
212 if self.outlog:
213 try:
214 self.outlog.write(req)
215 except IOError:
216 self.outlog.close()
217 self.outlog = False
218 if self._read_byte(2) != 'OK':
219 raise ConnectionClosed
220
223
225 if self.socket != None:
226 self.socket.close()
227 if self.inlog:
228 self.inlog.close()
229 if self.outlog:
230 self.outlog.close()
231
233 result = self.read_buffer
234 while not self.stopped and len(result) < num_bytes_to_read:
235 try:
236 read = self.socket.recv(4096)
237 if not read:
238 self.stop()
239 raise ConnectionClosed
240 result += read
241 if self.inlog:
242 try:
243 self.inlog.write(read)
244 except IOError:
245 self.inlog.close()
246 self.inlog = False
247 except socket.timeout:
248 pass
249 except ssl.SSLError as strerror:
250 if "timed out" in str(strerror):
251 pass
252 elif not self.stopped:
253 self.stop();
254 raise
255 except:
256 if not self.stopped:
257 self.stop()
258 raise
259 if self.stopped:
260 return None
261 self.read_buffer = result[num_bytes_to_read:]
262 result = result[:num_bytes_to_read]
263 return result
264
266 while in_thread or not self.event_queue.empty():
267 event = self.event_queue.get()
268 if self.stopped:
269 return
270 event.dispatch()
271
272 - def xcall(self, request):
273 ev_resp_here = threading.Event()
274 self.encoding_lock.acquire()
275 self.pending_lock.acquire()
276 self.pending_requests[request.rid] = ev_resp_here
277 self.pending_lock.release()
278 req = request.send()
279 self.socket.sendall(req)
280 if self.outlog:
281 try:
282 self.outlog.write(req)
283 except IOError:
284 self.outlog.close()
285 self.outlog = False
286 self.encoding_lock.release()
287 ev_resp_here.wait()
288 if self.stopped:
289 raise ConnectionClosed()
290 resp = self._decode_parms()
291 self.decoded.set()
292 return resp
293
295 if len(objects) > 0:
296 self.get(GetRequest(header, objects))
297
298 - def get(self, request):
299 ev_resp_here = threading.Event()
300 self.encoding_lock.acquire()
301 self.pending_lock.acquire()
302 self.pending_gets.append(ev_resp_here)
303 self.pending_lock.release()
304 req = request.send()
305 self.socket.sendall(req)
306 if self.outlog:
307 try:
308 self.outlog.write(req)
309 except IOError:
310 self.outlog.close()
311 self.outlog = False
312 self.encoding_lock.release()
313 ev_resp_here.wait()
314 if self.stopped:
315 raise ConnectionClosed()
316
317 return self._decode_get_response()
318
320 response = None
321 mresponse = {}
322 continue_sign = ','
323 count = 0
324 while continue_sign == ',':
325 modid = self._decode_varuint()
326 while continue_sign == ',':
327 oid = self._decode_varuint()
328 obj = self.module_id2classes[modid](oid, self)
329 if not obj in mresponse:
330 mresponse[obj] = {}
331 kind = self._read_byte()
332 while kind != ']':
333 propid = self._decode_varuint()
334 if kind != 'N':
335 response = self._decoders[kind](self)
336 mresponse[obj][propid] = response
337 obj.rwlock.acquire()
338 obj.properties[propid] = response
339 obj.rwlock.release()
340 count = count + 1
341 kind = self._read_byte()
342 if kind != ']':
343 raise ResponseError()
344 continue_sign = self._read_byte()
345 if continue_sign != ']':
346 raise ResponseError()
347 continue_sign = self._read_byte()
348 if continue_sign != ']':
349 raise ResponseError()
350 if self._read_byte() != 'z':
351 raise ResponseError()
352 self.decoded.set()
353 if count > 1:
354 response = mresponse
355 return response
356
358 self.pending_lock.acquire()
359 self.pending_gets.popleft().set()
360 self.pending_lock.release()
361 _decoders['g'] = _get_response
362
364 shift = 0
365 result = 0
366 while 1:
367 value = ord(self._read_byte()) & 0xff
368 result = result | ((value & 0x7f) << shift)
369 shift = shift + 7
370 if not (value & 0x80): break
371 return result
372 _decoders['u'] = _decode_varuint
373 _decoders['O'] = _decode_varuint
374 _decoders['e'] = _decode_varuint
375
377 value = self._decode_varuint()
378 if not value & 0x1:
379 return value >> 1
380 return (value >> 1) ^ (~0)
381 _decoders['i'] = _decode_varint
382
385 _decoders['T'] = _decode_true
386
389 _decoders['F'] = _decode_false
390
392 decoded_list = []
393 while True:
394 byte = self._read_byte()
395 if byte == ']':
396 return decoded_list
397 decoder = self._decoders[byte]
398 if decoder:
399 decoded_list.append(decoder(self))
400 _decoders['['] = _decode_list
401
403 length = self._decode_varuint()
404 val = ''
405 if length > 0:
406 val = self._read_byte(length)
407 return val
408
410 string = self._decode_binary()
411 return string.decode('utf-8', 'ignore')
412 _decoders['f'] = _decode_string
413 _decoders['B'] = _decode_binary
414 _decoders['S'] = _decode_string
415 _decoders['X'] = _decode_string
416
418 - def get(self, index, defval = None):
419 try:
420 return self[index]
421 except:
422 if defval == None: defval = 0
423 return defval
424
426 parms = self.Parms()
427 decoder = True
428 while decoder != None:
429 byte = self._read_byte()
430 if self.stopped or byte == 'z': break
431 if byte != 'N':
432 decoder = self._decoders[byte]
433 tag = self._decode_varuint()
434 if decoder:
435 parms[tag] = decoder(self)
436 else:
437
438
439 self._read_byte()
440 self.decoded.set()
441 raise ResponseError()
442
443 return parms
444
446 - def __init__(self, transport, modid, target, evid, parms):
447 self.module_id = modid
448 self.target = target
449 self.event_id = evid
450 self.transport = transport
451 self.parms = parms
452 transport.decoded.set()
460
462
463 modid = self._decode_varuint()
464 target = self.root
465 evid = self._decode_varuint()
466 parms = self._decode_parms()
467 self.event_queue.put(SkypeKit.Event(self, modid, target, evid, parms))
468 _decoders['E'] = _decode_event
469
471 - def __init__(self, transport, modid, oid, propid, val):
472 self.transport = transport
473 self.modid = modid
474 self.oid = oid
475 self.propid = propid
476 self.val = val
477 transport.decoded.set()
496
498
499 modid = self._decode_varuint()
500 oid = self._decode_varuint()
501 kind = self._read_byte()
502 propid = self._decode_varuint()
503 val = None
504 if kind != 'N':
505 val = self._decoders[kind](self)
506 self._read_byte(4)
507 self.event_queue.put(SkypeKit.PropertyChange(self, modid, oid, propid, val))
508 _decoders['C'] = _decode_property_change
509
511 rid = self._decode_varuint()
512 self.pending_lock.acquire()
513 ev_resp_here = self.pending_requests[rid]
514 del self.pending_requests[rid]
515 self.pending_lock.release()
516 ev_resp_here.set()
517 _decoders['r'] = _xcall_response
518
520 while not self.stopped:
521 if self._read_byte(1) == 'Z':
522 if self.stopped:
523 return
524 cmd = self._read_byte()
525 if self.stopped:
526 return
527 decoder = self._decoders[cmd]
528 if decoder:
529 decoder(self)
530 self.decoded.wait()
531 self.decoded.clear()
532
534 if not self.stopped:
535 self.stopped = True
536 if self.socket:
537 self.socket.shutdown(socket.SHUT_RDWR)
538 self.socket.close()
539 self.socket = None
540 self.decoded.set()
541 self.event_queue.put({})
542 for ev_get in self.pending_gets:
543 ev_get.set()
544 for ev_req in self.pending_requests:
545 self.pending_requests[ev_req].set()
546
548 ''' Base class for all request that provides the encoding primitives
549 and a write buffer
550 '''
552 self.tokens = array.array('B')
553 _encoders = { }
554
556 if number >= 0:
557 number = number << 1
558 else:
559 number = (number << 1) ^ (~0)
560 self._encode_varuint(number)
561 _encoders['i'] = _encode_varint
562
564 tok = self.tokens
565 while 1:
566 towrite = number & 0x7f
567 number = number >> 7
568 if number == 0:
569 tok.append(towrite)
570 break
571 tok.append(0x80|towrite)
572 _encoders['u'] = _encode_varuint
573 _encoders['e'] = _encode_varuint
574 _encoders['o'] = _encode_varuint
575
577 if not val:
578 self._encode_varuint(0)
579 else:
580 self._encode_varuint(val.object_id)
581 _encoders['O'] = _encode_objectid
582
584 tok = self.tokens
585 if isinstance(val, UNICODE_CLASS):
586 val = val.encode('utf-8')
587 length = len(val)
588 self._encode_varuint(length)
589 if length > 0:
590 tok.fromstring(val)
591 _encoders['S'] = _encode_string
592 _encoders['X'] = _encode_string
593 _encoders['f'] = _encode_string
594 _encoders['B'] = _encode_string
595
597 tok = self.tokens
598 if isinstance(val, list):
599 tok.append(ord('['))
600 self._encode_varuint(tag)
601 encoder = self._encoders[kind]
602 for elem in val:
603 if kind != 'b':
604 tok.append(ord(kind))
605 encoder(self, elem)
606 else:
607 if elem:
608 tok.append(ord('T'))
609 else:
610 tok.append(ord('F'))
611 tok.append(ord(']'))
612 elif kind != 'b':
613 tok.append(ord(kind))
614 if tag == 0:
615 self.oid = val.object_id
616 self._encode_varuint(tag)
617 self._encoders[kind](self, val)
618 else:
619 if val:
620 tok.append(ord('T'))
621 else:
622 tok.append(ord('F'))
623 self._encode_varuint(tag)
624 return self
625
627 ''' action call request
628 '''
629 __request_id = 0
630 __request_lock = threading.Lock()
631 - def __init__(self, header, module_id, method_id):
643 tok = self.tokens
644 tok.append(ord('z'))
645 self.tokens = None
646 return tok
647
649 ''' get request: support multiple object id, but not heterogeneous object classes in
650 one request
651 '''
653 ''' constructor, takes a preencoded header, and a signle object id or a list of
654 object ids
655 '''
656 Request.__init__(self)
657 tok = self.tokens
658 tok.fromstring(header)
659 if isinstance(object_id, list):
660 prefix = ''
661 for obj in object_id:
662 tok.fromstring(prefix)
663 self._encode_varuint(obj.object_id)
664 prefix = ','
665 else:
666 self._encode_varuint(object_id)
667 tok.fromstring(']]z')
669 tok = self.tokens
670 self.tokens = None
671 return tok
672