Class: Aerospike::Cluster
- Inherits:
-
Object
- Object
- Aerospike::Cluster
- Defined in:
- lib/aerospike/cluster/cluster.rb
Instance Attribute Summary collapse
-
#connection_queue_size ⇒ Object
readonly
Returns the value of attribute connection_queue_size.
-
#connection_timeout ⇒ Object
readonly
Returns the value of attribute connection_timeout.
-
#password ⇒ Object
readonly
Returns the value of attribute password.
-
#user ⇒ Object
readonly
Returns the value of attribute user.
Instance Method Summary collapse
- #add_seeds(hosts) ⇒ Object
- #change_password(user, password) ⇒ Object
-
#close ⇒ Object
Closes all cached connections to the cluster nodes and stops the tend thread.
- #connected? ⇒ Boolean
- #find_alias(aliass) ⇒ Object
- #get_node(partition) ⇒ Object
-
#get_node_by_name(node_name) ⇒ Object
Find a node by name and returns an error if not found.
-
#initialize(policy, *hosts) ⇒ Cluster
constructor
A new instance of Cluster.
-
#nodes ⇒ Object
Returns a list of all nodes in the cluster.
-
#random_node ⇒ Object
Returns a random node on the cluster.
- #request_info(policy, *commands) ⇒ Object
- #seeds ⇒ Object
- #update_partitions(conn, node) ⇒ Object
Constructor Details
#initialize(policy, *hosts) ⇒ Cluster
Returns a new instance of Cluster.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
# File 'lib/aerospike/cluster/cluster.rb', line 30 def initialize(policy, *hosts) @cluster_seeds = hosts @connection_queue_size = policy.connection_queue_size @connection_timeout = policy.timeout @aliases = {} @cluster_nodes = [] @partition_write_map = {} @node_index = Atomic.new(0) @closed = Atomic.new(false) @mutex = Mutex.new # setup auth info for cluster if policy.requires_authentication @user = policy.user @password = AdminCommand.hash_password(policy.password) end wait_till_stablized if policy.fail_if_not_connected && !connected? raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE) end launch_tend_thread Aerospike.logger.info('New cluster initialized and ready to be used...') self end |
Instance Attribute Details
#connection_queue_size ⇒ Object (readonly)
Returns the value of attribute connection_queue_size.
28 29 30 |
# File 'lib/aerospike/cluster/cluster.rb', line 28 def connection_queue_size @connection_queue_size end |
#connection_timeout ⇒ Object (readonly)
Returns the value of attribute connection_timeout.
28 29 30 |
# File 'lib/aerospike/cluster/cluster.rb', line 28 def connection_timeout @connection_timeout end |
#password ⇒ Object (readonly)
Returns the value of attribute password.
28 29 30 |
# File 'lib/aerospike/cluster/cluster.rb', line 28 def password @password end |
#user ⇒ Object (readonly)
Returns the value of attribute user.
28 29 30 |
# File 'lib/aerospike/cluster/cluster.rb', line 28 def user @user end |
Instance Method Details
#add_seeds(hosts) ⇒ Object
60 61 62 63 64 |
# File 'lib/aerospike/cluster/cluster.rb', line 60 def add_seeds(hosts) @mutex.synchronize do @cluster_seeds.concat(hosts) end end |
#change_password(user, password) ⇒ Object
178 179 180 181 |
# File 'lib/aerospike/cluster/cluster.rb', line 178 def change_password(user, password) # change password ONLY if the user is the same @password = password if @user == user end |
#close ⇒ Object
Closes all cached connections to the cluster nodes and stops the tend thread
130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/aerospike/cluster/cluster.rb', line 130 def close unless @closed.value # send close signal to maintenance channel @closed.value = true @tend_thread.kill nodes.each do |node| node.close end end end |
#connected? ⇒ Boolean
72 73 74 75 76 |
# File 'lib/aerospike/cluster/cluster.rb', line 72 def connected? # Must copy array reference for copy on write semantics to work. node_array = nodes (node_array.length > 0) && !@closed.value end |
#find_alias(aliass) ⇒ Object
143 144 145 146 147 |
# File 'lib/aerospike/cluster/cluster.rb', line 143 def find_alias(aliass) @mutex.synchronize do @aliases[aliass] end end |
#get_node(partition) ⇒ Object
78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/aerospike/cluster/cluster.rb', line 78 def get_node(partition) # Must copy hashmap reference for copy on write semantics to work. nmap = partitions if node_array = nmap[partition.namespace] node = node_array.value[partition.partition_id] if node && node.active? return node end end return random_node end |
#get_node_by_name(node_name) ⇒ Object
Find a node by name and returns an error if not found
121 122 123 124 125 126 127 |
# File 'lib/aerospike/cluster/cluster.rb', line 121 def get_node_by_name(node_name) node = find_node_by_name(node_name) raise Aerospike::Exceptions::InvalidNode.new unless node node end |
#nodes ⇒ Object
Returns a list of all nodes in the cluster
113 114 115 116 117 118 |
# File 'lib/aerospike/cluster/cluster.rb', line 113 def nodes @mutex.synchronize do # Must copy array reference for copy on write semantics to work. @cluster_nodes.dup end end |
#random_node ⇒ Object
Returns a random node on the cluster
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/aerospike/cluster/cluster.rb', line 93 def random_node # Must copy array reference for copy on write semantics to work. node_array = nodes length = node_array.length i = 0 while i < length # Must handle concurrency with other non-tending threads, so node_index is consistent. index = (@node_index.update{|v| v+1} % node_array.length).abs node = node_array[index] if node.active? return node end i = i.succ end raise Aerospike::Exceptions::InvalidNode.new end |
#request_info(policy, *commands) ⇒ Object
170 171 172 173 174 175 176 |
# File 'lib/aerospike/cluster/cluster.rb', line 170 def request_info(policy, *commands) node = random_node conn = node.get_connection(policy.timeout) Info.request(conn, *commands).tap do node.put_connection(conn) end end |
#seeds ⇒ Object
66 67 68 69 70 |
# File 'lib/aerospike/cluster/cluster.rb', line 66 def seeds @mutex.synchronize do @cluster_seeds.dup end end |
#update_partitions(conn, node) ⇒ Object
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/aerospike/cluster/cluster.rb', line 149 def update_partitions(conn, node) # TODO: Cluster should not care about version of tokenizer # decouple clstr interface nmap = {} if node.use_new_info? Aerospike.logger.info("Updating partitions using new protocol...") tokens = PartitionTokenizerNew.new(conn) nmap = tokens.update_partition(partitions, node) else Aerospike.logger.info("Updating partitions using old protocol...") tokens = PartitionTokenizerOld.new(conn) nmap = tokens.update_partition(partitions, node) end # update partition write map set_partitions(nmap) if nmap Aerospike.logger.info("Partitions updated...") end |