Module: Elasticsearch::Extensions::Test::Cluster

Extended by:
Cluster
Included in:
Cluster
Defined in:
lib/elasticsearch/extensions/test/cluster.rb

Overview

A convenience Ruby class for starting and stopping a separate testing in-memory cluster, to not depend on – and not mess up – <localhost:9200>.

Examples:

Start a cluster with default configuration

require 'elasticsearch/extensions/test/cluster'
Elasticsearch::Extensions::Test::Cluster.start

See Also:

Constant Summary collapse

@@number_of_nodes =
(ENV['TEST_CLUSTER_NODES'] || 2).to_i

Instance Method Summary collapse

Instance Method Details

#__get_cluster_health(port = 9250) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Tries to load cluster health information



277
278
279
280
281
282
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 277

def __get_cluster_health(port=9250)
  uri = URI("http://localhost:#{port}/_cluster/health")
  if response = Net::HTTP.get(uri) rescue nil
    return JSON.parse(response)
  end
end

#__print_cluster_info(port) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Print information about the cluster on STDOUT



251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 251

def __print_cluster_info(port)
  health = JSON.parse(Net::HTTP.get(URI("http://localhost:#{port}/_cluster/health")))
  nodes  = JSON.parse(Net::HTTP.get(URI("http://localhost:#{port}/_nodes/process,http")))
  master = JSON.parse(Net::HTTP.get(URI("http://localhost:#{port}/_cluster/state")))['master_node']

  puts "\n",
        ('-'*80).ansi(:faint),
       'Cluster: '.ljust(20).ansi(:faint) + health['cluster_name'].to_s.ansi(:faint),
       'Status:  '.ljust(20).ansi(:faint) + health['status'].to_s.ansi(:faint),
       'Nodes:   '.ljust(20).ansi(:faint) + health['number_of_nodes'].to_s.ansi(:faint)

  nodes['nodes'].each do |id, info|
    m = id == master ? '*' : '+'
    puts ''.ljust(20) +
         "#{m} ".ansi(:faint) +
         "#{info['name'].ansi(:bold)} ".ansi(:faint) +
         "| version: #{info['version'] rescue 'N/A'}, ".ansi(:faint) +
         "pid: #{info['process']['id'] rescue 'N/A'}, ".ansi(:faint) +
         "address: #{info['http']['bound_address'] rescue 'N/A'}".ansi(:faint)
  end
end

#__wait_for_status(status = 'green', port = 9250, timeout = 30) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Blocks the process and waits for the cluster to be in a “green” state.

Prints information about the cluster on STDOUT if the cluster is available.

Parameters:

  • status (String) (defaults to: 'green')

    The status to wait for (yellow, green)

  • port (Integer) (defaults to: 9250)

    The port on which the cluster is reachable

  • timeout (Integer) (defaults to: 30)

    The explicit timeout for the operation

Returns:

  • Boolean



221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 221

def __wait_for_status(status='green', port=9250, timeout=30)
  uri = URI("http://localhost:#{port}/_cluster/health?wait_for_status=#{status}")

  Timeout::timeout(timeout) do
    loop do
      response = begin
        JSON.parse(Net::HTTP.get(uri))
      rescue Exception => e
        puts e.inspect if ENV['DEBUG']
        nil
      end

      puts response.inspect if ENV['DEBUG']

      if response && response['status'] == status && ( @@number_of_nodes.nil? || @@number_of_nodes == response['number_of_nodes'].to_i  )
        __print_cluster_info(port) and break
      end

      print '.'.ansi(:faint)
      sleep 1
    end
  end

  return true
end

#running?(arguments = {}) ⇒ Boolean

Returns true when a specific test node is running within the cluster.

Parameters:

  • arguments (Hash) (defaults to: {})

    a customizable set of options

Options Hash (arguments):

  • :on (Integer)

    The port on which the node is running.

  • :as (String)

    The cluster name.

Returns:

  • (Boolean)

    Boolean



185
186
187
188
189
190
191
192
193
194
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 185

def running?(arguments={})
  port         = arguments[:on] || (ENV['TEST_CLUSTER_PORT'] || 9250).to_i
  cluster_name = arguments[:as] ||  ENV['TEST_CLUSTER_NAME'] || 'elasticsearch_test'

  if cluster_health = Timeout::timeout(0.25) { __get_cluster_health(port) } rescue nil
    return cluster_health['cluster_name']    == cluster_name && \
           cluster_health['number_of_nodes'] == @@number_of_nodes
  end
  return false
end

#start(arguments = {}) ⇒ Object

Starts a cluster

Launches the specified number of nodes in test-suitable configuration by default and prints information about the cluster – unless this specific cluster is running already.

Use the Cluster.stop command with the same arguments to stop this cluster.

You can also use environment variables to set these options.

Examples:

Start a cluster with default configuration (2 nodes, in-memory, etc)

Elasticsearch::Extensions::Test::Cluster.start

Start a cluster with a custom configuration

Elasticsearch::Extensions::Test::Cluster.start \
  cluster_name: 'my-cluster',
  nodes: 3,
  node_name: 'my-node',
  port: 9350

Start a cluster with a different Elasticsearch version

Elasticsearch::Extensions::Test::Cluster.start \
  command: "/usr/local/Cellar/elasticsearch/1.0.0.Beta2/bin/elasticsearch"

Parameters:

  • arguments (Hash) (defaults to: {})

    a customizable set of options

Options Hash (arguments):

  • :command (String)

    Elasticsearch command (default: ‘elasticsearch`).

  • :nodes (Integer)

    Number of desired nodes (default: 2).

  • :cluster_name (String)

    Cluster name (default: ‘elasticsearch_test`).

  • :port (String)

    Starting port number; will be auto-incremented (default: 9250).

  • :timeout (Integer)

    Timeout when starting the cluster (default: 30).

Returns:

  • Boolean

See Also:



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 68

def start(arguments={})
  @@number_of_nodes = (ENV['TEST_CLUSTER_NODES'] || arguments[:nodes] || 2).to_i

  arguments[:command]      ||= ENV['TEST_CLUSTER_COMMAND'] || 'elasticsearch'
  arguments[:port]         ||= (ENV['TEST_CLUSTER_PORT'] || 9250).to_i
  arguments[:cluster_name] ||= (ENV['TEST_CLUSTER_NAME'] || 'elasticsearch_test').chomp
  arguments[:path_data]    ||= ENV['TEST_CLUSTER_DATA'] || '/tmp'
  arguments[:es_params]    ||= ENV['TEST_CLUSTER_PARAMS'] || ''
  arguments[:path_work]    ||= '/tmp'
  arguments[:node_name]    ||= 'node'
  arguments[:timeout]      ||= (ENV['TEST_CLUSTER_TIMEOUT'] || 30).to_i

  # Make sure `cluster_name` is not dangerous
  if arguments[:cluster_name] =~ /^[\/\\]?$/
    raise ArgumentError, "The `cluster_name` parameter cannot be empty string or a slash"
  end

  if running? :on => arguments[:port], :as => arguments[:cluster_name]
    print "[!] Elasticsearch cluster already running".ansi(:red)
    wait_for_green(arguments[:port], arguments[:timeout])
    return false
  end

  # Wipe out data for this cluster name
  FileUtils.rm_rf "#{arguments[:path_data]}/#{arguments[:cluster_name]}"

  print "Starting ".ansi(:faint) +
        @@number_of_nodes.to_s.ansi(:bold, :faint) +
        " Elasticsearch nodes..".ansi(:faint)

  pids = []

  @@number_of_nodes.times do |n|
    n += 1
    pid = Process.spawn <<-COMMAND
      #{arguments[:command]} \
        -D es.foreground=yes \
        -D es.cluster.name=#{arguments[:cluster_name]} \
        -D es.node.name=#{arguments[:node_name]}-#{n} \
        -D es.http.port=#{arguments[:port].to_i + (n-1)} \
        -D es.path.data=#{arguments[:path_data]} \
        -D es.path.work=#{arguments[:path_work]} \
        -D es.cluster.routing.allocation.disk.threshold_enabled=false \
        -D es.network.host=0.0.0.0 \
        -D es.discovery.zen.ping.multicast.enabled=true \
        -D es.script.disable_dynamic=false \
        -D es.node.test=true \
        -D es.node.bench=true \
        -D es.logger.level=DEBUG \
        #{arguments[:es_params]} \
        > /dev/null
    COMMAND
    Process.detach pid
    pids << pid
  end

  # Check for proceses running
  if `ps -p #{pids.join(' ')}`.split("\n").size < @@number_of_nodes+1
    STDERR.puts "", "[!!!] Process failed to start (see output above)".ansi(:red)
    exit(1)
  end

  wait_for_green(arguments[:port], arguments[:timeout])
  return true
end

#stop(arguments = {}) ⇒ Object

Stop the cluster.

Fetches the PID numbers from “Nodes Info” API and terminates matching nodes.

Examples:

Stop the default cluster

Elasticsearch::Extensions::Test::Cluster.stop

Stop the cluster reachable on specific port

Elasticsearch::Extensions::Test::Cluster.stop port: 9350

Returns:

  • Boolean

See Also:



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 147

def stop(arguments={})
  arguments[:port] ||= (ENV['TEST_CLUSTER_PORT'] || 9250).to_i

  nodes = begin
    JSON.parse(Net::HTTP.get(URI("http://localhost:#{arguments[:port]}/_nodes/?process")))
  rescue Exception => e
    STDERR.puts "[!] Exception raised when stopping the cluster: #{e.inspect}".ansi(:red)
    nil
  end

  return false if nodes.nil? or nodes.empty?

  pids  = nodes['nodes'].map { |id, info| info['process']['id'] }

  unless pids.empty?
    print "\nStopping Elasticsearch nodes... ".ansi(:faint)
    pids.each_with_index do |pid, i|
      begin
        print "stopped PID #{pid}. ".ansi(:green) if Process.kill 'INT', pid
      rescue Exception => e
        print "[#{e.class}] PID #{pid} not found. ".ansi(:red)
      end
    end
    puts
  else
    false
  end

  return pids
end

#wait_for_green(port = 9250, timeout = 60) ⇒ Object

Waits until the cluster is green and prints information

Examples:

Print the information about the default cluster

Elasticsearch::Extensions::Test::Cluster.wait_for_green

Parameters:

  • status (String)

    The status to wait for (yellow, green)

  • port (Integer) (defaults to: 9250)

    The port on which the cluster is reachable

  • timeout (Integer) (defaults to: 60)

    The explicit timeout for the operation

Returns:

  • Boolean



205
206
207
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 205

def wait_for_green(port=9250, timeout=60)
  __wait_for_status('green', port, timeout)
end