Class: Nanite::Mapper

Inherits:
Object show all
Includes:
AMQPHelper, ConsoleHelper, DaemonizeHelper
Defined in:
lib/nanite/mapper.rb

Overview

Mappers are control nodes in nanite clusters. Nanite clusters can follow peer-to-peer model of communication as well as client-server, and mappers are nodes that know who to send work requests to agents.

Mappers can reside inside a front end web application written in Merb/Rails and distribute heavy lifting to actors that register with the mapper as soon as they go online.

Each mapper tracks nanites registered with it. It periodically checks when the last time a certain nanite sent a heartbeat notification, and removes those that have timed out from the list of available workers. As soon as a worker goes back online again it re-registers itself and the mapper adds it to the list and makes it available to be called again.

This makes Nanite clusters self-healing and immune to individual node failures.

Constant Summary collapse

DEFAULT_OPTIONS =
COMMON_DEFAULT_OPTIONS.merge({:user => 'mapper', :identity => Identity.generate, :agent_timeout => 15,
:offline_redelivery_frequency => 10, :persistent => false, :offline_failsafe => false})

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from DaemonizeHelper

#daemonize

Methods included from ConsoleHelper

included, #start_console

Methods included from AMQPHelper

#start_amqp

Constructor Details

#initialize(options) ⇒ Mapper

Returns a new instance of Mapper.



85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/nanite/mapper.rb', line 85

def initialize(options)
  @options = DEFAULT_OPTIONS.merge(options)
  @identity = "mapper-#{@options[:identity]}"
  @log = Log.new(@options, @identity)
  @serializer = Serializer.new(@options[:format])
  daemonize if @options[:daemonize]
  @amq =start_amqp(@options)
  @cluster = Cluster.new(@amq, @options[:agent_timeout], @options[:identity], @log, @serializer)
  @job_warden = JobWarden.new(@serializer, @log)
  @log.info('starting mapper')
  setup_queues
  start_console if @options[:console] && !@options[:daemonize]
end

Instance Attribute Details

#amqObject (readonly)

Returns the value of attribute amq.



24
25
26
# File 'lib/nanite/mapper.rb', line 24

def amq
  @amq
end

#clusterObject (readonly)

Returns the value of attribute cluster.



24
25
26
# File 'lib/nanite/mapper.rb', line 24

def cluster
  @cluster
end

#identityObject (readonly)

Returns the value of attribute identity.



24
25
26
# File 'lib/nanite/mapper.rb', line 24

def identity
  @identity
end

#job_wardenObject (readonly)

Returns the value of attribute job_warden.



24
25
26
# File 'lib/nanite/mapper.rb', line 24

def job_warden
  @job_warden
end

#logObject (readonly)

Returns the value of attribute log.



24
25
26
# File 'lib/nanite/mapper.rb', line 24

def log
  @log
end

#optionsObject (readonly)

Returns the value of attribute options.



24
25
26
# File 'lib/nanite/mapper.rb', line 24

def options
  @options
end

#serializerObject (readonly)

Returns the value of attribute serializer.



24
25
26
# File 'lib/nanite/mapper.rb', line 24

def serializer
  @serializer
end

Class Method Details

.start(options = {}) ⇒ Object

Initializes a new mapper and establishes AMQP connection. This must be used inside EM.run block or if EventMachine reactor is already started, for instance, by a Thin server that your Merb/Rails application runs on.

Mapper options:

identity : identity of this mapper, may be any string

format : format to use for packets serialization. Can be :marshal, :json or :yaml.

Defaults to Ruby's Marshall format. For interoperability with
AMQP clients implemented in other languages, use JSON.

Note that Nanite uses JSON gem,
and ActiveSupport's JSON encoder may cause clashes
if ActiveSupport is loaded after JSON gem.

log_level : the verbosity of logging, can be debug, info, warn, error or fatal.

agent_timeout : how long to wait before an agent is considered to be offline

and thus removed from the list of available agents.

log_dir : log file path, defaults to the current working directory.

console : true tells mapper to start interactive console

daemonize : true tells mapper to daemonize

offline_redelivery_frequency : The frequency in seconds that messages stored in the offline queue will be retrieved

for attempted redelivery to the nanites. Default is 10 seconds.

persistent : true instructs the AMQP broker to save messages to persistent storage so that they aren’t lost when the

broker is restarted. Default is false. Can be overriden on a per-message basis using the request and push methods.

secure : use Security features of rabbitmq to restrict nanites to themselves

Connection options:

vhost : AMQP broker vhost that should be used

user : AMQP broker user

pass : AMQP broker password

host : host AMQP broker (or node of interest) runs on,

defaults to 0.0.0.0

port : port AMQP broker (or node of interest) runs on,

this defaults to 5672, port used by some widely
used AMQP brokers (RabbitMQ and ZeroMQ)


81
82
83
# File 'lib/nanite/mapper.rb', line 81

def self.start(options = {})
  new(options)
end

Instance Method Details

#push(type, payload = '', opts = {}) ⇒ Object

Make a nanite request which does not expect a response.

Parameters

type<String>

The dispatch route for the request

payload<Object>

Payload to send. This will get marshalled en route

Options

:selector<Symbol>

Method for selecting an actor. Default is :least_loaded.

:least_loaded

Pick the nanite which has the lowest load.

:all

Send the request to all nanites which respond to the service.

:random

Randomly pick a nanite.

:rr: Select a nanite according to round robin ordering.
:offline_failsafe<Boolean>

Store messages in an offline queue when all

the nanites are offline. Messages will be redelivered when nanites come online.
Default is false unless the mapper was started with the --offline-failsafe flag.
:persistent<Boolean>

Instructs the AMQP broker to save the message to persistent

storage so that it isnt lost when the broker is restarted.
Default is false unless the mapper was started with the --persistent flag.


160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/nanite/mapper.rb', line 160

def push(type, payload = '', opts = {})
  push = build_deliverable(Push, type, payload, opts)
  targets = cluster.targets_for(push)
  if !targets.empty?
    cluster.route(push, targets)
    true
  elsif opts.key?(:offline_failsafe) ? opts[:offline_failsafe] : options[:offline_failsafe]
    cluster.publish(push, 'mapper-offline')
    :offline
  else
    false
  end
end

#request(type, payload = '', opts = {}, &blk) ⇒ Object

Make a nanite request which expects a response.

Parameters

type<String>

The dispatch route for the request

payload<Object>

Payload to send. This will get marshalled en route

Options

:selector<Symbol>

Method for selecting an actor. Default is :least_loaded.

:least_loaded

Pick the nanite which has the lowest load.

:all

Send the request to all nanites which respond to the service.

:random

Randomly pick a nanite.

:rr: Select a nanite according to round robin ordering.
:target<String>

Select a specific nanite via identity, rather than using

a selector.
:offline_failsafe<Boolean>

Store messages in an offline queue when all

the nanites are offline. Messages will be redelivered when nanites come online.
Default is false unless the mapper was started with the --offline-failsafe flag.
:persistent<Boolean>

Instructs the AMQP broker to save the message to persistent

storage so that it isnt lost when the broker is restarted.
Default is false unless the mapper was started with the --persistent flag.

Block Parameters

:results<Object>

The returned value from the nanite actor.



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/nanite/mapper.rb', line 124

def request(type, payload = '', opts = {}, &blk)
  request = build_deliverable(Request, type, payload, opts)
  request.reply_to = identity
  targets = cluster.targets_for(request)
  if !targets.empty?
    job = job_warden.new_job(request, targets, blk)
    cluster.route(request, job.targets)
    job
  elsif opts.key?(:offline_failsafe) ? opts[:offline_failsafe] : options[:offline_failsafe]
    cluster.publish(request, 'mapper-offline')
    :offline
  else
    false
  end
end