Class: Aerospike::Cluster

Inherits:
Object
  • Object
show all
Defined in:
lib/aerospike/cluster/cluster.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(policy, *hosts) ⇒ Cluster

Returns a new instance of Cluster.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
# File 'lib/aerospike/cluster/cluster.rb', line 30

def initialize(policy, *hosts)
  @cluster_seeds = hosts
  @connection_queue_size = policy.connection_queue_size
  @connection_timeout = policy.timeout
  @aliases = {}
  @cluster_nodes = []
  @partition_write_map = {}
  @node_index = Atomic.new(0)
  @closed = Atomic.new(false)
  @mutex = Mutex.new

  # setup auth info for cluster
  if policy.requires_authentication
    @user = policy.user
    @password = AdminCommand.hash_password(policy.password)
  end

  wait_till_stablized

  if policy.fail_if_not_connected && !connected?
    raise Aerospike::Exceptions::Aerospike.new(Aerospike::ResultCode::SERVER_NOT_AVAILABLE)
  end

  launch_tend_thread

  Aerospike.logger.info('New cluster initialized and ready to be used...')

  self
end

Instance Attribute Details

#connection_queue_sizeObject (readonly)

Returns the value of attribute connection_queue_size.



28
29
30
# File 'lib/aerospike/cluster/cluster.rb', line 28

def connection_queue_size
  @connection_queue_size
end

#connection_timeoutObject (readonly)

Returns the value of attribute connection_timeout.



28
29
30
# File 'lib/aerospike/cluster/cluster.rb', line 28

def connection_timeout
  @connection_timeout
end

#passwordObject (readonly)

Returns the value of attribute password.



28
29
30
# File 'lib/aerospike/cluster/cluster.rb', line 28

def password
  @password
end

#userObject (readonly)

Returns the value of attribute user.



28
29
30
# File 'lib/aerospike/cluster/cluster.rb', line 28

def user
  @user
end

Instance Method Details

#add_seeds(hosts) ⇒ Object



60
61
62
63
64
# File 'lib/aerospike/cluster/cluster.rb', line 60

def add_seeds(hosts)
  @mutex.synchronize do
    @cluster_seeds.concat(hosts)
  end
end

#change_password(user, password) ⇒ Object



178
179
180
181
# File 'lib/aerospike/cluster/cluster.rb', line 178

def change_password(user, password)
 # change password ONLY if the user is the same
 @password = password if @user == user
end

#closeObject

Closes all cached connections to the cluster nodes and stops the tend thread



130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/aerospike/cluster/cluster.rb', line 130

def close
  unless @closed.value
    # send close signal to maintenance channel
    @closed.value = true
    @tend_thread.kill

    nodes.each do |node|
      node.close
    end
  end

end

#connected?Boolean

Returns:

  • (Boolean)


72
73
74
75
76
# File 'lib/aerospike/cluster/cluster.rb', line 72

def connected?
  # Must copy array reference for copy on write semantics to work.
  node_array = nodes
  (node_array.length > 0) && !@closed.value
end

#find_alias(aliass) ⇒ Object



143
144
145
146
147
# File 'lib/aerospike/cluster/cluster.rb', line 143

def find_alias(aliass)
  @mutex.synchronize do
    @aliases[aliass]
  end
end

#get_node(partition) ⇒ Object



78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/aerospike/cluster/cluster.rb', line 78

def get_node(partition)
  # Must copy hashmap reference for copy on write semantics to work.
  nmap = partitions
  if node_array = nmap[partition.namespace]
    node = node_array.value[partition.partition_id]

    if node && node.active?
      return node
    end
  end

  return random_node
end

#get_node_by_name(node_name) ⇒ Object

Find a node by name and returns an error if not found



121
122
123
124
125
126
127
# File 'lib/aerospike/cluster/cluster.rb', line 121

def get_node_by_name(node_name)
  node = find_node_by_name(node_name)

  raise Aerospike::Exceptions::InvalidNode.new unless node

  node
end

#nodesObject

Returns a list of all nodes in the cluster



113
114
115
116
117
118
# File 'lib/aerospike/cluster/cluster.rb', line 113

def nodes
  @mutex.synchronize do
    # Must copy array reference for copy on write semantics to work.
    @cluster_nodes.dup
  end
end

#random_nodeObject

Returns a random node on the cluster



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/aerospike/cluster/cluster.rb', line 93

def random_node
  # Must copy array reference for copy on write semantics to work.
  node_array = nodes
  length = node_array.length
  i = 0
  while i < length
    # Must handle concurrency with other non-tending threads, so node_index is consistent.
    index = (@node_index.update{|v| v+1} % node_array.length).abs
    node = node_array[index]

    if node.active?
      return node
    end

    i = i.succ
  end
  raise Aerospike::Exceptions::InvalidNode.new
end

#request_info(policy, *commands) ⇒ Object



170
171
172
173
174
175
176
# File 'lib/aerospike/cluster/cluster.rb', line 170

def request_info(policy, *commands)
  node = random_node
  conn = node.get_connection(policy.timeout)
  Info.request(conn, *commands).tap do
    node.put_connection(conn)
  end
end

#seedsObject



66
67
68
69
70
# File 'lib/aerospike/cluster/cluster.rb', line 66

def seeds
  @mutex.synchronize do
    @cluster_seeds.dup
  end
end

#update_partitions(conn, node) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/aerospike/cluster/cluster.rb', line 149

def update_partitions(conn, node)
  # TODO: Cluster should not care about version of tokenizer
  # decouple clstr interface
  nmap = {}
  if node.use_new_info?
    Aerospike.logger.info("Updating partitions using new protocol...")

    tokens = PartitionTokenizerNew.new(conn)
    nmap = tokens.update_partition(partitions, node)
  else
    Aerospike.logger.info("Updating partitions using old protocol...")
    tokens = PartitionTokenizerOld.new(conn)
    nmap = tokens.update_partition(partitions, node)
  end

  # update partition write map
  set_partitions(nmap) if nmap

  Aerospike.logger.info("Partitions updated...")
end