001    /**
002     * Copyright (C) 2010, Skype Limited
003     *
004     * All intellectual property rights, including but not limited to copyrights,
005     * trademarks and patents, as well as know how and trade secrets contained in,
006     * relating to, or arising from the internet telephony software of
007     * Skype Limited (including its affiliates, "Skype"), including without
008     * limitation this source code, Skype API and related material of such
009     * software proprietary to Skype and/or its licensors ("IP Rights") are and
010     * shall remain the exclusive property of Skype and/or its licensors.
011     * The recipient hereby acknowledges and agrees that any unauthorized use of
012     * the IP Rights is a violation of intellectual property laws.
013     *
014     * Skype reserves all rights and may take legal action against infringers of
015     * IP Rights.
016     *
017     * The recipient agrees not to remove, obscure, make illegible or alter any
018     * notices or indications of the IP Rights and/or Skype's rights and
019     * ownership thereof.
020     */
021    
022    package com.skype.ipc;
023    
024    import java.io.EOFException;
025    import java.io.IOException;
026    import java.util.Collections;
027    import java.util.HashMap;
028    import java.util.Map;
029    import java.util.concurrent.BlockingQueue;
030    import java.util.concurrent.LinkedBlockingQueue;
031    
032    import com.skype.ipc.RootObject.ErrorListener;
033    import com.skype.util.Log;
034    
035    public class ResponseListener extends Thread
036    {
037        private static final String                      TAG                    = "ResponseListener";
038        private static final int                         XCALL_RESPONSE_TIMEOUT = 60000;
039        private Thread                                   my_thread;
040        private Transport                                ioTransport;
041        private boolean                                  die;
042        private final Map<Integer, Response>             response_table;
043        private final BlockingQueue<GetPropertyResponse> get_prop_response_queue;
044        private EventDispatcher                          myEdp;
045    
046        private boolean                                  responseWaitInterrupted;
047        private ErrorListener                            errorListener;
048        
049        private enum HandshakeStatus {
050            NOT_DONE, DONE, ERROR, REJECTED
051        };
052    
053        private HandshakeStatus handshakeStatus;
054    
055        public ResponseListener(EventDispatcher edp, RootObject root)
056        {
057            myEdp = edp;
058            ioTransport = root.getTransport();
059            errorListener = root.errorListener;
060            handshakeStatus = HandshakeStatus.NOT_DONE;
061            setName("response listener thread");
062            die = false;
063            response_table = Collections.synchronizedMap(new HashMap<Integer, Response>());
064            get_prop_response_queue = new LinkedBlockingQueue<GetPropertyResponse>();
065            responseWaitInterrupted = false;
066        }
067    
068        public boolean isDead()
069        {
070            return die;
071        }
072        
073        public void die()
074        {
075            die = true;
076            my_thread.interrupt(); // trigger interrupt in case of waiting
077        }
078    
079        synchronized private byte readByte() throws IOException
080        {
081            int res = ioTransport.read();
082            byte resByte = (byte) (res & 0xFF);
083            Log.d(TAG, "ResponseListener::readByte: " + Integer.toHexString(res) + " ('" + (char) res + "') " + resByte);
084    
085            return resByte;
086        }
087    
088        public void run()
089        {
090            my_thread = Thread.currentThread();
091            Log.d(TAG, "Running! Threadid=" + my_thread.getId());
092    
093    
094            try {
095                byte b1 = readByte();
096                byte b2 = readByte();
097                if (b1 == 'O' && b2 == 'K') {
098                    Log.d(TAG, "Skypekit executable handshake successful");
099                    handshakeStatus = HandshakeStatus.DONE;
100                }
101                else if (b1 == 'E' && b2 == 'R') {
102                    // service unavailable
103                    Log.e(TAG, "Skypekit executable handshake failed. Code '" + (char) b1 + (char) b2 + "'.");
104                    handshakeStatus = HandshakeStatus.ERROR;
105                }
106                else if ((b1 == 'A' && b2 == 'N') ||
107                        (b1 == 'O' && b2 == 'B') ||
108                        (b1 == 'S' && b2 == 'N') ||
109                        (b1 == 'O' && b2 == 'M') ||
110                        (b1 == 'A' && b2 == 'B') ||
111                        (b1 == 'S' && b2 == 'B') ||
112                        (b1 == 'P' && b2 == 'B'))
113                {
114                    // application token rejected for one reason or another
115                    Log.e(TAG, "Skypekit executable handshake failed. Code '" + (char) b1 + (char) b2 + "'.");
116                    handshakeStatus = HandshakeStatus.REJECTED;
117                }
118                else {
119                    // unknown error in handshake
120                    Log.e(TAG, "Unknown Skypekit executable handshake failure. Code '" + (char) b1 + (char) b2 + "'.");
121                    handshakeStatus = HandshakeStatus.ERROR;
122                }
123    
124            }
125            catch (IOException e) {
126                Log.e(TAG, "exception=", e);
127                e.printStackTrace();
128                handshakeStatus = HandshakeStatus.ERROR;
129            }
130            catch (Exception e) {
131                Log.e(TAG, "Unknown exception caught = ", e);
132                e.printStackTrace();
133                die = true;
134                handshakeStatus = HandshakeStatus.ERROR;
135            }
136    
137            while ( ! die) {
138                try {
139                    int prefix = readByte();
140                    Log.d(TAG, "Prefix read:" + prefix);
141                    
142                    if (prefix == 'Z') {
143                        int cmd = readByte();
144                        Log.d(TAG, "Processing command " + cmd + " (" + (char) cmd + ")");
145    
146                        switch (cmd) {
147                        case 'r':
148                            x_call_response();
149                            break;
150                        case 'C':
151                            decode_property_change();
152                            break;
153                        case 'E':
154                            decode_event();
155                            break;
156                        case 'g':
157                            decode_get_response();
158                            break;
159                        default:
160                            Log.e(TAG, "Unknown command " + cmd + " (" + (char) cmd + ")");
161                        }
162                    }
163                    else {
164                        Log.e(TAG, "Bad symbol in the stream: " + prefix + " (" + (char) prefix + ") - discarding");
165                    }
166                }
167                catch (java.net.SocketException e) {
168                    Log.d(TAG, "Socket exception after normal socket shutdown");
169                }
170                catch (EOFException e) {
171                    Log.e(TAG, "EOF Exception, no socket connection:" + e.getMessage());
172                    if (errorListener != null)
173                        errorListener.OnSkypeKitConnectionClosed();
174                    die = true;
175                }
176                catch (IOException e) {
177                    Log.e(TAG, "Listener IO Exception:" + e.getMessage());
178                    e.printStackTrace();
179                    die = true;
180                }
181                catch (Exception e) {
182                    Log.e(TAG, "Unknown exception caught in Listener:", e);
183                    e.printStackTrace();
184                    die = true;
185                }
186            }
187            Log.d(TAG, "Exiting!");
188        }
189    
190        // Called by external thread. Blocks until handshake has completed.
191        public void waitHandshake() throws IOException, IllegalArgumentException {
192            int maxWaitRetries = 100;
193            while (true) {
194                if (handshakeStatus == HandshakeStatus.DONE) {
195                    return;
196                } else if (handshakeStatus == HandshakeStatus.ERROR) {
197                    throw new IOException("Handshake error");
198                } else if (handshakeStatus == HandshakeStatus.REJECTED) {
199                    throw new IllegalArgumentException("Application token rejected");
200                } else {
201                    try {
202                        Thread.sleep(100);
203                    } catch (InterruptedException e) {
204                        throw new IOException("WaitHandshake interrupted");
205                    }
206                    maxWaitRetries--;
207                    if (maxWaitRetries == 0) {
208                        throw new IOException("WaitHandshake has timed out. Giving up.");
209                    }
210                }
211            }
212        }
213        void x_call_response() throws IOException
214        {
215            int rid = AbstractDecoder.decodeOneVaruint(ioTransport);
216            Log.d(TAG, "x_call_response rid: " + rid);
217    
218            // Tell SkypeKit that we got new response in case it's waiting for it
219            synchronized (response_table) {
220                response_table.put(rid, new Response(ioTransport, rid));
221    
222                response_table.notifyAll();
223                Log.d(TAG, "Notified of incoming response rid=" + rid);
224            }
225            ;
226        }
227    
228        void decode_property_change() throws IOException
229        {
230            Log.d(TAG, "->Property change - decode_property_change starting...");
231            try {
232                myEdp.AddPropertyChange(new PropertyChange(ioTransport));
233            }
234            catch (InterruptedException e) {
235                throw new IOException("AddPropertyChange interrupted at EventDispatcher");
236            }
237            Log.d(TAG, "->decode_property_change done.");
238        }
239    
240        void decode_event() throws IOException
241        {
242            Log.d(TAG, "->Event - decode_event starting...");
243            try {
244                myEdp.AddEvent(new Event(ioTransport));
245            }
246            catch (InterruptedException e) {
247                throw new IOException("AddEvent interrupted at EventDispatcher");
248            }
249            Log.d(TAG, "->decode_event done.");
250        }
251    
252        void decode_get_response() throws IOException
253        {
254            // SkypeKit guarantees that get responses always arrive in same order
255            // they were requested
256            Log.d(TAG, "->Event - decode_get_response starting...");
257            try {
258                if (!responseWaitInterrupted) {
259                    // place the response in the queue
260                    get_prop_response_queue.put(new GetPropertyResponse(ioTransport));
261                }
262                else {
263                    new GetPropertyResponse(ioTransport); // discard the response
264                    responseWaitInterrupted = false;
265                }
266            }
267            catch (InterruptedException e) {
268                throw new IOException("Interrupted while get_prop_response_queue.put(..).");
269            }
270            Log.d(TAG, "->Decoding done, placed GetResponse to queue");
271        }
272    
273        // This is called by SkypeKit thread
274        // NOTE: That sending property request and calling get_prop_response() to get
275        // the response MUST be synchronized
276        public GetPropertyResponse get_prop_response()
277        {
278            try {
279                Log.d(TAG, "Going to take GetResponse from queue");
280                GetPropertyResponse temp = get_prop_response_queue.take();
281                if (temp.isMultiresponse()) {
282                    Log.d(TAG, "Took GetResponse from queue. It is a multiresponse.");
283                }
284                else {
285                    Log.d(TAG, "Took GetResponse from queue. Propid=" + temp.getPropId());
286                }
287                return temp;
288            }
289            catch (InterruptedException e) {
290                responseWaitInterrupted = true;
291                Log.d(TAG, "get_prop_response_queue.take() was interrupted! Next property response will be discarded to keep queue in sync.");
292                return null;
293            }
294        }
295    
296        // This is called by SkypeKit thread
297        public Response get_x_call_response(int rid) throws InterruptedException
298        {
299            Response myresponse;
300            while (!die) {
301                myresponse = response_table.remove(rid);
302                if (myresponse == null) {
303                    try {
304                        Log.d(TAG, "Response not ready for rid=" + rid + ". Going to wait.");
305                        synchronized (response_table) {
306                            if (!response_table.containsKey(rid))
307                                response_table.wait(XCALL_RESPONSE_TIMEOUT);
308                        }
309                        Log.d(TAG, "Woken up for rid=" + rid);
310                    }
311                    catch (InterruptedException e) {
312                        Log.e(TAG, "Waiting for rid=" + rid + " interrupted. e:" + e.toString());
313                        throw e;
314                    }
315                }
316                else {
317                    // response has arrived
318                    Log.d(TAG, "Response has arrived for rid=" + rid + ". Processing...");
319                    return (myresponse);
320                }
321            }
322            Log.d(TAG, "Exiting GetXCallResponse due to Response Listener being killed.");
323            throw new InterruptedException("Response listener killed.");
324        }
325    }