001    package com.skype.ipc;
002    
003    import java.io.IOException;
004    import java.io.PrintWriter;
005    import java.io.StringWriter;
006    import java.lang.ref.SoftReference;
007    import java.util.ArrayDeque;
008    import java.util.HashMap;
009    import java.util.HashSet;
010    import java.util.Map.Entry;
011    import java.util.concurrent.locks.ReentrantLock;
012    
013    /***
014     * Base class for a SID interface. Most of function are internal even when public. The functions to be used in the application are documented ones.
015     */
016    public abstract class SidRoot implements ClientEncodingListener, ClientDecodingListener, ObjectFactoring {
017    
018        protected SidRoot() {
019            mSidTimestamp = 0;
020        }
021    
022        protected class EventThread extends Thread {
023            public EventThread(SidRoot root) {
024                super();
025                mRoot = root;
026            }
027    
028            public void run() {
029                mRoot.sidPollEvent();
030            }
031    
032            private SidRoot mRoot;
033        }
034    
035        protected void sidPollEvent() {
036            try {
037                while (!mSidStopped) {
038                    Decoding decoder = mSidDecoder.decodeEvent(this);
039                    switch (decoder.getCommand()) {
040                    case 'C': 
041                        sidOnChangedProperty(decoder);
042                        break;
043                    case 'E':
044                        int mid  = decoder.decodeUint();
045                        int evid = decoder.decodeUint();
046                        sidDispatchEvent(mid, evid, decoder);
047                        break;
048                    case 'g':
049                        sidAddPendingGetResponse();
050                        break;
051                    case 'r':
052                        sidAddPendingResponse(decoder.decodeUint());
053                        break;
054                    }
055                }
056            } catch (IOException e) {
057                sidOnFatalError(e);
058            }
059        }
060    
061        public abstract void sidDispatchEvent(int modId, int evId, Decoding decoder);
062    
063        public boolean sidWantRead() {
064            synchronized (this) {
065                if (mSidReader == mSidEventThread || mSidReader == null) {
066                     mSidReader = mSidEventThread;
067                     return true;
068                } else {
069                    mSidReadyEventThread = mSidEventThread;
070                }
071            }
072            synchronized (mSidEventThread) {
073                try {
074                    while (mSidCurrentReader != mSidEventThread)
075                        mSidEventThread.wait();
076                } catch (InterruptedException ie) {
077                    return false;
078                }
079                mSidCurrentReader = null;
080            }
081            return true;
082        }
083    
084        /***
085         * initialize the connection to the runtime, and the interface cache accordingly to the configuration
086         * @param configuration defining the connection parameters to the runtime
087         * @param listener callbacks to check the status of the connection
088         * return success or failure to connect
089         */
090        public boolean init(ClientConfiguration configuration, ConnectionListener listener) {
091            registerConnectionListener(listener);
092    
093            TransportFactory factory = configuration.getTransportFactory();
094            TransportFactory.Result transport = factory.init(configuration, listener);
095            if (transport != null) {
096                mSidInput           = transport.in;
097                mSidOutput          = transport.out;
098                mSidDispatchAll     = configuration.isDispatchAll();
099                mSidDecoder         = new BinProtocolClientDecoder(transport.in, this, this);
100                mSidEncoder         = new BinProtocolClientEncoder(transport.out, this);
101                flushCache(null);
102                mSidEncoderLock     = new ReentrantLock();
103                mSidEncoding        = false;
104                mSidPendingRequests = new HashMap<Integer, Thread>(2);
105                mSidPendingOneWayRequests = new HashSet<Integer>(2);
106                mSidPendingGets     = new ArrayDeque<Thread>(2);
107                mSidEventThread     = new EventThread(this);
108                mSidReader          = mSidEventThread;
109                mSidCurrentReader   = null;
110                mSidEventThread.start();
111            }
112            return mSidObjects != null;
113        }
114    
115        protected SidObject sidDecodeEventTarget(int mid, Decoding decoder)
116        {
117            SidObject target = null;
118            try {
119                int b = decoder.decodeTag(); // shall be 'O'
120                int zero = decoder.decodeUint();
121                int oid = decoder.decodeUint();
122                target = mSidDispatchAll ? sidGetObject(mid, oid) : sidGetObjectIfPresent(mid, oid);
123                if (target == null) {
124                    decoder.skipEndOfMessage();
125                }
126            }
127            catch (IOException e) {
128                sidOnFatalError(e);
129            }
130            return target;
131        }
132    
133        /***
134         * start the session after init, once all the runtime setups have been done
135         */
136        public abstract boolean start();
137    
138        /***
139         * stop and terminate the connection to a runtime
140         */
141        public synchronized void stop() {
142            mSidStopped = true;
143            if (mSidEncoding) {
144                mSidEncoding = false;
145                mSidEncoderLock.unlock();
146            }
147            try {
148                mSidInput.close();
149                mSidOutput.close();
150            } catch (IOException e) {
151            }
152            if (mSidCurrentReader != null)
153                mSidCurrentReader.interrupt(); 
154            if (mSidReader != null && mSidReader != mSidCurrentReader)
155                mSidReader.interrupt(); 
156            for (Thread thread : mSidPendingRequests.values()) {
157                thread.interrupt();
158            }
159            for (Thread thread : mSidPendingGets) {
160                thread.interrupt();
161            }
162        }
163    
164        public void sidOnFatalError(IOException e) {
165            if (mSidStopped) return;
166            stop();
167            if (mSidConnectionListener != null) {
168                StringWriter sw = new StringWriter();
169                e.printStackTrace(new PrintWriter(sw));
170                mSidConnectionListener.sidOnDisconnected(sw.toString());
171            } else {
172                e.printStackTrace();
173                System.exit(-1);
174            }
175        }
176    
177        //
178        // Object Cache Management
179        //
180    
181        public SidObject sidGetObjectIfPresent(int mid, int oid) {
182            Integer key = oid; //new Integer(oid);
183            SidObject o = null;
184            synchronized (mSidObjects) {
185                SoftReference<SidObject> r = mSidObjects.get(key); 
186                if (r != null) {
187                    o = r.get();
188                    if (o == null) {
189                        o = sidCreateObject(mid, oid);
190                        mSidObjects.put(key, new SoftReference<SidObject>(o));
191                    }
192                }
193            }
194            return o;
195        }
196    
197        public SidObject sidGetObject(int mid, int oid) {
198            Integer key = oid; //new Integer(oid);
199            SidObject o = null;
200            synchronized (mSidObjects) {
201                SoftReference<SidObject> r = mSidObjects.get(key); 
202                if (r != null) {
203                    o = r.get();
204                }
205                if (o == null) { 
206                    o = sidCreateObject(mid, oid);
207                    mSidObjects.put(key, new SoftReference<SidObject>(o));
208                }
209            }
210            return o;
211        }
212        protected abstract SidObject sidCreateObject(int mid, int oid);
213    
214        protected void sidRemove(Integer oid) {
215            synchronized (mSidObjects) {
216                mSidObjects.remove(oid);
217            }
218        }
219    
220        protected Encoding sidBeginMultiGet(PropertyEnumConverting[] properties, int modid, int oid) throws IOException {
221            mSidEncoderLock.lock();
222            mSidEncoding = true;
223            return mSidEncoder.beginMultiGet(properties, modid, oid, this);
224        }
225        
226        protected Decoding sidDoGetRequest(byte[] request, int oid) throws IOException { // case of multi request?
227            mSidEncoderLock.lock();
228            mSidEncoding = true;
229            return mSidEncoder.doGetRequest(request, oid, this);
230        }
231    
232        public Decoding sidOnGetRequestEncoded() throws IOException {
233            Thread current = null;
234            synchronized (this) {
235                if (mSidPendingGetResponse != null) { // response is already here...
236                    mSidPendingGetResponse = null;
237                } else {
238                    current = Thread.currentThread();
239                    mSidPendingGets.add(current);
240                    if (mSidReader == null) mSidReader = current;
241                }
242            }
243            mSidEncoding = false;
244            mSidEncoderLock.unlock();
245            if (current != null) {
246                if (mSidReader != current) {
247                    synchronized (current) {
248                        try {
249                            while (mSidCurrentReader != current)
250                                current.wait();
251                        } catch (InterruptedException ie) {
252                            mSidReader = null;
253                        }
254                        mSidCurrentReader = null;
255                    }
256                }
257                if (mSidReader == current) {
258                    // current thread became the new reader and shall wait for a response
259                    sidWaitResponse();
260                }
261            }
262            // response is here and needs being decoded
263            return mSidDecoder;
264        }
265    
266        public void sidOnGetResponseDecoded() {
267             sidOnResponseDecoded();
268        }
269        
270        protected Encoding sidDoRequest(byte[] request) throws IOException {
271            mSidEncoderLock.lock();
272            mSidEncoding = true;
273            return mSidEncoder.beginRequest(request, this);
274        }
275    
276        protected Encoding sidDoRequest(byte[] request, int oid) throws IOException {
277            mSidEncoderLock.lock();
278            mSidEncoding = true;
279            return mSidEncoder.beginRequest(request, oid, this);
280        }
281        
282        public Decoding sidOnRequestEncoded(int requestId) throws IOException {
283            mSidEncoding = false;
284            mSidEncoderLock.unlock();
285            Thread current = Thread.currentThread();
286            Integer rid    = requestId;
287            synchronized (this) {
288                Thread responder = mSidPendingRequests.get(rid);
289                if (responder != null) { // response already here
290                    mSidPendingRequests.remove(rid);
291                    return mSidDecoder;
292                }
293                mSidPendingRequests.put(rid, current);
294                if (mSidReader == null) mSidReader = current;
295            }
296            if (mSidReader != current) {
297                synchronized (current) {
298                    try {
299                        while (mSidCurrentReader != current)
300                            current.wait();
301                    } catch (InterruptedException ie) {
302                        mSidReader = null;
303                    }
304                    mSidCurrentReader = null;
305                    // 2 cases => either there was no reader and we were promoted reader
306                    //            then mSidReader == current, and we must wait responses
307                    //         => or the reader got our response, we must decode it
308                }
309            }
310            if (mSidReader == current) { // we were wakenup to read
311                sidWaitResponse();
312            }
313            return mSidDecoder;
314        }
315    
316        public Decoding sidOnOneWayRequestEncoded(int requestId) {
317            mSidEncoding = false;
318            mSidEncoderLock.unlock();
319            Thread responder = null;
320            Integer rid      = requestId;
321            Thread current   = Thread.currentThread();
322            
323            synchronized (this) {
324                responder = mSidPendingRequests.get(rid);
325                if (responder != null) { // response is already here and was partly swallowed
326                    mSidPendingRequests.remove(rid);
327                } else { 
328                    mSidPendingRequests.put(rid, current);
329                    mSidPendingOneWayRequests.add(rid);
330                }
331            }
332            if (responder != null) {
333                try {
334                    mSidDecoder.skipMessage();
335                } catch (IOException e) {
336                    sidOnFatalError(e);
337                }
338                synchronized (responder) {
339                    mSidCurrentReader = responder;
340                    responder.notify();                
341                }
342            }
343            return null;
344        }
345    
346        public void sidOnResponseDecoded() {
347            Thread current = Thread.currentThread();
348            if (mSidReader == current) {
349                synchronized (this) {
350                    if (!mSidPendingGets.isEmpty()) {
351                        mSidReader = mSidPendingGets.peek(); 
352                    } else {
353                        mSidReader = null;
354                        if (!mSidPendingRequests.isEmpty()) {
355                            // can be oneway...
356                            for (Entry<Integer, Thread> entry : mSidPendingRequests.entrySet()) {
357                                Integer rid = entry.getKey();
358                                if (!mSidPendingOneWayRequests.contains(rid)) {
359                                    mSidReader = entry.getValue();
360                                    break;
361                                }
362                            }
363                        }
364                        if (mSidReader == null && mSidReadyEventThread != null) {
365                            mSidReader = mSidReadyEventThread; 
366                        }
367                    }
368                    current = mSidReader; 
369                }
370            } else {
371                current = mSidReader;
372            }
373            if (current != null) {
374                synchronized (current) {
375                    mSidCurrentReader = current;
376                    current.notify();
377                }
378            }
379        }
380    
381        void sidOnChangedProperty(Decoding decoder) {
382            // currently only 1 notified at once and only int-alike properties are valued, else design shall be changed to ensure that all properties are decoded
383            // before calling the callbacks
384            int expected = 1;
385            int kind;
386            int oid       = 0;
387            int moduleId  = 0;
388            int value     = 0;
389            String svalue = null;
390            int propertyId= -1;
391            try {
392                while (decoder.hasNextProperty(false)) {
393                    assert(expected-- > 0);
394                    PropertyInfo info = decoder.getNextProperty();
395                    kind       = info.kind; 
396                    moduleId   = info.moduleId;
397                    oid        = info.objectId;
398                    propertyId = info.propertyId;
399                    switch (kind) {
400                    case 'F': case 'N': break;
401                    case 'T': value = 1; break;
402                    case 'O': case 'e': case 'u': value = decoder.decodeUint(); break;
403                    case 'i': value = decoder.decodeInt(); break;
404                    case 'S': case 'f': case 'X': svalue =  decoder.decodeString(); break;
405                    default:
406                        decoder.skipValue(kind);
407                        break;
408                    }
409                }
410            } catch (IOException e) {
411                sidOnFatalError(e);
412            }
413            if (oid>0) {
414                SidObject o = mSidDispatchAll ? sidGetObject(moduleId, oid) : sidGetObjectIfPresent(moduleId, oid);
415                if (o != null) {
416                    o.sidOnChangedProperty(propertyId, value, svalue);
417                }
418            }
419        }
420    
421        boolean sidAddPendingGetResponse()  throws IOException {
422            Thread current   = mSidReader;
423            Thread requester = null;
424            synchronized (this) {
425                if (mSidPendingGets.isEmpty()) {
426                    mSidPendingGetResponse = current;
427                } else {
428                    requester =  mSidPendingGets.poll();
429                }
430            }
431            boolean got_my_response = requester == current;
432            if (!got_my_response) {
433                if (requester != null) {
434                    synchronized (requester) {
435                        mSidCurrentReader = requester;
436                        requester.notify();
437                    }
438                }
439                synchronized (current) {
440                    try {
441                        while (mSidCurrentReader != current)
442                            current.wait();
443                    } catch (InterruptedException ie) {
444                        throw new IOException("Connection was closed");
445                    }
446                    mSidCurrentReader = null;
447                }
448            }
449            return got_my_response;
450        }
451    
452        boolean sidAddPendingResponse(int requestId)  throws IOException {
453            Thread current   = mSidReader;
454            Thread requester = null;
455            Integer rid      = requestId;
456            boolean oneway   = false;
457            synchronized (this) {
458                requester = mSidPendingRequests.get(rid);
459                if (requester == null) {
460                    mSidPendingRequests.put(rid, current);
461                } else {
462                    mSidPendingRequests.remove(rid);
463                    oneway = mSidPendingOneWayRequests.remove(rid);
464                }
465            }
466            if (oneway) {
467                try {
468                    mSidDecoder.skipMessage();
469                } catch (IOException e) {
470                    sidOnFatalError(e);
471                }
472                return false;
473            }
474            boolean got_my_response = requester == current;
475            if (!got_my_response) {
476                if (requester != null) {
477                    synchronized (requester) {
478                        mSidCurrentReader = requester;
479                        requester.notify();
480                    }
481                } 
482                synchronized (current) {
483                    try {
484                        while (mSidCurrentReader != current)
485                            current.wait();
486                    } catch (InterruptedException ie) {
487                        throw new IOException("Connection was closed");
488                    }
489                    mSidCurrentReader = null;
490                }
491            }
492            return got_my_response;
493        }
494        
495        void sidWaitResponse() throws IOException {
496            Decoding decoder = mSidDecoder;
497            while (true) {
498                int r;
499                try {
500                    r = decoder.decodeResponse(this);
501                } catch (IOException e) {
502                     return;
503                }
504                switch (r) {
505                case 'r':
506                    int rid;
507                    try {
508                        rid = decoder.decodeUint();
509                    } catch (IOException e) {
510                        return;
511                    }
512                    if (sidAddPendingResponse(rid)) return;
513                    break;
514                case 'g':
515                    if (sidAddPendingGetResponse()) return;
516                    break;
517                }
518            }
519        }
520    
521        public void registerConnectionListener(ConnectionListener listener) {
522            mSidConnectionListener = listener;
523        }
524        public void unRegisterConnectionListener(ConnectionListener listener) {
525            mSidConnectionListener = null;
526        }
527        public ConnectionListener getConnectionListener() {
528            return mSidConnectionListener;
529        }
530    
531        public SidObject flushCache(SidObject keep) {
532            mSidTimestamp++;
533            mSidObjects = new HashMap<Integer, SoftReference<SidObject>>(512);
534            if (keep != null) {
535                mSidObjects.put(keep.getOid(), new SoftReference<SidObject>(keep));
536                keep.mSidTimestamp = mSidTimestamp;
537            }
538            return keep;
539        }    
540    
541        protected int                                      mSidTimestamp;
542        private ConnectionListener                         mSidConnectionListener;
543        private Thread                                     mSidReader;
544        private Thread                                     mSidCurrentReader;
545        private Thread                                     mSidEventThread;
546        private Thread                                     mSidReadyEventThread;
547        private ReentrantLock                              mSidEncoderLock;
548        private boolean                                    mSidEncoding;
549        protected BinProtocolClientEncoder                 mSidEncoder;
550        protected BinProtocolClientDecoder                 mSidDecoder;
551        private HashMap<Integer, Thread>                   mSidPendingRequests;
552        private HashSet<Integer>                           mSidPendingOneWayRequests;
553        private ArrayDeque<Thread>                         mSidPendingGets;
554        private Thread                                     mSidPendingGetResponse;
555        private boolean                                    mSidDispatchAll;
556        private boolean                                    mSidStopped;
557        private HashMap<Integer, SoftReference<SidObject>> mSidObjects;
558        private InputTransporting                          mSidInput;
559        private OutputTransporting                         mSidOutput;
560    }
561