Class: NsqCluster
- Inherits:
-
Object
- Object
- NsqCluster
- Defined in:
- lib/nsq-cluster.rb
Instance Attribute Summary collapse
-
#nsqadmin ⇒ Object
readonly
Returns the value of attribute nsqadmin.
-
#nsqd ⇒ Object
readonly
Returns the value of attribute nsqd.
-
#nsqlookupd ⇒ Object
readonly
Returns the value of attribute nsqlookupd.
Instance Method Summary collapse
- #block_until_running(timeout = 3) ⇒ Object
- #block_until_stopped(timeout = 10) ⇒ Object
- #create_nsqadmin ⇒ Object
- #create_nsqds(count, options) ⇒ Object
- #create_nsqlookupds(count, options) ⇒ Object
- #destroy ⇒ Object
-
#initialize(opts = {}) ⇒ NsqCluster
constructor
A new instance of NsqCluster.
-
#nsqlookupd_http_endpoints ⇒ Object
return an array of http endpoints.
Constructor Details
#initialize(opts = {}) ⇒ NsqCluster
Returns a new instance of NsqCluster.
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/nsq-cluster.rb', line 24 def initialize(opts = {}) opts = { nsqlookupd_count: 0, nsqdlookupd_options: {}, nsqd_count: 0, nsqadmin: false, nsqd_options: {}, verbose: false }.merge(opts) @verbose = opts[:verbose] @nsqlookupd = create_nsqlookupds(opts[:nsqlookupd_count], opts[:nsqdlookupd_options]) @nsqd = create_nsqds(opts[:nsqd_count], opts[:nsqd_options]) @nsqadmin = create_nsqadmin if opts[:nsqadmin] begin # start everything! all_services.each{|d| d.start(async: true)} # by default, block execution until everything is started block_until_running unless opts[:async] rescue Exception => ex # if we hit an error, stop everything that we started destroy raise ex end end |
Instance Attribute Details
#nsqadmin ⇒ Object (readonly)
Returns the value of attribute nsqadmin.
22 23 24 |
# File 'lib/nsq-cluster.rb', line 22 def nsqadmin @nsqadmin end |
#nsqd ⇒ Object (readonly)
Returns the value of attribute nsqd.
22 23 24 |
# File 'lib/nsq-cluster.rb', line 22 def nsqd @nsqd end |
#nsqlookupd ⇒ Object (readonly)
Returns the value of attribute nsqlookupd.
22 23 24 |
# File 'lib/nsq-cluster.rb', line 22 def nsqlookupd @nsqlookupd end |
Instance Method Details
#block_until_running(timeout = 3) ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/nsq-cluster.rb', line 97 def block_until_running(timeout = 3) puts "Waiting for cluster to launch..." if @verbose begin Timeout::timeout(timeout) do all_services.each {|service| service.block_until_running} puts "Cluster launched." if @verbose end rescue Timeout::Error raise "Cluster did not fully launch within #{timeout} seconds." end end |
#block_until_stopped(timeout = 10) ⇒ Object
110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/nsq-cluster.rb', line 110 def block_until_stopped(timeout = 10) puts "Waiting for cluster to stop..." if @verbose begin Timeout::timeout(timeout) do all_services.each{|service| service.block_until_stopped} puts "Cluster stopped." if @verbose end rescue Timeout::Error raise "Cluster did not fully stop within #{timeout} seconds." end end |
#create_nsqadmin ⇒ Object
78 79 80 81 82 83 |
# File 'lib/nsq-cluster.rb', line 78 def create_nsqadmin Nsqadmin.new( { nsqlookupd: @nsqlookupd }, @verbose ) end |
#create_nsqds(count, options) ⇒ Object
65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/nsq-cluster.rb', line 65 def create_nsqds(count, ) (0...count).map do |idx| Nsqd.new( .merge({ id: idx, nsqlookupd: @nsqlookupd }), @verbose ) end end |
#create_nsqlookupds(count, options) ⇒ Object
53 54 55 56 57 58 59 60 61 62 |
# File 'lib/nsq-cluster.rb', line 53 def create_nsqlookupds(count, ) (0...count).map do |idx| Nsqlookupd.new( .merge({ id: idx }), @verbose ) end end |
#destroy ⇒ Object
86 87 88 |
# File 'lib/nsq-cluster.rb', line 86 def destroy all_services.each{|s| s.destroy} end |
#nsqlookupd_http_endpoints ⇒ Object
return an array of http endpoints
92 93 94 |
# File 'lib/nsq-cluster.rb', line 92 def nsqlookupd_http_endpoints @nsqlookupd.map { |lookupd| "http://#{lookupd.host}:#{lookupd.http_port}" } end |