001    /**
002     * Copyright (C) 2010, Skype Limited
003     *
004     * All intellectual property rights, including but not limited to copyrights,
005     * trademarks and patents, as well as know how and trade secrets contained in,
006     * relating to, or arising from the internet telephony software of
007     * Skype Limited (including its affiliates, "Skype"), including without
008     * limitation this source code, Skype API and related material of such
009     * software proprietary to Skype and/or its licensors ("IP Rights") are and
010     * shall remain the exclusive property of Skype and/or its licensors.
011     * The recipient hereby acknowledges and agrees that any unauthorized use of
012     * the IP Rights is a violation of intellectual property laws.
013     *
014     * Skype reserves all rights and may take legal action against infringers of
015     * IP Rights.
016     *
017     * The recipient agrees not to remove, obscure, make illegible or alter any
018     * notices or indications of the IP Rights and/or Skype's rights and
019     * ownership thereof.
020     */
021    
022    package com.skype.ipc;
023    
024    import java.io.EOFException;
025    import java.io.IOException;
026    import java.util.Collections;
027    import java.util.HashMap;
028    import java.util.Map;
029    import java.util.concurrent.BlockingQueue;
030    import java.util.concurrent.LinkedBlockingQueue;
031    
032    import com.skype.ipc.RootObject.ErrorListener;
033    import com.skype.util.Log;
034    
035    public class ResponseListener extends Thread
036    {
037        private static final String                      TAG                    = "ResponseListener";
038        private static final int                         XCALL_RESPONSE_TIMEOUT = 60000;
039        private Thread                                   my_thread;
040        private Transport                                ioTransport;
041        private boolean                                  die;
042        private final Map<Integer, Response>             response_table;
043        private final BlockingQueue<GetPropertyResponse> get_prop_response_queue;
044        private EventDispatcher                          myEdp;
045    
046        private boolean                                  responseWaitInterrupted;
047        private ErrorListener                            errorListener;
048    
049        private enum HandshakeStatus {
050            NOT_DONE, DONE, ERROR, REJECTED
051        };
052    
053        private HandshakeStatus handshakeStatus;
054    
055        public ResponseListener(EventDispatcher edp, RootObject root)
056        {
057            myEdp = edp;
058            ioTransport = root.getTransport();
059            errorListener = root.errorListener;
060            handshakeStatus = HandshakeStatus.NOT_DONE;
061            setName("response listener thread");
062            die = ioTransport == null;
063            response_table = Collections.synchronizedMap(new HashMap<Integer, Response>());
064            get_prop_response_queue = new LinkedBlockingQueue<GetPropertyResponse>();
065            responseWaitInterrupted = false;
066        }
067    
068        public boolean isDead()
069        {
070            return die;
071        }
072        
073        public void die()
074        {
075            die = true;
076            my_thread.interrupt(); // trigger interrupt in case of waiting
077        }
078    
079        synchronized private byte readByte() throws IOException
080        {
081            int res = ioTransport.read();
082            byte resByte = (byte) (res & 0xFF);
083    
084            return resByte;
085        }
086    
087        public void run()
088        {
089            my_thread = Thread.currentThread();
090            Log.d(TAG, "Running! Threadid=" + my_thread.getId());
091    
092            if (die) {
093                    Log.e(TAG, "ResponseListener failing, connection closed.");
094                    return;
095            }
096            
097            try {
098                byte b1 = readByte();
099                byte b2 = readByte();
100                if (b1 == 'O' && b2 == 'K') {
101                    Log.d(TAG, "Skypekit executable handshake successful");
102                    handshakeStatus = HandshakeStatus.DONE;
103                }
104                else if (b1 == 'E' && b2 == 'R') {
105                    // service unavailable
106                    Log.e(TAG, "Skypekit executable handshake failed. Code '" + (char) b1 + (char) b2 + "'.");
107                    handshakeStatus = HandshakeStatus.ERROR;
108                }
109                else if ((b1 == 'A' && b2 == 'N') ||
110                        (b1 == 'O' && b2 == 'B') ||
111                        (b1 == 'S' && b2 == 'N') ||
112                        (b1 == 'O' && b2 == 'M') ||
113                        (b1 == 'A' && b2 == 'B') ||
114                        (b1 == 'S' && b2 == 'B') ||
115                        (b1 == 'P' && b2 == 'B'))
116                {
117                    // application token rejected for one reason or another
118                    Log.e(TAG, "Skypekit executable handshake failed. Code '" + (char) b1 + (char) b2 + "'.");
119                    handshakeStatus = HandshakeStatus.REJECTED;
120                }
121                else {
122                    // unknown error in handshake
123                    Log.e(TAG, "Unknown Skypekit executable handshake failure. Code '" + (char) b1 + (char) b2 + "'.");
124                    handshakeStatus = HandshakeStatus.ERROR;
125                }
126    
127            }
128            catch (IOException e) {
129                Log.e(TAG, "exception=", e);
130                e.printStackTrace();
131                handshakeStatus = HandshakeStatus.ERROR;
132            }
133            catch (Exception e) {
134                Log.e(TAG, "Unknown exception caught = ", e);
135                e.printStackTrace();
136                die = true;
137                handshakeStatus = HandshakeStatus.ERROR;
138            }
139    
140            while ( ! die) {
141                try {
142                    int prefix = readByte();
143                    
144                    if (prefix == 'Z') {
145                        int cmd = readByte();
146    
147                        switch (cmd) {
148                        case 'r':
149                            x_call_response();
150                            break;
151                        case 'C':
152                            decode_property_change();
153                            break;
154                        case 'E':
155                            decode_event();
156                            break;
157                        case 'g':
158                            decode_get_response();
159                            break;
160                        default:
161                            Log.e(TAG, "Unknown command " + cmd + " (" + (char) cmd + ")");
162                        }
163                    }
164                    else {
165                        Log.e(TAG, "Bad symbol in the stream: " + prefix + " (" + (char) prefix + ") - discarding");
166                    }
167                }
168                catch (java.net.SocketException e) {
169                    Log.d(TAG, "Socket exception after normal socket shutdown");
170                }
171                catch (EOFException e) {
172                    Log.e(TAG, "EOF Exception, no socket connection:" + e.getMessage());
173                    if (errorListener != null)
174                        errorListener.OnSkypeKitConnectionClosed();
175                    die = true;
176                }
177                catch (IOException e) {
178                    Log.e(TAG, "Listener IO Exception:" + e.getMessage());
179                    e.printStackTrace();
180                    die = true;
181                }
182                catch (Exception e) {
183                    Log.e(TAG, "Unknown exception caught in Listener:", e);
184                    e.printStackTrace();
185                    die = true;
186                }
187            }
188            Log.d(TAG, "Exiting!");
189        }
190    
191        // Called by external thread. Blocks until handshake has completed.
192        public void waitHandshake() throws IOException, IllegalArgumentException {
193            int maxWaitRetries = 100;
194            while (true) {
195                if (handshakeStatus == HandshakeStatus.DONE) {
196                    return;
197                } else if (handshakeStatus == HandshakeStatus.ERROR) {
198                    throw new IOException("Handshake error");
199                } else if (handshakeStatus == HandshakeStatus.REJECTED) {
200                    throw new IllegalArgumentException("Application token rejected");
201                } else {
202                    try {
203                        Thread.sleep(100);
204                    } catch (InterruptedException e) {
205                        throw new IOException("WaitHandshake interrupted");
206                    }
207                    maxWaitRetries--;
208                    if (maxWaitRetries == 0) {
209                        throw new IOException("WaitHandshake has timed out. Giving up.");
210                    }
211                }
212            }
213        }
214    
215        void x_call_response() throws IOException
216        {
217            int rid = AbstractDecoder.decodeOneVaruint(ioTransport);
218    
219            // Tell SkypeKit that we got new response in case it's waiting for it
220            synchronized (response_table) {
221                response_table.put(rid, new Response(ioTransport, rid));
222    
223                response_table.notifyAll();
224            }
225            ;
226        }
227    
228        void decode_property_change() throws IOException
229        {
230            try {
231                myEdp.AddPropertyChange(new PropertyChange(ioTransport));
232            }
233            catch (InterruptedException e) {
234                throw new IOException("AddPropertyChange interrupted at EventDispatcher");
235            }
236        }
237    
238        void decode_event() throws IOException
239        {
240            try {
241                myEdp.AddEvent(new Event(ioTransport));
242            }
243            catch (InterruptedException e) {
244                throw new IOException("AddEvent interrupted at EventDispatcher");
245            }
246        }
247    
248        void decode_get_response() throws IOException
249        {
250            // SkypeKit guarantees that get responses always arrive in same order
251            // they were requested
252            try {
253                if (!responseWaitInterrupted) {
254                    // place the response in the queue
255                    get_prop_response_queue.put(new GetPropertyResponse(ioTransport));
256                }
257                else {
258                    new GetPropertyResponse(ioTransport); // discard the response
259                    responseWaitInterrupted = false;
260                }
261            }
262            catch (InterruptedException e) {
263                throw new IOException("Interrupted while get_prop_response_queue.put(..).");
264            }
265        }
266    
267        // This is called by SkypeKit thread
268        // NOTE: That sending property request and calling get_prop_response() to get
269        // the response MUST be synchronized
270        public GetPropertyResponse get_prop_response()
271        {
272            try {
273                GetPropertyResponse temp = get_prop_response_queue.take();
274                return temp;
275            }
276            catch (InterruptedException e) {
277                responseWaitInterrupted = true;
278                Log.d(TAG, "get_prop_response_queue.take() was interrupted! Next property response will be discarded to keep queue in sync.");
279                return null;
280            }
281        }
282    
283        // This is called by SkypeKit thread
284        public Response get_x_call_response(int rid) throws InterruptedException
285        {
286            Response myresponse;
287            while (!die) {
288                myresponse = response_table.remove(rid);
289                if (myresponse == null) {
290                    try {
291                        //Log.d(TAG, "Response not ready for rid=" + rid + ". Going to wait.");
292                        synchronized (response_table) {
293                            if (!response_table.containsKey(rid))
294                                response_table.wait(XCALL_RESPONSE_TIMEOUT);
295                        }
296                        //Log.d(TAG, "Woken up for rid=" + rid);
297                    }
298                    catch (InterruptedException e) {
299                        Log.e(TAG, "Waiting for rid=" + rid + " interrupted. e:" + e.toString());
300                        throw e;
301                    }
302                }
303                else {
304                    // response has arrived
305                    //Log.d(TAG, "Response has arrived for rid=" + rid + ". Processing...");
306                    return (myresponse);
307                }
308            }
309            Log.d(TAG, "Exiting GetXCallResponse due to Response Listener being killed.");
310            throw new InterruptedException("Response listener killed.");
311        }
312    }