Class: DCell::Node

Inherits:
Object
  • Object
show all
Extended by:
Enumerable, Forwardable
Includes:
Celluloid, Celluloid::FSM
Defined in:
lib/dcell/node.rb

Overview

A node in a DCell cluster

Constant Summary collapse

NODE_DISCOVERY_TIMEOUT =
5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id, addr) ⇒ Node

Returns a new instance of Node.



40
41
42
43
44
45
46
47
# File 'lib/dcell/node.rb', line 40

def initialize(id, addr)
  @id, @addr = id, addr
  @socket = nil
  @heartbeat = nil

  # Total hax to accommodate the new Celluloid::FSM API
  attach self
end

Instance Attribute Details

#addrObject (readonly)

Returns the value of attribute addr.



6
7
8
# File 'lib/dcell/node.rb', line 6

def addr
  @addr
end

#idObject (readonly)

Returns the value of attribute id.



6
7
8
# File 'lib/dcell/node.rb', line 6

def id
  @id
end

Instance Method Details

#actorsObject Also known as: all

List all registered actors on this node



96
97
98
99
100
101
102
103
104
105
106
# File 'lib/dcell/node.rb', line 96

def actors
  request = Message::List.new(Thread.mailbox)
  send_message request

  response = receive do |msg|
    msg.respond_to?(:request_id) && msg.request_id == request.id
  end

  abort response.value if response.is_a? ErrorResponse
  response.value
end

#find(name) ⇒ Object Also known as: []

Find an actor registered with a given name on this node



81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/dcell/node.rb', line 81

def find(name)
  request = Message::Find.new(Thread.mailbox, name)
  send_message request

  response = receive(NODE_DISCOVERY_TIMEOUT) do |msg|
    msg.respond_to?(:request_id) && msg.request_id == request.id
  end

  return nil if response.nil?
  abort response.value if response.is_a? ErrorResponse
  response.value
end

#handle_heartbeatObject

Handle an incoming heartbeat for this node



128
129
130
131
# File 'lib/dcell/node.rb', line 128

def handle_heartbeat
  transition :connected
  transition :partitioned, :delay => self.class.heartbeat_timeout
end

#inspectObject

Friendlier inspection



134
135
136
# File 'lib/dcell/node.rb', line 134

def inspect
  "#<DCell::Node[#{@id}] @addr=#{@addr.inspect}>"
end

#send_heartbeatObject

Send a heartbeat message after the given interval



122
123
124
125
# File 'lib/dcell/node.rb', line 122

def send_heartbeat
  send_message DCell::Message::Heartbeat.new
  @heartbeat = after(self.class.heartbeat_rate) { send_heartbeat }
end

#send_message(message) ⇒ Object Also known as: <<

Send a message to another DCell node



110
111
112
113
114
115
116
117
118
# File 'lib/dcell/node.rb', line 110

def send_message(message)
  begin
    message = Marshal.dump(message)
  rescue => ex
    abort ex
  end

  socket << message
end

#shutdownObject



58
59
60
61
# File 'lib/dcell/node.rb', line 58

def shutdown
  transition :shutdown
  @socket.close if @socket
end

#socketObject

Obtain the node’s 0MQ socket



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/dcell/node.rb', line 64

def socket
  return @socket if @socket

  @socket = Celluloid::ZMQ::PushSocket.new
  begin
    @socket.connect addr
  rescue IOError
    @socket.close
    @socket = nil
    raise
  end

  transition :connected
  @socket
end

#update_client_address(addr) ⇒ Object



49
50
51
52
# File 'lib/dcell/node.rb', line 49

def update_client_address( addr )
  @addr = addr
  send_heartbeat
end

#update_server_address(addr) ⇒ Object



54
55
56
# File 'lib/dcell/node.rb', line 54

def update_server_address(addr)
  @addr = addr
end