Class: Bones::RPC::Node
- Inherits:
-
Object
- Object
- Bones::RPC::Node
- Includes:
- Instrumentable
- Defined in:
- lib/bones/rpc/node.rb,
lib/bones/rpc/node/registry.rb
Overview
Represents a client to a node in a server cluster.
Direct Known Subclasses
Defined Under Namespace
Classes: Registry
Constant Summary
Constants included from Instrumentable
Instrumentable::TOPIC, Instrumentable::WARN
Instance Attribute Summary collapse
-
#address ⇒ Address
The address.
- #cluster ⇒ Object readonly
-
#down_at ⇒ Time
The time the node was marked as down.
-
#latency ⇒ Integer
The latency in milliseconds.
-
#options ⇒ Hash
The node options.
- #refreshed_at ⇒ Object
Attributes included from Instrumentable
Instance Method Summary collapse
-
#==(other) ⇒ true, false
(also: #eql?)
Is this node equal to another?.
- #adapter ⇒ Object
- #attach(channel, id, future) ⇒ Object
- #cleanup_socket(socket) ⇒ Object
-
#connect ⇒ true
Connect the node on the underlying connection.
-
#connected? ⇒ true, false
Is the node currently connected?.
- #connection ⇒ Object
- #detach(channel, id) ⇒ Object
-
#disconnect ⇒ true
Force the node to disconnect from the server.
-
#down ⇒ nil
Mark the node as down.
-
#down? ⇒ Time?
Is the node down?.
-
#ensure_connected(&block) ⇒ nil
Yields the block if a connection can be established, retrying when a connection error is raised.
- #handle_message(message) ⇒ Object
-
#initialize(cluster, address) ⇒ Node
constructor
A new instance of Node.
-
#inspect ⇒ String
Get the node as a nice formatted string.
-
#needs_refresh?(time) ⇒ true, false
Does the node need to be refreshed?.
- #notify(method, params) ⇒ Object
-
#refresh ⇒ nil
Refresh information about the node, such as it’s status in the replica set and it’s known peers.
-
#refresh_timeout ⇒ Integer
Get the timeout, in seconds, for this node.
- #registry_empty? ⇒ Boolean
- #request(method, params) ⇒ Object
- #synchronize ⇒ Object
Methods included from Instrumentable
Constructor Details
#initialize(cluster, address) ⇒ Node
Returns a new instance of Node.
168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/bones/rpc/node.rb', line 168 def initialize(cluster, address) @cluster = cluster @address = address @connection = cluster.session.backend.connection_class.new(current_actor) @down_at = nil @refreshed_at = nil @latency = nil @instrumenter = @cluster.[:instrumenter] || Instrumentable::Log @registry = Node::Registry.new @request_id = 0 @synack_id = 0 @address.resolve(current_actor) end |
Instance Attribute Details
#address ⇒ Address
Returns The address.
27 |
# File 'lib/bones/rpc/node.rb', line 27 attr_reader :cluster, :address, :down_at, :latency, :refreshed_at |
#cluster ⇒ Object (readonly)
27 28 29 |
# File 'lib/bones/rpc/node.rb', line 27 def cluster @cluster end |
#down_at ⇒ Time
Returns The time the node was marked as down.
27 |
# File 'lib/bones/rpc/node.rb', line 27 attr_reader :cluster, :address, :down_at, :latency, :refreshed_at |
#latency ⇒ Integer
Returns The latency in milliseconds.
27 |
# File 'lib/bones/rpc/node.rb', line 27 attr_reader :cluster, :address, :down_at, :latency, :refreshed_at |
#options ⇒ Hash
Returns The node options.
27 |
# File 'lib/bones/rpc/node.rb', line 27 attr_reader :cluster, :address, :down_at, :latency, :refreshed_at |
#refreshed_at ⇒ Object
27 |
# File 'lib/bones/rpc/node.rb', line 27 attr_reader :cluster, :address, :down_at, :latency, :refreshed_at |
Instance Method Details
#==(other) ⇒ true, false Also known as: eql?
Is this node equal to another?
39 40 41 42 |
# File 'lib/bones/rpc/node.rb', line 39 def ==(other) return false unless other.is_a?(Node) address.resolved == other.address.resolved end |
#adapter ⇒ Object
45 46 47 |
# File 'lib/bones/rpc/node.rb', line 45 def adapter @adapter ||= Adapter.get([:adapter] || :json) end |
#attach(channel, id, future) ⇒ Object
49 50 51 |
# File 'lib/bones/rpc/node.rb', line 49 def attach(channel, id, future) @registry.set(channel, id, future) end |
#cleanup_socket(socket) ⇒ Object
53 54 55 |
# File 'lib/bones/rpc/node.rb', line 53 def cleanup_socket(socket) @registry.flush end |
#connect ⇒ true
Connect the node on the underlying connection.
67 68 69 70 71 72 73 |
# File 'lib/bones/rpc/node.rb', line 67 def connect start = Time.now connection { |conn| conn.connect } @latency = Time.now - start @down_at = nil true end |
#connected? ⇒ true, false
Is the node currently connected?
83 84 85 |
# File 'lib/bones/rpc/node.rb', line 83 def connected? connection { |conn| conn.alive? } end |
#connection ⇒ Object
87 88 89 90 91 92 93 |
# File 'lib/bones/rpc/node.rb', line 87 def connection if block_given? yield @connection else @connection end end |
#detach(channel, id) ⇒ Object
95 96 97 |
# File 'lib/bones/rpc/node.rb', line 95 def detach(channel, id) @registry.get(channel, id) end |
#disconnect ⇒ true
Force the node to disconnect from the server.
107 108 109 110 |
# File 'lib/bones/rpc/node.rb', line 107 def disconnect connection { |conn| conn.disconnect } true end |
#down ⇒ nil
Mark the node as down.
132 133 134 135 136 |
# File 'lib/bones/rpc/node.rb', line 132 def down @down_at = Time.new @latency = nil disconnect if connected? end |
#down? ⇒ Time?
Is the node down?
120 121 122 |
# File 'lib/bones/rpc/node.rb', line 120 def down? !!@down_at end |
#ensure_connected(&block) ⇒ nil
Yields the block if a connection can be established, retrying when a connection error is raised.
151 152 153 154 155 156 157 158 |
# File 'lib/bones/rpc/node.rb', line 151 def ensure_connected(&block) begin connect unless connected? yield(current_actor) rescue Exception => e Failover.get(e).execute(e, current_actor, &block) end end |
#handle_message(message) ⇒ Object
160 161 162 163 164 165 166 |
# File 'lib/bones/rpc/node.rb', line 160 def () logging() do if future = .get(current_actor) .signal(future) end end end |
#inspect ⇒ String
Get the node as a nice formatted string.
190 191 192 |
# File 'lib/bones/rpc/node.rb', line 190 def inspect "<#{self.class.name} resolved_address=#{address.resolved.inspect}>" end |
#needs_refresh?(time) ⇒ true, false
Does the node need to be refreshed?
204 205 206 |
# File 'lib/bones/rpc/node.rb', line 204 def needs_refresh?(time) !refreshed_at || refreshed_at < time end |
#notify(method, params) ⇒ Object
208 209 210 |
# File 'lib/bones/rpc/node.rb', line 208 def notify(method, params) without_future(Protocol::Notify.new(method, params)) end |
#refresh ⇒ nil
Refresh information about the node, such as it’s status in the replica set and it’s known peers.
230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
# File 'lib/bones/rpc/node.rb', line 230 def refresh if address.resolve(current_actor) begin @refreshed_at = Time.now if synchronize.value(refresh_timeout) cluster.handle_refresh(current_actor) else down end rescue Timeout::Error down end end end |
#refresh_timeout ⇒ Integer
Get the timeout, in seconds, for this node.
253 254 255 |
# File 'lib/bones/rpc/node.rb', line 253 def refresh_timeout @refresh_timeout ||= ([:timeout] || 5) end |
#registry_empty? ⇒ Boolean
257 258 259 |
# File 'lib/bones/rpc/node.rb', line 257 def registry_empty? @registry.empty? end |
#request(method, params) ⇒ Object
261 262 263 |
# File 'lib/bones/rpc/node.rb', line 261 def request(method, params) with_future(Protocol::Request.new(next_request_id, method, params)) end |
#synchronize ⇒ Object
265 266 267 |
# File 'lib/bones/rpc/node.rb', line 265 def synchronize with_future(Protocol::Synchronize.new(next_synack_id, adapter)) end |