Class: Aerospike::Cluster
- Inherits:
-
Object
- Object
- Aerospike::Cluster
- Defined in:
- lib/aerospike/cluster/cluster.rb
Instance Attribute Summary collapse
-
#connection_queue_size ⇒ Object
readonly
Returns the value of attribute connection_queue_size.
-
#connection_timeout ⇒ Object
readonly
Returns the value of attribute connection_timeout.
-
#features ⇒ Object
readonly
Returns the value of attribute features.
-
#password ⇒ Object
readonly
Returns the value of attribute password.
-
#user ⇒ Object
readonly
Returns the value of attribute user.
Instance Method Summary collapse
- #add_cluster_config_change_listener(listener) ⇒ Object
- #add_seeds(hosts) ⇒ Object
- #change_password(user, password) ⇒ Object
-
#close ⇒ Object
Closes all cached connections to the cluster nodes and stops the tend thread.
- #connect ⇒ Object
- #connected? ⇒ Boolean
- #find_alias(aliass) ⇒ Object
- #get_node(partition) ⇒ Object
-
#get_node_by_name(node_name) ⇒ Object
Find a node by name and returns an error if not found.
-
#initialize(policy, *hosts) ⇒ Cluster
constructor
A new instance of Cluster.
- #inspect ⇒ Object
-
#nodes ⇒ Object
Returns a list of all nodes in the cluster.
-
#random_node ⇒ Object
Returns a random node on the cluster.
- #remove_cluster_config_change_listener(listener) ⇒ Object
- #request_info(policy, *commands) ⇒ Object
- #seeds ⇒ Object
- #supports_feature?(feature) ⇒ Boolean
- #update_partitions(conn, node) ⇒ Object
Constructor Details
#initialize(policy, *hosts) ⇒ Cluster
Returns a new instance of Cluster.
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/aerospike/cluster/cluster.rb', line 32 def initialize(policy, *hosts) @cluster_seeds = hosts @fail_if_not_connected = policy.fail_if_not_connected @connection_queue_size = policy.connection_queue_size @connection_timeout = policy.timeout @tend_interval = policy.tend_interval @cluster_name = policy.cluster_name @aliases = {} @cluster_nodes = [] @partition_write_map = {} @node_index = Atomic.new(0) @features = Atomic.new(Set.new) @closed = Atomic.new(true) @mutex = Mutex.new @cluster_config_change_listeners = Atomic.new([]) @old_node_count = 0 # setup auth info for cluster if policy.requires_authentication @user = policy.user @password = AdminCommand.hash_password(policy.password) end self end |
Instance Attribute Details
#connection_queue_size ⇒ Object (readonly)
Returns the value of attribute connection_queue_size.
29 30 31 |
# File 'lib/aerospike/cluster/cluster.rb', line 29 def connection_queue_size @connection_queue_size end |
#connection_timeout ⇒ Object (readonly)
Returns the value of attribute connection_timeout.
29 30 31 |
# File 'lib/aerospike/cluster/cluster.rb', line 29 def connection_timeout @connection_timeout end |
#features ⇒ Object (readonly)
Returns the value of attribute features.
30 31 32 |
# File 'lib/aerospike/cluster/cluster.rb', line 30 def features @features end |
#password ⇒ Object (readonly)
Returns the value of attribute password.
29 30 31 |
# File 'lib/aerospike/cluster/cluster.rb', line 29 def password @password end |
#user ⇒ Object (readonly)
Returns the value of attribute user.
29 30 31 |
# File 'lib/aerospike/cluster/cluster.rb', line 29 def user @user end |
Instance Method Details
#add_cluster_config_change_listener(listener) ⇒ Object
200 201 202 203 204 |
# File 'lib/aerospike/cluster/cluster.rb', line 200 def add_cluster_config_change_listener(listener) @cluster_config_change_listeners.update do |listeners| listeners.push(listener) end end |
#add_seeds(hosts) ⇒ Object
73 74 75 76 77 |
# File 'lib/aerospike/cluster/cluster.rb', line 73 def add_seeds(hosts) @mutex.synchronize do @cluster_seeds.concat(hosts) end end |
#change_password(user, password) ⇒ Object
195 196 197 198 |
# File 'lib/aerospike/cluster/cluster.rb', line 195 def change_password(user, password) # change password ONLY if the user is the same @password = password if @user == user end |
#close ⇒ Object
Closes all cached connections to the cluster nodes and stops the tend thread
143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/aerospike/cluster/cluster.rb', line 143 def close unless @closed.value # send close signal to maintenance channel @closed.value = true @tend_thread.kill nodes.each do |node| node.close end end end |
#connect ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/aerospike/cluster/cluster.rb', line 59 def connect wait_till_stablized if @fail_if_not_connected && !connected? raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE) end launch_tend_thread Aerospike.logger.info('New cluster initialized and ready to be used...') self end |
#connected? ⇒ Boolean
85 86 87 88 89 |
# File 'lib/aerospike/cluster/cluster.rb', line 85 def connected? # Must copy array reference for copy on write semantics to work. node_array = nodes (node_array.length > 0) && !@closed.value end |
#find_alias(aliass) ⇒ Object
156 157 158 159 160 |
# File 'lib/aerospike/cluster/cluster.rb', line 156 def find_alias(aliass) @mutex.synchronize do @aliases[aliass] end end |
#get_node(partition) ⇒ Object
91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/aerospike/cluster/cluster.rb', line 91 def get_node(partition) # Must copy hashmap reference for copy on write semantics to work. nmap = partitions if node_array = nmap[partition.namespace] node = node_array.value[partition.partition_id] if node && node.active? return node end end return random_node end |
#get_node_by_name(node_name) ⇒ Object
Find a node by name and returns an error if not found
134 135 136 137 138 139 140 |
# File 'lib/aerospike/cluster/cluster.rb', line 134 def get_node_by_name(node_name) node = find_node_by_name(node_name) raise Aerospike::Exceptions::InvalidNode.new unless node node end |
#inspect ⇒ Object
212 213 214 |
# File 'lib/aerospike/cluster/cluster.rb', line 212 def inspect "#<Aerospike::Cluster @cluster_nodes=#{@cluster_nodes}>" end |
#nodes ⇒ Object
Returns a list of all nodes in the cluster
126 127 128 129 130 131 |
# File 'lib/aerospike/cluster/cluster.rb', line 126 def nodes @mutex.synchronize do # Must copy array reference for copy on write semantics to work. @cluster_nodes.dup end end |
#random_node ⇒ Object
Returns a random node on the cluster
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/aerospike/cluster/cluster.rb', line 106 def random_node # Must copy array reference for copy on write semantics to work. node_array = nodes length = node_array.length i = 0 while i < length # Must handle concurrency with other non-tending threads, so node_index is consistent. index = (@node_index.update{|v| v+1} % node_array.length).abs node = node_array[index] if node.active? return node end i = i.succ end raise Aerospike::Exceptions::InvalidNode.new end |
#remove_cluster_config_change_listener(listener) ⇒ Object
206 207 208 209 210 |
# File 'lib/aerospike/cluster/cluster.rb', line 206 def remove_cluster_config_change_listener(listener) @cluster_config_change_listeners.update do |listeners| listeners.delete(listener) end end |
#request_info(policy, *commands) ⇒ Object
183 184 185 186 187 188 189 |
# File 'lib/aerospike/cluster/cluster.rb', line 183 def request_info(policy, *commands) node = random_node conn = node.get_connection(policy.timeout) Info.request(conn, *commands).tap do node.put_connection(conn) end end |
#seeds ⇒ Object
79 80 81 82 83 |
# File 'lib/aerospike/cluster/cluster.rb', line 79 def seeds @mutex.synchronize do @cluster_seeds.dup end end |
#supports_feature?(feature) ⇒ Boolean
191 192 193 |
# File 'lib/aerospike/cluster/cluster.rb', line 191 def supports_feature?(feature) @features.get.include?(feature.to_s) end |
#update_partitions(conn, node) ⇒ Object
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 |
# File 'lib/aerospike/cluster/cluster.rb', line 162 def update_partitions(conn, node) # TODO: Cluster should not care about version of tokenizer # decouple clstr interface nmap = {} if node.use_new_info? Aerospike.logger.info("Updating partitions using new protocol...") tokens = PartitionTokenizerNew.new(conn) nmap = tokens.update_partition(partitions, node) else Aerospike.logger.info("Updating partitions using old protocol...") tokens = PartitionTokenizerOld.new(conn) nmap = tokens.update_partition(partitions, node) end # update partition write map set_partitions(nmap) if nmap Aerospike.logger.info("Partitions updated...") end |