Class: Elasticsearch::Embedded::Cluster

Inherits:
Object
  • Object
show all
Defined in:
lib/elasticsearch/embedded/cluster.rb

Overview

Class used to manage a local cluster of elasticsearch nodes

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeCluster

Assign default values to options



22
23
24
25
26
27
28
29
30
31
# File 'lib/elasticsearch/embedded/cluster.rb', line 22

def initialize
  @nodes = 1
  @port = 9250
  @version = Downloader::DEFAULT_VERSION
  @working_dir = Downloader::TEMPORARY_PATH
  @timeout = 30
  @cluster_name = 'elasticsearch_test'
  @pids = []
  @pids_lock = Mutex.new
end

Instance Attribute Details

#additional_optionsObject

Options for cluster



16
17
18
# File 'lib/elasticsearch/embedded/cluster.rb', line 16

def additional_options
  @additional_options
end

#cluster_nameObject

Options for cluster



16
17
18
# File 'lib/elasticsearch/embedded/cluster.rb', line 16

def cluster_name
  @cluster_name
end

#downloaderObject

Options for downloader



19
20
21
# File 'lib/elasticsearch/embedded/cluster.rb', line 19

def downloader
  @downloader
end

#nodesObject

Options for cluster



16
17
18
# File 'lib/elasticsearch/embedded/cluster.rb', line 16

def nodes
  @nodes
end

#persistentObject

Options for cluster



16
17
18
# File 'lib/elasticsearch/embedded/cluster.rb', line 16

def persistent
  @persistent
end

#portObject

Options for cluster



16
17
18
# File 'lib/elasticsearch/embedded/cluster.rb', line 16

def port
  @port
end

#timeoutObject

Options for cluster



16
17
18
# File 'lib/elasticsearch/embedded/cluster.rb', line 16

def timeout
  @timeout
end

#verboseObject

Options for cluster



16
17
18
# File 'lib/elasticsearch/embedded/cluster.rb', line 16

def verbose
  @verbose
end

#versionObject

Options for downloader



19
20
21
# File 'lib/elasticsearch/embedded/cluster.rb', line 19

def version
  @version
end

#working_dirObject

Options for downloader



19
20
21
# File 'lib/elasticsearch/embedded/cluster.rb', line 19

def working_dir
  @working_dir
end

Instance Method Details

#apply_development_template!Object

Used for persistent clusters, otherwise cluster won’t get green state because of missing replicas



102
103
104
105
106
107
108
109
110
111
112
# File 'lib/elasticsearch/embedded/cluster.rb', line 102

def apply_development_template!
  development_settings = {
      template: '*',
      settings: {
          number_of_shards: 1,
          number_of_replicas: 0,
      }
  }
  # Create the template on cluster
  http_object.put('/_template/development_template', JSON.dump(development_settings))
end

#delete_all_indices!Array<Net::HTTPResponse>

Remove all indices in the cluster

Returns:

  • (Array<Net::HTTPResponse>)

    raw http responses



89
90
91
# File 'lib/elasticsearch/embedded/cluster.rb', line 89

def delete_all_indices!
  delete_index! :_all
end

#delete_index!(*args) ⇒ Array<Net::HTTPResponse>

Remove the indices given as args

Parameters:

  • args (Array<String,Symbol>)

    list of indices to delet

Returns:

  • (Array<Net::HTTPResponse>)

    raw http responses



97
98
99
# File 'lib/elasticsearch/embedded/cluster.rb', line 97

def delete_index!(*args)
  args.map { |index| http_object.request(Net::HTTP::Delete.new("/#{index}")) }
end

#ensure_started!Object

Start server unless it’s running



73
74
75
# File 'lib/elasticsearch/embedded/cluster.rb', line 73

def ensure_started!
  start unless running?
end

#pidsObject

Thread safe access to all spawned process pids



68
69
70
# File 'lib/elasticsearch/embedded/cluster.rb', line 68

def pids
  @pids_lock.synchronize { @pids }
end

#running?Boolean

Returns true when started cluster is running

Returns:

  • (Boolean)

    Boolean



80
81
82
83
84
# File 'lib/elasticsearch/embedded/cluster.rb', line 80

def running?
  cluster_health = Timeout::timeout(0.25) { __get_cluster_health } rescue nil
  # Response is present, cluster name is the same and number of nodes is the same
  !!cluster_health && cluster_health['cluster_name'] == cluster_name && cluster_health['number_of_nodes'] == nodes
end

#startObject

Start an elasticsearch cluster and return immediately



34
35
36
37
38
# File 'lib/elasticsearch/embedded/cluster.rb', line 34

def start
  @downloader = Downloader.download(version: version, path: working_dir)
  start_cluster
  apply_development_template! if persistent
end

#start_and_wait!Object

Start an elasticsearch cluster and wait until running, also register a signal handler to close the cluster on INT, TERM and QUIT signals



42
43
44
45
46
47
48
49
# File 'lib/elasticsearch/embedded/cluster.rb', line 42

def start_and_wait!
  # register handler before starting cluster
  register_shutdown_handler
  # start the cluster
  start
  # Wait for all child processes to end then return
  Process.waitall
end

#stopObject

Stop the cluster and return after all child processes are dead



52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/elasticsearch/embedded/cluster.rb', line 52

def stop
  logger.warn 'Cluster is still starting, wait until startup is complete before sending shutdown command' if @pids_lock.locked?
  @pids_lock.synchronize do
    http_object.post('/_shutdown', nil)
    logger.debug 'Cluster stopped succesfully using shutdown api'
    Timeout.timeout(2) { Process.waitall }
    # Reset running pids reader
    @pids = []
  end
rescue
  logger.warn "Following processes are still alive #{pids}, kill them with signals"
  # Send term signal if post request fails to all processes still alive after 2 seconds
  pids.each { |pid| wait_or_kill(pid) }
end