Class: Elasticsearch::Extensions::Test::Cluster::Cluster
- Inherits:
-
Object
- Object
- Elasticsearch::Extensions::Test::Cluster::Cluster
- Defined in:
- lib/elasticsearch/extensions/test/cluster.rb
Constant Summary collapse
- COMMANDS =
{ '0.90' => lambda { |arguments, node_number| <<-COMMAND.gsub(/ /, '').gsub(/\n$/, '') #{arguments[:command]} \ -f \ -D es.cluster.name=#{arguments[:cluster_name]} \ -D es.node.name=#{arguments[:node_name]}-#{node_number} \ -D es.http.port=#{arguments[:port].to_i + (node_number-1)} \ -D es.path.data=#{arguments[:path_data]} \ -D es.path.work=#{arguments[:path_work]} \ -D es.path.logs=#{arguments[:path_logs]} \ -D es.cluster.routing.allocation.disk.threshold_enabled=false \ -D es.network.host=#{arguments[:network_host]} \ -D es.discovery.zen.ping.multicast.enabled=#{arguments[:multicast_enabled]} \ -D es.script.inline=true \ -D es.script.indexed=true \ -D es.node.test=true \ -D es.node.testattr=test \ -D es.node.bench=true \ -D es.path.repo=/tmp \ -D es.repositories.url.allowed_urls=http://snapshot.test* \ -D es.logger.level=#{ENV['DEBUG'] ? 'DEBUG' : 'INFO'} \ #{arguments[:es_params]} COMMAND }, '1.0' => lambda { |arguments, node_number| <<-COMMAND.gsub(/ /, '').gsub(/\n$/, '') #{arguments[:command]} \ -D es.foreground=yes \ -D es.cluster.name=#{arguments[:cluster_name]} \ -D es.node.name=#{arguments[:node_name]}-#{node_number} \ -D es.http.port=#{arguments[:port].to_i + (node_number-1)} \ -D es.path.data=#{arguments[:path_data]} \ -D es.path.work=#{arguments[:path_work]} \ -D es.path.logs=#{arguments[:path_logs]} \ -D es.cluster.routing.allocation.disk.threshold_enabled=false \ -D es.network.host=#{arguments[:network_host]} \ -D es.discovery.zen.ping.multicast.enabled=#{arguments[:multicast_enabled]} \ -D es.script.inline=on \ -D es.script.indexed=on \ -D es.node.test=true \ -D es.node.testattr=test \ -D es.node.bench=true \ -D es.path.repo=/tmp \ -D es.repositories.url.allowed_urls=http://snapshot.test* \ -D es.logger.level=#{ENV['DEBUG'] ? 'DEBUG' : 'INFO'} \ #{arguments[:es_params]} COMMAND }, '2.0' => lambda { |arguments, node_number| <<-COMMAND.gsub(/ /, '').gsub(/\n$/, '') #{arguments[:command]} \ -D es.foreground=yes \ -D es.cluster.name=#{arguments[:cluster_name]} \ -D es.node.name=#{arguments[:node_name]}-#{node_number} \ -D es.http.port=#{arguments[:port].to_i + (node_number-1)} \ -D es.path.data=#{arguments[:path_data]} \ -D es.path.work=#{arguments[:path_work]} \ -D es.path.logs=#{arguments[:path_logs]} \ -D es.cluster.routing.allocation.disk.threshold_enabled=false \ -D es.network.host=#{arguments[:network_host]} \ -D es.script.inline=true \ -D es.script.stored=true \ -D es.node.attr.testattr=test \ -D es.path.repo=/tmp \ -D es.repositories.url.allowed_urls=http://snapshot.test* \ -D es.logger.level=#{ENV['DEBUG'] ? 'DEBUG' : 'INFO'} \ #{arguments[:es_params]} COMMAND }, '5.0' => lambda { |arguments, node_number| <<-COMMAND.gsub(/ /, '').gsub(/\n$/, '') #{arguments[:command]} \ -E cluster.name=#{arguments[:cluster_name]} \ -E node.name=#{arguments[:node_name]}-#{node_number} \ -E http.port=#{arguments[:port].to_i + (node_number-1)} \ -E path.data=#{arguments[:path_data]} \ -E path.logs=#{arguments[:path_logs]} \ -E cluster.routing.allocation.disk.threshold_enabled=false \ -E network.host=#{arguments[:network_host]} \ -E script.inline=true \ -E script.stored=true \ -E node.attr.testattr=test \ -E path.repo=/tmp \ -E repositories.url.allowed_urls=http://snapshot.test* \ -E discovery.zen.minimum_master_nodes=#{arguments[:number_of_nodes]-1} \ -E node.max_local_storage_nodes=#{arguments[:number_of_nodes]} \ -E logger.level=#{ENV['DEBUG'] ? 'DEBUG' : 'INFO'} \ #{arguments[:es_params]} COMMAND }, '6.0' => lambda { |arguments, node_number| <<-COMMAND.gsub(/ /, '').gsub(/\n$/, '') #{arguments[:command]} \ -E cluster.name=#{arguments[:cluster_name]} \ -E node.name=#{arguments[:node_name]}-#{node_number} \ -E http.port=#{arguments[:port].to_i + (node_number-1)} \ -E path.data=#{arguments[:path_data]} \ -E path.logs=#{arguments[:path_logs]} \ -E cluster.routing.allocation.disk.threshold_enabled=false \ -E network.host=#{arguments[:network_host]} \ -E node.attr.testattr=test \ -E path.repo=/tmp \ -E repositories.url.allowed_urls=http://snapshot.test* \ -E discovery.zen.minimum_master_nodes=#{arguments[:number_of_nodes]-1} \ -E node.max_local_storage_nodes=#{arguments[:number_of_nodes]} \ -E logger.level=#{ENV['DEBUG'] ? 'DEBUG' : 'INFO'} \ #{arguments[:es_params]} COMMAND } }
Instance Attribute Summary collapse
-
#arguments ⇒ Object
readonly
Returns the value of attribute arguments.
Instance Method Summary collapse
-
#__check_for_running_processes(pids) ⇒ Object
private
Check whether process for PIDs are running.
-
#__cluster_info ⇒ Object
private
Return information about the cluster.
-
#__cluster_url ⇒ Object
private
Returns the HTTP URL for the cluster based on ‘:network_host` setting.
-
#__command(version, arguments, node_number) ⇒ Object
private
Returns the launch command for a specific version.
-
#__default_cluster_name ⇒ Object
private
Returns a reasonably unique cluster name.
-
#__default_network_host ⇒ Object
private
Returns default ‘:network_host` setting based on the version.
-
#__determine_version ⇒ Object
private
Determine Elasticsearch version to be launched.
-
#__get_cluster_health(status = nil) ⇒ Object
private
Tries to load cluster health information.
-
#__get_nodes ⇒ Object
private
Get the information about nodes.
-
#__log(message, mode = :puts) ⇒ Object
Print to STDERR.
-
#__remove_cluster_data ⇒ Object
private
Remove the data directory.
-
#__wait_for_status(status = 'green', timeout = 30) ⇒ Object
private
Blocks the process and waits for the cluster to be in a “green” state.
-
#initialize(arguments = {}) ⇒ Cluster
constructor
Create a new instance of the Cluster class.
-
#running? ⇒ Boolean
Returns true when a specific test node is running within the cluster.
-
#start ⇒ Object
Starts a cluster.
-
#stop ⇒ Object
Stops the cluster.
-
#version ⇒ Object
Returns the major version of Elasticsearch.
-
#wait_for_green ⇒ Object
Waits until the cluster is green and prints information about it.
Constructor Details
#initialize(arguments = {}) ⇒ Cluster
Create a new instance of the Cluster class
You can also use environment variables to set the constructor options (see source).
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 |
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 223 def initialize(arguments={}) @arguments = arguments.dup @arguments[:command] ||= ENV.fetch('TEST_CLUSTER_COMMAND', 'elasticsearch') @arguments[:port] ||= ENV.fetch('TEST_CLUSTER_PORT', 9250).to_i @arguments[:cluster_name] ||= ENV.fetch('TEST_CLUSTER_NAME', __default_cluster_name).chomp @arguments[:node_name] ||= ENV.fetch('TEST_CLUSTER_NODE_NAME', 'node') @arguments[:path_data] ||= ENV.fetch('TEST_CLUSTER_DATA', '/tmp/elasticsearch_test') @arguments[:path_work] ||= ENV.fetch('TEST_CLUSTER_TMP', '/tmp') @arguments[:path_logs] ||= ENV.fetch('TEST_CLUSTER_LOGS', '/tmp/log/elasticsearch') @arguments[:es_params] ||= ENV.fetch('TEST_CLUSTER_PARAMS', '') @arguments[:multicast_enabled] ||= ENV.fetch('TEST_CLUSTER_MULTICAST', 'true') @arguments[:timeout] ||= ENV.fetch('TEST_CLUSTER_TIMEOUT', 60).to_i @arguments[:timeout_version] ||= ENV.fetch('TEST_CLUSTER_TIMEOUT_VERSION', 15).to_i @arguments[:number_of_nodes] ||= ENV.fetch('TEST_CLUSTER_NODES', 2).to_i @arguments[:network_host] ||= ENV.fetch('TEST_CLUSTER_NETWORK_HOST', __default_network_host) @arguments[:quiet] ||= ! ENV.fetch('QUIET', '').empty? @clear_cluster = !!@arguments[:clear_cluster] || (ENV.fetch('TEST_CLUSTER_CLEAR', 'true') != 'false') # Make sure `cluster_name` is not dangerous raise ArgumentError, "The `cluster_name` argument cannot be empty string or a slash" \ if @arguments[:cluster_name] =~ /^[\/\\]?$/ end |
Instance Attribute Details
#arguments ⇒ Object (readonly)
Returns the value of attribute arguments.
82 83 84 |
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 82 def arguments @arguments end |
Instance Method Details
#__check_for_running_processes(pids) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Check whether process for PIDs are running
654 655 656 657 658 659 |
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 654 def __check_for_running_processes(pids) if `ps -p #{pids.join(' ')}`.split("\n").size < arguments[:number_of_nodes]+1 __log "\n[!!!] Process failed to start (see output above)".ansi(:red) exit(1) end end |
#__cluster_info ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Return information about the cluster
592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 |
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 592 def __cluster_info health = JSON.parse(Net::HTTP.get(URI("#{__cluster_url}/_cluster/health"))) nodes = if version == '0.90' JSON.parse(Net::HTTP.get(URI("#{__cluster_url}/_nodes/?process&http"))) else JSON.parse(Net::HTTP.get(URI("#{__cluster_url}/_nodes/process,http"))) end master = JSON.parse(Net::HTTP.get(URI("#{__cluster_url}/_cluster/state")))['master_node'] result = ["\n", ('-'*80).ansi(:faint), 'Cluster: '.ljust(20).ansi(:faint) + health['cluster_name'].to_s.ansi(:faint), 'Status: '.ljust(20).ansi(:faint) + health['status'].to_s.ansi(:faint), 'Nodes: '.ljust(20).ansi(:faint) + health['number_of_nodes'].to_s.ansi(:faint)].join("\n") nodes['nodes'].each do |id, info| m = id == master ? '*' : '-' result << "\n" + ''.ljust(20) + "#{m} ".ansi(:faint) + "#{info['name'].ansi(:bold)} ".ansi(:faint) + "| version: #{info['version'] rescue 'N/A'}, ".ansi(:faint) + "pid: #{info['process']['id'] rescue 'N/A'}, ".ansi(:faint) + "address: #{info['http']['bound_address'] rescue 'N/A'}".ansi(:faint) end result end |
#__cluster_url ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the HTTP URL for the cluster based on ‘:network_host` setting
434 435 436 437 438 439 440 |
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 434 def __cluster_url if '_local_' == arguments[:network_host] "http://localhost:#{arguments[:port]}" else "http://#{arguments[:network_host]}:#{arguments[:port]}" end end |
#__command(version, arguments, node_number) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns the launch command for a specific version
542 543 544 545 546 547 548 |
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 542 def __command(version, arguments, node_number) if command = COMMANDS[version] command.call(arguments, node_number) else raise ArgumentError, "Cannot find command for version [#{version}]" end end |
#__default_cluster_name ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns a reasonably unique cluster name
424 425 426 |
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 424 def __default_cluster_name "elasticsearch-test-#{Socket.gethostname.downcase}" end |
#__default_network_host ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Returns default ‘:network_host` setting based on the version
405 406 407 408 409 410 411 412 413 414 415 416 |
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 405 def __default_network_host case version when /^0|^1/ '0.0.0.0' when /^2/ '_local_' when /^5|^6|^7/ '_local_' else raise RuntimeError, "Cannot determine default network host from version [#{version}]" end end |
#__determine_version ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Determine Elasticsearch version to be launched
Tries to get the version from the arguments passed, if not available, it parses the version number from the ‘lib/elasticsearch-X.Y.Z.jar` file, if that is not available, uses `elasticsearch –version` or `elasticsearch -v`
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 |
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 452 def __determine_version path_to_lib = File.dirname(arguments[:command]) + '/../lib/' version = if arguments[:version] arguments[:version] elsif File.exist?(path_to_lib) && !(jar = Dir.entries(path_to_lib).select { |f| f =~ /^elasticsearch\-\d/ }.first).nil? __log "Determining version from [#{jar}]" if ENV['DEBUG'] if m = jar.match(/elasticsearch\-(\S+-)?(?<version>\d+\.\d+\.\d+).*/) m[:version] else raise RuntimeError, "Cannot determine Elasticsearch version from jar [#{jar}]" end else __log "[!] Cannot find Elasticsearch .jar from path to command [#{arguments[:command]}], using `#{arguments[:command]} --version`" if ENV['DEBUG'] unless File.exist? arguments[:command] __log "File [#{arguments[:command]}] does not exists, checking full path by `which`: ", :print if ENV['DEBUG'] begin full_path = `which #{arguments[:command]}`.strip __log "#{full_path.inspect}\n", :print if ENV['DEBUG'] rescue Exception => e raise RuntimeError, "Cannot determine full path to [#{arguments[:command]}] with 'which'" end if full_path.empty? raise Errno::ENOENT, "Cannot find Elasticsearch launch script from [#{arguments[:command]}] -- did you pass a correct path?" end end output = '' begin # First, try the new `--version` syntax... __log "Running [#{arguments[:command]} --version] to determine version" if ENV['DEBUG'] io = IO.popen("#{arguments[:command]} --version") pid = io.pid Timeout::timeout(arguments[:timeout_version]) do Process.wait(pid) output = io.read end rescue Timeout::Error # ...else, the old `-v` syntax __log "Running [#{arguments[:command]} -v] to determine version" if ENV['DEBUG'] output = `#{arguments[:command]} -v` ensure if pid Process.kill('INT', pid) rescue Errno::ESRCH # Most likely the process has terminated already end io.close unless io.closed? end STDERR.puts "> #{output}" if ENV['DEBUG'] if output.empty? raise RuntimeError, "Cannot determine Elasticsearch version from [#{arguments[:command]} --version] or [#{arguments[:command]} -v]" end if m = output.match(/Version: (\d\.\d.\d).*,/) m[1] else raise RuntimeError, "Cannot determine Elasticsearch version from elasticsearch --version output [#{output}]" end end case version when /^0\.90.*/ '0.90' when /^1\..*/ '1.0' when /^2\..*/ '2.0' when /^5\..*/ '5.0' when /^6\..*/ '6.0' when /^7\..*/ '7.0' else raise RuntimeError, "Cannot determine major version from [#{version}]" end end |
#__get_cluster_health(status = nil) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Tries to load cluster health information
627 628 629 630 631 632 633 634 635 636 637 638 639 |
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 627 def __get_cluster_health(status=nil) uri = URI("#{__cluster_url}/_cluster/health") uri.query = "wait_for_status=#{status}" if status begin response = Net::HTTP.get(uri) rescue Exception => e STDERR.puts e.inspect if ENV['DEBUG'] return nil end JSON.parse(response) end |
#__get_nodes ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Get the information about nodes
665 666 667 |
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 665 def __get_nodes JSON.parse(Net::HTTP.get(URI("#{__cluster_url}/_nodes/process"))) end |
#__log(message, mode = :puts) ⇒ Object
Print to STDERR
671 672 673 |
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 671 def __log(, mode=:puts) STDERR.__send__ mode, unless @arguments[:quiet] end |
#__remove_cluster_data ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Remove the data directory
645 646 647 |
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 645 def __remove_cluster_data FileUtils.rm_rf arguments[:path_data] end |
#__wait_for_status(status = 'green', timeout = 30) ⇒ Object
This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.
Blocks the process and waits for the cluster to be in a “green” state
Prints information about the cluster on STDOUT if the cluster is available.
561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 |
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 561 def __wait_for_status(status='green', timeout=30) begin Timeout::timeout(timeout) do loop do response = __get_cluster_health(status) __log response if ENV['DEBUG'] if response && response['status'] == status && ( arguments[:number_of_nodes].nil? || arguments[:number_of_nodes].to_i == response['number_of_nodes'].to_i ) break end __log '.'.ansi(:faint), :print sleep 1 end end rescue Timeout::Error => e = "\nTimeout while waiting for cluster status [#{status}]" += " and [#{arguments[:number_of_nodes]}] nodes" if arguments[:number_of_nodes] __log .ansi(:red, :bold) raise e end return true end |
#running? ⇒ Boolean
Returns true when a specific test node is running within the cluster
373 374 375 376 377 378 379 |
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 373 def running? if cluster_health = Timeout::timeout(0.25) { __get_cluster_health } rescue nil return cluster_health['cluster_name'] == arguments[:cluster_name] && \ cluster_health['number_of_nodes'] == arguments[:number_of_nodes] end return false end |
#start ⇒ Object
Starts a cluster
Launches the specified number of nodes in a test-suitable configuration and prints information about the cluster – unless this specific cluster is already running.
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 |
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 272 def start if self.running? __log "[!] Elasticsearch cluster already running".ansi(:red) return false end __remove_cluster_data if @clear_cluster __log "Starting ".ansi(:faint) + arguments[:number_of_nodes].to_s.ansi(:bold, :faint) + " Elasticsearch #{arguments[:number_of_nodes] < 2 ? 'node' : 'nodes'}..".ansi(:faint), :print pids = [] __log "\nUsing Elasticsearch version [#{version}]" if ENV['DEBUG'] arguments[:number_of_nodes].times do |n| n += 1 command = __command(version, arguments, n) command += '> /dev/null' unless ENV['DEBUG'] __log command.gsub(/ {1,}/, ' ').ansi(:bold) if ENV['DEBUG'] pid = Process.spawn(command) Process.detach pid pids << pid sleep 1 end __check_for_running_processes(pids) wait_for_green __log __cluster_info return true end |
#stop ⇒ Object
Stops the cluster
Fetches the PID numbers from “Nodes Info” API and terminates matching nodes.
321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 |
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 321 def stop begin nodes = __get_nodes rescue Exception => e __log "[!] Exception raised when stopping the cluster: #{e.inspect}".ansi(:red) nil end return false if nodes.nil? or nodes.empty? pids = nodes['nodes'].map { |id, info| info['process']['id'] } unless pids.empty? __log "Stopping Elasticsearch nodes... ".ansi(:faint), :print pids.each_with_index do |pid, i| ['INT','KILL'].each do |signal| begin Process.kill signal, pid rescue Exception => e __log "[#{e.class}] PID #{pid} not found. ".ansi(:red), :print end # Give the system some breathing space to finish... Kernel.sleep 1 # Check that pid really is dead begin Process.getpgid pid # `getpgid` will raise error if pid is dead, so if we get here, try next signal next rescue Errno::ESRCH __log "Stopped PID #{pid}".ansi(:green) + (ENV['DEBUG'] ? " with #{signal} signal".ansi(:green) : '') + ". ".ansi(:green), :print break # pid is dead end end end __log "\n" else return false end return pids end |
#version ⇒ Object
Returns the major version of Elasticsearch
394 395 396 |
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 394 def version @version ||= __determine_version end |
#wait_for_green ⇒ Object
Waits until the cluster is green and prints information about it
385 386 387 |
# File 'lib/elasticsearch/extensions/test/cluster.rb', line 385 def wait_for_green __wait_for_status('green', arguments[:timeout]) end |