Class: Session

Inherits:
Object
  • Object
show all
Defined in:
lib/importio.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(io, host = "https://query.import.io", user_id = nil, api_key = nil, proxy_host = nil, proxy_port = nil) ⇒ Session

Session manager, used for managing the message channel, sending queries and receiving data



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/importio.rb', line 195

def initialize(io, host="https://query.import.io", user_id=nil, api_key=nil, proxy_host=nil, proxy_port=nil)
  # Initialises the client library with its configuration
  @io = io
  @msg_id = 1
  @client_id = nil
  @url = "#{host}/query/comet/"
  @messaging_channel = "/messaging"
  @queries = Hash.new
  @user_id = user_id
  @api_key = api_key
  @queue = Queue.new
  @connected = false
  @connecting = false
  @disconnecting = false
  @polling = false
  # These variables serve to identify this client and its version to the server
  @clientName = "import.io Ruby client"
  @clientVersion = "2.0.0"
  @cj = HTTP::CookieJar.new
  @proxy_host = proxy_host
  @proxy_port = proxy_port
end

Instance Attribute Details

#client_idObject

We use this only for a specific test case



219
220
221
# File 'lib/importio.rb', line 219

def client_id
  @client_id
end

#connectedObject (readonly)

Returns the value of attribute connected.



221
222
223
# File 'lib/importio.rb', line 221

def connected
  @connected
end

Instance Method Details

#connectObject



363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
# File 'lib/importio.rb', line 363

def connect
  # Connect this client to the import.io server if not already connected
  # Don't connect again if we're already connected
  if @connected || @connecting
    return
  end

  @connecting = true

  # Do the hanshake request to register the client on the server
  handshake
  
  # Register this client with a subscription to our chosen message channel
  subscribe(@messaging_channel)

  # Now we are subscribed, we can set the client as connected
  @connected = true

  # Ruby's HTTP requests are synchronous - so that user apps can run while we are waiting for long connections
  # from the import.io server, we need to pass the long-polling connection off to a thread so it doesn't block
  # anything else
  @threads = []
  @threads << Thread.new(self) { |context|
    context.poll
  }

  # Similarly with the polling, we need to handle queued messages in a separate thread too
  @threads << Thread.new(self) { |context|
    context.poll_queue
  }

  @connecting = false
end

#disconnectObject



397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
# File 'lib/importio.rb', line 397

def disconnect
  # Call this method to ask the client library to disconnect from the import.io server
  # It is best practice to disconnect when you are finished with querying, so as to clean
  # up resources on both the client and server

  # Maintain a local value of the queries, and then erase them from the class
  q = @queries.clone
  @queries = Hash.new

  # Set the flag to notify handlers that we are disconnecting, i.e. open connect calls will fail
  @disconnecting = true

  # Set the connection status flag in the library to prevent any other requests going out
  @connected = false

  # Make the disconnect request to the server
  request("/meta/disconnect");

  # Now we are disconnected we need to remove the client ID
  @client_id = nil

  # We are done disconnecting so reset the flag
  @disconnecting = false

  # Send a "disconnected" message to all of the current queries, and then remove them
  q.each { |key, query|
    query._on_message({"type"=>"DISCONNECT","requestId"=>key})
  }
end

#encode(dict) ⇒ Object



245
246
247
248
# File 'lib/importio.rb', line 245

def encode(dict)
  # Encodes a dictionary to x-www-form format
  dict.map{|k,v| "#{CGI.escape(k)}=#{CGI.escape(v)}"}.join("&")
end

#handshakeObject



346
347
348
349
350
351
352
353
354
355
356
# File 'lib/importio.rb', line 346

def handshake
  # This method uses the request helper to make a CometD handshake request to register the client on the server
  handshake = request("/meta/handshake", path="handshake", data={"version"=>"1.0","minimumVersion"=>"0.9","supportedConnectionTypes"=>["long-polling"],"advice"=>{"timeout"=>60000,"interval"=>0}})
  
  if handshake == nil
    return
  end

  # Set the Client ID from the handshake's response
  @client_id = handshake.body[0]["clientId"]
end

#joinObject



434
435
436
437
438
439
440
441
442
443
444
# File 'lib/importio.rb', line 434

def join
  # This method joins the threads that are running together, so we can wait for them to be finished
  while @connected
    if @queries.length == 0
      # When there are no more queries, stop all the threads
      stop()
      return
    end
    sleep 1
  end
end

#login(username, password, host = "https://api.import.io") ⇒ Object



250
251
252
253
254
255
256
257
258
259
# File 'lib/importio.rb', line 250

def (username, password, host="https://api.import.io")
  # If you want to use cookie-based authentication, this method will log you in with a username and password to get a session
  data = encode({'username' => username, 'password'=> password})
  uri, http, req = make_request("#{host}/auth/login", data )
  r = open(uri, http, req)

  if r.code != "200"
    raise "Could not log in, code #{r.code}"
  end
end

#make_request(url, data) ⇒ Object



223
224
225
226
227
228
229
230
231
# File 'lib/importio.rb', line 223

def make_request(url, data)
  # Helper method that generates a request object
  uri = URI(url)
  request = Net::HTTP::Post.new(uri.request_uri)
  request.body = data
  http = Net::HTTP.new(uri.host, uri.port, @proxy_host, @proxy_port)
  http.use_ssl = uri.scheme == "https"
  return uri, http, request
end

#open(uri, http, request) ⇒ Object



233
234
235
236
237
238
239
240
241
242
243
# File 'lib/importio.rb', line 233

def open(uri, http, request)
  # Makes a network request
  response = http.request(request)
  cookies = response.get_fields("set-cookie")
  if cookies != nil
    cookies.each { |value|
      @cj.parse(value, uri)
    }
  end
  return response
end

#pollObject



461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
# File 'lib/importio.rb', line 461

def poll
  # This method is called in a new thread to open long-polling HTTP connections to the import.io
  # CometD server so that we can wait for any messages that the server needs to send to us
  
  if @polling
    return
  end

  @polling = true

  # While loop means we keep making connections until manually disconnected
  while @connected
    # Use the request helper to make the connect call to the CometD endpoint
    request("/meta/connect", "connect", {}, false)
  end

  @polling = false
end

#poll_queueObject



446
447
448
449
450
451
452
453
454
455
456
457
458
459
# File 'lib/importio.rb', line 446

def poll_queue
  # This method is called in a new thread to poll the queue of messages returned from the server
  # and process them

  # This while will mean the thread keeps going until the client library is disconnected
  while @connected
    begin
      # Attempt to process the last message on the queue
      process_message @queue.pop
    rescue => exception
      puts exception.backtrace
    end
  end
end

#process_message(data) ⇒ Object



480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
# File 'lib/importio.rb', line 480

def process_message(data)
  # This method is called by the queue poller to handle messages that are received from the import.io
  # CometD server
  begin
    # First we need to look up which query object the message corresponds to, based on its request ID
    request_id = data["requestId"]
    query = @queries[request_id]
    
    # If we don't recognise the client ID, then do not process the message
    if query == nil 
      puts "No open query #{query}:"
      puts JSON.pretty_generate(data)
      return
    end
    
    # Call the message callback on the query object with the data
    query._on_message(data)

    # Clean up the query map if the query itself is finished
    if query.finished
      @queries.delete(request_id)
    end
  rescue => exception
    puts exception.backtrace
  end
end

#query(query, callback) ⇒ Object



507
508
509
510
511
512
513
514
515
516
517
518
# File 'lib/importio.rb', line 507

def query(query, callback)
  # This method takes an import.io Query object and issues it to the server, calling the callback
  # whenever a relevant message is received
  
  # Set the request ID to a random GUID
  # This allows us to track which messages correspond to which query
  query["requestId"] = SecureRandom.uuid
  # Construct a new query state tracker and store it in our map of currently running queries
  @queries[query["requestId"]] = Query::new(callback, query)
  # Issue the query to the server
  request("/service/query", "", { "data"=>query })
end

#request(channel, path = "", data = {}, throw = true) ⇒ Object



261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
# File 'lib/importio.rb', line 261

def request(channel, path="", data={}, throw=true)
  # Helper method that makes a generic request on the messaging channel

  # These are CometD configuration values that are common to all requests we need to send
  data["channel"] = channel
  data["connectionType"] = "long-polling"

  # We need to increment the message ID with each request that we send
  data["id"] = @msg_id
  @msg_id += 1
  
  # If we have a client ID, then we need to send that (will be provided on handshake)
  if @client_id != nil
    data["clientId"] = @client_id
  end
    
  # Build the URL that we are going to request
  url = "#{@url}#{path}"
  
  # If the user has chosen API key authentication, we need to send the API key with each request
  if @api_key != nil
    q = encode({ "_user" => @user_id, "_apikey" => @api_key })
    url = "#{url}?#{q}"
  end
  
  # Build the request object we are going to use to initialise the request
  body = JSON.dump([data])
  uri, http, request = make_request(url, body)
  request.content_type = "application/json;charset=UTF-8"
  request["Cookie"] = HTTP::Cookie.cookie_value(@cj.cookies(uri))
  request["import-io-client"] = @clientName
  request["import-io-client-version"] = @clientVersion
  
  # Send the request itself
  response = open(uri, http, request)

  # Don't process the response if we've disconnected in the meantime
  if !@connected and !@connecting
    return
  end

  # If the server responds non-200 we have a serious issue (configuration wrong or server down)
  if response.code != "200"
    error_message = "Unable to connect to import.io, status #{response.code} for url #{url}"
    if throw
      raise error_message
    else
      puts error_message
    end
  end
  
  response.body = JSON.parse(response.body)

  # Iterate through each of the messages in the response content
  for msg in response.body do
    # If the message is not successful, i.e. an import.io server error has occurred, decide what action to take
    if msg.has_key?("successful") and msg["successful"] != true 
      error_message = "Unsuccessful request: #{msg}"
      if !@disconnecting and @connected and !@connecting
        # If we get a 402 unknown client we need to reconnect
        if msg["error"] == "402::Unknown client"
          puts "402 received, reconnecting"
          @io.reconnect()
        elsif throw
          raise error_message
        else
          puts error_message
        end
      else
        next
      end
    end
    
    # Ignore messages that come back on a CometD channel that we have not subscribed to
    if msg["channel"] != @messaging_channel
      next
    end

    # Now we have a valid message on the right channel, queue it up to be processed
    @queue.push(msg["data"])
  end
  
  return response
end

#stopObject



427
428
429
430
431
432
# File 'lib/importio.rb', line 427

def stop
  # This method stops all of the threads that are currently running
  @threads.each { |thread| 
    thread.terminate
  }
end

#subscribe(channel) ⇒ Object



358
359
360
361
# File 'lib/importio.rb', line 358

def subscribe(channel)
  # This method uses the request helper to issue a CometD subscription request for this client on the server
  return request("/meta/subscribe", "", {"subscription"=>channel})
end