Class: Bones::RPC::Node
- Inherits:
-
Object
- Object
- Bones::RPC::Node
- Includes:
- Instrumentable, Celluloid
- Defined in:
- lib/bones/rpc/node.rb,
lib/bones/rpc/node/registry.rb
Overview
Represents a client to a node in a server cluster.
Defined Under Namespace
Classes: Registry
Constant Summary
Constants included from Instrumentable
Instrumentable::TOPIC, Instrumentable::WARN
Instance Attribute Summary collapse
-
#address ⇒ Address
The address.
- #cluster ⇒ Object readonly
-
#down_at ⇒ Time
The time the node was marked as down.
-
#latency ⇒ Integer
The latency in milliseconds.
-
#options ⇒ Hash
The node options.
- #refreshed_at ⇒ Object
Attributes included from Instrumentable
Instance Method Summary collapse
-
#==(other) ⇒ true, false
(also: #eql?)
Is this node equal to another?.
- #adapter ⇒ Object
- #attach(channel, id, future) ⇒ Object
- #cleanup_socket(socket) ⇒ Object
-
#connect ⇒ true
Connect the node on the underlying connection.
-
#connected? ⇒ true, false
Is the node currently connected?.
- #detach(channel, id) ⇒ Object
-
#disconnect ⇒ true
Force the node to disconnect from the server.
-
#down ⇒ nil
Mark the node as down.
-
#down? ⇒ Time?
Is the node down?.
-
#ensure_connected(&block) ⇒ nil
Yields the block if a connection can be established, retrying when a connection error is raised.
- #handle_message(message) ⇒ Object
-
#initialize(cluster, address) ⇒ Node
constructor
A new instance of Node.
-
#inspect ⇒ String
Get the node as a nice formatted string.
-
#needs_refresh?(time) ⇒ true, false
Does the node need to be refreshed?.
- #notify(method, params) ⇒ Object
-
#refresh ⇒ nil
Refresh information about the node, such as it’s status in the replica set and it’s known peers.
- #request(method, params) ⇒ Object
- #synchronize ⇒ Object
-
#timeout ⇒ Integer
Get the timeout, in seconds, for this node.
Methods included from Instrumentable
Constructor Details
#initialize(cluster, address) ⇒ Node
Returns a new instance of Node.
162 163 164 165 166 167 168 169 170 171 172 173 174 |
# File 'lib/bones/rpc/node.rb', line 162 def initialize(cluster, address) @cluster = cluster @address = address @connection = Connection.new(current_actor) @down_at = nil @refreshed_at = nil @latency = nil @instrumenter = @cluster.[:instrumenter] || Instrumentable::Log @registry = Node::Registry.new @request_id = 0 @synack_id = 0 @address.resolve(self) end |
Instance Attribute Details
#address ⇒ Address
Returns The address.
29 |
# File 'lib/bones/rpc/node.rb', line 29 attr_reader :cluster, :address, :down_at, :latency, :refreshed_at |
#cluster ⇒ Object (readonly)
29 30 31 |
# File 'lib/bones/rpc/node.rb', line 29 def cluster @cluster end |
#down_at ⇒ Time
Returns The time the node was marked as down.
29 |
# File 'lib/bones/rpc/node.rb', line 29 attr_reader :cluster, :address, :down_at, :latency, :refreshed_at |
#latency ⇒ Integer
Returns The latency in milliseconds.
29 |
# File 'lib/bones/rpc/node.rb', line 29 attr_reader :cluster, :address, :down_at, :latency, :refreshed_at |
#options ⇒ Hash
Returns The node options.
29 |
# File 'lib/bones/rpc/node.rb', line 29 attr_reader :cluster, :address, :down_at, :latency, :refreshed_at |
#refreshed_at ⇒ Object
29 |
# File 'lib/bones/rpc/node.rb', line 29 attr_reader :cluster, :address, :down_at, :latency, :refreshed_at |
Instance Method Details
#==(other) ⇒ true, false Also known as: eql?
Is this node equal to another?
41 42 43 44 |
# File 'lib/bones/rpc/node.rb', line 41 def ==(other) return false unless other.is_a?(Node) id == other.id end |
#adapter ⇒ Object
47 48 49 |
# File 'lib/bones/rpc/node.rb', line 47 def adapter @adapter ||= Adapter.get([:adapter] || :msgpack) end |
#attach(channel, id, future) ⇒ Object
51 52 53 |
# File 'lib/bones/rpc/node.rb', line 51 def attach(channel, id, future) @registry.set(channel, id, future) end |
#cleanup_socket(socket) ⇒ Object
55 56 57 |
# File 'lib/bones/rpc/node.rb', line 55 def cleanup_socket(socket) @registry.flush end |
#connect ⇒ true
Connect the node on the underlying connection.
69 70 71 72 73 74 75 |
# File 'lib/bones/rpc/node.rb', line 69 def connect start = Time.now @connection.connect @latency = Time.now - start @down_at = nil true end |
#connected? ⇒ true, false
Is the node currently connected?
85 86 87 |
# File 'lib/bones/rpc/node.rb', line 85 def connected? @connection.alive? end |
#detach(channel, id) ⇒ Object
89 90 91 |
# File 'lib/bones/rpc/node.rb', line 89 def detach(channel, id) @registry.get(channel, id) end |
#disconnect ⇒ true
Force the node to disconnect from the server.
101 102 103 104 |
# File 'lib/bones/rpc/node.rb', line 101 def disconnect @connection.disconnect true end |
#down ⇒ nil
Mark the node as down.
126 127 128 129 130 |
# File 'lib/bones/rpc/node.rb', line 126 def down @down_at = Time.new @latency = nil disconnect if connected? end |
#down? ⇒ Time?
Is the node down?
114 115 116 |
# File 'lib/bones/rpc/node.rb', line 114 def down? !!@down_at end |
#ensure_connected(&block) ⇒ nil
Yields the block if a connection can be established, retrying when a connection error is raised.
145 146 147 148 149 150 151 152 |
# File 'lib/bones/rpc/node.rb', line 145 def ensure_connected(&block) begin connect unless connected? yield(current_actor) rescue Exception => e Failover.get(e).execute(e, current_actor, &block) end end |
#handle_message(message) ⇒ Object
154 155 156 157 158 159 160 |
# File 'lib/bones/rpc/node.rb', line 154 def () logging() do if future = .get(current_actor) .signal(future) end end end |
#inspect ⇒ String
Get the node as a nice formatted string.
184 185 186 |
# File 'lib/bones/rpc/node.rb', line 184 def inspect "<#{self.class.name} resolved_address=#{address.resolved.inspect}>" end |
#needs_refresh?(time) ⇒ true, false
Does the node need to be refreshed?
198 199 200 |
# File 'lib/bones/rpc/node.rb', line 198 def needs_refresh?(time) !refreshed_at || refreshed_at < time end |
#notify(method, params) ⇒ Object
202 203 204 |
# File 'lib/bones/rpc/node.rb', line 202 def notify(method, params) without_future(Protocol::Notify.new(method, params)) end |
#refresh ⇒ nil
Refresh information about the node, such as it’s status in the replica set and it’s known peers.
224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
# File 'lib/bones/rpc/node.rb', line 224 def refresh if address.resolve(self) begin @refreshed_at = Time.now if synchronize.value(timeout) cluster.handle_refresh(current_actor) else down end rescue Timeout::Error down end end end |
#request(method, params) ⇒ Object
239 240 241 |
# File 'lib/bones/rpc/node.rb', line 239 def request(method, params) with_future(Protocol::Request.new(next_request_id, method, params)) end |
#synchronize ⇒ Object
243 244 245 |
# File 'lib/bones/rpc/node.rb', line 243 def synchronize with_future(Protocol::Synchronize.new(next_synack_id, adapter)) end |
#timeout ⇒ Integer
Get the timeout, in seconds, for this node.
255 256 257 |
# File 'lib/bones/rpc/node.rb', line 255 def timeout @timeout ||= ([:timeout] || 5) end |