Class: RJR::Node

Inherits:
Object show all
Defined in:
lib/rjr/node.rb

Overview

Base RJR Node interface. Nodes are the central transport mechanism of RJR, this class provides the core methods common among all transport types and mechanisms to start and run the subsystems which drives all requests.

A subclass of RJR::Node should be defined for each transport that is supported. Each subclass should define

* RJR_NODE_TYPE - unique id of the transport
* listen method - begin listening for new requests and return
* send_message(msg, connection) - send message using the specified connection (transport dependent)
* invoke - establish connection, send message, and wait for / return result
* notify - establish connection, send message, and immediately return

Not all methods necessarily have to be implemented depending on the context / use of the node, and the base node class provides many utility methods which to assist in message processing (see below).

See nodes residing in lib/rjr/nodes/ for specific examples.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ Node

RJR::Node initializer

Parameters:

  • args (Hash) (defaults to: {})

    options to set on request

Options Hash (args):

  • :node_id (String)

    unique id of the node

  • :headers (Hash<String,String>)

    optional headers to set on all json-rpc messages

  • :dispatcher (Dispatcher)

    dispatcher to assign to the node



62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/rjr/node.rb', line 62

def initialize(args = {})
   @connection_event_handlers = {:closed => [], :error => []}
   @response_lock = Mutex.new
   @response_cv   = ConditionVariable.new
   @responses     = []

   @node_id         = args[:node_id]
   @dispatcher      = args[:dispatcher] || RJR::Dispatcher.new
   @message_headers = args.has_key?(:headers) ? {}.merge(args[:headers]) : {}

   @@tp ||= ThreadPool.new
   @@em ||= EMAdapter.new

   # will do nothing if already started
   @@tp.start
   @@em.start
end

Instance Attribute Details

#dispatcherObject

Dispatcher to use to satisfy requests



45
46
47
# File 'lib/rjr/node.rb', line 45

def dispatcher
  @dispatcher
end

#message_headersObject

Attitional header fields to set on all requests and responses received and sent by node



42
43
44
# File 'lib/rjr/node.rb', line 42

def message_headers
  @message_headers
end

#node_idObject (readonly)

Unique string identifier of the node



38
39
40
# File 'lib/rjr/node.rb', line 38

def node_id
  @node_id
end

Class Method Details

.emObject

XXX used by debugging / stats interface



53
# File 'lib/rjr/node.rb', line 53

def self.em ; defined?(@@em) ? @@em : nil end

.tpObject



54
# File 'lib/rjr/node.rb', line 54

def self.tp ; defined?(@@tp) ? @@tp : nil end

Instance Method Details

#haltObject

Immediately terminate the node

Warning this does what it says it does. All running threads, and reactor jobs are immediately killed

Returns:

  • self



95
96
97
98
99
# File 'lib/rjr/node.rb', line 95

def halt
  @@em.stop_event_loop
  @@tp.stop
  self
end

#joinObject

Block until the eventmachine reactor and thread pool have both completed running

Returns:

  • self



83
84
85
86
87
# File 'lib/rjr/node.rb', line 83

def join
  @@tp.join
  @@em.join
  self
end

#node_typeObject

alias of RJR_NODE_TYPE



48
49
50
# File 'lib/rjr/node.rb', line 48

def node_type
  self.class::RJR_NODE_TYPE
end

#on(event, &handler) {|Node| ... } ⇒ Object

Register connection event handler

Parameters:

  • event (:error, :close)

    the event to register the handler for

  • handler (Callable)

    block param to be added to array of handlers that are called when event occurs

Yields:

  • (Node)

    self is passed to each registered handler when event occurs



107
108
109
110
111
# File 'lib/rjr/node.rb', line 107

def on(event, &handler)
  if @connection_event_handlers.keys.include?(event)
    @connection_event_handlers[event] << handler
  end
end