Module: Elasticsearch::Extensions::Test::Cluster

Extended by:
Cluster
Included in:
Cluster
Defined in:
lib/elasticsearch/extensions/test/cluster.rb

Overview

A convenience Ruby class for starting and stopping a separate testing in-memory cluster, to not depend on – and not mess up – <localhost:9200>.

Examples:

Start a cluster with default configuration

require 'elasticsearch/extensions/test/cluster'
Elasticsearch::Extensions::Test::Cluster.start

See Also:

Constant Summary collapse

@@network_host =
ENV.fetch('TEST_CLUSTER_NETWORK_HOST', 'localhost')
@@number_of_nodes =
(ENV['TEST_CLUSTER_NODES'] || 2).to_i
@@default_cluster_name =
"elasticsearch-test-#{Socket.gethostname.downcase}"

Instance Method Summary collapse

Instance Method Details

#__get_cluster_health(port = 9250) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Tries to load cluster health information



318
319
320
321
322
323
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 318

def __get_cluster_health(port=9250)
  uri = URI("http://#{@@network_host}:#{port}/_cluster/health")
  if response = Net::HTTP.get(uri) rescue nil
    return JSON.parse(response)
  end
end

#__print_cluster_info(port) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Print information about the cluster on STDOUT



292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 292

def __print_cluster_info(port)
  health = JSON.parse(Net::HTTP.get(URI("http://#{@@network_host}:#{port}/_cluster/health")))
  nodes  = JSON.parse(Net::HTTP.get(URI("http://#{@@network_host}:#{port}/_nodes/process,http")))
  master = JSON.parse(Net::HTTP.get(URI("http://#{@@network_host}:#{port}/_cluster/state")))['master_node']

  puts "\n",
        ('-'*80).ansi(:faint),
       'Cluster: '.ljust(20).ansi(:faint) + health['cluster_name'].to_s.ansi(:faint),
       'Status:  '.ljust(20).ansi(:faint) + health['status'].to_s.ansi(:faint),
       'Nodes:   '.ljust(20).ansi(:faint) + health['number_of_nodes'].to_s.ansi(:faint)

  nodes['nodes'].each do |id, info|
    m = id == master ? '*' : '+'
    puts ''.ljust(20) +
         "#{m} ".ansi(:faint) +
         "#{info['name'].ansi(:bold)} ".ansi(:faint) +
         "| version: #{info['version'] rescue 'N/A'}, ".ansi(:faint) +
         "pid: #{info['process']['id'] rescue 'N/A'}, ".ansi(:faint) +
         "address: #{info['http']['bound_address'] rescue 'N/A'}".ansi(:faint)
  end
end

#__wait_for_status(status = 'green', port = 9250, timeout = 30) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Blocks the process and waits for the cluster to be in a “green” state.

Prints information about the cluster on STDOUT if the cluster is available.

Parameters:

  • status (String) (defaults to: 'green')

    The status to wait for (yellow, green)

  • port (Integer) (defaults to: 9250)

    The port on which the cluster is reachable

  • timeout (Integer) (defaults to: 30)

    The explicit timeout for the operation

Returns:

  • Boolean



262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 262

def __wait_for_status(status='green', port=9250, timeout=30)
  uri = URI("http://#{@@network_host}:#{port}/_cluster/health?wait_for_status=#{status}")

  Timeout::timeout(timeout) do
    loop do
      response = begin
        JSON.parse(Net::HTTP.get(uri))
      rescue Exception => e
        STDERR.puts e.inspect if ENV['DEBUG']
        nil
      end

      STDERR.puts response.inspect if response && ENV['DEBUG']

      if response && response['status'] == status && ( @@number_of_nodes.nil? || @@number_of_nodes == response['number_of_nodes'].to_i  )
        __print_cluster_info(port) and break
      end

      print '.'.ansi(:faint)
      sleep 1
    end
  end

  return true
end

#running?(arguments = {}) ⇒ Boolean

Returns true when a specific test node is running within the cluster.

Parameters:

  • arguments (Hash) (defaults to: {})

    a customizable set of options

Options Hash (arguments):

  • :on (Integer)

    The port on which the node is running.

  • :as (String)

    The cluster name.

  • :num (Integer)

    Number of nodes in the cluster.

Returns:

  • (Boolean)

    Boolean



225
226
227
228
229
230
231
232
233
234
235
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 225

def running?(arguments={})
  port            = arguments[:on] || (ENV['TEST_CLUSTER_PORT'] || 9250).to_i
  cluster_name    = arguments[:as] || (ENV.fetch('TEST_CLUSTER_NAME', @@default_cluster_name).chomp)
  number_of_nodes = arguments[:num] || (ENV.fetch('TEST_CLUSTER_NODES', @@number_of_nodes)).to_i

  if cluster_health = Timeout::timeout(0.25) { __get_cluster_health(port) } rescue nil
    return cluster_health['cluster_name']    == cluster_name && \
           cluster_health['number_of_nodes'] == number_of_nodes
  end
  return false
end

#start(arguments = {}) ⇒ Object

Starts a cluster

Launches the specified number of nodes in test-suitable configuration by default and prints information about the cluster – unless this specific cluster is running already.

Use the Cluster.stop command with the same arguments to stop this cluster.

You can also use environment variables to set these options.

Examples:

Start a cluster with default configuration (2 nodes, in-memory, etc)

Elasticsearch::Extensions::Test::Cluster.start

Start a cluster with a custom configuration

Elasticsearch::Extensions::Test::Cluster.start \
  cluster_name: 'my-cluster',
  nodes: 3,
  node_name: 'my-node',
  port: 9350

Start a cluster with a different Elasticsearch version

Elasticsearch::Extensions::Test::Cluster.start \
  command: "/usr/local/Cellar/elasticsearch/1.0.0.Beta2/bin/elasticsearch"

Parameters:

  • arguments (Hash) (defaults to: {})

    a customizable set of options

Options Hash (arguments):

  • :cluster_name (String)

    Cluster name (default: ‘elasticsearch_test`)

  • :nodes (Integer)

    Number of desired nodes (default: 2)

  • :command (String)

    Elasticsearch command (default: ‘elasticsearch`)

  • :port (String)

    Starting port number; will be auto-incremented (default: 9250)

  • :node_name (String)

    The node name (will be appended with a number)

  • :path_data (String)

    Path to the directory to store data in

  • :path_work (String)

    Path to the directory with auxiliary files

  • :path_logs (String)

    Path to the directory with log files

  • :multicast_enabled (Boolean)

    Whether multicast is enabled (default: true)

  • :timeout (Integer)

    Timeout when starting the cluster (default: 30)

  • :network_host (String)

    The host that nodes will bind on and publish to

  • :clear_cluster (Boolean)

    Wipe out cluster content on startup (default: true)

Returns:

  • Boolean

See Also:



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 78

def start(arguments={})
  @@number_of_nodes = ( ENV.fetch('TEST_CLUSTER_NODES', arguments[:nodes] || 2) ).to_i

  arguments[:command]           ||= ENV.fetch('TEST_CLUSTER_COMMAND',   'elasticsearch')
  arguments[:port]              ||= (ENV.fetch('TEST_CLUSTER_PORT',     9250).to_i)
  arguments[:cluster_name]      ||= (ENV.fetch('TEST_CLUSTER_NAME',     @@default_cluster_name).chomp)
  arguments[:node_name]         ||= ENV.fetch('TEST_CLUSTER_NODE_NAME', 'node')
  arguments[:path_data]         ||= ENV.fetch('TEST_CLUSTER_DATA',      '/tmp/elasticsearch_test')
  arguments[:path_work]         ||= ENV.fetch('TEST_CLUSTER_TMP',       '/tmp')
  arguments[:path_logs]         ||= ENV.fetch('TEST_CLUSTER_LOGS',      '/tmp/log/elasticsearch')
  arguments[:es_params]         ||= ENV.fetch('TEST_CLUSTER_PARAMS',    '')
  arguments[:multicast_enabled] ||= ENV.fetch('TEST_CLUSTER_MULTICAST', 'true')
  arguments[:timeout]           ||= (ENV.fetch('TEST_CLUSTER_TIMEOUT', 30).to_i)
  arguments[:network_host]      ||= @@network_host

  clear_cluster = !!arguments[:clear_cluster] || (ENV.fetch('TEST_CLUSTER_CLEAR', 'true') != 'false')

  # Make sure `cluster_name` is not dangerous
  if arguments[:cluster_name] =~ /^[\/\\]?$/
    raise ArgumentError, "The `cluster_name` parameter cannot be empty string or a slash"
  end

  if running? :on => arguments[:port], :as => arguments[:cluster_name]
    print "[!] Elasticsearch cluster already running".ansi(:red)
    wait_for_green(arguments[:port], arguments[:timeout])
    return false
  end

  # Wipe out data on disk for this cluster name by default
  FileUtils.rm_rf "#{arguments[:path_data]}/#{arguments[:cluster_name]}" if clear_cluster

  print "Starting ".ansi(:faint) +
        @@number_of_nodes.to_s.ansi(:bold, :faint) +
        " Elasticsearch nodes..".ansi(:faint)

  pids = []

  @@number_of_nodes.times do |n|
    n += 1
    command = <<-COMMAND
      #{arguments[:command]} \
        -D es.foreground=yes \
        -D es.cluster.name=#{arguments[:cluster_name]} \
        -D es.node.name=#{arguments[:node_name]}-#{n} \
        -D es.http.port=#{arguments[:port].to_i + (n-1)} \
        -D es.path.data=#{arguments[:path_data]} \
        -D es.path.work=#{arguments[:path_work]} \
        -D es.path.logs=#{arguments[:path_logs]} \
        -D es.cluster.routing.allocation.disk.threshold_enabled=false \
        -D es.network.host=#{@@network_host} \
        -D es.discovery.zen.ping.multicast.enabled=#{arguments[:multicast_enabled]} \
        -D es.script.inline=on \
        -D es.script.indexed=on \
        -D es.node.test=true \
        -D es.node.testattr=test \
        -D es.node.bench=true \
        -D es.path.repo=/tmp \
        -D es.repositories.url.allowed_urls=http://snapshot.test* \
        -D es.logger.level=DEBUG \
        #{arguments[:es_params]} \
        > /dev/null
    COMMAND
    STDERR.puts command.gsub(/ {1,}/, ' ') if ENV['DEBUG']

    pid = Process.spawn(command)
    Process.detach pid
    pids << pid
  end

  # Check for proceses running
  if `ps -p #{pids.join(' ')}`.split("\n").size < @@number_of_nodes+1
    STDERR.puts "", "[!!!] Process failed to start (see output above)".ansi(:red)
    exit(1)
  end

  wait_for_green(arguments[:port], arguments[:timeout])
  return true
end

#stop(arguments = {}) ⇒ Object

Stop the cluster.

Fetches the PID numbers from “Nodes Info” API and terminates matching nodes.

Examples:

Stop the default cluster

Elasticsearch::Extensions::Test::Cluster.stop

Stop the cluster reachable on specific port

Elasticsearch::Extensions::Test::Cluster.stop port: 9350

Returns:

  • Boolean

See Also:



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 170

def stop(arguments={})
  arguments[:port] ||= (ENV['TEST_CLUSTER_PORT'] || 9250).to_i
  arguments[:network_host] ||= ENV.fetch('TEST_CLUSTER_NETWORK_HOST', @@network_host)

  nodes = begin
    JSON.parse(Net::HTTP.get(URI("http://#{arguments[:network_host]}:#{arguments[:port]}/_nodes/?process")))
  rescue Exception => e
    STDERR.puts "[!] Exception raised when stopping the cluster: #{e.inspect}".ansi(:red)
    nil
  end

  return false if nodes.nil? or nodes.empty?

  pids  = nodes['nodes'].map { |id, info| info['process']['id'] }

  unless pids.empty?
    print "\nStopping Elasticsearch nodes... ".ansi(:faint)
    pids.each_with_index do |pid, i|
      ['INT','KILL'].each do |signal|
        begin
          Process.kill signal, pid
        rescue Exception => e
          print "[#{e.class}] PID #{pid} not found. ".ansi(:red)
        end

        # Give the system some breathing space to finish...
        sleep 1

        # Check that pid really is dead
        begin
          Process.getpgid( pid )
          # `getpgid` will raise error if pid is dead, so if we get here, try next signal.
          next
        rescue Errno::ESRCH
          print "stopped PID #{pid} with #{signal} signal. ".ansi(:green)
          break # pid is dead
        end
      end
    end
    puts
  else
    false
  end

  return pids
end

#wait_for_green(port = 9250, timeout = 60) ⇒ Object

Waits until the cluster is green and prints information

Examples:

Print the information about the default cluster

Elasticsearch::Extensions::Test::Cluster.wait_for_green

Parameters:

  • status (String)

    The status to wait for (yellow, green)

  • port (Integer) (defaults to: 9250)

    The port on which the cluster is reachable

  • timeout (Integer) (defaults to: 60)

    The explicit timeout for the operation

Returns:

  • Boolean



246
247
248
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 246

def wait_for_green(port=9250, timeout=60)
  __wait_for_status('green', port, timeout)
end