Class: DCell::Node

Inherits:
Object
  • Object
show all
Extended by:
Enumerable
Includes:
Celluloid, Celluloid::FSM
Defined in:
lib/dcell/node.rb

Overview

A node in a DCell cluster

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(id, addr) ⇒ Node

Returns a new instance of Node.



69
70
71
72
73
74
75
76
# File 'lib/dcell/node.rb', line 69

def initialize(id, addr)
  @id, @addr = id, addr
  @socket = nil
  @heartbeat = nil

  # Total hax to accommodate the new Celluloid::FSM API
  attach self
end

Class Attribute Details

.heartbeat_rateObject (readonly)

Returns the value of attribute heartbeat_rate.



30
31
32
# File 'lib/dcell/node.rb', line 30

def heartbeat_rate
  @heartbeat_rate
end

.heartbeat_timeoutObject (readonly)

Returns the value of attribute heartbeat_timeout.



30
31
32
# File 'lib/dcell/node.rb', line 30

def heartbeat_timeout
  @heartbeat_timeout
end

Instance Attribute Details

#addrObject (readonly)

Returns the value of attribute addr.



6
7
8
# File 'lib/dcell/node.rb', line 6

def addr
  @addr
end

#idObject (readonly)

Returns the value of attribute id.



6
7
8
# File 'lib/dcell/node.rb', line 6

def id
  @id
end

Class Method Details

.[]Object

Find a node by its node ID



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/dcell/node.rb', line 66

def find(id)
  node = @lock.synchronize { @nodes[id] }
  return node if node

  addr = Directory[id]

  if addr
    if id == DCell.id
      node = DCell.me
    else
      node = Node.new(id, addr)
    end

    @lock.synchronize do
      @nodes[id] ||= node
      @nodes[id]
    end
  end
end

.allObject

Return all available nodes in the cluster



33
34
35
36
37
# File 'lib/dcell/node.rb', line 33

def all
  Directory.all.map do |node_id|
    find node_id
  end
end

.eachObject

Iterate across all available nodes



40
41
42
43
44
# File 'lib/dcell/node.rb', line 40

def each
  Directory.all.each do |node_id|
    yield find node_id
  end
end

.find(id) ⇒ Object

Find a node by its node ID



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/dcell/node.rb', line 47

def find(id)
  node = @lock.synchronize { @nodes[id] }
  return node if node

  addr = Directory[id]

  if addr
    if id == DCell.id
      node = DCell.me
    else
      node = Node.new(id, addr)
    end

    @lock.synchronize do
      @nodes[id] ||= node
      @nodes[id]
    end
  end
end

Instance Method Details

#actorsObject Also known as: all

List all registered actors on this node



115
116
117
118
119
120
121
122
123
124
125
# File 'lib/dcell/node.rb', line 115

def actors
  request = Message::List.new(Thread.mailbox)
  send_message request

  response = receive do |msg|
    msg.respond_to?(:request_id) && msg.request_id == request.id
  end

  abort response.value if response.is_a? ErrorResponse
  response.value
end

#finalizeObject



78
79
80
81
# File 'lib/dcell/node.rb', line 78

def finalize
  transition :shutdown
  @socket.close if socket
end

#find(name) ⇒ Object Also known as: []

Find an actor registered with a given name on this node



101
102
103
104
105
106
107
108
109
110
111
# File 'lib/dcell/node.rb', line 101

def find(name)
  request = Message::Find.new(Thread.mailbox, name)
  send_message request

  response = receive do |msg|
    msg.respond_to?(:request_id) && msg.request_id == request.id
  end

  abort response.value if response.is_a? ErrorResponse
  response.value
end

#handle_heartbeatObject

Handle an incoming heartbeat for this node



147
148
149
150
# File 'lib/dcell/node.rb', line 147

def handle_heartbeat
  transition :connected
  transition :partitioned, :delay => self.class.heartbeat_timeout
end

#inspectObject

Friendlier inspection



153
154
155
# File 'lib/dcell/node.rb', line 153

def inspect
  "#<DCell::Node[#{@id}] @addr=#{@addr.inspect}>"
end

#send_heartbeatObject

Send a heartbeat message after the given interval



141
142
143
144
# File 'lib/dcell/node.rb', line 141

def send_heartbeat
  send_message DCell::Message::Heartbeat.new
  @heartbeat = after(self.class.heartbeat_rate) { send_heartbeat }
end

#send_message(message) ⇒ Object Also known as: <<

Send a message to another DCell node



129
130
131
132
133
134
135
136
137
# File 'lib/dcell/node.rb', line 129

def send_message(message)
  begin
    message = Marshal.dump(message)
  rescue => ex
    abort ex
  end

  socket << message
end

#socketObject

Obtain the node’s 0MQ socket



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# File 'lib/dcell/node.rb', line 84

def socket
  return @socket if @socket

  @socket = Celluloid::ZMQ::PushSocket.new
  begin
    @socket.connect addr
  rescue IOError
    @socket.close
    @socket = nil
    raise
  end

  transition :connected
  @socket
end