Class: Aerospike::Cluster

Inherits:
Object
  • Object
show all
Defined in:
lib/aerospike/cluster/cluster.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(policy, *hosts) ⇒ Cluster

Returns a new instance of Cluster.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/aerospike/cluster/cluster.rb', line 32

def initialize(policy, *hosts)
  @cluster_seeds = hosts
  @fail_if_not_connected = policy.fail_if_not_connected
  @connection_queue_size = policy.connection_queue_size
  @connection_timeout = policy.timeout
  @tend_interval = policy.tend_interval
  @aliases = {}
  @cluster_nodes = []
  @partition_write_map = {}
  @node_index = Atomic.new(0)
  @features = Atomic.new(Set.new)
  @closed = Atomic.new(true)
  @mutex = Mutex.new
  @cluster_config_change_listeners = Atomic.new([])

  @old_node_cound = 0

  # setup auth info for cluster
  if policy.requires_authentication
    @user = policy.user
    @password = AdminCommand.hash_password(policy.password)
  end

  self
end

Instance Attribute Details

#connection_queue_sizeObject (readonly)

Returns the value of attribute connection_queue_size.



29
30
31
# File 'lib/aerospike/cluster/cluster.rb', line 29

def connection_queue_size
  @connection_queue_size
end

#connection_timeoutObject (readonly)

Returns the value of attribute connection_timeout.



29
30
31
# File 'lib/aerospike/cluster/cluster.rb', line 29

def connection_timeout
  @connection_timeout
end

#featuresObject (readonly)

Returns the value of attribute features.



30
31
32
# File 'lib/aerospike/cluster/cluster.rb', line 30

def features
  @features
end

#passwordObject (readonly)

Returns the value of attribute password.



29
30
31
# File 'lib/aerospike/cluster/cluster.rb', line 29

def password
  @password
end

#userObject (readonly)

Returns the value of attribute user.



29
30
31
# File 'lib/aerospike/cluster/cluster.rb', line 29

def user
  @user
end

Instance Method Details

#add_cluster_config_change_listener(listener) ⇒ Object



199
200
201
202
203
# File 'lib/aerospike/cluster/cluster.rb', line 199

def add_cluster_config_change_listener(listener)
  @cluster_config_change_listeners.update do |listeners|
    listeners.push(listener)
  end
end

#add_seeds(hosts) ⇒ Object



72
73
74
75
76
# File 'lib/aerospike/cluster/cluster.rb', line 72

def add_seeds(hosts)
  @mutex.synchronize do
    @cluster_seeds.concat(hosts)
  end
end

#change_password(user, password) ⇒ Object



194
195
196
197
# File 'lib/aerospike/cluster/cluster.rb', line 194

def change_password(user, password)
 # change password ONLY if the user is the same
 @password = password if @user == user
end

#closeObject

Closes all cached connections to the cluster nodes and stops the tend thread



142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/aerospike/cluster/cluster.rb', line 142

def close
  unless @closed.value
    # send close signal to maintenance channel
    @closed.value = true
    @tend_thread.kill

    nodes.each do |node|
      node.close
    end
  end

end

#connectObject



58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/aerospike/cluster/cluster.rb', line 58

def connect
  wait_till_stablized

  if @fail_if_not_connected && !connected?
    raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE)
  end

  launch_tend_thread

  Aerospike.logger.info('New cluster initialized and ready to be used...')

  self
end

#connected?Boolean

Returns:

  • (Boolean)


84
85
86
87
88
# File 'lib/aerospike/cluster/cluster.rb', line 84

def connected?
  # Must copy array reference for copy on write semantics to work.
  node_array = nodes
  (node_array.length > 0) && !@closed.value
end

#find_alias(aliass) ⇒ Object



155
156
157
158
159
# File 'lib/aerospike/cluster/cluster.rb', line 155

def find_alias(aliass)
  @mutex.synchronize do
    @aliases[aliass]
  end
end

#get_node(partition) ⇒ Object



90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/aerospike/cluster/cluster.rb', line 90

def get_node(partition)
  # Must copy hashmap reference for copy on write semantics to work.
  nmap = partitions
  if node_array = nmap[partition.namespace]
    node = node_array.value[partition.partition_id]

    if node && node.active?
      return node
    end
  end

  return random_node
end

#get_node_by_name(node_name) ⇒ Object

Find a node by name and returns an error if not found



133
134
135
136
137
138
139
# File 'lib/aerospike/cluster/cluster.rb', line 133

def get_node_by_name(node_name)
  node = find_node_by_name(node_name)

  raise Aerospike::Exceptions::InvalidNode.new unless node

  node
end

#nodesObject

Returns a list of all nodes in the cluster



125
126
127
128
129
130
# File 'lib/aerospike/cluster/cluster.rb', line 125

def nodes
  @mutex.synchronize do
    # Must copy array reference for copy on write semantics to work.
    @cluster_nodes.dup
  end
end

#random_nodeObject

Returns a random node on the cluster



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/aerospike/cluster/cluster.rb', line 105

def random_node
  # Must copy array reference for copy on write semantics to work.
  node_array = nodes
  length = node_array.length
  i = 0
  while i < length
    # Must handle concurrency with other non-tending threads, so node_index is consistent.
    index = (@node_index.update{|v| v+1} % node_array.length).abs
    node = node_array[index]

    if node.active?
      return node
    end

    i = i.succ
  end
  raise Aerospike::Exceptions::InvalidNode.new
end

#remove_cluster_config_change_listener(listener) ⇒ Object



205
206
207
208
209
# File 'lib/aerospike/cluster/cluster.rb', line 205

def remove_cluster_config_change_listener(listener)
  @cluster_config_change_listeners.update do |listeners|
    listeners.delete(listener)
  end
end

#request_info(policy, *commands) ⇒ Object



182
183
184
185
186
187
188
# File 'lib/aerospike/cluster/cluster.rb', line 182

def request_info(policy, *commands)
  node = random_node
  conn = node.get_connection(policy.timeout)
  Info.request(conn, *commands).tap do
    node.put_connection(conn)
  end
end

#seedsObject



78
79
80
81
82
# File 'lib/aerospike/cluster/cluster.rb', line 78

def seeds
  @mutex.synchronize do
    @cluster_seeds.dup
  end
end

#supports_feature?(feature) ⇒ Boolean

Returns:

  • (Boolean)


190
191
192
# File 'lib/aerospike/cluster/cluster.rb', line 190

def supports_feature?(feature)
  @features.get.include?(feature.to_s)
end

#update_partitions(conn, node) ⇒ Object



161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/aerospike/cluster/cluster.rb', line 161

def update_partitions(conn, node)
  # TODO: Cluster should not care about version of tokenizer
  # decouple clstr interface
  nmap = {}
  if node.use_new_info?
    Aerospike.logger.info("Updating partitions using new protocol...")

    tokens = PartitionTokenizerNew.new(conn)
    nmap = tokens.update_partition(partitions, node)
  else
    Aerospike.logger.info("Updating partitions using old protocol...")
    tokens = PartitionTokenizerOld.new(conn)
    nmap = tokens.update_partition(partitions, node)
  end

  # update partition write map
  set_partitions(nmap) if nmap

  Aerospike.logger.info("Partitions updated...")
end