001 package com.skype.ipc;
002
003 import java.io.IOException;
004 import java.io.PrintWriter;
005 import java.io.StringWriter;
006 import java.lang.ref.SoftReference;
007 import java.util.ArrayDeque;
008 import java.util.HashMap;
009 import java.util.HashSet;
010 import java.util.Map.Entry;
011 import java.util.concurrent.locks.ReentrantLock;
012
013 /***
014 * Base class for a SID interface. Most of function are internal even when public. The functions to be used in the application are documented ones.
015 */
016 public abstract class SidRoot implements ClientEncodingListener, ClientDecodingListener, ObjectFactoring {
017
018 protected SidRoot() {
019 mSidTimestamp = 0;
020 }
021
022 protected class EventThread extends Thread {
023 public EventThread(SidRoot root) {
024 super();
025 mRoot = root;
026 }
027
028 public void run() {
029 mRoot.sidPollEvent();
030 }
031
032 private SidRoot mRoot;
033 }
034
035 protected void sidPollEvent() {
036 try {
037 while (!mSidStopped) {
038 Decoding decoder = mSidDecoder.decodeEvent(this);
039 switch (decoder.getCommand()) {
040 case 'C':
041 sidOnChangedProperty(decoder);
042 break;
043 case 'E':
044 int mid = decoder.decodeUint();
045 int evid = decoder.decodeUint();
046 sidDispatchEvent(mid, evid, decoder);
047 break;
048 case 'g':
049 sidAddPendingGetResponse();
050 break;
051 case 'r':
052 sidAddPendingResponse(decoder.decodeUint());
053 break;
054 }
055 }
056 } catch (IOException e) {
057 sidOnFatalError(e);
058 }
059 }
060
061 public abstract void sidDispatchEvent(int modId, int evId, Decoding decoder);
062
063 public boolean sidWantRead() {
064 synchronized (this) {
065 if (mSidReader == mSidEventThread || mSidReader == null) {
066 mSidReader = mSidEventThread;
067 return true;
068 } else {
069 mSidReadyEventThread = mSidEventThread;
070 }
071 }
072 synchronized (mSidEventThread) {
073 try {
074 while (mSidCurrentReader != mSidEventThread)
075 mSidEventThread.wait();
076 } catch (InterruptedException ie) {
077 return false;
078 }
079 mSidCurrentReader = null;
080 }
081 return true;
082 }
083
084 /***
085 * initialize the connection to the runtime, and the interface cache accordingly to the configuration
086 * @param configuration defining the connection parameters to the runtime
087 * @param listener callbacks to check the status of the connection
088 * return success or failure to connect
089 */
090 public boolean init(ClientConfiguration configuration, ConnectionListener listener) {
091 registerConnectionListener(listener);
092
093 TransportFactory factory = configuration.getTransportFactory();
094 TransportFactory.Result transport = factory.init(configuration, listener);
095 if (transport != null) {
096 mSidInput = transport.in;
097 mSidOutput = transport.out;
098 mSidDispatchAll = configuration.isDispatchAll();
099 mSidDecoder = new BinProtocolClientDecoder(transport.in, this, this);
100 mSidEncoder = new BinProtocolClientEncoder(transport.out, this);
101 flushCache(null);
102 mSidEncoderLock = new ReentrantLock();
103 mSidEncoding = false;
104 mSidPendingRequests = new HashMap<Integer, Thread>(2);
105 mSidPendingOneWayRequests = new HashSet<Integer>(2);
106 mSidPendingGets = new ArrayDeque<Thread>(2);
107 mSidEventThread = new EventThread(this);
108 mSidReader = mSidEventThread;
109 mSidCurrentReader = null;
110 mSidEventThread.start();
111 }
112 return mSidObjects != null;
113 }
114
115 protected SidObject sidDecodeEventTarget(int mid, Decoding decoder)
116 {
117 SidObject target = null;
118 try {
119 int b = decoder.decodeTag(); // shall be 'O'
120 int zero = decoder.decodeUint();
121 int oid = decoder.decodeUint();
122 target = mSidDispatchAll ? sidGetObject(mid, oid) : sidGetObjectIfPresent(mid, oid);
123 if (target == null) {
124 decoder.skipEndOfMessage();
125 }
126 }
127 catch (IOException e) {
128 sidOnFatalError(e);
129 }
130 return target;
131 }
132
133 /***
134 * start the session after init, once all the runtime setups have been done
135 */
136 public abstract boolean start();
137
138 /***
139 * stop and terminate the connection to a runtime
140 */
141 public synchronized void stop() {
142 mSidStopped = true;
143 if (mSidEncoding) {
144 mSidEncoding = false;
145 mSidEncoderLock.unlock();
146 }
147 try {
148 mSidInput.close();
149 mSidOutput.close();
150 } catch (IOException e) {
151 }
152 if (mSidCurrentReader != null)
153 mSidCurrentReader.interrupt();
154 if (mSidReader != null && mSidReader != mSidCurrentReader)
155 mSidReader.interrupt();
156 for (Thread thread : mSidPendingRequests.values()) {
157 thread.interrupt();
158 }
159 for (Thread thread : mSidPendingGets) {
160 thread.interrupt();
161 }
162 }
163
164 public void sidOnFatalError(IOException e) {
165 if (mSidStopped) return;
166 stop();
167 if (mSidConnectionListener != null) {
168 StringWriter sw = new StringWriter();
169 e.printStackTrace(new PrintWriter(sw));
170 mSidConnectionListener.sidOnDisconnected(sw.toString());
171 } else {
172 e.printStackTrace();
173 System.exit(-1);
174 }
175 }
176
177 //
178 // Object Cache Management
179 //
180
181 public SidObject sidGetObjectIfPresent(int mid, int oid) {
182 Integer key = oid; //new Integer(oid);
183 SidObject o = null;
184 synchronized (mSidObjects) {
185 SoftReference<SidObject> r = mSidObjects.get(key);
186 if (r != null) {
187 o = r.get();
188 if (o == null) {
189 o = sidCreateObject(mid, oid);
190 mSidObjects.put(key, new SoftReference<SidObject>(o));
191 }
192 }
193 }
194 return o;
195 }
196
197 public SidObject sidGetObject(int mid, int oid) {
198 Integer key = oid; //new Integer(oid);
199 SidObject o = null;
200 synchronized (mSidObjects) {
201 SoftReference<SidObject> r = mSidObjects.get(key);
202 if (r != null) {
203 o = r.get();
204 }
205 if (o == null) {
206 o = sidCreateObject(mid, oid);
207 mSidObjects.put(key, new SoftReference<SidObject>(o));
208 }
209 }
210 return o;
211 }
212 protected abstract SidObject sidCreateObject(int mid, int oid);
213
214 protected void sidRemove(Integer oid) {
215 synchronized (mSidObjects) {
216 mSidObjects.remove(oid);
217 }
218 }
219
220 protected Encoding sidBeginMultiGet(PropertyEnumConverting[] properties, int modid, int oid) throws IOException {
221 mSidEncoderLock.lock();
222 mSidEncoding = true;
223 return mSidEncoder.beginMultiGet(properties, modid, oid, this);
224 }
225
226 protected Decoding sidDoGetRequest(byte[] request, int oid) throws IOException { // case of multi request?
227 mSidEncoderLock.lock();
228 mSidEncoding = true;
229 return mSidEncoder.doGetRequest(request, oid, this);
230 }
231
232 public Decoding sidOnGetRequestEncoded() throws IOException {
233 Thread current = null;
234 synchronized (this) {
235 if (mSidPendingGetResponse != null) { // response is already here...
236 mSidPendingGetResponse = null;
237 } else {
238 current = Thread.currentThread();
239 mSidPendingGets.add(current);
240 if (mSidReader == null) mSidReader = current;
241 }
242 }
243 mSidEncoding = false;
244 mSidEncoderLock.unlock();
245 if (current != null) {
246 if (mSidReader != current) {
247 synchronized (current) {
248 try {
249 while (mSidCurrentReader != current)
250 current.wait();
251 } catch (InterruptedException ie) {
252 mSidReader = null;
253 }
254 mSidCurrentReader = null;
255 }
256 }
257 if (mSidReader == current) {
258 // current thread became the new reader and shall wait for a response
259 sidWaitResponse();
260 }
261 }
262 // response is here and needs being decoded
263 return mSidDecoder;
264 }
265
266 public void sidOnGetResponseDecoded() {
267 sidOnResponseDecoded();
268 }
269
270 protected Encoding sidDoRequest(byte[] request) throws IOException {
271 mSidEncoderLock.lock();
272 mSidEncoding = true;
273 return mSidEncoder.beginRequest(request, this);
274 }
275
276 protected Encoding sidDoRequest(byte[] request, int oid) throws IOException {
277 mSidEncoderLock.lock();
278 mSidEncoding = true;
279 return mSidEncoder.beginRequest(request, oid, this);
280 }
281
282 public Decoding sidOnRequestEncoded(int requestId) throws IOException {
283 mSidEncoding = false;
284 mSidEncoderLock.unlock();
285 Thread current = Thread.currentThread();
286 Integer rid = requestId;
287 synchronized (this) {
288 Thread responder = mSidPendingRequests.get(rid);
289 if (responder != null) { // response already here
290 mSidPendingRequests.remove(rid);
291 return mSidDecoder;
292 }
293 mSidPendingRequests.put(rid, current);
294 if (mSidReader == null) mSidReader = current;
295 }
296 if (mSidReader != current) {
297 synchronized (current) {
298 try {
299 while (mSidCurrentReader != current)
300 current.wait();
301 } catch (InterruptedException ie) {
302 mSidReader = null;
303 }
304 mSidCurrentReader = null;
305 // 2 cases => either there was no reader and we were promoted reader
306 // then mSidReader == current, and we must wait responses
307 // => or the reader got our response, we must decode it
308 }
309 }
310 if (mSidReader == current) { // we were wakenup to read
311 sidWaitResponse();
312 }
313 return mSidDecoder;
314 }
315
316 public Decoding sidOnOneWayRequestEncoded(int requestId) {
317 mSidEncoding = false;
318 mSidEncoderLock.unlock();
319 Thread responder = null;
320 Integer rid = requestId;
321 Thread current = Thread.currentThread();
322
323 synchronized (this) {
324 responder = mSidPendingRequests.get(rid);
325 if (responder != null) { // response is already here and was partly swallowed
326 mSidPendingRequests.remove(rid);
327 } else {
328 mSidPendingRequests.put(rid, current);
329 mSidPendingOneWayRequests.add(rid);
330 }
331 }
332 if (responder != null) {
333 try {
334 mSidDecoder.skipMessage();
335 } catch (IOException e) {
336 sidOnFatalError(e);
337 }
338 synchronized (responder) {
339 mSidCurrentReader = responder;
340 responder.notify();
341 }
342 }
343 return null;
344 }
345
346 public void sidOnResponseDecoded() {
347 Thread current = Thread.currentThread();
348 if (mSidReader == current) {
349 synchronized (this) {
350 if (!mSidPendingGets.isEmpty()) {
351 mSidReader = mSidPendingGets.peek();
352 } else {
353 mSidReader = null;
354 if (!mSidPendingRequests.isEmpty()) {
355 // can be oneway...
356 for (Entry<Integer, Thread> entry : mSidPendingRequests.entrySet()) {
357 Integer rid = entry.getKey();
358 if (!mSidPendingOneWayRequests.contains(rid)) {
359 mSidReader = entry.getValue();
360 break;
361 }
362 }
363 }
364 if (mSidReader == null && mSidReadyEventThread != null) {
365 mSidReader = mSidReadyEventThread;
366 }
367 }
368 current = mSidReader;
369 }
370 } else {
371 current = mSidReader;
372 }
373 if (current != null) {
374 synchronized (current) {
375 mSidCurrentReader = current;
376 current.notify();
377 }
378 }
379 }
380
381 void sidOnChangedProperty(Decoding decoder) {
382 // currently only 1 notified at once and only int-alike properties are valued, else design shall be changed to ensure that all properties are decoded
383 // before calling the callbacks
384 int expected = 1;
385 int kind;
386 int oid = 0;
387 int moduleId = 0;
388 int value = 0;
389 String svalue = null;
390 int propertyId= -1;
391 try {
392 while (decoder.hasNextProperty(false)) {
393 assert(expected-- > 0);
394 PropertyInfo info = decoder.getNextProperty();
395 kind = info.kind;
396 moduleId = info.moduleId;
397 oid = info.objectId;
398 propertyId = info.propertyId;
399 switch (kind) {
400 case 'F': case 'N': break;
401 case 'T': value = 1; break;
402 case 'O': case 'e': case 'u': value = decoder.decodeUint(); break;
403 case 'i': value = decoder.decodeInt(); break;
404 case 'S': case 'f': case 'X': svalue = decoder.decodeString(); break;
405 default:
406 decoder.skipValue(kind);
407 break;
408 }
409 }
410 } catch (IOException e) {
411 sidOnFatalError(e);
412 }
413 if (oid>0) {
414 SidObject o = mSidDispatchAll ? sidGetObject(moduleId, oid) : sidGetObjectIfPresent(moduleId, oid);
415 if (o != null) {
416 o.sidOnChangedProperty(propertyId, value, svalue);
417 }
418 }
419 }
420
421 boolean sidAddPendingGetResponse() throws IOException {
422 Thread current = mSidReader;
423 Thread requester = null;
424 synchronized (this) {
425 if (mSidPendingGets.isEmpty()) {
426 mSidPendingGetResponse = current;
427 } else {
428 requester = mSidPendingGets.poll();
429 }
430 }
431 boolean got_my_response = requester == current;
432 if (!got_my_response) {
433 if (requester != null) {
434 synchronized (requester) {
435 mSidCurrentReader = requester;
436 requester.notify();
437 }
438 }
439 synchronized (current) {
440 try {
441 while (mSidCurrentReader != current)
442 current.wait();
443 } catch (InterruptedException ie) {
444 throw new IOException("Connection was closed");
445 }
446 mSidCurrentReader = null;
447 }
448 }
449 return got_my_response;
450 }
451
452 boolean sidAddPendingResponse(int requestId) throws IOException {
453 Thread current = mSidReader;
454 Thread requester = null;
455 Integer rid = requestId;
456 boolean oneway = false;
457 synchronized (this) {
458 requester = mSidPendingRequests.get(rid);
459 if (requester == null) {
460 mSidPendingRequests.put(rid, current);
461 } else {
462 mSidPendingRequests.remove(rid);
463 oneway = mSidPendingOneWayRequests.remove(rid);
464 }
465 }
466 if (oneway) {
467 try {
468 mSidDecoder.skipMessage();
469 } catch (IOException e) {
470 sidOnFatalError(e);
471 }
472 return false;
473 }
474 boolean got_my_response = requester == current;
475 if (!got_my_response) {
476 if (requester != null) {
477 synchronized (requester) {
478 mSidCurrentReader = requester;
479 requester.notify();
480 }
481 }
482 synchronized (current) {
483 try {
484 while (mSidCurrentReader != current)
485 current.wait();
486 } catch (InterruptedException ie) {
487 throw new IOException("Connection was closed");
488 }
489 mSidCurrentReader = null;
490 }
491 }
492 return got_my_response;
493 }
494
495 void sidWaitResponse() throws IOException {
496 Decoding decoder = mSidDecoder;
497 while (true) {
498 int r;
499 try {
500 r = decoder.decodeResponse(this);
501 } catch (IOException e) {
502 return;
503 }
504 switch (r) {
505 case 'r':
506 int rid;
507 try {
508 rid = decoder.decodeUint();
509 } catch (IOException e) {
510 return;
511 }
512 if (sidAddPendingResponse(rid)) return;
513 break;
514 case 'g':
515 if (sidAddPendingGetResponse()) return;
516 break;
517 }
518 }
519 }
520
521 public void registerConnectionListener(ConnectionListener listener) {
522 mSidConnectionListener = listener;
523 }
524 public void unRegisterConnectionListener(ConnectionListener listener) {
525 mSidConnectionListener = null;
526 }
527 public ConnectionListener getConnectionListener() {
528 return mSidConnectionListener;
529 }
530
531 public SidObject flushCache(SidObject keep) {
532 mSidTimestamp++;
533 mSidObjects = new HashMap<Integer, SoftReference<SidObject>>(512);
534 if (keep != null) {
535 mSidObjects.put(keep.getOid(), new SoftReference<SidObject>(keep));
536 keep.mSidTimestamp = mSidTimestamp;
537 }
538 return keep;
539 }
540
541 protected int mSidTimestamp;
542 private ConnectionListener mSidConnectionListener;
543 private Thread mSidReader;
544 private Thread mSidCurrentReader;
545 private Thread mSidEventThread;
546 private Thread mSidReadyEventThread;
547 private ReentrantLock mSidEncoderLock;
548 private boolean mSidEncoding;
549 protected BinProtocolClientEncoder mSidEncoder;
550 protected BinProtocolClientDecoder mSidDecoder;
551 private HashMap<Integer, Thread> mSidPendingRequests;
552 private HashSet<Integer> mSidPendingOneWayRequests;
553 private ArrayDeque<Thread> mSidPendingGets;
554 private Thread mSidPendingGetResponse;
555 private boolean mSidDispatchAll;
556 private boolean mSidStopped;
557 private HashMap<Integer, SoftReference<SidObject>> mSidObjects;
558 private InputTransporting mSidInput;
559 private OutputTransporting mSidOutput;
560 }
561