001 /**
002 * Copyright (C) 2010, Skype Limited
003 *
004 * All intellectual property rights, including but not limited to copyrights,
005 * trademarks and patents, as well as know how and trade secrets contained in,
006 * relating to, or arising from the internet telephony software of
007 * Skype Limited (including its affiliates, "Skype"), including without
008 * limitation this source code, Skype API and related material of such
009 * software proprietary to Skype and/or its licensors ("IP Rights") are and
010 * shall remain the exclusive property of Skype and/or its licensors.
011 * The recipient hereby acknowledges and agrees that any unauthorized use of
012 * the IP Rights is a violation of intellectual property laws.
013 *
014 * Skype reserves all rights and may take legal action against infringers of
015 * IP Rights.
016 *
017 * The recipient agrees not to remove, obscure, make illegible or alter any
018 * notices or indications of the IP Rights and/or Skype's rights and
019 * ownership thereof.
020 */
021
022 package com.skype.ipc;
023
024 import java.io.EOFException;
025 import java.io.IOException;
026 import java.util.Collections;
027 import java.util.HashMap;
028 import java.util.Map;
029 import java.util.concurrent.BlockingQueue;
030 import java.util.concurrent.LinkedBlockingQueue;
031
032 import com.skype.ipc.RootObject.ErrorListener;
033 import com.skype.util.Log;
034
035 public class ResponseListener extends Thread
036 {
037 private static final String TAG = "ResponseListener";
038 private static final int XCALL_RESPONSE_TIMEOUT = 60000;
039 private Thread my_thread;
040 private Transport ioTransport;
041 private boolean die;
042 private final Map<Integer, Response> response_table;
043 private final BlockingQueue<GetPropertyResponse> get_prop_response_queue;
044 private EventDispatcher myEdp;
045
046 private boolean responseWaitInterrupted;
047 private ErrorListener errorListener;
048
049 private enum HandshakeStatus {
050 NOT_DONE, DONE, ERROR, REJECTED
051 };
052
053 private HandshakeStatus handshakeStatus;
054
055 public ResponseListener(EventDispatcher edp, RootObject root)
056 {
057 myEdp = edp;
058 ioTransport = root.getTransport();
059 errorListener = root.errorListener;
060 handshakeStatus = HandshakeStatus.NOT_DONE;
061 setName("response listener thread");
062 die = ioTransport == null;
063 response_table = Collections.synchronizedMap(new HashMap<Integer, Response>());
064 get_prop_response_queue = new LinkedBlockingQueue<GetPropertyResponse>();
065 responseWaitInterrupted = false;
066 }
067
068 public boolean isDead()
069 {
070 return die;
071 }
072
073 public void die()
074 {
075 die = true;
076 my_thread.interrupt(); // trigger interrupt in case of waiting
077 }
078
079 synchronized private byte readByte() throws IOException
080 {
081 int res = ioTransport.read();
082 byte resByte = (byte) (res & 0xFF);
083
084 return resByte;
085 }
086
087 public void run()
088 {
089 my_thread = Thread.currentThread();
090 Log.d(TAG, "Running! Threadid=" + my_thread.getId());
091
092 if (die) {
093 Log.e(TAG, "ResponseListener failing, connection closed.");
094 return;
095 }
096
097 try {
098 byte b1 = readByte();
099 byte b2 = readByte();
100 if (b1 == 'O' && b2 == 'K') {
101 Log.d(TAG, "Skypekit executable handshake successful");
102 handshakeStatus = HandshakeStatus.DONE;
103 }
104 else if (b1 == 'E' && b2 == 'R') {
105 // service unavailable
106 Log.e(TAG, "Skypekit executable handshake failed. Code '" + (char) b1 + (char) b2 + "'.");
107 handshakeStatus = HandshakeStatus.ERROR;
108 }
109 else if ((b1 == 'A' && b2 == 'N') ||
110 (b1 == 'O' && b2 == 'B') ||
111 (b1 == 'S' && b2 == 'N') ||
112 (b1 == 'O' && b2 == 'M') ||
113 (b1 == 'A' && b2 == 'B') ||
114 (b1 == 'S' && b2 == 'B') ||
115 (b1 == 'P' && b2 == 'B'))
116 {
117 // application token rejected for one reason or another
118 Log.e(TAG, "Skypekit executable handshake failed. Code '" + (char) b1 + (char) b2 + "'.");
119 handshakeStatus = HandshakeStatus.REJECTED;
120 }
121 else {
122 // unknown error in handshake
123 Log.e(TAG, "Unknown Skypekit executable handshake failure. Code '" + (char) b1 + (char) b2 + "'.");
124 handshakeStatus = HandshakeStatus.ERROR;
125 }
126
127 }
128 catch (IOException e) {
129 Log.e(TAG, "exception=", e);
130 e.printStackTrace();
131 handshakeStatus = HandshakeStatus.ERROR;
132 }
133 catch (Exception e) {
134 Log.e(TAG, "Unknown exception caught = ", e);
135 e.printStackTrace();
136 die = true;
137 handshakeStatus = HandshakeStatus.ERROR;
138 }
139
140 while ( ! die) {
141 try {
142 int prefix = readByte();
143
144 if (prefix == 'Z') {
145 int cmd = readByte();
146
147 switch (cmd) {
148 case 'r':
149 x_call_response();
150 break;
151 case 'C':
152 decode_property_change();
153 break;
154 case 'E':
155 decode_event();
156 break;
157 case 'g':
158 decode_get_response();
159 break;
160 default:
161 Log.e(TAG, "Unknown command " + cmd + " (" + (char) cmd + ")");
162 }
163 }
164 else {
165 Log.e(TAG, "Bad symbol in the stream: " + prefix + " (" + (char) prefix + ") - discarding");
166 }
167 }
168 catch (java.net.SocketException e) {
169 Log.d(TAG, "Socket exception after normal socket shutdown");
170 }
171 catch (EOFException e) {
172 Log.e(TAG, "EOF Exception, no socket connection:" + e.getMessage());
173 if (errorListener != null)
174 errorListener.OnSkypeKitConnectionClosed();
175 die = true;
176 }
177 catch (IOException e) {
178 Log.e(TAG, "Listener IO Exception:" + e.getMessage());
179 e.printStackTrace();
180 die = true;
181 }
182 catch (Exception e) {
183 Log.e(TAG, "Unknown exception caught in Listener:", e);
184 e.printStackTrace();
185 die = true;
186 }
187 }
188 Log.d(TAG, "Exiting!");
189 }
190
191 // Called by external thread. Blocks until handshake has completed.
192 public void waitHandshake() throws IOException, IllegalArgumentException {
193 int maxWaitRetries = 100;
194 while (true) {
195 if (handshakeStatus == HandshakeStatus.DONE) {
196 return;
197 } else if (handshakeStatus == HandshakeStatus.ERROR) {
198 throw new IOException("Handshake error");
199 } else if (handshakeStatus == HandshakeStatus.REJECTED) {
200 throw new IllegalArgumentException("Application token rejected");
201 } else {
202 try {
203 Thread.sleep(100);
204 } catch (InterruptedException e) {
205 throw new IOException("WaitHandshake interrupted");
206 }
207 maxWaitRetries--;
208 if (maxWaitRetries == 0) {
209 throw new IOException("WaitHandshake has timed out. Giving up.");
210 }
211 }
212 }
213 }
214
215 void x_call_response() throws IOException
216 {
217 int rid = AbstractDecoder.decodeOneVaruint(ioTransport);
218
219 // Tell SkypeKit that we got new response in case it's waiting for it
220 synchronized (response_table) {
221 response_table.put(rid, new Response(ioTransport, rid));
222
223 response_table.notifyAll();
224 }
225 ;
226 }
227
228 void decode_property_change() throws IOException
229 {
230 try {
231 myEdp.AddPropertyChange(new PropertyChange(ioTransport));
232 }
233 catch (InterruptedException e) {
234 throw new IOException("AddPropertyChange interrupted at EventDispatcher");
235 }
236 }
237
238 void decode_event() throws IOException
239 {
240 try {
241 myEdp.AddEvent(new Event(ioTransport));
242 }
243 catch (InterruptedException e) {
244 throw new IOException("AddEvent interrupted at EventDispatcher");
245 }
246 }
247
248 void decode_get_response() throws IOException
249 {
250 // SkypeKit guarantees that get responses always arrive in same order
251 // they were requested
252 try {
253 if (!responseWaitInterrupted) {
254 // place the response in the queue
255 get_prop_response_queue.put(new GetPropertyResponse(ioTransport));
256 }
257 else {
258 new GetPropertyResponse(ioTransport); // discard the response
259 responseWaitInterrupted = false;
260 }
261 }
262 catch (InterruptedException e) {
263 throw new IOException("Interrupted while get_prop_response_queue.put(..).");
264 }
265 }
266
267 // This is called by SkypeKit thread
268 // NOTE: That sending property request and calling get_prop_response() to get
269 // the response MUST be synchronized
270 public GetPropertyResponse get_prop_response()
271 {
272 try {
273 GetPropertyResponse temp = get_prop_response_queue.take();
274 return temp;
275 }
276 catch (InterruptedException e) {
277 responseWaitInterrupted = true;
278 Log.d(TAG, "get_prop_response_queue.take() was interrupted! Next property response will be discarded to keep queue in sync.");
279 return null;
280 }
281 }
282
283 // This is called by SkypeKit thread
284 public Response get_x_call_response(int rid) throws InterruptedException
285 {
286 Response myresponse;
287 while (!die) {
288 myresponse = response_table.remove(rid);
289 if (myresponse == null) {
290 try {
291 //Log.d(TAG, "Response not ready for rid=" + rid + ". Going to wait.");
292 synchronized (response_table) {
293 if (!response_table.containsKey(rid))
294 response_table.wait(XCALL_RESPONSE_TIMEOUT);
295 }
296 //Log.d(TAG, "Woken up for rid=" + rid);
297 }
298 catch (InterruptedException e) {
299 Log.e(TAG, "Waiting for rid=" + rid + " interrupted. e:" + e.toString());
300 throw e;
301 }
302 }
303 else {
304 // response has arrived
305 //Log.d(TAG, "Response has arrived for rid=" + rid + ". Processing...");
306 return (myresponse);
307 }
308 }
309 Log.d(TAG, "Exiting GetXCallResponse due to Response Listener being killed.");
310 throw new InterruptedException("Response listener killed.");
311 }
312 }