001 /**
002 * Copyright (C) 2010, Skype Limited
003 *
004 * All intellectual property rights, including but not limited to copyrights,
005 * trademarks and patents, as well as know how and trade secrets contained in,
006 * relating to, or arising from the internet telephony software of
007 * Skype Limited (including its affiliates, "Skype"), including without
008 * limitation this source code, Skype API and related material of such
009 * software proprietary to Skype and/or its licensors ("IP Rights") are and
010 * shall remain the exclusive property of Skype and/or its licensors.
011 * The recipient hereby acknowledges and agrees that any unauthorized use of
012 * the IP Rights is a violation of intellectual property laws.
013 *
014 * Skype reserves all rights and may take legal action against infringers of
015 * IP Rights.
016 *
017 * The recipient agrees not to remove, obscure, make illegible or alter any
018 * notices or indications of the IP Rights and/or Skype's rights and
019 * ownership thereof.
020 */
021
022 package com.skype.ipc;
023
024 import java.io.EOFException;
025 import java.io.IOException;
026 import java.util.Collections;
027 import java.util.HashMap;
028 import java.util.Map;
029 import java.util.concurrent.BlockingQueue;
030 import java.util.concurrent.LinkedBlockingQueue;
031
032 import com.skype.ipc.RootObject.ErrorListener;
033 import com.skype.util.Log;
034
035 public class ResponseListener extends Thread
036 {
037 private static final String TAG = "ResponseListener";
038 private static final int XCALL_RESPONSE_TIMEOUT = 60000;
039 private Thread my_thread;
040 private Transport ioTransport;
041 private boolean die;
042 private final Map<Integer, Response> response_table;
043 private final BlockingQueue<GetPropertyResponse> get_prop_response_queue;
044 private EventDispatcher myEdp;
045
046 private boolean responseWaitInterrupted;
047 private ErrorListener errorListener;
048
049 private enum HandshakeStatus {
050 NOT_DONE, DONE, ERROR, REJECTED
051 };
052
053 private HandshakeStatus handshakeStatus;
054
055 public ResponseListener(EventDispatcher edp, RootObject root)
056 {
057 myEdp = edp;
058 ioTransport = root.getTransport();
059 errorListener = root.errorListener;
060 handshakeStatus = HandshakeStatus.NOT_DONE;
061 setName("response listener thread");
062 die = false;
063 response_table = Collections.synchronizedMap(new HashMap<Integer, Response>());
064 get_prop_response_queue = new LinkedBlockingQueue<GetPropertyResponse>();
065 responseWaitInterrupted = false;
066 }
067
068 public boolean isDead()
069 {
070 return die;
071 }
072
073 public void die()
074 {
075 die = true;
076 my_thread.interrupt(); // trigger interrupt in case of waiting
077 }
078
079 synchronized private byte readByte() throws IOException
080 {
081 int res = ioTransport.read();
082 byte resByte = (byte) (res & 0xFF);
083 Log.d(TAG, "ResponseListener::readByte: " + Integer.toHexString(res) + " ('" + (char) res + "') " + resByte);
084
085 return resByte;
086 }
087
088 public void run()
089 {
090 my_thread = Thread.currentThread();
091 Log.d(TAG, "Running! Threadid=" + my_thread.getId());
092
093
094 try {
095 byte b1 = readByte();
096 byte b2 = readByte();
097 if (b1 == 'O' && b2 == 'K') {
098 Log.d(TAG, "Skypekit executable handshake successful");
099 handshakeStatus = HandshakeStatus.DONE;
100 }
101 else if (b1 == 'E' && b2 == 'R') {
102 // service unavailable
103 Log.e(TAG, "Skypekit executable handshake failed. Code '" + (char) b1 + (char) b2 + "'.");
104 handshakeStatus = HandshakeStatus.ERROR;
105 }
106 else if ((b1 == 'A' && b2 == 'N') ||
107 (b1 == 'O' && b2 == 'B') ||
108 (b1 == 'S' && b2 == 'N') ||
109 (b1 == 'O' && b2 == 'M') ||
110 (b1 == 'A' && b2 == 'B') ||
111 (b1 == 'S' && b2 == 'B') ||
112 (b1 == 'P' && b2 == 'B'))
113 {
114 // application token rejected for one reason or another
115 Log.e(TAG, "Skypekit executable handshake failed. Code '" + (char) b1 + (char) b2 + "'.");
116 handshakeStatus = HandshakeStatus.REJECTED;
117 }
118 else {
119 // unknown error in handshake
120 Log.e(TAG, "Unknown Skypekit executable handshake failure. Code '" + (char) b1 + (char) b2 + "'.");
121 handshakeStatus = HandshakeStatus.ERROR;
122 }
123
124 }
125 catch (IOException e) {
126 Log.e(TAG, "exception=", e);
127 e.printStackTrace();
128 handshakeStatus = HandshakeStatus.ERROR;
129 }
130 catch (Exception e) {
131 Log.e(TAG, "Unknown exception caught = ", e);
132 e.printStackTrace();
133 die = true;
134 handshakeStatus = HandshakeStatus.ERROR;
135 }
136
137 while ( ! die) {
138 try {
139 int prefix = readByte();
140 Log.d(TAG, "Prefix read:" + prefix);
141
142 if (prefix == 'Z') {
143 int cmd = readByte();
144 Log.d(TAG, "Processing command " + cmd + " (" + (char) cmd + ")");
145
146 switch (cmd) {
147 case 'r':
148 x_call_response();
149 break;
150 case 'C':
151 decode_property_change();
152 break;
153 case 'E':
154 decode_event();
155 break;
156 case 'g':
157 decode_get_response();
158 break;
159 default:
160 Log.e(TAG, "Unknown command " + cmd + " (" + (char) cmd + ")");
161 }
162 }
163 else {
164 Log.e(TAG, "Bad symbol in the stream: " + prefix + " (" + (char) prefix + ") - discarding");
165 }
166 }
167 catch (java.net.SocketException e) {
168 Log.d(TAG, "Socket exception after normal socket shutdown");
169 }
170 catch (EOFException e) {
171 Log.e(TAG, "EOF Exception, no socket connection:" + e.getMessage());
172 if (errorListener != null)
173 errorListener.OnSkypeKitConnectionClosed();
174 die = true;
175 }
176 catch (IOException e) {
177 Log.e(TAG, "Listener IO Exception:" + e.getMessage());
178 e.printStackTrace();
179 die = true;
180 }
181 catch (Exception e) {
182 Log.e(TAG, "Unknown exception caught in Listener:", e);
183 e.printStackTrace();
184 die = true;
185 }
186 }
187 Log.d(TAG, "Exiting!");
188 }
189
190 // Called by external thread. Blocks until handshake has completed.
191 public void waitHandshake() throws IOException, IllegalArgumentException {
192 int maxWaitRetries = 100;
193 while (true) {
194 if (handshakeStatus == HandshakeStatus.DONE) {
195 return;
196 } else if (handshakeStatus == HandshakeStatus.ERROR) {
197 throw new IOException("Handshake error");
198 } else if (handshakeStatus == HandshakeStatus.REJECTED) {
199 throw new IllegalArgumentException("Application token rejected");
200 } else {
201 try {
202 Thread.sleep(100);
203 } catch (InterruptedException e) {
204 throw new IOException("WaitHandshake interrupted");
205 }
206 maxWaitRetries--;
207 if (maxWaitRetries == 0) {
208 throw new IOException("WaitHandshake has timed out. Giving up.");
209 }
210 }
211 }
212 }
213 void x_call_response() throws IOException
214 {
215 int rid = AbstractDecoder.decodeOneVaruint(ioTransport);
216 Log.d(TAG, "x_call_response rid: " + rid);
217
218 // Tell SkypeKit that we got new response in case it's waiting for it
219 synchronized (response_table) {
220 response_table.put(rid, new Response(ioTransport, rid));
221
222 response_table.notifyAll();
223 Log.d(TAG, "Notified of incoming response rid=" + rid);
224 }
225 ;
226 }
227
228 void decode_property_change() throws IOException
229 {
230 Log.d(TAG, "->Property change - decode_property_change starting...");
231 try {
232 myEdp.AddPropertyChange(new PropertyChange(ioTransport));
233 }
234 catch (InterruptedException e) {
235 throw new IOException("AddPropertyChange interrupted at EventDispatcher");
236 }
237 Log.d(TAG, "->decode_property_change done.");
238 }
239
240 void decode_event() throws IOException
241 {
242 Log.d(TAG, "->Event - decode_event starting...");
243 try {
244 myEdp.AddEvent(new Event(ioTransport));
245 }
246 catch (InterruptedException e) {
247 throw new IOException("AddEvent interrupted at EventDispatcher");
248 }
249 Log.d(TAG, "->decode_event done.");
250 }
251
252 void decode_get_response() throws IOException
253 {
254 // SkypeKit guarantees that get responses always arrive in same order
255 // they were requested
256 Log.d(TAG, "->Event - decode_get_response starting...");
257 try {
258 if (!responseWaitInterrupted) {
259 // place the response in the queue
260 get_prop_response_queue.put(new GetPropertyResponse(ioTransport));
261 }
262 else {
263 new GetPropertyResponse(ioTransport); // discard the response
264 responseWaitInterrupted = false;
265 }
266 }
267 catch (InterruptedException e) {
268 throw new IOException("Interrupted while get_prop_response_queue.put(..).");
269 }
270 Log.d(TAG, "->Decoding done, placed GetResponse to queue");
271 }
272
273 // This is called by SkypeKit thread
274 // NOTE: That sending property request and calling get_prop_response() to get
275 // the response MUST be synchronized
276 public GetPropertyResponse get_prop_response()
277 {
278 try {
279 Log.d(TAG, "Going to take GetResponse from queue");
280 GetPropertyResponse temp = get_prop_response_queue.take();
281 if (temp.isMultiresponse()) {
282 Log.d(TAG, "Took GetResponse from queue. It is a multiresponse.");
283 }
284 else {
285 Log.d(TAG, "Took GetResponse from queue. Propid=" + temp.getPropId());
286 }
287 return temp;
288 }
289 catch (InterruptedException e) {
290 responseWaitInterrupted = true;
291 Log.d(TAG, "get_prop_response_queue.take() was interrupted! Next property response will be discarded to keep queue in sync.");
292 return null;
293 }
294 }
295
296 // This is called by SkypeKit thread
297 public Response get_x_call_response(int rid) throws InterruptedException
298 {
299 Response myresponse;
300 while (!die) {
301 myresponse = response_table.remove(rid);
302 if (myresponse == null) {
303 try {
304 Log.d(TAG, "Response not ready for rid=" + rid + ". Going to wait.");
305 synchronized (response_table) {
306 if (!response_table.containsKey(rid))
307 response_table.wait(XCALL_RESPONSE_TIMEOUT);
308 }
309 Log.d(TAG, "Woken up for rid=" + rid);
310 }
311 catch (InterruptedException e) {
312 Log.e(TAG, "Waiting for rid=" + rid + " interrupted. e:" + e.toString());
313 throw e;
314 }
315 }
316 else {
317 // response has arrived
318 Log.d(TAG, "Response has arrived for rid=" + rid + ". Processing...");
319 return (myresponse);
320 }
321 }
322 Log.d(TAG, "Exiting GetXCallResponse due to Response Listener being killed.");
323 throw new InterruptedException("Response listener killed.");
324 }
325 }