Class: Nsqd
- Inherits:
-
ProcessWrapper
- Object
- ProcessWrapper
- Nsqd
- Includes:
- HTTPWrapper
- Defined in:
- lib/nsq-cluster/nsqd.rb
Constant Summary
Constants inherited from ProcessWrapper
ProcessWrapper::HTTPCHECK_INTERVAL
Instance Attribute Summary collapse
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#http_port ⇒ Object
readonly
Returns the value of attribute http_port.
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#tcp_port ⇒ Object
readonly
Returns the value of attribute tcp_port.
Attributes inherited from ProcessWrapper
Instance Method Summary collapse
- #args ⇒ Object
- #command ⇒ Object
-
#create(params = {}) ⇒ Object
create a topic or a channel in an existing topic.
-
#data_path ⇒ Object
find or create a temporary data directory for this instance.
-
#delete(params = {}) ⇒ Object
delete a topic or a channel in an existing topic.
- #destroy ⇒ Object
-
#empty(params = {}) ⇒ Object
empty a topic or a channel in an existing topic.
-
#info ⇒ Object
returns version number.
-
#initialize(opts = {}, verbose = false) ⇒ Nsqd
constructor
A new instance of Nsqd.
-
#mpub(topic, *messages) ⇒ Object
publish multiple messages to a topic.
-
#pause(params = {}) ⇒ Object
pause a topic or a channel in a topic.
-
#ping ⇒ Object
monitoring endpoint.
-
#pub(topic, message) ⇒ Object
publish a single message to a topic.
-
#stats ⇒ Object
return stats in json format.
-
#unpause(params = {}) ⇒ Object
unpause a topic or a channel in a topic.
Methods included from HTTPWrapper
Methods inherited from ProcessWrapper
#another_instance_is_running?, #block_until_running, #block_until_stopped, #output, #running?, #start, #stop
Constructor Details
#initialize(opts = {}, verbose = false) ⇒ Nsqd
Returns a new instance of Nsqd.
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/nsq-cluster/nsqd.rb', line 12 def initialize(opts = {}, verbose = false) super @id = opts.delete(:id) || 0 @host = opts.delete(:host) || '127.0.0.1' @tcp_port = opts.delete(:tcp_port) || (4150 + @id * 2) @http_port = opts.delete(:http_port) || (4151 + @id * 2) @lookupd = opts.delete(:nsqlookupd) || [] @broadcast_address = opts.delete(:broadcast_address) || @host @extra_args = opts.map do |key, value| "--#{key.to_s.gsub('_', '-')}=#{value}" end clear_data_directory create_data_directory end |
Instance Attribute Details
#host ⇒ Object (readonly)
Returns the value of attribute host.
9 10 11 |
# File 'lib/nsq-cluster/nsqd.rb', line 9 def host @host end |
#http_port ⇒ Object (readonly)
Returns the value of attribute http_port.
9 10 11 |
# File 'lib/nsq-cluster/nsqd.rb', line 9 def http_port @http_port end |
#id ⇒ Object (readonly)
Returns the value of attribute id.
9 10 11 |
# File 'lib/nsq-cluster/nsqd.rb', line 9 def id @id end |
#tcp_port ⇒ Object (readonly)
Returns the value of attribute tcp_port.
9 10 11 |
# File 'lib/nsq-cluster/nsqd.rb', line 9 def tcp_port @tcp_port end |
Instance Method Details
#args ⇒ Object
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/nsq-cluster/nsqd.rb', line 42 def args base_args = [ %Q(--tcp-address=#{@host}:#{@tcp_port}), %Q(--http-address=#{@host}:#{@http_port}), %Q(--data-path=#{data_path}), %Q(--worker-id=#{id}), %Q(--broadcast-address=#{@broadcast_address}) ] lookupd_args = @lookupd.map do |ld| %Q(--lookupd-tcp-address=#{ld.host}:#{ld.tcp_port}) end base_args + @extra_args + lookupd_args end |
#command ⇒ Object
37 38 39 |
# File 'lib/nsq-cluster/nsqd.rb', line 37 def command 'nsqd' end |
#create(params = {}) ⇒ Object
create a topic or a channel in an existing topic
78 79 80 |
# File 'lib/nsq-cluster/nsqd.rb', line 78 def create(params = {}) nsqd_post 'create', topic: params[:topic], channel: params[:channel] end |
#data_path ⇒ Object
find or create a temporary data directory for this instance
60 61 62 |
# File 'lib/nsq-cluster/nsqd.rb', line 60 def data_path "/tmp/nsqd-#{id}" end |
#delete(params = {}) ⇒ Object
delete a topic or a channel in an existing topic
84 85 86 |
# File 'lib/nsq-cluster/nsqd.rb', line 84 def delete(params = {}) nsqd_post 'delete', topic: params[:topic], channel: params[:channel] end |
#destroy ⇒ Object
31 32 33 34 |
# File 'lib/nsq-cluster/nsqd.rb', line 31 def destroy super clear_data_directory end |
#empty(params = {}) ⇒ Object
empty a topic or a channel in an existing topic
90 91 92 |
# File 'lib/nsq-cluster/nsqd.rb', line 90 def empty(params = {}) nsqd_post 'empty', topic: params[:topic], channel: params[:channel] end |
#info ⇒ Object
returns version number
120 121 122 |
# File 'lib/nsq-cluster/nsqd.rb', line 120 def info get 'info' end |
#mpub(topic, *messages) ⇒ Object
publish multiple messages to a topic
72 73 74 |
# File 'lib/nsq-cluster/nsqd.rb', line 72 def mpub(topic, *) post 'mpub', { topic: topic }, .join("\n") end |
#pause(params = {}) ⇒ Object
pause a topic or a channel in a topic
96 97 98 |
# File 'lib/nsq-cluster/nsqd.rb', line 96 def pause(params = {}) nsqd_post 'pause', topic: params[:topic], channel: params[:channel] end |
#ping ⇒ Object
monitoring endpoint
114 115 116 |
# File 'lib/nsq-cluster/nsqd.rb', line 114 def ping get 'ping' end |
#pub(topic, message) ⇒ Object
publish a single message to a topic
66 67 68 |
# File 'lib/nsq-cluster/nsqd.rb', line 66 def pub(topic, ) post 'pub', { topic: topic }, end |
#stats ⇒ Object
return stats in json format
108 109 110 |
# File 'lib/nsq-cluster/nsqd.rb', line 108 def stats get 'stats', format: 'json' end |
#unpause(params = {}) ⇒ Object
unpause a topic or a channel in a topic
102 103 104 |
# File 'lib/nsq-cluster/nsqd.rb', line 102 def unpause(params = {}) nsqd_post 'unpause', topic: params[:topic], channel: params[:channel] end |