Class: NsqCluster

Inherits:
Object
  • Object
show all
Defined in:
lib/nsq-cluster.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ NsqCluster

Returns a new instance of NsqCluster.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/nsq-cluster.rb', line 24

def initialize(opts = {})
  opts = {
    nsqlookupd_count: 0,
    nsqdlookupd_options: {},
    nsqd_count: 0,
    nsqadmin: false,
    nsqd_options: {},
    verbose: false
  }.merge(opts)

  @verbose = opts[:verbose]
  @nsqlookupd = create_nsqlookupds(opts[:nsqlookupd_count], opts[:nsqdlookupd_options])
  @nsqd = create_nsqds(opts[:nsqd_count], opts[:nsqd_options])
  @nsqadmin = create_nsqadmin if opts[:nsqadmin]

  begin
    # start everything!
    all_services.each{|d| d.start(async: true)}

    # by default, block execution until everything is started
    block_until_running unless opts[:async]
  rescue Exception => ex
    # if we hit an error, stop everything that we started
    destroy
    raise ex
  end
end

Instance Attribute Details

#nsqadminObject (readonly)

Returns the value of attribute nsqadmin.



22
23
24
# File 'lib/nsq-cluster.rb', line 22

def nsqadmin
  @nsqadmin
end

#nsqdObject (readonly)

Returns the value of attribute nsqd.



22
23
24
# File 'lib/nsq-cluster.rb', line 22

def nsqd
  @nsqd
end

#nsqlookupdObject (readonly)

Returns the value of attribute nsqlookupd.



22
23
24
# File 'lib/nsq-cluster.rb', line 22

def nsqlookupd
  @nsqlookupd
end

Instance Method Details

#block_until_running(timeout = 3) ⇒ Object



97
98
99
100
101
102
103
104
105
106
107
# File 'lib/nsq-cluster.rb', line 97

def block_until_running(timeout = 3)
  puts "Waiting for cluster to launch..." if @verbose
  begin
    Timeout::timeout(timeout) do
      all_services.each {|service| service.block_until_running}
      puts "Cluster launched." if @verbose
    end
  rescue Timeout::Error
    raise "Cluster did not fully launch within #{timeout} seconds."
  end
end

#block_until_stopped(timeout = 10) ⇒ Object



110
111
112
113
114
115
116
117
118
119
120
# File 'lib/nsq-cluster.rb', line 110

def block_until_stopped(timeout = 10)
  puts "Waiting for cluster to stop..." if @verbose
  begin
    Timeout::timeout(timeout) do
      all_services.each{|service| service.block_until_stopped}
      puts "Cluster stopped." if @verbose
    end
  rescue Timeout::Error
    raise "Cluster did not fully stop within #{timeout} seconds."
  end
end

#create_nsqadminObject



78
79
80
81
82
83
# File 'lib/nsq-cluster.rb', line 78

def create_nsqadmin
  Nsqadmin.new(
    { nsqlookupd: @nsqlookupd },
    @verbose
  )
end

#create_nsqds(count, options) ⇒ Object



65
66
67
68
69
70
71
72
73
74
75
# File 'lib/nsq-cluster.rb', line 65

def create_nsqds(count, options)
  (0...count).map do |idx|
    Nsqd.new(
      options.merge({
        id: idx,
        nsqlookupd: @nsqlookupd
      }),
      @verbose
    )
  end
end

#create_nsqlookupds(count, options) ⇒ Object



53
54
55
56
57
58
59
60
61
62
# File 'lib/nsq-cluster.rb', line 53

def create_nsqlookupds(count, options)
  (0...count).map do |idx|
    Nsqlookupd.new(
      options.merge({
        id: idx
      }),
      @verbose
    )
  end
end

#destroyObject



86
87
88
# File 'lib/nsq-cluster.rb', line 86

def destroy
  all_services.each{|s| s.destroy}
end

#nsqlookupd_http_endpointsObject

return an array of http endpoints



92
93
94
# File 'lib/nsq-cluster.rb', line 92

def nsqlookupd_http_endpoints
  @nsqlookupd.map { |lookupd| "http://#{lookupd.host}:#{lookupd.http_port}" }
end