Module: Elasticsearch::Extensions::Test::Cluster

Extended by:
Cluster
Included in:
Cluster
Defined in:
lib/elasticsearch/extensions/test/cluster.rb

Overview

A convenience Ruby class for starting and stopping a separate testing in-memory cluster, to not depend on – and not mess up – <localhost:9200>.

Examples:

Start a cluster with default configuration

require 'elasticsearch/extensions/test/cluster'
Elasticsearch::Extensions::Test::Cluster.start

See Also:

Instance Method Summary collapse

Instance Method Details

#__get_cluster_health(port = 9250) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Tries to load cluster health information



247
248
249
250
251
252
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 247

def __get_cluster_health(port=9250)
  uri = URI("http://localhost:#{port}/_cluster/health")
  if response = Net::HTTP.get(uri) rescue nil
    return JSON.parse(response)
  end
end

#__print_cluster_info(port) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Print information about the cluster on STDOUT



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 225

def __print_cluster_info(port)
  health = JSON.parse(Net::HTTP.get(URI("http://localhost:#{port}/_cluster/health")))
  nodes  = JSON.parse(Net::HTTP.get(URI("http://localhost:#{port}/_cluster/nodes/?process")))
  master = JSON.parse(Net::HTTP.get(URI("http://localhost:#{port}/_cluster/state")))['master_node']

  puts "\n",
        ('-'*80).ansi(:faint),
       'Cluster: '.ljust(20).ansi(:faint) + health['cluster_name'].to_s.ansi(:faint),
       'Status:  '.ljust(20).ansi(:faint) + health['status'].to_s.ansi(:faint),
       'Nodes:   '.ljust(20).ansi(:faint) + health['number_of_nodes'].to_s.ansi(:faint)

  nodes['nodes'].each do |id, info|
    m = id == master ? '+' : '-'
    puts ''.ljust(20) +
         "#{m} #{info['name'].ansi(:bold)} | version: #{info['version']}, pid: #{info['process']['id']}".ansi(:faint)
  end
end

#__wait_for_status(status = 'green', port = 9250, timeout = 30) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Blocks the process and waits for the cluster to be in a “green” state.

Prints information about the cluster on STDOUT if the cluster is available.

Parameters:

  • status (String) (defaults to: 'green')

    The status to wait for (yellow, green)

  • port (Integer) (defaults to: 9250)

    The port on which the cluster is reachable

  • timeout (Integer) (defaults to: 30)

    The explicit timeout for the operation

Returns:

  • Boolean



197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 197

def __wait_for_status(status='green', port=9250, timeout=30)
  uri = URI("http://localhost:#{port}/_cluster/health?wait_for_status=#{status}")

  Timeout::timeout(timeout) do
    loop do
      response = begin
        JSON.parse(Net::HTTP.get(uri))
      rescue Exception => e
        puts e.inspect if ENV['DEBUG']
        nil
      end

      if response && response['status'] == status && ( @@number_of_nodes.nil? || @@number_of_nodes == response['number_of_nodes'].to_i  )
        __print_cluster_info(port) and break
      end

      print '.'.ansi(:faint)
      sleep 1
    end
  end

  return true
end

#running?(arguments = {}) ⇒ Boolean

Returns true when a specific test node is running within the cluster.

Parameters:

  • arguments (Hash) (defaults to: {})

    a customizable set of options

Options Hash (arguments):

  • :on (Integer)

    The port on which the node is running.

  • :as (String)

    The cluster name.

Returns:

  • (Boolean)

    Boolean



161
162
163
164
165
166
167
168
169
170
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 161

def running?(arguments={})
  port         = arguments[:on] || (ENV['TEST_CLUSTER_PORT'] || 9250).to_i
  cluster_name = arguments[:as] ||  ENV['TEST_CLUSTER_NAME'] || 'elasticsearch_test'

  if cluster_health = Timeout::timeout(0.25) { __get_cluster_health(port) } rescue nil
    return cluster_health['cluster_name']    == cluster_name && \
           cluster_health['number_of_nodes'] == @@number_of_nodes
  end
  return false
end

#start(arguments = {}) ⇒ Object

Starts a cluster

Launches the specified number of nodes in test-suitable configuration by default and prints information about the cluster – unless this specific cluster is running already.

Use the Cluster.stop command with the same arguments to stop this cluster.

You can also use environment variables to set these options.

Examples:

Start a cluster with default configuration (2 nodes, in-memory, etc)

Elasticsearch::Extensions::Test::Cluster.start

Start a cluster with a custom configuration

Elasticsearch::Extensions::Test::Cluster.start \
  cluster_name: 'my-cluster',
  nodes: 3,
  node_name: 'my-node',
  port: 9350

Start a cluster with a different Elasticsearch version

Elasticsearch::Extensions::Test::Cluster.start \
  command: "/usr/local/Cellar/elasticsearch/1.0.0.Beta2/bin/elasticsearch"

Parameters:

  • arguments (Hash) (defaults to: {})

    a customizable set of options

Options Hash (arguments):

  • :command (String)

    Elasticsearch command (default: ‘elasticsearch`).

  • :nodes (Integer)

    Number of desired nodes (default: 2).

  • :cluster_name (String)

    Cluster name (default: ‘elasticsearch_test`).

  • :port (String)

    Starting port number; will be auto-incremented (default: 9250).

  • :timeout (Integer)

    Timeout when starting the cluster (default: 30).

Returns:

  • Boolean

See Also:



66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 66

def start(arguments={})
  @@number_of_nodes = (ENV['TEST_CLUSTER_NODES'] || arguments[:nodes] || 2).to_i

  arguments[:command]      ||= ENV['TEST_CLUSTER_COMMAND'] || 'elasticsearch'
  arguments[:port]         ||= (ENV['TEST_CLUSTER_PORT'] || 9250).to_i
  arguments[:cluster_name] ||= ENV['TEST_CLUSTER_NAME'] || 'elasticsearch_test'
  arguments[:gateway_type] ||= 'none'
  arguments[:index_store]  ||= 'memory'
  arguments[:path_data]    ||= ENV['TEST_CLUSTER_DATA'] || '/tmp'
  arguments[:es_params]    ||= ENV['TEST_CLUSTER_PARAMS'] || ''
  arguments[:path_work]    ||= '/tmp'
  arguments[:node_name]    ||= 'node'
  arguments[:timeout]      ||= 30

  if running? :on => arguments[:port], :as => arguments[:cluster_name]
    print "[!] Elasticsearch cluster already running".ansi(:red)
    wait_for_green(arguments[:port], arguments[:timeout])
    return false
  end

  print "Starting ".ansi(:faint) +
        @@number_of_nodes.to_s.ansi(:bold, :faint) +
        " Elasticsearch nodes..".ansi(:faint)

  @@number_of_nodes.times do |n|
    n += 1
    pid = Process.spawn <<-COMMAND
      #{arguments[:command]} \
        -D es.foreground=no \
        -D es.cluster.name=#{arguments[:cluster_name]} \
        -D es.node.name=#{arguments[:node_name]}-#{n} \
        -D es.http.port=#{arguments[:port].to_i + (n-1)} \
        -D es.gateway.type=#{arguments[:gateway_type]} \
        -D es.index.store.type=#{arguments[:index_store]} \
        -D es.path.data=#{arguments[:path_data]} \
        -D es.path.work=#{arguments[:path_work]} \
        -D es.network.host=0.0.0.0 \
        -D es.discovery.zen.ping.multicast.enabled=true \
        -D es.node.test=true \
        #{arguments[:es_params]} \
        > /dev/null 2>&1
    COMMAND
    Process.detach pid
  end

  wait_for_green(arguments[:port], arguments[:timeout])
  return true
end

#stop(arguments = {}) ⇒ Object

Stop the cluster.

Fetches the PID numbers from “Nodes Info” API and terminates matching nodes.

Examples:

Stop the default cluster

Elasticsearch::Extensions::Test::Cluster.stop

Stop the cluster reachable on specific port

Elasticsearch::Extensions::Test::Cluster.stop port: 9350

Returns:

  • Boolean

See Also:



128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 128

def stop(arguments={})
  arguments[:port] ||= (ENV['TEST_CLUSTER_PORT'] || 9250).to_i

  nodes = JSON.parse(Net::HTTP.get(URI("http://localhost:#{arguments[:port]}/_cluster/nodes/?process"))) rescue nil

  return false if nodes.nil? or nodes.empty?

  pids  = nodes['nodes'].map { |id, info| info['process']['id'] }

  unless pids.empty?
    print "Stopping Elasticsearch nodes... ".ansi(:faint)
    pids.each_with_index do |pid, i|
      begin
        print "stopped PID #{pid}. ".ansi(:green) if Process.kill 'KILL', pid
      rescue Exception => e
        print "[#{e.class}] PID #{pid} not found. ".ansi(:red)
      end
    end
    puts
  else
    false
  end

  return pids
end

#wait_for_green(port = 9250, timeout = 60) ⇒ Object

Waits until the cluster is green and prints information

Examples:

Print the information about the default cluster

Elasticsearch::Extensions::Test::Cluster.wait_for_green

Parameters:

  • status (String)

    The status to wait for (yellow, green)

  • port (Integer) (defaults to: 9250)

    The port on which the cluster is reachable

  • timeout (Integer) (defaults to: 60)

    The explicit timeout for the operation

Returns:

  • Boolean



181
182
183
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 181

def wait_for_green(port=9250, timeout=60)
  __wait_for_status('green', port, timeout)
end