Class: Bones::RPC::Node

Inherits:
Object
  • Object
show all
Includes:
Instrumentable
Defined in:
lib/bones/rpc/node.rb,
lib/bones/rpc/node/registry.rb

Overview

Represents a client to a node in a server cluster.

Since:

  • 0.0.1

Direct Known Subclasses

Synchronous::Node

Defined Under Namespace

Classes: Registry

Constant Summary

Constants included from Instrumentable

Instrumentable::TOPIC, Instrumentable::WARN

Instance Attribute Summary collapse

Attributes included from Instrumentable

#instrumenter

Instance Method Summary collapse

Methods included from Instrumentable

#instrument

Constructor Details

#initialize(cluster, address) ⇒ Node

Returns a new instance of Node.

Since:

  • 0.0.1



168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/bones/rpc/node.rb', line 168

def initialize(cluster, address)
  @cluster = cluster
  @address = address
  @connection = cluster.session.backend.connection_class.new(current_actor)
  @down_at = nil
  @refreshed_at = nil
  @latency = nil
  @instrumenter = @cluster.options[:instrumenter] || Instrumentable::Log
  @registry = Node::Registry.new
  @request_id = 0
  @synack_id = 0
  @address.resolve(current_actor)
end

Instance Attribute Details

#addressAddress

Returns The address.

Returns:



27
# File 'lib/bones/rpc/node.rb', line 27

attr_reader :cluster, :address, :down_at, :latency, :refreshed_at

#clusterObject (readonly)

Since:

  • 0.0.1



27
28
29
# File 'lib/bones/rpc/node.rb', line 27

def cluster
  @cluster
end

#down_atTime

Returns The time the node was marked as down.

Returns:

  • (Time)

    The time the node was marked as down.



27
# File 'lib/bones/rpc/node.rb', line 27

attr_reader :cluster, :address, :down_at, :latency, :refreshed_at

#latencyInteger

Returns The latency in milliseconds.

Returns:

  • (Integer)

    The latency in milliseconds.



27
# File 'lib/bones/rpc/node.rb', line 27

attr_reader :cluster, :address, :down_at, :latency, :refreshed_at

#optionsHash

Returns The node options.

Returns:

  • (Hash)

    The node options.

Since:

  • 0.0.1



27
# File 'lib/bones/rpc/node.rb', line 27

attr_reader :cluster, :address, :down_at, :latency, :refreshed_at

#refreshed_atObject

Since:

  • 0.0.1



27
# File 'lib/bones/rpc/node.rb', line 27

attr_reader :cluster, :address, :down_at, :latency, :refreshed_at

Instance Method Details

#==(other) ⇒ true, false Also known as: eql?

Is this node equal to another?

Examples:

Is the node equal to another.

node == other

Parameters:

  • other (Node)

    The other node.

Returns:

  • (true, false)

    If the addresses are equal.

Since:

  • 0.0.1



39
40
41
42
# File 'lib/bones/rpc/node.rb', line 39

def ==(other)
  return false unless other.is_a?(Node)
  address.resolved == other.address.resolved
end

#adapterObject

Since:

  • 0.0.1



45
46
47
# File 'lib/bones/rpc/node.rb', line 45

def adapter
  @adapter ||= Adapter.get(options[:adapter] || :json)
end

#attach(channel, id, future) ⇒ Object

Since:

  • 0.0.1



49
50
51
# File 'lib/bones/rpc/node.rb', line 49

def attach(channel, id, future)
  @registry.set(channel, id, future)
end

#cleanup_socket(socket) ⇒ Object

Since:

  • 0.0.1



53
54
55
# File 'lib/bones/rpc/node.rb', line 53

def cleanup_socket(socket)
  @registry.flush
end

#connecttrue

Connect the node on the underlying connection.

Examples:

Connect the node.

node.connect

Returns:

  • (true)

    If the connection suceeded.

Raises:

Since:

  • 0.0.1



67
68
69
70
71
72
73
# File 'lib/bones/rpc/node.rb', line 67

def connect
  start = Time.now
  connection { |conn| conn.connect }
  @latency = Time.now - start
  @down_at = nil
  true
end

#connected?true, false

Is the node currently connected?

Examples:

Is the node connected?

node.connected?

Returns:

  • (true, false)

    If the node is connected or not.

Since:

  • 0.0.1



83
84
85
# File 'lib/bones/rpc/node.rb', line 83

def connected?
  connection { |conn| conn.alive? }
end

#connectionObject

Since:

  • 0.0.1



87
88
89
90
91
92
93
# File 'lib/bones/rpc/node.rb', line 87

def connection
  if block_given?
    yield @connection
  else
    @connection
  end
end

#detach(channel, id) ⇒ Object

Since:

  • 0.0.1



95
96
97
# File 'lib/bones/rpc/node.rb', line 95

def detach(channel, id)
  @registry.get(channel, id)
end

#disconnecttrue

Force the node to disconnect from the server.

Examples:

Disconnect the node.

node.disconnect

Returns:

  • (true)

    If the disconnection succeeded.

Since:

  • 0.0.1



107
108
109
110
# File 'lib/bones/rpc/node.rb', line 107

def disconnect
  connection { |conn| conn.disconnect }
  true
end

#downnil

Mark the node as down.

Examples:

Mark the node as down.

node.down

Returns:

  • (nil)

    Nothing.

Since:

  • 0.0.1



132
133
134
135
136
# File 'lib/bones/rpc/node.rb', line 132

def down
  @down_at = Time.new
  @latency = nil
  disconnect if connected?
end

#down?Time?

Is the node down?

Examples:

Is the node down?

node.down?

Returns:

  • (Time, nil)

    The time the node went down, or nil if up.

Since:

  • 0.0.1



120
121
122
# File 'lib/bones/rpc/node.rb', line 120

def down?
  !!@down_at
end

#ensure_connected(&block) ⇒ nil

Yields the block if a connection can be established, retrying when a connection error is raised.

Examples:

Ensure we are connection.

node.ensure_connected do
  #...
end

Returns:

  • (nil)

    nil.

Since:

  • 0.0.1



151
152
153
154
155
156
157
158
# File 'lib/bones/rpc/node.rb', line 151

def ensure_connected(&block)
  begin
    connect unless connected?
    yield(current_actor)
  rescue Exception => e
    Failover.get(e).execute(e, current_actor, &block)
  end
end

#handle_message(message) ⇒ Object

Since:

  • 0.0.1



160
161
162
163
164
165
166
# File 'lib/bones/rpc/node.rb', line 160

def handle_message(message)
  logging(message) do
    if future = message.get(current_actor)
      message.signal(future)
    end
  end
end

#inspectString

Get the node as a nice formatted string.

Examples:

Inspect the node.

node.inspect

Returns:

  • (String)

    The string inspection.

Since:

  • 0.0.1



190
191
192
# File 'lib/bones/rpc/node.rb', line 190

def inspect
  "<#{self.class.name} resolved_address=#{address.resolved.inspect}>"
end

#needs_refresh?(time) ⇒ true, false

Does the node need to be refreshed?

Examples:

Does the node require refreshing?

node.needs_refresh?(time)

Parameters:

  • time (Time)

    The next referesh time.

Returns:

  • (true, false)

    Whether the node needs to be refreshed.

Since:

  • 0.0.1



204
205
206
# File 'lib/bones/rpc/node.rb', line 204

def needs_refresh?(time)
  !refreshed_at || refreshed_at < time
end

#notify(method, params) ⇒ Object

Since:

  • 0.0.1



208
209
210
# File 'lib/bones/rpc/node.rb', line 208

def notify(method, params)
  without_future(Protocol::Notify.new(method, params))
end

#refreshnil

Refresh information about the node, such as it’s status in the replica set and it’s known peers.

Examples:

Refresh the node.

node.refresh

Returns:

  • (nil)

    nil.

Raises:

  • (ConnectionFailure)

    If the node cannot be reached.

  • (ReplicaSetReconfigured)

    If the node is no longer a primary node and refresh was called within an #ensure_primary block.

Since:

  • 0.0.1



230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/bones/rpc/node.rb', line 230

def refresh
  if address.resolve(current_actor)
    begin
      @refreshed_at = Time.now
      if synchronize.value(refresh_timeout)
        cluster.handle_refresh(current_actor)
      else
        down
      end
    rescue Timeout::Error
      down
    end
  end
end

#refresh_timeoutInteger

Get the timeout, in seconds, for this node.

Examples:

Get the timeout in seconds.

node.refresh_timeout

Returns:

  • (Integer)

    The configured timeout or the default of 5.

Since:

  • 0.0.1



253
254
255
# File 'lib/bones/rpc/node.rb', line 253

def refresh_timeout
  @refresh_timeout ||= (options[:timeout] || 5)
end

#registry_empty?Boolean

Returns:

  • (Boolean)

Since:

  • 0.0.1



257
258
259
# File 'lib/bones/rpc/node.rb', line 257

def registry_empty?
  @registry.empty?
end

#request(method, params) ⇒ Object

Since:

  • 0.0.1



261
262
263
# File 'lib/bones/rpc/node.rb', line 261

def request(method, params)
  with_future(Protocol::Request.new(next_request_id, method, params))
end

#synchronizeObject

Since:

  • 0.0.1



265
266
267
# File 'lib/bones/rpc/node.rb', line 265

def synchronize
  with_future(Protocol::Synchronize.new(next_synack_id, adapter))
end