Class: Aerospike::Node

Inherits:
Object
  • Object
show all
Defined in:
lib/aerospike/cluster/node.rb

Constant Summary collapse

PARTITIONS =
4096
FULL_HEALTH =
100

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(cluster, nv) ⇒ Node

Initialize server node with connection parameters.



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/aerospike/cluster/node.rb', line 31

def initialize(cluster, nv)
  @cluster = cluster
  @name = nv.name
  @aliases = Atomic.new(nv.aliases)
  @host = nv.host
  @use_new_info = Atomic.new(nv.use_new_info)

  # Assign host to first IP alias because the server identifies nodes
  # by IP address (not hostname).
  @host =                nv.aliases[0]
  @health =              Atomic.new(FULL_HEALTH)
  @partition_generation = Atomic.new(-1)
  @reference_count =      Atomic.new(0)
  @responded =           Atomic.new(false)
  @active =              Atomic.new(true)

  @connections =         Pool.new(@cluster.connection_queue_size)
  @connections.create_block = Proc.new do
    while conn = Connection.new(@host.name, @host.port, @cluster.connection_timeout)

      # need to authenticate
      if @cluster.user && @cluster.user != ''
        begin
          command = AdminCommand.new
          command.authenticate(conn, @cluster.user, @cluster.password)
        rescue => e
          # Socket not authenticated. Do not put back into pool.
          conn.close if conn
          raise e
        end
      end

      break if conn.connected?
    end
    conn
  end

  @connections.cleanup_block = Proc.new { |conn| conn.close if conn }
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



25
26
27
# File 'lib/aerospike/cluster/node.rb', line 25

def name
  @name
end

#reference_countObject (readonly)

Returns the value of attribute reference_count.



25
26
27
# File 'lib/aerospike/cluster/node.rb', line 25

def reference_count
  @reference_count
end

#respondedObject (readonly)

Returns the value of attribute responded.



25
26
27
# File 'lib/aerospike/cluster/node.rb', line 25

def responded
  @responded
end

Instance Method Details

#==(other) ⇒ Object Also known as: eql?



176
177
178
# File 'lib/aerospike/cluster/node.rb', line 176

def ==(other)
  other && other.is_a?(Node) && (@name == other.name)
end

#active?Boolean

Checks if the node is active

Returns:

  • (Boolean)


140
141
142
# File 'lib/aerospike/cluster/node.rb', line 140

def active?
  @active.value
end

#add_alias(alias_to_add) ⇒ Object

Adds an alias for the node



155
156
157
158
159
160
161
162
163
# File 'lib/aerospike/cluster/node.rb', line 155

def add_alias(alias_to_add)
  # Aliases are only referenced in the cluster tend threads,
  # so synchronization is not necessary.
  aliases = get_aliases
  aliases ||= []

  aliases << alias_to_add
  set_aliases(aliases)
end

#closeObject

Marks node as inactice and closes all cached connections



166
167
168
169
# File 'lib/aerospike/cluster/node.rb', line 166

def close
  @active.value = false
  close_connections
end

#decrease_healthObject

Decrease node Health as a result of bad connection or communication



125
126
127
# File 'lib/aerospike/cluster/node.rb', line 125

def decrease_health
  @health.update {|v| v -= 1 }
end

#get_aliasesObject

Returns node aliases



150
151
152
# File 'lib/aerospike/cluster/node.rb', line 150

def get_aliases
  @aliases.value
end

#get_connection(timeout) ⇒ Object

Get a connection to the node. If no cached connection is not available, a new connection will be created



100
101
102
103
104
105
106
107
108
# File 'lib/aerospike/cluster/node.rb', line 100

def get_connection(timeout)
  while true
    conn = @connections.poll
    if conn.connected?
      conn.timeout = timeout.to_f
      return conn
    end
  end
end

#get_hostObject

Retrieves host for the node



135
136
137
# File 'lib/aerospike/cluster/node.rb', line 135

def get_host
  @host
end

#get_nameObject

Returns node name



145
146
147
# File 'lib/aerospike/cluster/node.rb', line 145

def get_name
  @name
end

#hashObject



185
186
187
# File 'lib/aerospike/cluster/node.rb', line 185

def hash
  @name.hash
end

#put_connection(conn) ⇒ Object

Put back a connection to the cache. If cache is full, the connection will be closed and discarded



112
113
114
115
# File 'lib/aerospike/cluster/node.rb', line 112

def put_connection(conn)
  conn.close if !@active.value
  @connections.offer(conn)
end

#refreshObject

Request current status from server node, and update node with the result



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/aerospike/cluster/node.rb', line 72

def refresh
  friends = []
  conn = get_connection(1)

  begin
    info_map = Info.request(conn, "node", "partition-generation", "services")
  rescue => e
    Aerospike.logger.error(e)

    conn.close if conn
    decrease_health

    raise e
  end

  verify_node_name(info_map)
  restore_health

  @responded.update{|v| true}

  friends = add_friends(info_map)
  update_partitions(conn, info_map)
  put_connection(conn)
  friends
end

#restore_healthObject

Mark the node as healthy



118
119
120
121
122
# File 'lib/aerospike/cluster/node.rb', line 118

def restore_health
  # There can be cases where health is full, but active is false.
  # Once a node has been marked inactive, it stays inactive.
  @health.value = FULL_HEALTH
end

#to_sObject

Implements stringer interface



172
173
174
# File 'lib/aerospike/cluster/node.rb', line 172

def to_s
  "#{@name}:#{@host}"
end

#unhealthy?Boolean

Check if the node is unhealthy

Returns:

  • (Boolean)


130
131
132
# File 'lib/aerospike/cluster/node.rb', line 130

def unhealthy?
  @health.value <= 0
end

#use_new_info?Boolean

Returns:

  • (Boolean)


181
182
183
# File 'lib/aerospike/cluster/node.rb', line 181

def use_new_info?
  @use_new_info.value
end