Class: RJR::Node

Inherits:
Object show all
Defined in:
lib/rjr/node.rb

Overview

Base RJR Node interface. Nodes are the central transport mechanism of RJR, this class provides the core methods common among all transport types and mechanisms to start and run the subsystems which drives all requests.

A subclass of RJR::Node should be defined for each transport that is supported. Each subclass should define

* RJR_NODE_TYPE - unique id of the transport
* listen method - begin listening for new requests and return
* send_message(msg, connection) - send message using the specified connection (transport dependent)
* invoke - establish connection, send message, and wait for / return result
* notify - establish connection, send message, and immediately return

Not all methods necessarily have to be implemented depending on the context / use of the node, and the base node class provides many utility methods which to assist in message processing (see below).

See nodes residing in lib/rjr/nodes/ for specific examples.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(args = {}) ⇒ Node

RJR::Node initializer

Parameters:

  • args (Hash) (defaults to: {})

    options to set on request

Options Hash (args):

  • :node_id (String)

    unique id of the node

  • :headers (Hash<String,String>)

    optional headers to set on all json-rpc messages

  • :dispatcher (Dispatcher)

    dispatcher to assign to the node



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File 'lib/rjr/node.rb', line 75

def initialize(args = {})
   clear_event_handlers
   @response_lock = Mutex.new
   @response_cv   = ConditionVariable.new
   @responses     = []

   @node_id         = args[:node_id]
   @dispatcher      = args[:dispatcher] || RJR::Dispatcher.new
   @message_headers = args.has_key?(:headers) ? {}.merge(args[:headers]) : {}

   @@tp ||= ThreadPool.new
   @@em ||= EMAdapter.new

   # will do nothing if already started
   @@tp.start
   @@em.start
end

Instance Attribute Details

#dispatcherObject

Dispatcher to use to satisfy requests



45
46
47
# File 'lib/rjr/node.rb', line 45

def dispatcher
  @dispatcher
end

#message_headersObject

Attitional header fields to set on all requests and responses received and sent by node



42
43
44
# File 'lib/rjr/node.rb', line 42

def message_headers
  @message_headers
end

#node_idObject (readonly)

Unique string identifier of the node



38
39
40
# File 'lib/rjr/node.rb', line 38

def node_id
  @node_id
end

Class Method Details

.emObject

XXX used by debugging / stats interface



66
# File 'lib/rjr/node.rb', line 66

def self.em ; defined?(@@em) ? @@em : nil end

.persistent?Boolean

Bool indiciting if this node is persistent

Returns:

  • (Boolean)


49
50
51
52
# File 'lib/rjr/node.rb', line 49

def persistent?
  self.const_defined?(:PERSISTENT_NODE) &&
  self.const_get(:PERSISTENT_NODE)
end

.tpObject



67
# File 'lib/rjr/node.rb', line 67

def self.tp ; defined?(@@tp) ? @@tp : nil end

Instance Method Details

#clear_event_handlersObject

Reset connection event handlers



116
117
118
# File 'lib/rjr/node.rb', line 116

def clear_event_handlers
  @connection_event_handlers = {:closed => [], :error => []}
end

#haltObject

Immediately terminate the node

Warning this does what it says it does. All running threads, and reactor jobs are immediately killed

Returns:

  • self



108
109
110
111
112
# File 'lib/rjr/node.rb', line 108

def halt
  @@em.stop_event_loop
  @@tp.stop
  self
end

#joinObject

Block until the eventmachine reactor and thread pool have both completed running

Returns:

  • self



96
97
98
99
100
# File 'lib/rjr/node.rb', line 96

def join
  @@tp.join
  @@em.join
  self
end

#node_typeObject

alias of RJR_NODE_TYPE



61
62
63
# File 'lib/rjr/node.rb', line 61

def node_type
  self.class::RJR_NODE_TYPE
end

#on(event, &handler) {|Node| ... } ⇒ Object

Register connection event handler

Parameters:

  • event (:error, :close)

    the event to register the handler for

  • handler (Callable)

    block param to be added to array of handlers that are called when event occurs

Yields:

  • (Node)

    self is passed to each registered handler when event occurs



124
125
126
127
128
# File 'lib/rjr/node.rb', line 124

def on(event, &handler)
  if @connection_event_handlers.keys.include?(event)
    @connection_event_handlers[event] << handler
  end
end

#persistent?Boolean

Bool indicating if this node class is persistent

Returns:

  • (Boolean)


56
57
58
# File 'lib/rjr/node.rb', line 56

def persistent?
  self.class.persistent?
end