Class: Aerospike::Cluster

Inherits:
Object
  • Object
show all
Defined in:
lib/aerospike/cluster/cluster.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(policy, *hosts) ⇒ Cluster

Returns a new instance of Cluster.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/aerospike/cluster/cluster.rb', line 32

def initialize(policy, *hosts)
  @cluster_seeds = hosts
  @fail_if_not_connected = policy.fail_if_not_connected
  @connection_queue_size = policy.connection_queue_size
  @connection_timeout = policy.timeout
  @tend_interval = policy.tend_interval
  @cluster_name = policy.cluster_name
  @aliases = {}
  @cluster_nodes = []
  @partition_write_map = {}
  @node_index = Atomic.new(0)
  @features = Atomic.new(Set.new)
  @closed = Atomic.new(true)
  @mutex = Mutex.new
  @cluster_config_change_listeners = Atomic.new([])

  @old_node_count = 0

  # setup auth info for cluster
  if policy.requires_authentication
    @user = policy.user
    @password = AdminCommand.hash_password(policy.password)
  end

  self
end

Instance Attribute Details

#connection_queue_sizeObject (readonly)

Returns the value of attribute connection_queue_size.



29
30
31
# File 'lib/aerospike/cluster/cluster.rb', line 29

def connection_queue_size
  @connection_queue_size
end

#connection_timeoutObject (readonly)

Returns the value of attribute connection_timeout.



29
30
31
# File 'lib/aerospike/cluster/cluster.rb', line 29

def connection_timeout
  @connection_timeout
end

#featuresObject (readonly)

Returns the value of attribute features.



30
31
32
# File 'lib/aerospike/cluster/cluster.rb', line 30

def features
  @features
end

#passwordObject (readonly)

Returns the value of attribute password.



29
30
31
# File 'lib/aerospike/cluster/cluster.rb', line 29

def password
  @password
end

#userObject (readonly)

Returns the value of attribute user.



29
30
31
# File 'lib/aerospike/cluster/cluster.rb', line 29

def user
  @user
end

Instance Method Details

#add_cluster_config_change_listener(listener) ⇒ Object



200
201
202
203
204
# File 'lib/aerospike/cluster/cluster.rb', line 200

def add_cluster_config_change_listener(listener)
  @cluster_config_change_listeners.update do |listeners|
    listeners.push(listener)
  end
end

#add_seeds(hosts) ⇒ Object



73
74
75
76
77
# File 'lib/aerospike/cluster/cluster.rb', line 73

def add_seeds(hosts)
  @mutex.synchronize do
    @cluster_seeds.concat(hosts)
  end
end

#change_password(user, password) ⇒ Object



195
196
197
198
# File 'lib/aerospike/cluster/cluster.rb', line 195

def change_password(user, password)
 # change password ONLY if the user is the same
 @password = password if @user == user
end

#closeObject

Closes all cached connections to the cluster nodes and stops the tend thread



143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/aerospike/cluster/cluster.rb', line 143

def close
  unless @closed.value
    # send close signal to maintenance channel
    @closed.value = true
    @tend_thread.kill

    nodes.each do |node|
      node.close
    end
  end

end

#connectObject



59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/aerospike/cluster/cluster.rb', line 59

def connect
  wait_till_stablized

  if @fail_if_not_connected && !connected?
    raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE)
  end

  launch_tend_thread

  Aerospike.logger.info('New cluster initialized and ready to be used...')

  self
end

#connected?Boolean

Returns:

  • (Boolean)


85
86
87
88
89
# File 'lib/aerospike/cluster/cluster.rb', line 85

def connected?
  # Must copy array reference for copy on write semantics to work.
  node_array = nodes
  (node_array.length > 0) && !@closed.value
end

#find_alias(aliass) ⇒ Object



156
157
158
159
160
# File 'lib/aerospike/cluster/cluster.rb', line 156

def find_alias(aliass)
  @mutex.synchronize do
    @aliases[aliass]
  end
end

#get_node(partition) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/aerospike/cluster/cluster.rb', line 91

def get_node(partition)
  # Must copy hashmap reference for copy on write semantics to work.
  nmap = partitions
  if node_array = nmap[partition.namespace]
    node = node_array.value[partition.partition_id]

    if node && node.active?
      return node
    end
  end

  return random_node
end

#get_node_by_name(node_name) ⇒ Object

Find a node by name and returns an error if not found



134
135
136
137
138
139
140
# File 'lib/aerospike/cluster/cluster.rb', line 134

def get_node_by_name(node_name)
  node = find_node_by_name(node_name)

  raise Aerospike::Exceptions::InvalidNode.new unless node

  node
end

#inspectObject



212
213
214
# File 'lib/aerospike/cluster/cluster.rb', line 212

def inspect
  "#<Aerospike::Cluster @cluster_nodes=#{@cluster_nodes}>"
end

#nodesObject

Returns a list of all nodes in the cluster



126
127
128
129
130
131
# File 'lib/aerospike/cluster/cluster.rb', line 126

def nodes
  @mutex.synchronize do
    # Must copy array reference for copy on write semantics to work.
    @cluster_nodes.dup
  end
end

#random_nodeObject

Returns a random node on the cluster



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/aerospike/cluster/cluster.rb', line 106

def random_node
  # Must copy array reference for copy on write semantics to work.
  node_array = nodes
  length = node_array.length
  i = 0
  while i < length
    # Must handle concurrency with other non-tending threads, so node_index is consistent.
    index = (@node_index.update{|v| v+1} % node_array.length).abs
    node = node_array[index]

    if node.active?
      return node
    end

    i = i.succ
  end
  raise Aerospike::Exceptions::InvalidNode.new
end

#remove_cluster_config_change_listener(listener) ⇒ Object



206
207
208
209
210
# File 'lib/aerospike/cluster/cluster.rb', line 206

def remove_cluster_config_change_listener(listener)
  @cluster_config_change_listeners.update do |listeners|
    listeners.delete(listener)
  end
end

#request_info(policy, *commands) ⇒ Object



183
184
185
186
187
188
189
# File 'lib/aerospike/cluster/cluster.rb', line 183

def request_info(policy, *commands)
  node = random_node
  conn = node.get_connection(policy.timeout)
  Info.request(conn, *commands).tap do
    node.put_connection(conn)
  end
end

#seedsObject



79
80
81
82
83
# File 'lib/aerospike/cluster/cluster.rb', line 79

def seeds
  @mutex.synchronize do
    @cluster_seeds.dup
  end
end

#supports_feature?(feature) ⇒ Boolean

Returns:

  • (Boolean)


191
192
193
# File 'lib/aerospike/cluster/cluster.rb', line 191

def supports_feature?(feature)
  @features.get.include?(feature.to_s)
end

#update_partitions(conn, node) ⇒ Object



162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# File 'lib/aerospike/cluster/cluster.rb', line 162

def update_partitions(conn, node)
  # TODO: Cluster should not care about version of tokenizer
  # decouple clstr interface
  nmap = {}
  if node.use_new_info?
    Aerospike.logger.info("Updating partitions using new protocol...")

    tokens = PartitionTokenizerNew.new(conn)
    nmap = tokens.update_partition(partitions, node)
  else
    Aerospike.logger.info("Updating partitions using old protocol...")
    tokens = PartitionTokenizerOld.new(conn)
    nmap = tokens.update_partition(partitions, node)
  end

  # update partition write map
  set_partitions(nmap) if nmap

  Aerospike.logger.info("Partitions updated...")
end