Class: DripDrop::Node

Inherits:
Object
  • Object
show all
Defined in:
lib/dripdrop/node.rb,
lib/dripdrop/node/nodelet.rb

Defined Under Namespace

Classes: Nodelet

Constant Summary collapse

ZCTX =
ZMQ::Context.new 1

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}, &block) ⇒ Node

Returns a new instance of Node.



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/dripdrop/node.rb', line 29

def initialize(opts={},&block)
  @block      = block
  @thread     = nil # Thread containing the reactors
  @routing    = {}  # Routing table
  @run_list   = opts['run_list'] || opts[:run_list] || nil  #List of nodelets to run
  @run_list   = @run_list.map(&:to_sym) if @run_list
  @debug      = opts[:debug]
  @recipients_for       = {}
  @handler_default_opts = {:debug => @debug}
  @nodelets   = {}  # Cache of registered nodelets
  @zctx = ZCTX
end

Instance Attribute Details

#debugObject

Returns the value of attribute debug.



27
28
29
# File 'lib/dripdrop/node.rb', line 27

def debug
  @debug
end

#nodeletsObject (readonly)

Returns the value of attribute nodelets.



26
27
28
# File 'lib/dripdrop/node.rb', line 26

def nodelets
  @nodelets
end

#routingObject (readonly)

Returns the value of attribute routing.



26
27
28
# File 'lib/dripdrop/node.rb', line 26

def routing
  @routing
end

#run_listObject (readonly)

Returns the value of attribute run_list.



26
27
28
# File 'lib/dripdrop/node.rb', line 26

def run_list
  @run_list
end

#zm_reactorObject (readonly)

Returns the value of attribute zm_reactor.



26
27
28
# File 'lib/dripdrop/node.rb', line 26

def zm_reactor
  @zm_reactor
end

Class Method Details

.error_handler(e) ⇒ Object

Catch all error handler Global to all DripDrop Nodes



310
311
312
# File 'lib/dripdrop/node.rb', line 310

def self.error_handler(e)
  $stderr.write "#{e.class}: #{e.message}\n\t#{e.backtrace.join("\n\t")}"
end

Instance Method Details

#actionObject

When subclassing DripDrop::Node you probably want to define this method Otherwise it will attempt to run the @block passed into DripDrop::Node.new



64
65
66
67
68
69
70
# File 'lib/dripdrop/node.rb', line 64

def action
  if @block
    self.instance_eval(&@block)
  else
    raise "Could not start, no block or action specified"
  end
end

#http_client(address, opts = {}) ⇒ Object

An EM HTTP client. Example:

client = http_client(addr)
client.send_message(:name => 'name', :body => 'hi') do |resp_msg|
  puts resp_msg.inspect
end


266
267
268
269
270
# File 'lib/dripdrop/node.rb', line 266

def http_client(address,opts={})
  uri     = URI.parse(address)
  h_opts  = handler_opts_given(opts)
  DripDrop::HTTPClientHandler.new(uri, h_opts)
end

#http_server(address, opts = {}, &block) ⇒ Object

Starts a new Thin HTTP server listening on address. Can have an on_receive handler that gets passed msg and response args.

http_server(addr) {|msg,response| response.send_message(msg)}


254
255
256
257
258
# File 'lib/dripdrop/node.rb', line 254

def http_server(address,opts={},&block)
  uri     = URI.parse(address)
  h_opts  = handler_opts_given(opts)
  DripDrop::HTTPServerHandler.new(uri, h_opts,&block)
end

#joinObject

If the reactor has started, this blocks until the thread running the reactor joins. This should block forever unless stop is called.



75
76
77
78
79
80
81
# File 'lib/dripdrop/node.rb', line 75

def join
  if @thread
    @thread.join
  else
    raise "Can't join on a node that isn't yet started"
  end
end

#nodelet(name, klass = Nodelet, *configure_args, &block) ⇒ Object

Nodelets are a way of segmenting a DripDrop::Node. This can be used for both organization and deployment. One might want the production deployment of an app to be broken across multiple servers or processes for instance:

nodelet :heartbeat do |nlet|
  nlet.route :ticker, :zmq_publish, 'tcp://127.0.0.1', :bind
  EM::PeriodicalTimer.new(1) do
    nlet.ticker.send_message(:name => 'tick')
  end
end

Nodelets can also be subclassed, for instance:

class SpecialNodelet < DripDrop::Node::Nodelet
  def action
    nlet.route :ticker, :zmq_publish, 'tcp://127.0.0.1', :bind
    EM::PeriodicalTimer.new(1) do
      nlet.ticker.send_message(:name => 'tick')
    end
  end
end

nodelet :heartbeat, SpecialNodelet

If you specify a block, Nodelet#action will be ignored and the block will be run

Nodelets are made available as instance methods on the current DripDrop::Nodelet Object, so the following works as well:

nodelet :mynodelet

mynodelet.route :route_name, :zmq_xreq, 'tcp://127.0.0.1:2000', ;bind


157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/dripdrop/node.rb', line 157

def nodelet(name,klass=Nodelet,*configure_args,&block)
  # If there's a run list, only run nodes in that list
  return nil if @run_list && !@run_list.include?(name.to_sym)
   
  nlet = @nodelets[name] ||= klass.new(self,name,*configure_args)
  
  # Define a method returning the nodelet in the current node
  unless respond_to?(name)
   (class << self; self; end).class_eval do
      define_method(name) { nlet }
    end
  end
   
  if block
    block.call(nlet)
  else
    nlet.action
  end
  nlet
end

#recv_internal(dest, identifier, &block) ⇒ Object

Defines a subscriber to the channel dest, to receive messages from send_internal. identifier is a unique identifier for this receiver. The identifier can be used by remove_recv_internal



293
294
295
296
297
298
299
# File 'lib/dripdrop/node.rb', line 293

def recv_internal(dest,identifier,&block)
  if @recipients_for[dest]
    @recipients_for[dest][identifier] =  block
  else
    @recipients_for[dest] = {identifier => block}
  end
end

#remove_recv_internal(dest, identifier) ⇒ Object

Deletes a subscriber to the channel dest previously identified by a reciever created with recv_internal



303
304
305
306
# File 'lib/dripdrop/node.rb', line 303

def remove_recv_internal(dest,identifier)
  return false unless @recipients_for[dest]
  @recipients_for[dest].delete(identifier)
end

#route(name, handler_type, *handler_args) ⇒ Object

Defines a new route. Routes are the recommended way to instantiate handlers. For example:

route :stats_pub, :zmq_publish, 'tcp://127.0.0.1:2200', :bind
route :stats_sub, :zmq_subscribe, stats_pub.address, :connect

Will make the following methods available within the reactor block:

stats_pub  # A regular zmq_publish handler
:stats_sub # A regular zmq_subscribe handler

See the docs for routes_for for more info in grouping routes for nodelets and maintaining sanity in larger apps



95
96
97
# File 'lib/dripdrop/node.rb', line 95

def route(name,handler_type,*handler_args)
  route_full(nil, name, handler_type, *handler_args)
end

#route_full(nodelet, name, handler_type, *handler_args) ⇒ Object

Probably not useful for most, apps. This is used internally to create a route for a given nodelet.



101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/dripdrop/node.rb', line 101

def route_full(nodelet, name, handler_type, *handler_args)
  # If we're in a route_for block, prepend appropriately
  full_name = (nodelet && nodelet.name) ? "#{nodelet.name}_#{name}".to_sym : name
  
  handler = self.send(handler_type, *handler_args)
  @routing[full_name] = handler
  
  # Define the route name as a singleton method
  (class << self; self; end).class_eval do
    define_method(full_name) { handler }
  end
  
  handler
end

#routes_for(nodelet_name, &block) ⇒ Object

DEPRECATED, will be deleted in 0.8



117
118
119
120
121
# File 'lib/dripdrop/node.rb', line 117

def routes_for(nodelet_name,&block)
  $stderr.write "routes_for is now deprecated, use nodelet instead"
  nlet = nodelet(nodelet_name,&block)
  block.call(nlet)
end

#send_internal(dest, data) ⇒ Object

An inprocess pub/sub queue that works similarly to EM::Channel, but has manually specified identifiers for subscribers letting you more easily delete subscribers without crazy id tracking.

This is useful for situations where you want to broadcast messages across your app, but need a way to properly delete listeners.

dest is the name of the pub/sub channel. data is any type of ruby var you’d like to send.



281
282
283
284
285
286
287
288
# File 'lib/dripdrop/node.rb', line 281

def send_internal(dest,data)
  return false unless @recipients_for[dest]
  blocks = @recipients_for[dest].values
  return false unless blocks
  blocks.each do |block|
    block.call(data)
  end
end

#startObject

Starts the reactors and runs the block passed to initialize. This is non-blocking.



44
45
46
47
48
49
# File 'lib/dripdrop/node.rb', line 44

def start
  @thread = Thread.new do
    EM.error_handler {|e| self.class.error_handler e}
    EM.run { action }
  end
end

#start!Object

Blocking version of start, equivalent to start then join



52
53
54
55
# File 'lib/dripdrop/node.rb', line 52

def start!
  self.start
  self.join
end

#stopObject

Stops the reactors. If you were blocked on #join, that will unblock.



58
59
60
# File 'lib/dripdrop/node.rb', line 58

def stop
  EM.stop
end

#websocket(*args) ⇒ Object

DEPRECATED: Use websocket_server



246
247
248
249
# File 'lib/dripdrop/node.rb', line 246

def websocket(*args)
  $stderr.write "DripDrop#websocket handler is deprecated, use DripDrop#websocket_server"
  websocket_server(*args)
end

#websocket_server(address, opts = {}) ⇒ Object

Binds an EM websocket server connection to address. takes blocks for on_open, on_receive, on_close and on_error.

For example on_receive could be used to echo incoming messages thusly:

websocket_server(addr).on_open {|conn|
  ws.send_message(:name => 'ws_open_ack')
}.on_receive {|msg,conn|
  conn.send(msg)
}.on_close {|conn|
}.on_error {|reason,conn|
}

The ws object that’s passed into the handlers is not the DripDrop::WebSocketHandler object, but an em-websocket object.



239
240
241
242
243
# File 'lib/dripdrop/node.rb', line 239

def websocket_server(address,opts={})
  uri     = URI.parse(address)
  h_opts  = handler_opts_given(opts)
  DripDrop::WebSocketHandler.new(uri,h_opts)
end

#zmq_m2(addresses, opts = {}, &block) ⇒ Object



178
179
180
# File 'lib/dripdrop/node.rb', line 178

def zmq_m2(addresses, opts={}, &block)
  zmq_handler(DripDrop::Mongrel2Handler, [ZMQ::PULL, ZMQ::PUB], addresses, [:connect, :connect], opts)
end

#zmq_publish(address, socket_ctype, opts = {}) ⇒ Object

Creates a ZMQ::PUB type socket, can only send messages via send_message



190
191
192
# File 'lib/dripdrop/node.rb', line 190

def zmq_publish(address,socket_ctype,opts={})
  zmq_handler(DripDrop::ZMQPubHandler,ZMQ::PUB,address,socket_ctype,opts)
end

#zmq_pull(address, socket_ctype, opts = {}, &block) ⇒ Object

Creates a ZMQ::PULL type socket. Can only receive messages via on_receive



195
196
197
# File 'lib/dripdrop/node.rb', line 195

def zmq_pull(address,socket_ctype,opts={},&block)
  zmq_handler(DripDrop::ZMQPullHandler,ZMQ::PULL,address,socket_ctype,opts)
end

#zmq_push(address, socket_ctype, opts = {}) ⇒ Object

Creates a ZMQ::PUSH type socket, can only send messages via send_message



200
201
202
# File 'lib/dripdrop/node.rb', line 200

def zmq_push(address,socket_ctype,opts={})
  zmq_handler(DripDrop::ZMQPushHandler,ZMQ::PUSH,address,socket_ctype,opts)
end

#zmq_subscribe(address, socket_ctype, opts = {}, &block) ⇒ Object

Creates a ZMQ::SUB type socket. Can only receive messages via on_receive. zmq_subscribe sockets have a topic_filter option, which restricts which messages they can receive. It takes a regexp as an option.



185
186
187
# File 'lib/dripdrop/node.rb', line 185

def zmq_subscribe(address,socket_ctype,opts={},&block)
  zmq_handler(DripDrop::ZMQSubHandler,ZMQ::SUB,address,socket_ctype,opts)
end

#zmq_xrep(address, socket_ctype, opts = {}) ⇒ Object

Creates a ZMQ::XREP type socket, both sends and receivesc XREP sockets are extremely powerful, so their functionality is currently limited. XREP sockets in DripDrop can reply to the original source of the message.

Receiving with XREP sockets in DripDrop is different than other types of sockets, on_receive passes 2 arguments to its callback, message, and response. A minimal example is shown below:

zmq_xrep(z_addr, :bind).on_receive do |message,response|
  response.send_message(message)
end


216
217
218
# File 'lib/dripdrop/node.rb', line 216

def zmq_xrep(address,socket_ctype,opts={})
  zmq_handler(DripDrop::ZMQXRepHandler,ZMQ::XREP,address,socket_ctype,opts)
end

#zmq_xreq(address, socket_ctype, opts = {}) ⇒ Object

See the documentation for zmq_xrep for more info



221
222
223
# File 'lib/dripdrop/node.rb', line 221

def zmq_xreq(address,socket_ctype,opts={})
  zmq_handler(DripDrop::ZMQXReqHandler,ZMQ::XREQ,address,socket_ctype,opts)
end