Module: Elasticsearch::Extensions::Test::Cluster

Extended by:
Cluster
Included in:
Cluster
Defined in:
lib/elasticsearch/extensions/test/cluster.rb

Overview

A convenience Ruby class for starting and stopping a separate testing in-memory cluster, to not depend on – and not mess up – <localhost:9200>.

Examples:

Start a cluster with default configuration

require 'elasticsearch/extensions/test/cluster'
Elasticsearch::Extensions::Test::Cluster.start

See Also:

Constant Summary collapse

@@network_host =
ENV.fetch('TEST_CLUSTER_NETWORK_HOST', 'localhost')
@@number_of_nodes =
(ENV['TEST_CLUSTER_NODES'] || 2).to_i
@@default_cluster_name =
"elasticsearch-test-#{Socket.gethostname.downcase}"

Instance Method Summary collapse

Instance Method Details

#__get_cluster_health(port = 9250) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Tries to load cluster health information



315
316
317
318
319
320
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 315

def __get_cluster_health(port=9250)
  uri = URI("http://#{@@network_host}:#{port}/_cluster/health")
  if response = Net::HTTP.get(uri) rescue nil
    return JSON.parse(response)
  end
end

#__print_cluster_info(port) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Print information about the cluster on STDOUT



289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 289

def __print_cluster_info(port)
  health = JSON.parse(Net::HTTP.get(URI("http://#{@@network_host}:#{port}/_cluster/health")))
  nodes  = JSON.parse(Net::HTTP.get(URI("http://#{@@network_host}:#{port}/_nodes/process,http")))
  master = JSON.parse(Net::HTTP.get(URI("http://#{@@network_host}:#{port}/_cluster/state")))['master_node']

  puts "\n",
        ('-'*80).ansi(:faint),
       'Cluster: '.ljust(20).ansi(:faint) + health['cluster_name'].to_s.ansi(:faint),
       'Status:  '.ljust(20).ansi(:faint) + health['status'].to_s.ansi(:faint),
       'Nodes:   '.ljust(20).ansi(:faint) + health['number_of_nodes'].to_s.ansi(:faint)

  nodes['nodes'].each do |id, info|
    m = id == master ? '*' : '+'
    puts ''.ljust(20) +
         "#{m} ".ansi(:faint) +
         "#{info['name'].ansi(:bold)} ".ansi(:faint) +
         "| version: #{info['version'] rescue 'N/A'}, ".ansi(:faint) +
         "pid: #{info['process']['id'] rescue 'N/A'}, ".ansi(:faint) +
         "address: #{info['http']['bound_address'] rescue 'N/A'}".ansi(:faint)
  end
end

#__wait_for_status(status = 'green', port = 9250, timeout = 30) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Blocks the process and waits for the cluster to be in a “green” state.

Prints information about the cluster on STDOUT if the cluster is available.

Parameters:

  • status (String) (defaults to: 'green')

    The status to wait for (yellow, green)

  • port (Integer) (defaults to: 9250)

    The port on which the cluster is reachable

  • timeout (Integer) (defaults to: 30)

    The explicit timeout for the operation

Returns:

  • Boolean



259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 259

def __wait_for_status(status='green', port=9250, timeout=30)
  uri = URI("http://#{@@network_host}:#{port}/_cluster/health?wait_for_status=#{status}")

  Timeout::timeout(timeout) do
    loop do
      response = begin
        JSON.parse(Net::HTTP.get(uri))
      rescue Exception => e
        STDERR.puts e.inspect if ENV['DEBUG']
        nil
      end

      STDERR.puts response.inspect if response && ENV['DEBUG']

      if response && response['status'] == status && ( @@number_of_nodes.nil? || @@number_of_nodes == response['number_of_nodes'].to_i  )
        __print_cluster_info(port) and break
      end

      print '.'.ansi(:faint)
      sleep 1
    end
  end

  return true
end

#running?(arguments = {}) ⇒ Boolean

Returns true when a specific test node is running within the cluster.

Parameters:

  • arguments (Hash) (defaults to: {})

    a customizable set of options

Options Hash (arguments):

  • :on (Integer)

    The port on which the node is running.

  • :as (String)

    The cluster name.

Returns:

  • (Boolean)

    Boolean



223
224
225
226
227
228
229
230
231
232
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 223

def running?(arguments={})
  port         = arguments[:on] || (ENV['TEST_CLUSTER_PORT'] || 9250).to_i
  cluster_name = arguments[:as] || (ENV.fetch('TEST_CLUSTER_NAME', @@default_cluster_name).chomp)

  if cluster_health = Timeout::timeout(0.25) { __get_cluster_health(port) } rescue nil
    return cluster_health['cluster_name']    == cluster_name && \
           cluster_health['number_of_nodes'] == @@number_of_nodes
  end
  return false
end

#start(arguments = {}) ⇒ Object

Starts a cluster

Launches the specified number of nodes in test-suitable configuration by default and prints information about the cluster – unless this specific cluster is running already.

Use the Cluster.stop command with the same arguments to stop this cluster.

You can also use environment variables to set these options.

Examples:

Start a cluster with default configuration (2 nodes, in-memory, etc)

Elasticsearch::Extensions::Test::Cluster.start

Start a cluster with a custom configuration

Elasticsearch::Extensions::Test::Cluster.start \
  cluster_name: 'my-cluster',
  nodes: 3,
  node_name: 'my-node',
  port: 9350

Start a cluster with a different Elasticsearch version

Elasticsearch::Extensions::Test::Cluster.start \
  command: "/usr/local/Cellar/elasticsearch/1.0.0.Beta2/bin/elasticsearch"

Parameters:

  • arguments (Hash) (defaults to: {})

    a customizable set of options

Options Hash (arguments):

  • :cluster_name (String)

    Cluster name (default: ‘elasticsearch_test`)

  • :nodes (Integer)

    Number of desired nodes (default: 2)

  • :command (String)

    Elasticsearch command (default: ‘elasticsearch`)

  • :port (String)

    Starting port number; will be auto-incremented (default: 9250)

  • :node_name (String)

    The node name (will be appended with a number)

  • :path_data (String)

    Path to the directory to store data in

  • :path_work (String)

    Path to the directory with auxiliary files

  • :path_logs (String)

    Path to the directory with log files

  • :multicast_enabled (Boolean)

    Whether multicast is enabled (default: true)

  • :timeout (Integer)

    Timeout when starting the cluster (default: 30)

  • :network_host (String)

    The host that nodes will bind on and publish to

  • :clear (Boolean)

    Wipe out cluster content on startup (default: true)

Returns:

  • Boolean

See Also:



78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 78

def start(arguments={})
  @@number_of_nodes = ( ENV.fetch('TEST_CLUSTER_NODES', arguments[:nodes] || 2) ).to_i

  arguments[:command]           ||= ENV.fetch('TEST_CLUSTER_COMMAND',   'elasticsearch')
  arguments[:port]              ||= (ENV.fetch('TEST_CLUSTER_PORT',     9250).to_i)
  arguments[:cluster_name]      ||= (ENV.fetch('TEST_CLUSTER_NAME',     @@default_cluster_name).chomp)
  arguments[:node_name]         ||= ENV.fetch('TEST_CLUSTER_NODE_NAME', 'node')
  arguments[:path_data]         ||= ENV.fetch('TEST_CLUSTER_DATA',      '/tmp/elasticsearch_test')
  arguments[:path_work]         ||= ENV.fetch('TEST_CLUSTER_TMP',       '/tmp')
  arguments[:path_logs]         ||= ENV.fetch('TEST_CLUSTER_LOGS',      '/var/log/elasticsearch')
  arguments[:es_params]         ||= ENV.fetch('TEST_CLUSTER_PARAMS',    '')
  arguments[:multicast_enabled] ||= ENV.fetch('TEST_CLUSTER_MULTICAST', 'true')
  arguments[:timeout]           ||= (ENV.fetch('TEST_CLUSTER_TIMEOUT', 30).to_i)
  arguments[:network_host]      ||= @@network_host
  arguments[:clear]             ||= true

  # Make sure `cluster_name` is not dangerous
  if arguments[:cluster_name] =~ /^[\/\\]?$/
    raise ArgumentError, "The `cluster_name` parameter cannot be empty string or a slash"
  end

  if running? :on => arguments[:port], :as => arguments[:cluster_name]
    print "[!] Elasticsearch cluster already running".ansi(:red)
    wait_for_green(arguments[:port], arguments[:timeout])
    return false
  end

  # Wipe out data for this cluster name if requested
  FileUtils.rm_rf "#{arguments[:path_data]}/#{arguments[:cluster_name]}" if arguments[:clear]

  print "Starting ".ansi(:faint) +
        @@number_of_nodes.to_s.ansi(:bold, :faint) +
        " Elasticsearch nodes..".ansi(:faint)

  pids = []

  @@number_of_nodes.times do |n|
    n += 1
    command = "      \#{arguments[:command]} \\\n        -D es.foreground=yes \\\n        -D es.cluster.name=\#{arguments[:cluster_name]} \\\n        -D es.node.name=\#{arguments[:node_name]}-\#{n} \\\n        -D es.http.port=\#{arguments[:port].to_i + (n-1)} \\\n        -D es.path.data=\#{arguments[:path_data]} \\\n        -D es.path.work=\#{arguments[:path_work]} \\\n        -D es.path.logs=\#{arguments[:path_logs]} \\\n        -D es.cluster.routing.allocation.disk.threshold_enabled=false \\\n        -D es.network.host=\#{@@network_host} \\\n        -D es.discovery.zen.ping.multicast.enabled=\#{arguments[:multicast_enabled]} \\\n        -D es.script.inline=on \\\n        -D es.script.indexed=on \\\n        -D es.node.test=true \\\n        -D es.node.testattr=test \\\n        -D es.node.bench=true \\\n        -D es.path.repo=/tmp \\\n        -D es.repositories.url.allowed_urls=http://snapshot.test* \\\n        -D es.logger.level=DEBUG \\\n        \#{arguments[:es_params]} \\\n        > /dev/null\n    COMMAND\n    STDERR.puts command.gsub(/ {1,}/, ' ') if ENV['DEBUG']\n\n    pid = Process.spawn(command)\n    Process.detach pid\n    pids << pid\n  end\n\n  # Check for proceses running\n  if `ps -p \#{pids.join(' ')}`.split(\"\\n\").size < @@number_of_nodes+1\n    STDERR.puts \"\", \"[!!!] Process failed to start (see output above)\".ansi(:red)\n    exit(1)\n  end\n\n  wait_for_green(arguments[:port], arguments[:timeout])\n  return true\nend\n"

#stop(arguments = {}) ⇒ Object

Stop the cluster.

Fetches the PID numbers from “Nodes Info” API and terminates matching nodes.

Examples:

Stop the default cluster

Elasticsearch::Extensions::Test::Cluster.stop

Stop the cluster reachable on specific port

Elasticsearch::Extensions::Test::Cluster.stop port: 9350

Returns:

  • Boolean

See Also:



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 169

def stop(arguments={})
  arguments[:port] ||= (ENV['TEST_CLUSTER_PORT'] || 9250).to_i
  arguments[:network_host] ||= ENV.fetch('TEST_CLUSTER_NETWORK_HOST', @@network_host)

  nodes = begin
    JSON.parse(Net::HTTP.get(URI("http://#{arguments[:network_host]}:#{arguments[:port]}/_nodes/?process")))
  rescue Exception => e
    STDERR.puts "[!] Exception raised when stopping the cluster: #{e.inspect}".ansi(:red)
    nil
  end

  return false if nodes.nil? or nodes.empty?

  pids  = nodes['nodes'].map { |id, info| info['process']['id'] }

  unless pids.empty?
    print "\nStopping Elasticsearch nodes... ".ansi(:faint)
    pids.each_with_index do |pid, i|
      ['INT','KILL'].each do |signal|
        begin
          Process.kill signal, pid
        rescue Exception => e
          print "[#{e.class}] PID #{pid} not found. ".ansi(:red)
        end

        # Give the system some breathing space to finish...
        sleep 1

        # Check that pid really is dead
        begin
          Process.getpgid( pid )
          # `getpgid` will raise error if pid is dead, so if we get here, try next signal.
          next
        rescue Errno::ESRCH
          print "stopped PID #{pid} with #{signal} signal. ".ansi(:green)
          break # pid is dead
        end
      end
    end
    puts
  else
    false
  end

  return pids
end

#wait_for_green(port = 9250, timeout = 60) ⇒ Object

Waits until the cluster is green and prints information

Examples:

Print the information about the default cluster

Elasticsearch::Extensions::Test::Cluster.wait_for_green

Parameters:

  • status (String)

    The status to wait for (yellow, green)

  • port (Integer) (defaults to: 9250)

    The port on which the cluster is reachable

  • timeout (Integer) (defaults to: 60)

    The explicit timeout for the operation

Returns:

  • Boolean



243
244
245
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 243

def wait_for_green(port=9250, timeout=60)
  __wait_for_status('green', port, timeout)
end