Class: Session
- Inherits:
-
Object
- Object
- Session
- Defined in:
- lib/importio.rb
Instance Attribute Summary collapse
-
#client_id ⇒ Object
We use this only for a specific test case.
-
#connected ⇒ Object
readonly
Returns the value of attribute connected.
Instance Method Summary collapse
- #connect ⇒ Object
- #disconnect ⇒ Object
- #encode(dict) ⇒ Object
- #handshake ⇒ Object
-
#initialize(io, host = "https://query.import.io", user_id = nil, api_key = nil, proxy_host = nil, proxy_port = nil) ⇒ Session
constructor
Session manager, used for managing the message channel, sending queries and receiving data.
- #join ⇒ Object
- #login(username, password, host = "https://api.import.io") ⇒ Object
- #make_request(url, data) ⇒ Object
- #open(uri, http, request) ⇒ Object
- #poll ⇒ Object
- #poll_queue ⇒ Object
- #process_message(data) ⇒ Object
- #query(query, callback) ⇒ Object
- #request(channel, path = "", data = {}, throw = true) ⇒ Object
- #stop ⇒ Object
- #subscribe(channel) ⇒ Object
Constructor Details
#initialize(io, host = "https://query.import.io", user_id = nil, api_key = nil, proxy_host = nil, proxy_port = nil) ⇒ Session
Session manager, used for managing the message channel, sending queries and receiving data
195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
# File 'lib/importio.rb', line 195 def initialize(io, host="https://query.import.io", user_id=nil, api_key=nil, proxy_host=nil, proxy_port=nil) # Initialises the client library with its configuration @io = io @msg_id = 1 @client_id = nil @url = "#{host}/query/comet/" @messaging_channel = "/messaging" @queries = Hash.new @user_id = user_id @api_key = api_key @queue = Queue.new @connected = false @connecting = false @disconnecting = false @polling = false # These variables serve to identify this client and its version to the server @clientName = "import.io Ruby client" @clientVersion = "2.0.0" @cj = HTTP::CookieJar.new @proxy_host = proxy_host @proxy_port = proxy_port end |
Instance Attribute Details
#client_id ⇒ Object
We use this only for a specific test case
219 220 221 |
# File 'lib/importio.rb', line 219 def client_id @client_id end |
#connected ⇒ Object (readonly)
Returns the value of attribute connected.
221 222 223 |
# File 'lib/importio.rb', line 221 def connected @connected end |
Instance Method Details
#connect ⇒ Object
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 |
# File 'lib/importio.rb', line 363 def connect # Connect this client to the import.io server if not already connected # Don't connect again if we're already connected if @connected || @connecting return end @connecting = true # Do the hanshake request to register the client on the server handshake # Register this client with a subscription to our chosen message channel subscribe(@messaging_channel) # Now we are subscribed, we can set the client as connected @connected = true # Ruby's HTTP requests are synchronous - so that user apps can run while we are waiting for long connections # from the import.io server, we need to pass the long-polling connection off to a thread so it doesn't block # anything else @threads = [] @threads << Thread.new(self) { |context| context.poll } # Similarly with the polling, we need to handle queued messages in a separate thread too @threads << Thread.new(self) { |context| context.poll_queue } @connecting = false end |
#disconnect ⇒ Object
397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 |
# File 'lib/importio.rb', line 397 def disconnect # Call this method to ask the client library to disconnect from the import.io server # It is best practice to disconnect when you are finished with querying, so as to clean # up resources on both the client and server # Maintain a local value of the queries, and then erase them from the class q = @queries.clone @queries = Hash.new # Set the flag to notify handlers that we are disconnecting, i.e. open connect calls will fail @disconnecting = true # Set the connection status flag in the library to prevent any other requests going out @connected = false # Make the disconnect request to the server request("/meta/disconnect"); # Now we are disconnected we need to remove the client ID @client_id = nil # We are done disconnecting so reset the flag @disconnecting = false # Send a "disconnected" message to all of the current queries, and then remove them q.each { |key, query| query.({"type"=>"DISCONNECT","requestId"=>key}) } end |
#encode(dict) ⇒ Object
245 246 247 248 |
# File 'lib/importio.rb', line 245 def encode(dict) # Encodes a dictionary to x-www-form format dict.map{|k,v| "#{CGI.escape(k)}=#{CGI.escape(v)}"}.join("&") end |
#handshake ⇒ Object
346 347 348 349 350 351 352 353 354 355 356 |
# File 'lib/importio.rb', line 346 def handshake # This method uses the request helper to make a CometD handshake request to register the client on the server handshake = request("/meta/handshake", path="handshake", data={"version"=>"1.0","minimumVersion"=>"0.9","supportedConnectionTypes"=>["long-polling"],"advice"=>{"timeout"=>60000,"interval"=>0}}) if handshake == nil return end # Set the Client ID from the handshake's response @client_id = handshake.body[0]["clientId"] end |
#join ⇒ Object
434 435 436 437 438 439 440 441 442 443 444 |
# File 'lib/importio.rb', line 434 def join # This method joins the threads that are running together, so we can wait for them to be finished while @connected if @queries.length == 0 # When there are no more queries, stop all the threads stop() return end sleep 1 end end |
#login(username, password, host = "https://api.import.io") ⇒ Object
250 251 252 253 254 255 256 257 258 259 |
# File 'lib/importio.rb', line 250 def login(username, password, host="https://api.import.io") # If you want to use cookie-based authentication, this method will log you in with a username and password to get a session data = encode({'username' => username, 'password'=> password}) uri, http, req = make_request("#{host}/auth/login", data ) r = open(uri, http, req) if r.code != "200" raise "Could not log in, code #{r.code}" end end |
#make_request(url, data) ⇒ Object
223 224 225 226 227 228 229 230 231 |
# File 'lib/importio.rb', line 223 def make_request(url, data) # Helper method that generates a request object uri = URI(url) request = Net::HTTP::Post.new(uri.request_uri) request.body = data http = Net::HTTP.new(uri.host, uri.port, @proxy_host, @proxy_port) http.use_ssl = uri.scheme == "https" return uri, http, request end |
#open(uri, http, request) ⇒ Object
233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/importio.rb', line 233 def open(uri, http, request) # Makes a network request response = http.request(request) = response.get_fields("set-cookie") if != nil .each { |value| @cj.parse(value, uri) } end return response end |
#poll ⇒ Object
461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 |
# File 'lib/importio.rb', line 461 def poll # This method is called in a new thread to open long-polling HTTP connections to the import.io # CometD server so that we can wait for any messages that the server needs to send to us if @polling return end @polling = true # While loop means we keep making connections until manually disconnected while @connected # Use the request helper to make the connect call to the CometD endpoint request("/meta/connect", "connect", {}, false) end @polling = false end |
#poll_queue ⇒ Object
446 447 448 449 450 451 452 453 454 455 456 457 458 459 |
# File 'lib/importio.rb', line 446 def poll_queue # This method is called in a new thread to poll the queue of messages returned from the server # and process them # This while will mean the thread keeps going until the client library is disconnected while @connected begin # Attempt to process the last message on the queue @queue.pop rescue => exception puts exception.backtrace end end end |
#process_message(data) ⇒ Object
480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 |
# File 'lib/importio.rb', line 480 def (data) # This method is called by the queue poller to handle messages that are received from the import.io # CometD server begin # First we need to look up which query object the message corresponds to, based on its request ID request_id = data["requestId"] query = @queries[request_id] # If we don't recognise the client ID, then do not process the message if query == nil puts "No open query #{query}:" puts JSON.pretty_generate(data) return end # Call the message callback on the query object with the data query.(data) # Clean up the query map if the query itself is finished if query.finished @queries.delete(request_id) end rescue => exception puts exception.backtrace end end |
#query(query, callback) ⇒ Object
507 508 509 510 511 512 513 514 515 516 517 518 |
# File 'lib/importio.rb', line 507 def query(query, callback) # This method takes an import.io Query object and issues it to the server, calling the callback # whenever a relevant message is received # Set the request ID to a random GUID # This allows us to track which messages correspond to which query query["requestId"] = SecureRandom.uuid # Construct a new query state tracker and store it in our map of currently running queries @queries[query["requestId"]] = Query::new(callback, query) # Issue the query to the server request("/service/query", "", { "data"=>query }) end |
#request(channel, path = "", data = {}, throw = true) ⇒ Object
261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 |
# File 'lib/importio.rb', line 261 def request(channel, path="", data={}, throw=true) # Helper method that makes a generic request on the messaging channel # These are CometD configuration values that are common to all requests we need to send data["channel"] = channel data["connectionType"] = "long-polling" # We need to increment the message ID with each request that we send data["id"] = @msg_id @msg_id += 1 # If we have a client ID, then we need to send that (will be provided on handshake) if @client_id != nil data["clientId"] = @client_id end # Build the URL that we are going to request url = "#{@url}#{path}" # If the user has chosen API key authentication, we need to send the API key with each request if @api_key != nil q = encode({ "_user" => @user_id, "_apikey" => @api_key }) url = "#{url}?#{q}" end # Build the request object we are going to use to initialise the request body = JSON.dump([data]) uri, http, request = make_request(url, body) request.content_type = "application/json;charset=UTF-8" request["Cookie"] = HTTP::Cookie.(@cj.(uri)) request["import-io-client"] = @clientName request["import-io-client-version"] = @clientVersion # Send the request itself response = open(uri, http, request) # Don't process the response if we've disconnected in the meantime if !@connected and !@connecting return end # If the server responds non-200 we have a serious issue (configuration wrong or server down) if response.code != "200" = "Unable to connect to import.io, status #{response.code} for url #{url}" if throw raise else puts end end response.body = JSON.parse(response.body) # Iterate through each of the messages in the response content for msg in response.body do # If the message is not successful, i.e. an import.io server error has occurred, decide what action to take if msg.has_key?("successful") and msg["successful"] != true = "Unsuccessful request: #{msg}" if !@disconnecting and @connected and !@connecting # If we get a 402 unknown client we need to reconnect if msg["error"] == "402::Unknown client" puts "402 received, reconnecting" @io.reconnect() elsif throw raise else puts end else next end end # Ignore messages that come back on a CometD channel that we have not subscribed to if msg["channel"] != @messaging_channel next end # Now we have a valid message on the right channel, queue it up to be processed @queue.push(msg["data"]) end return response end |
#stop ⇒ Object
427 428 429 430 431 432 |
# File 'lib/importio.rb', line 427 def stop # This method stops all of the threads that are currently running @threads.each { |thread| thread.terminate } end |
#subscribe(channel) ⇒ Object
358 359 360 361 |
# File 'lib/importio.rb', line 358 def subscribe(channel) # This method uses the request helper to issue a CometD subscription request for this client on the server return request("/meta/subscribe", "", {"subscription"=>channel}) end |