Class: Aerospike::Cluster
- Inherits:
-
Object
- Object
- Aerospike::Cluster
- Defined in:
- lib/aerospike/cluster/cluster.rb
Instance Attribute Summary collapse
-
#connection_queue_size ⇒ Object
readonly
Returns the value of attribute connection_queue_size.
-
#connection_timeout ⇒ Object
readonly
Returns the value of attribute connection_timeout.
-
#features ⇒ Object
readonly
Returns the value of attribute features.
-
#password ⇒ Object
readonly
Returns the value of attribute password.
-
#user ⇒ Object
readonly
Returns the value of attribute user.
Instance Method Summary collapse
- #add_cluster_config_change_listener(listener) ⇒ Object
- #add_seeds(hosts) ⇒ Object
- #change_password(user, password) ⇒ Object
-
#close ⇒ Object
Closes all cached connections to the cluster nodes and stops the tend thread.
- #connect ⇒ Object
- #connected? ⇒ Boolean
- #find_alias(aliass) ⇒ Object
- #get_node(partition) ⇒ Object
-
#get_node_by_name(node_name) ⇒ Object
Find a node by name and returns an error if not found.
-
#initialize(policy, *hosts) ⇒ Cluster
constructor
A new instance of Cluster.
-
#nodes ⇒ Object
Returns a list of all nodes in the cluster.
-
#random_node ⇒ Object
Returns a random node on the cluster.
- #remove_cluster_config_change_listener(listener) ⇒ Object
- #request_info(policy, *commands) ⇒ Object
- #seeds ⇒ Object
- #supports_feature?(feature) ⇒ Boolean
- #update_partitions(conn, node) ⇒ Object
Constructor Details
#initialize(policy, *hosts) ⇒ Cluster
Returns a new instance of Cluster.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/aerospike/cluster/cluster.rb', line 32 def initialize(policy, *hosts) @cluster_seeds = hosts @fail_if_not_connected = policy.fail_if_not_connected @connection_queue_size = policy.connection_queue_size @connection_timeout = policy.timeout @tend_interval = policy.tend_interval @aliases = {} @cluster_nodes = [] @partition_write_map = {} @node_index = Atomic.new(0) @features = Atomic.new(Set.new) @closed = Atomic.new(true) @mutex = Mutex.new @cluster_config_change_listeners = Atomic.new([]) @old_node_cound = 0 # setup auth info for cluster if policy.requires_authentication @user = policy.user @password = AdminCommand.hash_password(policy.password) end self end |
Instance Attribute Details
#connection_queue_size ⇒ Object (readonly)
Returns the value of attribute connection_queue_size.
29 30 31 |
# File 'lib/aerospike/cluster/cluster.rb', line 29 def connection_queue_size @connection_queue_size end |
#connection_timeout ⇒ Object (readonly)
Returns the value of attribute connection_timeout.
29 30 31 |
# File 'lib/aerospike/cluster/cluster.rb', line 29 def connection_timeout @connection_timeout end |
#features ⇒ Object (readonly)
Returns the value of attribute features.
30 31 32 |
# File 'lib/aerospike/cluster/cluster.rb', line 30 def features @features end |
#password ⇒ Object (readonly)
Returns the value of attribute password.
29 30 31 |
# File 'lib/aerospike/cluster/cluster.rb', line 29 def password @password end |
#user ⇒ Object (readonly)
Returns the value of attribute user.
29 30 31 |
# File 'lib/aerospike/cluster/cluster.rb', line 29 def user @user end |
Instance Method Details
#add_cluster_config_change_listener(listener) ⇒ Object
199 200 201 202 203 |
# File 'lib/aerospike/cluster/cluster.rb', line 199 def add_cluster_config_change_listener(listener) @cluster_config_change_listeners.update do |listeners| listeners.push(listener) end end |
#add_seeds(hosts) ⇒ Object
72 73 74 75 76 |
# File 'lib/aerospike/cluster/cluster.rb', line 72 def add_seeds(hosts) @mutex.synchronize do @cluster_seeds.concat(hosts) end end |
#change_password(user, password) ⇒ Object
194 195 196 197 |
# File 'lib/aerospike/cluster/cluster.rb', line 194 def change_password(user, password) # change password ONLY if the user is the same @password = password if @user == user end |
#close ⇒ Object
Closes all cached connections to the cluster nodes and stops the tend thread
142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/aerospike/cluster/cluster.rb', line 142 def close unless @closed.value # send close signal to maintenance channel @closed.value = true @tend_thread.kill nodes.each do |node| node.close end end end |
#connect ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/aerospike/cluster/cluster.rb', line 58 def connect wait_till_stablized if @fail_if_not_connected && !connected? raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE) end launch_tend_thread Aerospike.logger.info('New cluster initialized and ready to be used...') self end |
#connected? ⇒ Boolean
84 85 86 87 88 |
# File 'lib/aerospike/cluster/cluster.rb', line 84 def connected? # Must copy array reference for copy on write semantics to work. node_array = nodes (node_array.length > 0) && !@closed.value end |
#find_alias(aliass) ⇒ Object
155 156 157 158 159 |
# File 'lib/aerospike/cluster/cluster.rb', line 155 def find_alias(aliass) @mutex.synchronize do @aliases[aliass] end end |
#get_node(partition) ⇒ Object
90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/aerospike/cluster/cluster.rb', line 90 def get_node(partition) # Must copy hashmap reference for copy on write semantics to work. nmap = partitions if node_array = nmap[partition.namespace] node = node_array.value[partition.partition_id] if node && node.active? return node end end return random_node end |
#get_node_by_name(node_name) ⇒ Object
Find a node by name and returns an error if not found
133 134 135 136 137 138 139 |
# File 'lib/aerospike/cluster/cluster.rb', line 133 def get_node_by_name(node_name) node = find_node_by_name(node_name) raise Aerospike::Exceptions::InvalidNode.new unless node node end |
#nodes ⇒ Object
Returns a list of all nodes in the cluster
125 126 127 128 129 130 |
# File 'lib/aerospike/cluster/cluster.rb', line 125 def nodes @mutex.synchronize do # Must copy array reference for copy on write semantics to work. @cluster_nodes.dup end end |
#random_node ⇒ Object
Returns a random node on the cluster
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
# File 'lib/aerospike/cluster/cluster.rb', line 105 def random_node # Must copy array reference for copy on write semantics to work. node_array = nodes length = node_array.length i = 0 while i < length # Must handle concurrency with other non-tending threads, so node_index is consistent. index = (@node_index.update{|v| v+1} % node_array.length).abs node = node_array[index] if node.active? return node end i = i.succ end raise Aerospike::Exceptions::InvalidNode.new end |
#remove_cluster_config_change_listener(listener) ⇒ Object
205 206 207 208 209 |
# File 'lib/aerospike/cluster/cluster.rb', line 205 def remove_cluster_config_change_listener(listener) @cluster_config_change_listeners.update do |listeners| listeners.delete(listener) end end |
#request_info(policy, *commands) ⇒ Object
182 183 184 185 186 187 188 |
# File 'lib/aerospike/cluster/cluster.rb', line 182 def request_info(policy, *commands) node = random_node conn = node.get_connection(policy.timeout) Info.request(conn, *commands).tap do node.put_connection(conn) end end |
#seeds ⇒ Object
78 79 80 81 82 |
# File 'lib/aerospike/cluster/cluster.rb', line 78 def seeds @mutex.synchronize do @cluster_seeds.dup end end |
#supports_feature?(feature) ⇒ Boolean
190 191 192 |
# File 'lib/aerospike/cluster/cluster.rb', line 190 def supports_feature?(feature) @features.get.include?(feature.to_s) end |
#update_partitions(conn, node) ⇒ Object
161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/aerospike/cluster/cluster.rb', line 161 def update_partitions(conn, node) # TODO: Cluster should not care about version of tokenizer # decouple clstr interface nmap = {} if node.use_new_info? Aerospike.logger.info("Updating partitions using new protocol...") tokens = PartitionTokenizerNew.new(conn) nmap = tokens.update_partition(partitions, node) else Aerospike.logger.info("Updating partitions using old protocol...") tokens = PartitionTokenizerOld.new(conn) nmap = tokens.update_partition(partitions, node) end # update partition write map set_partitions(nmap) if nmap Aerospike.logger.info("Partitions updated...") end |