Class: Bones::RPC::Node

Inherits:
Object
  • Object
show all
Includes:
Instrumentable, Celluloid
Defined in:
lib/bones/rpc/node.rb,
lib/bones/rpc/node/registry.rb

Overview

Represents a client to a node in a server cluster.

Since:

  • 1.0.0

Defined Under Namespace

Classes: Registry

Constant Summary

Constants included from Instrumentable

Instrumentable::TOPIC, Instrumentable::WARN

Instance Attribute Summary collapse

Attributes included from Instrumentable

#instrumenter

Instance Method Summary collapse

Methods included from Instrumentable

#instrument

Constructor Details

#initialize(cluster, address) ⇒ Node

Returns a new instance of Node.

Since:

  • 1.0.0



162
163
164
165
166
167
168
169
170
171
172
173
174
# File 'lib/bones/rpc/node.rb', line 162

def initialize(cluster, address)
  @cluster = cluster
  @address = address
  @connection = Connection.new(current_actor)
  @down_at = nil
  @refreshed_at = nil
  @latency = nil
  @instrumenter = @cluster.options[:instrumenter] || Instrumentable::Log
  @registry = Node::Registry.new
  @request_id = 0
  @synack_id = 0
  @address.resolve(self)
end

Instance Attribute Details

#addressAddress

Returns The address.

Returns:



29
# File 'lib/bones/rpc/node.rb', line 29

attr_reader :cluster, :address, :down_at, :latency, :refreshed_at

#clusterObject (readonly)

Since:

  • 1.0.0



29
30
31
# File 'lib/bones/rpc/node.rb', line 29

def cluster
  @cluster
end

#down_atTime

Returns The time the node was marked as down.

Returns:

  • (Time)

    The time the node was marked as down.



29
# File 'lib/bones/rpc/node.rb', line 29

attr_reader :cluster, :address, :down_at, :latency, :refreshed_at

#latencyInteger

Returns The latency in milliseconds.

Returns:

  • (Integer)

    The latency in milliseconds.



29
# File 'lib/bones/rpc/node.rb', line 29

attr_reader :cluster, :address, :down_at, :latency, :refreshed_at

#optionsHash

Returns The node options.

Returns:

  • (Hash)

    The node options.

Since:

  • 1.0.0



29
# File 'lib/bones/rpc/node.rb', line 29

attr_reader :cluster, :address, :down_at, :latency, :refreshed_at

#refreshed_atObject

Since:

  • 1.0.0



29
# File 'lib/bones/rpc/node.rb', line 29

attr_reader :cluster, :address, :down_at, :latency, :refreshed_at

Instance Method Details

#==(other) ⇒ true, false Also known as: eql?

Is this node equal to another?

Examples:

Is the node equal to another.

node == other

Parameters:

  • other (Node)

    The other node.

Returns:

  • (true, false)

    If the addresses are equal.

Since:

  • 1.0.0



41
42
43
44
# File 'lib/bones/rpc/node.rb', line 41

def ==(other)
  return false unless other.is_a?(Node)
  id == other.id
end

#adapterObject

Since:

  • 1.0.0



47
48
49
# File 'lib/bones/rpc/node.rb', line 47

def adapter
  @adapter ||= Adapter.get(options[:adapter] || :msgpack)
end

#attach(channel, id, future) ⇒ Object

Since:

  • 1.0.0



51
52
53
# File 'lib/bones/rpc/node.rb', line 51

def attach(channel, id, future)
  @registry.set(channel, id, future)
end

#cleanup_socket(socket) ⇒ Object

Since:

  • 1.0.0



55
56
57
# File 'lib/bones/rpc/node.rb', line 55

def cleanup_socket(socket)
  @registry.flush
end

#connecttrue

Connect the node on the underlying connection.

Examples:

Connect the node.

node.connect

Returns:

  • (true)

    If the connection suceeded.

Raises:

Since:

  • 2.0.0



69
70
71
72
73
74
75
# File 'lib/bones/rpc/node.rb', line 69

def connect
  start = Time.now
  @connection.connect
  @latency = Time.now - start
  @down_at = nil
  true
end

#connected?true, false

Is the node currently connected?

Examples:

Is the node connected?

node.connected?

Returns:

  • (true, false)

    If the node is connected or not.

Since:

  • 2.0.0



85
86
87
# File 'lib/bones/rpc/node.rb', line 85

def connected?
  @connection.alive?
end

#detach(channel, id) ⇒ Object

Since:

  • 1.0.0



89
90
91
# File 'lib/bones/rpc/node.rb', line 89

def detach(channel, id)
  @registry.get(channel, id)
end

#disconnecttrue

Force the node to disconnect from the server.

Examples:

Disconnect the node.

node.disconnect

Returns:

  • (true)

    If the disconnection succeeded.

Since:

  • 1.2.0



101
102
103
104
# File 'lib/bones/rpc/node.rb', line 101

def disconnect
  @connection.disconnect
  true
end

#downnil

Mark the node as down.

Examples:

Mark the node as down.

node.down

Returns:

  • (nil)

    Nothing.

Since:

  • 2.0.0



126
127
128
129
130
# File 'lib/bones/rpc/node.rb', line 126

def down
  @down_at = Time.new
  @latency = nil
  disconnect if connected?
end

#down?Time?

Is the node down?

Examples:

Is the node down?

node.down?

Returns:

  • (Time, nil)

    The time the node went down, or nil if up.

Since:

  • 1.0.0



114
115
116
# File 'lib/bones/rpc/node.rb', line 114

def down?
  !!@down_at
end

#ensure_connected(&block) ⇒ nil

Yields the block if a connection can be established, retrying when a connection error is raised.

Examples:

Ensure we are connection.

node.ensure_connected do
  #...
end

Returns:

  • (nil)

    nil.

Since:

  • 1.0.0



145
146
147
148
149
150
151
152
# File 'lib/bones/rpc/node.rb', line 145

def ensure_connected(&block)
  begin
    connect unless connected?
    yield(current_actor)
  rescue Exception => e
    Failover.get(e).execute(e, current_actor, &block)
  end
end

#handle_message(message) ⇒ Object

Since:

  • 1.0.0



154
155
156
157
158
159
160
# File 'lib/bones/rpc/node.rb', line 154

def handle_message(message)
  logging(message) do
    if future = message.get(current_actor)
      message.signal(future)
    end
  end
end

#inspectString

Get the node as a nice formatted string.

Examples:

Inspect the node.

node.inspect

Returns:

  • (String)

    The string inspection.

Since:

  • 1.0.0



184
185
186
# File 'lib/bones/rpc/node.rb', line 184

def inspect
  "<#{self.class.name} resolved_address=#{address.resolved.inspect}>"
end

#needs_refresh?(time) ⇒ true, false

Does the node need to be refreshed?

Examples:

Does the node require refreshing?

node.needs_refresh?(time)

Parameters:

  • time (Time)

    The next referesh time.

Returns:

  • (true, false)

    Whether the node needs to be refreshed.

Since:

  • 1.0.0



198
199
200
# File 'lib/bones/rpc/node.rb', line 198

def needs_refresh?(time)
  !refreshed_at || refreshed_at < time
end

#notify(method, params) ⇒ Object

Since:

  • 1.0.0



202
203
204
# File 'lib/bones/rpc/node.rb', line 202

def notify(method, params)
  without_future(Protocol::Notify.new(method, params))
end

#refreshnil

Refresh information about the node, such as it’s status in the replica set and it’s known peers.

Examples:

Refresh the node.

node.refresh

Returns:

  • (nil)

    nil.

Raises:

  • (ConnectionFailure)

    If the node cannot be reached.

  • (ReplicaSetReconfigured)

    If the node is no longer a primary node and refresh was called within an #ensure_primary block.

Since:

  • 1.0.0



224
225
226
227
228
229
230
231
232
233
234
235
236
237
# File 'lib/bones/rpc/node.rb', line 224

def refresh
  if address.resolve(self)
    begin
      @refreshed_at = Time.now
      if synchronize.value(timeout)
        cluster.handle_refresh(current_actor)
      else
        down
      end
    rescue Timeout::Error
      down
    end
  end
end

#request(method, params) ⇒ Object

Since:

  • 1.0.0



239
240
241
# File 'lib/bones/rpc/node.rb', line 239

def request(method, params)
  with_future(Protocol::Request.new(next_request_id, method, params))
end

#synchronizeObject

Since:

  • 1.0.0



243
244
245
# File 'lib/bones/rpc/node.rb', line 243

def synchronize
  with_future(Protocol::Synchronize.new(next_synack_id, adapter))
end

#timeoutInteger

Get the timeout, in seconds, for this node.

Examples:

Get the timeout in seconds.

node.timeout

Returns:

  • (Integer)

    The configured timeout or the default of 5.

Since:

  • 1.0.0



255
256
257
# File 'lib/bones/rpc/node.rb', line 255

def timeout
  @timeout ||= (options[:timeout] || 5)
end