Module: Neighborparrot

Included in:
Reactor
Defined in:
lib/neighborparrot/config.rb,
lib/neighborparrot/auth.rb,
lib/neighborparrot/open.rb,
lib/neighborparrot/send.rb,
lib/neighborparrot/reactor.rb,
lib/neighborparrot/callbacks.rb

Overview

Callbacks used by the client

Defined Under Namespace

Classes: Reactor

Constant Summary collapse

EVENTS =
%w(message error close connect success timeout)
@@class_reactor =
nil
@@module_event_block =
{}

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.configurationObject

Return settings



14
15
16
# File 'lib/neighborparrot/config.rb', line 14

def self.configuration
  @@config
end

.configure(params = {}) ⇒ Object

Setup the configuration options

  • :api_id => Your api ID in neighborparrot.com

  • :api_key => Your api key

  • :server => Server to connect (Only for development)

  • :dummy_tests => See neighboparrot/helpers/url_helpers



9
10
11
# File 'lib/neighborparrot/config.rb', line 9

def self.configure(params={})
  @@config.merge! params
end

.open(request, params = {}, &block) ⇒ Object

Static helper. Create a EM Reactor and open the connexion on it



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/neighborparrot/open.rb', line 51

def self.open(request, params={}, &block)
  EM.error_handler { |error| Neighborparrot.trigger_error error }

  EM.run do

    parrot = Neighborparrot::Reactor.new

    parrot.on_error do |error|
      Neighborparrot.trigger_error error
      EM.stop
    end
    parrot.on_message do |message|
      Neighborparrot.trigger_message message
    end
    parrot.on_connect { Neighborparrot.trigger_connect }

    parrot.open request, params
  end
end

.reactor_running?Boolean

Returns true if module reactor running.

Returns:

  • (Boolean)

    true if module reactor running



21
22
23
# File 'lib/neighborparrot/reactor.rb', line 21

def self.reactor_running?
  @@class_reactor && @@class_reactor.running?
end

.reactor_startObject

Static a module reactor and keeping running and waiting



7
8
9
10
11
12
# File 'lib/neighborparrot/reactor.rb', line 7

def self.reactor_start
  if @@class_reactor.nil?
    return @@class_reactor = Reactor.new
  end
  @@class_reactor.start
end

.reactor_stopObject

Stop the module reactor



15
16
17
18
# File 'lib/neighborparrot/reactor.rb', line 15

def self.reactor_stop
  return unless @@class_reactor
  @@class_reactor.stop
end

.send(request, params = {}) ⇒ Object

Send the message to the broker. Create a new reactor and run the request inside Any output is printed in the standard output. If you start a module reactor in some point in your program the request is scheduled in this reactor and return the control to your program. Module callbacks are used in this case.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/neighborparrot/send.rb', line 10

def self.send(request, params={})
  if self.reactor_running?
    return @@class_reactor.send request, params
  end
  response = nil
  error = false
  EM.run do
    parrot = Neighborparrot::Reactor.new
    parrot.on_error do |msg|
      error = msg
      parrot.stop
    end
      parrot.on_success do |resp|
      Neighborparrot.trigger_success resp
      response = resp
      parrot.stop
    end
    # Skip reactor queues
    parrot.send_to_broker(:request => request, :params => params)
  end
  fail error if error
  return response
end

.sign_connect_request(query, params = {}) ⇒ Object



5
6
7
8
# File 'lib/neighborparrot/auth.rb', line 5

def self.sign_connect_request(query, params={})
  endpoint = query[:service] == 'es' ? '/open' : '/ws'
  sign_request('GET', endpoint, query, params)
end

.sign_request(method, path, request, params = {}) ⇒ Object



14
15
16
17
18
19
20
# File 'lib/neighborparrot/auth.rb', line 14

def self.sign_request(method, path, request, params={})
  params = Neighborparrot.configuration.merge params
  token = Signature::Token.new(params[:api_id], params[:api_key])
  sign_request = Signature::Request.new(method, path, request)
  auth_hash = sign_request.sign(token)
  request.merge(auth_hash)
end

.sign_send_request(body, params = {}) ⇒ Object



10
11
12
# File 'lib/neighborparrot/auth.rb', line 10

def self.sign_send_request(body, params={})
  sign_request('POST', '/send',body, params)
end

Instance Method Details

#closeObject



45
46
47
48
# File 'lib/neighborparrot/open.rb', line 45

def close
  return unless connected?
  @source.close
end

#connected?Boolean

Returns:

  • (Boolean)


41
42
43
# File 'lib/neighborparrot/open.rb', line 41

def connected?
  @source && @source.ready_state == EM::EventSource::OPEN
end

#open(request, params = {}) ⇒ Object

Open a Event Source connection with the broker



82
83
84
# File 'lib/neighborparrot/reactor.rb', line 82

def open(request, params={})
  EM.schedule { open_connection request, params }
end

#open_connection(request, params = {}) ⇒ Object

Open a persistent connection to the Neighbor in a new thread and return true if all works unless :foreground options is true. Current options to the connectio are:

  • :foreground [Boolean] run the connection in the foreground

    stoping the clode flow until the connection is closed by server or
    another thread call close
    
  • :api_id => Your api ID in neighborparrot.com

  • :api_key => Your api key

  • :server => Server to connect (Only for development)

Parameters:

  • channel (String)

    to connect

  • Params (Hash)

    for the connection. this params can be:



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/neighborparrot/open.rb', line 17

def open_connection(request, params={})
  params = Neighborparrot.configuration.merge params
  return unless check_params request.merge(params), :post
  return if dummy_connections?
  uri = URI.parse(params[:server])
  url = "#{params[:server]}/open"
  signed_request = Neighborparrot.sign_connect_request(request, params)
  @source = EM::EventSource.new(url, signed_request)
  @source.inactivity_timeout = 120
  @source.message do |message|
   EM.next_tick { trigger_message message }
  end
  @source.error do |error|
    puts "Error #{error}"
    EM.next_tick { trigger_error error }
  end

  @source.open do
    EM.next_tick { trigger_connect }
  end

  @source.start
end

#running?Boolean

Returns true if reactor running.

Returns:

  • (Boolean)

    true if reactor running



65
66
67
# File 'lib/neighborparrot/reactor.rb', line 65

def running?
  EM.reactor_running?
end

#send(request, params = {}) ⇒ Boolean

Send a message to a channel If empty data, refuse to send nothing

  • :channel => The channel name

  • :data => Your payload

Returns:

  • (Boolean)

    true if sended



76
77
78
# File 'lib/neighborparrot/reactor.rb', line 76

def send(request, params={})
  EM.schedule { @out_queue.push({ :request => request, :params => params }) }
end

#send_to_broker(options) ⇒ Object

Send the message to the broker This is the final step of a send request in the reactor process



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/neighborparrot/send.rb', line 36

def send_to_broker(options)
  params = Neighborparrot.configuration.merge options[:params]
  request = options[:request]
  # TODO: Refactor
  # return unless check_params request
  return if request[:data].nil? || request[:data].length == 0
  return if params[:dummy_connections]

  signed_request = Neighborparrot.sign_send_request request, params

  url = "#{params[:server]}/send"
  http = EventMachine::HttpRequest.new(url).post :body => signed_request
  http.errback{ |msg| trigger_error msg }
  http.callback do
    if http.response_header.status == 200
      trigger_success http.response, params
    else
      trigger_error http.response
    end
  end
end

#startObject

Start the reactor if not running



56
57
58
# File 'lib/neighborparrot/reactor.rb', line 56

def start
  reactor_start unless running?
end

#stopObject

Stop the reactor



61
62
63
# File 'lib/neighborparrot/reactor.rb', line 61

def stop
  EM.schedule { EM.stop }
end