Class: Elasticsearch::Embedded::Cluster
- Inherits:
-
Object
- Object
- Elasticsearch::Embedded::Cluster
- Defined in:
- lib/elasticsearch/embedded/cluster.rb
Overview
Class used to manage a local cluster of elasticsearch nodes
Instance Attribute Summary collapse
-
#additional_options ⇒ Object
Options for cluster.
-
#cluster_name ⇒ Object
Options for cluster.
-
#downloader ⇒ Object
Options for downloader.
-
#nodes ⇒ Object
Options for cluster.
-
#persistent ⇒ Object
Options for cluster.
-
#port ⇒ Object
Options for cluster.
-
#timeout ⇒ Object
Options for cluster.
-
#verbose ⇒ Object
Options for cluster.
-
#version ⇒ Object
Options for downloader.
-
#working_dir ⇒ Object
Options for downloader.
Instance Method Summary collapse
-
#apply_development_template! ⇒ Object
Used for persistent clusters, otherwise cluster won’t get green state because of missing replicas.
-
#delete_all_indices! ⇒ Array<Net::HTTPResponse>
Remove all indices in the cluster.
-
#delete_index!(*args) ⇒ Array<Net::HTTPResponse>
Remove the indices given as args.
-
#ensure_started! ⇒ Object
Start server unless it’s running.
-
#initialize ⇒ Cluster
constructor
Assign default values to options.
-
#pids ⇒ Object
Thread safe access to all spawned process pids.
-
#running? ⇒ Boolean
Returns true when started cluster is running.
-
#start ⇒ Object
Start an elasticsearch cluster and return immediately.
-
#start_and_wait! ⇒ Object
Start an elasticsearch cluster and wait until running, also register a signal handler to close the cluster on INT, TERM and QUIT signals.
-
#stop ⇒ Object
Stop the cluster and return after all child processes are dead.
Constructor Details
#initialize ⇒ Cluster
Assign default values to options
22 23 24 25 26 27 28 29 30 31 |
# File 'lib/elasticsearch/embedded/cluster.rb', line 22 def initialize @nodes = 1 @port = 9250 @version = Downloader::DEFAULT_VERSION @working_dir = Downloader::TEMPORARY_PATH @timeout = 30 @cluster_name = 'elasticsearch_test' @pids = [] @pids_lock = Mutex.new end |
Instance Attribute Details
#additional_options ⇒ Object
Options for cluster
16 17 18 |
# File 'lib/elasticsearch/embedded/cluster.rb', line 16 def @additional_options end |
#cluster_name ⇒ Object
Options for cluster
16 17 18 |
# File 'lib/elasticsearch/embedded/cluster.rb', line 16 def cluster_name @cluster_name end |
#downloader ⇒ Object
Options for downloader
19 20 21 |
# File 'lib/elasticsearch/embedded/cluster.rb', line 19 def downloader @downloader end |
#nodes ⇒ Object
Options for cluster
16 17 18 |
# File 'lib/elasticsearch/embedded/cluster.rb', line 16 def nodes @nodes end |
#persistent ⇒ Object
Options for cluster
16 17 18 |
# File 'lib/elasticsearch/embedded/cluster.rb', line 16 def persistent @persistent end |
#port ⇒ Object
Options for cluster
16 17 18 |
# File 'lib/elasticsearch/embedded/cluster.rb', line 16 def port @port end |
#timeout ⇒ Object
Options for cluster
16 17 18 |
# File 'lib/elasticsearch/embedded/cluster.rb', line 16 def timeout @timeout end |
#verbose ⇒ Object
Options for cluster
16 17 18 |
# File 'lib/elasticsearch/embedded/cluster.rb', line 16 def verbose @verbose end |
#version ⇒ Object
Options for downloader
19 20 21 |
# File 'lib/elasticsearch/embedded/cluster.rb', line 19 def version @version end |
#working_dir ⇒ Object
Options for downloader
19 20 21 |
# File 'lib/elasticsearch/embedded/cluster.rb', line 19 def working_dir @working_dir end |
Instance Method Details
#apply_development_template! ⇒ Object
Used for persistent clusters, otherwise cluster won’t get green state because of missing replicas
102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/elasticsearch/embedded/cluster.rb', line 102 def apply_development_template! development_settings = { template: '*', settings: { number_of_shards: 1, number_of_replicas: 0, } } # Create the template on cluster http_object.put('/_template/development_template', JSON.dump(development_settings)) end |
#delete_all_indices! ⇒ Array<Net::HTTPResponse>
Remove all indices in the cluster
89 90 91 |
# File 'lib/elasticsearch/embedded/cluster.rb', line 89 def delete_all_indices! delete_index! :_all end |
#delete_index!(*args) ⇒ Array<Net::HTTPResponse>
Remove the indices given as args
97 98 99 |
# File 'lib/elasticsearch/embedded/cluster.rb', line 97 def delete_index!(*args) args.map { |index| http_object.request(Net::HTTP::Delete.new("/#{index}")) } end |
#ensure_started! ⇒ Object
Start server unless it’s running
73 74 75 |
# File 'lib/elasticsearch/embedded/cluster.rb', line 73 def ensure_started! start unless running? end |
#pids ⇒ Object
Thread safe access to all spawned process pids
68 69 70 |
# File 'lib/elasticsearch/embedded/cluster.rb', line 68 def pids @pids_lock.synchronize { @pids } end |
#running? ⇒ Boolean
Returns true when started cluster is running
80 81 82 83 84 |
# File 'lib/elasticsearch/embedded/cluster.rb', line 80 def running? cluster_health = Timeout::timeout(0.25) { __get_cluster_health } rescue nil # Response is present, cluster name is the same and number of nodes is the same !!cluster_health && cluster_health['cluster_name'] == cluster_name && cluster_health['number_of_nodes'] == nodes end |
#start ⇒ Object
Start an elasticsearch cluster and return immediately
34 35 36 37 38 |
# File 'lib/elasticsearch/embedded/cluster.rb', line 34 def start @downloader = Downloader.download(version: version, path: working_dir) start_cluster apply_development_template! if persistent end |
#start_and_wait! ⇒ Object
Start an elasticsearch cluster and wait until running, also register a signal handler to close the cluster on INT, TERM and QUIT signals
42 43 44 45 46 47 48 49 |
# File 'lib/elasticsearch/embedded/cluster.rb', line 42 def start_and_wait! # register handler before starting cluster register_shutdown_handler # start the cluster start # Wait for all child processes to end then return Process.waitall end |
#stop ⇒ Object
Stop the cluster and return after all child processes are dead
52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/elasticsearch/embedded/cluster.rb', line 52 def stop logger.warn 'Cluster is still starting, wait until startup is complete before sending shutdown command' if @pids_lock.locked? @pids_lock.synchronize do http_object.post('/_shutdown', nil) logger.debug 'Cluster stopped succesfully using shutdown api' Timeout.timeout(2) { Process.waitall } # Reset running pids reader @pids = [] end rescue logger.warn "Following processes are still alive #{pids}, kill them with signals" # Send term signal if post request fails to all processes still alive after 2 seconds pids.each { |pid| wait_or_kill(pid) } end |