Class: NsqCluster
- Inherits:
-
Object
- Object
- NsqCluster
- Defined in:
- lib/nsq-cluster.rb
Instance Attribute Summary collapse
-
#nsqadmin ⇒ Object
readonly
Returns the value of attribute nsqadmin.
-
#nsqd ⇒ Object
readonly
Returns the value of attribute nsqd.
-
#nsqlookupd ⇒ Object
readonly
Returns the value of attribute nsqlookupd.
Instance Method Summary collapse
- #block_until_running(timeout = 3) ⇒ Object
- #block_until_stopped(timeout = 10) ⇒ Object
- #create_nsqadmin ⇒ Object
- #create_nsqds(count, options) ⇒ Object
- #create_nsqlookupds(count, options) ⇒ Object
- #destroy ⇒ Object
-
#initialize(opts = {}) ⇒ NsqCluster
constructor
A new instance of NsqCluster.
-
#nsqlookupd_http_endpoints ⇒ Object
return an array of http endpoints.
Constructor Details
#initialize(opts = {}) ⇒ NsqCluster
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/nsq-cluster.rb', line 24 def initialize(opts = {}) opts = { nsqlookupd_count: 0, nsqdlookupd_options: {}, nsqd_count: 0, nsqadmin: false, nsqd_options: {}, verbose: false }.merge(opts) @verbose = opts[:verbose] @nsqlookupd = create_nsqlookupds(opts[:nsqlookupd_count], opts[:nsqdlookupd_options]) @nsqd = create_nsqds(opts[:nsqd_count], opts[:nsqd_options]) @nsqadmin = create_nsqadmin if opts[:nsqadmin] begin # start everything! all_services.each { |d| d.start } rescue Exception => ex # if we hit an error, stop everything that we started destroy raise ex end end |
Instance Attribute Details
#nsqadmin ⇒ Object (readonly)
Returns the value of attribute nsqadmin.
22 23 24 |
# File 'lib/nsq-cluster.rb', line 22 def nsqadmin @nsqadmin end |
#nsqd ⇒ Object (readonly)
Returns the value of attribute nsqd.
22 23 24 |
# File 'lib/nsq-cluster.rb', line 22 def nsqd @nsqd end |
#nsqlookupd ⇒ Object (readonly)
Returns the value of attribute nsqlookupd.
22 23 24 |
# File 'lib/nsq-cluster.rb', line 22 def nsqlookupd @nsqlookupd end |
Instance Method Details
#block_until_running(timeout = 3) ⇒ Object
96 97 98 99 100 101 102 103 104 105 106 |
# File 'lib/nsq-cluster.rb', line 96 def block_until_running(timeout = 3) puts "Waiting for cluster to launch..." if @verbose begin Timeout::timeout(timeout) do all_services.each {|service| service.block_until_running} puts "Cluster launched." if @verbose end rescue Timeout::Error raise "Cluster did not fully launch within #{timeout} seconds." end end |
#block_until_stopped(timeout = 10) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/nsq-cluster.rb', line 109 def block_until_stopped(timeout = 10) puts "Waiting for cluster to stop..." if @verbose begin Timeout::timeout(timeout) do all_services.each{|service| service.block_until_stopped} puts "Cluster stopped." if @verbose end rescue Timeout::Error raise "Cluster did not fully stop within #{timeout} seconds." end end |
#create_nsqadmin ⇒ Object
77 78 79 80 81 82 |
# File 'lib/nsq-cluster.rb', line 77 def create_nsqadmin Nsqadmin.new( { nsqlookupd: @nsqlookupd }, @verbose ) end |
#create_nsqds(count, options) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/nsq-cluster.rb', line 63 def create_nsqds(count, ) (0...count).map do |idx| Nsqd.new( .merge({ tcp_port: 4150 + idx * 2, http_port: 4151 + idx * 2, nsqlookupd: @nsqlookupd, }), @verbose ) end end |
#create_nsqlookupds(count, options) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/nsq-cluster.rb', line 50 def create_nsqlookupds(count, ) (0...count).map do |idx| Nsqlookupd.new( .merge({ tcp_port: 4160 + idx * 2, http_port: 4161 + idx * 2 }), @verbose ) end end |
#destroy ⇒ Object
85 86 87 |
# File 'lib/nsq-cluster.rb', line 85 def destroy all_services.each{|s| s.destroy} end |
#nsqlookupd_http_endpoints ⇒ Object
return an array of http endpoints
91 92 93 |
# File 'lib/nsq-cluster.rb', line 91 def nsqlookupd_http_endpoints @nsqlookupd.map { |lookupd| "http://#{lookupd.host}:#{lookupd.http_port}" } end |