Class: Aerospike::Node

Inherits:
Object
  • Object
show all
Defined in:
lib/aerospike/cluster/node.rb

Constant Summary collapse

PARTITIONS =
4096
FULL_HEALTH =
100

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cluster, nv) ⇒ Node

Initialize server node with connection parameters.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/aerospike/cluster/node.rb', line 31

def initialize(cluster, nv)
  @cluster = cluster
  @name = nv.name
  @aliases = Atomic.new(nv.aliases)
  @host = nv.host
  @use_new_info = Atomic.new(nv.use_new_info)
  @features = nv.features
  @cluster_name = nv.cluster_name

  # Assign host to first IP alias because the server identifies nodes
  # by IP address (not hostname).
  @host = nv.aliases[0]
  @health = Atomic.new(FULL_HEALTH)
  @partition_generation = Atomic.new(-1)
  @reference_count = Atomic.new(0)
  @responded = Atomic.new(false)
  @active = Atomic.new(true)

  @connections = Pool.new(@cluster.connection_queue_size)
  @connections.create_block = Proc.new do
    while conn = Connection.new(@host.name, @host.port, @cluster.connection_timeout)

      # need to authenticate
      if @cluster.user && @cluster.user != ''
        begin
          command = AdminCommand.new
          command.authenticate(conn, @cluster.user, @cluster.password)
        rescue => e
          # Socket not authenticated. Do not put back into pool.
          conn.close if conn
          raise e
        end
      end

      break if conn.connected?
    end
    conn
  end

  @connections.cleanup_block = Proc.new { |conn| conn.close if conn }
end

Instance Attribute Details

#cluster_nameObject (readonly)

Returns the value of attribute cluster_name.



25
26
27
# File 'lib/aerospike/cluster/node.rb', line 25

def cluster_name
  @cluster_name
end

#featuresObject (readonly)

Returns the value of attribute features.



25
26
27
# File 'lib/aerospike/cluster/node.rb', line 25

def features
  @features
end

#nameObject (readonly)

Returns the value of attribute name.



25
26
27
# File 'lib/aerospike/cluster/node.rb', line 25

def name
  @name
end

#reference_countObject (readonly)

Returns the value of attribute reference_count.



25
26
27
# File 'lib/aerospike/cluster/node.rb', line 25

def reference_count
  @reference_count
end

#respondedObject (readonly)

Returns the value of attribute responded.



25
26
27
# File 'lib/aerospike/cluster/node.rb', line 25

def responded
  @responded
end

Instance Method Details

#==(other) ⇒ Object Also known as: eql?



178
179
180
# File 'lib/aerospike/cluster/node.rb', line 178

def ==(other)
  other && other.is_a?(Node) && (@name == other.name)
end

#active?Boolean

Checks if the node is active

Returns:

  • (Boolean)


143
144
145
# File 'lib/aerospike/cluster/node.rb', line 143

def active?
  @active.value
end

#add_alias(alias_to_add) ⇒ Object

Adds an alias for the node



158
159
160
161
162
163
164
165
166
# File 'lib/aerospike/cluster/node.rb', line 158

def add_alias(alias_to_add)
  # Aliases are only referenced in the cluster tend threads,
  # so synchronization is not necessary.
  aliases = get_aliases
  aliases ||= []

  aliases << alias_to_add
  set_aliases(aliases)
end

#closeObject

Marks node as inactice and closes all cached connections



169
170
171
172
# File 'lib/aerospike/cluster/node.rb', line 169

def close
  @active.value = false
  close_connections
end

#decrease_healthObject

Decrease node Health as a result of bad connection or communication



128
129
130
# File 'lib/aerospike/cluster/node.rb', line 128

def decrease_health
  @health.update {|v| v -= 1 }
end

#get_aliasesObject

Returns node aliases



153
154
155
# File 'lib/aerospike/cluster/node.rb', line 153

def get_aliases
  @aliases.value
end

#get_connection(timeout) ⇒ Object

Get a connection to the node. If no cached connection is not available, a new connection will be created



103
104
105
106
107
108
109
110
111
# File 'lib/aerospike/cluster/node.rb', line 103

def get_connection(timeout)
  while true
    conn = @connections.poll
    if conn.connected?
      conn.timeout = timeout.to_f
      return conn
    end
  end
end

#get_hostObject

Retrieves host for the node



138
139
140
# File 'lib/aerospike/cluster/node.rb', line 138

def get_host
  @host
end

#get_nameObject

Returns node name



148
149
150
# File 'lib/aerospike/cluster/node.rb', line 148

def get_name
  @name
end

#hashObject



187
188
189
# File 'lib/aerospike/cluster/node.rb', line 187

def hash
  @name.hash
end

#inspectObject



191
192
193
# File 'lib/aerospike/cluster/node.rb', line 191

def inspect
  "#<Aerospike::Node: @name=#{@name}, @host=#{@host}>"
end

#put_connection(conn) ⇒ Object

Put back a connection to the cache. If cache is full, the connection will be closed and discarded



115
116
117
118
# File 'lib/aerospike/cluster/node.rb', line 115

def put_connection(conn)
  conn.close if !@active.value
  @connections.offer(conn)
end

#refreshObject

Request current status from server node, and update node with the result



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/aerospike/cluster/node.rb', line 74

def refresh
  friends = []

  begin
    conn = get_connection(1)
    info_map = Info.request(conn, "node", "partition-generation", "services", "cluster-name")
  rescue => e
    Aerospike.logger.error("Error during refresh for node #{self}: #{e}")
    Aerospike.logger.error(e.backtrace.join("\n"))

    conn.close if conn
    decrease_health

    return friends
  end

  verify_node_name_and_cluster_name(info_map)
  restore_health

  @responded.update{|v| true}

  friends = add_friends(info_map)
  update_partitions(conn, info_map)
  put_connection(conn)
  friends
end

#restore_healthObject

Mark the node as healthy



121
122
123
124
125
# File 'lib/aerospike/cluster/node.rb', line 121

def restore_health
  # There can be cases where health is full, but active is false.
  # Once a node has been marked inactive, it stays inactive.
  @health.value = FULL_HEALTH
end

#supports_feature?(feature) ⇒ Boolean

Returns:

  • (Boolean)


174
175
176
# File 'lib/aerospike/cluster/node.rb', line 174

def supports_feature?(feature)
  @features.include?(feature.to_s)
end

#unhealthy?Boolean

Check if the node is unhealthy

Returns:

  • (Boolean)


133
134
135
# File 'lib/aerospike/cluster/node.rb', line 133

def unhealthy?
  @health.value <= 0
end

#use_new_info?Boolean

Returns:

  • (Boolean)


183
184
185
# File 'lib/aerospike/cluster/node.rb', line 183

def use_new_info?
  @use_new_info.value
end