Class: Nsqd

Inherits:
ProcessWrapper show all
Includes:
HTTPWrapper
Defined in:
lib/nsq-cluster/nsqd.rb

Constant Summary

Constants inherited from ProcessWrapper

ProcessWrapper::HTTPCHECK_INTERVAL

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from HTTPWrapper

#get, #post

Methods inherited from ProcessWrapper

#another_instance_is_running?, #block_until_running, #block_until_stopped, #output, #running?, #start, #stop

Constructor Details

#initialize(opts = {}) ⇒ Nsqd

Returns a new instance of Nsqd.



12
13
14
15
16
17
18
19
20
21
22
23
24
# File 'lib/nsq-cluster/nsqd.rb', line 12

def initialize(opts = {})
  @host = opts[:host] || '127.0.0.1'
  @tcp_port = opts[:tcp_port] || 4150
  @http_port = opts[:http_port] || 4151
  @lookupd = opts[:nsqlookupd] || []
  @msg_timeout = opts[:msg_timeout] || '60s'
  @broadcast_address = opts[:broadcast_address] || @host

  clear_data_directory
  create_data_directory

  super
end

Instance Attribute Details

#hostObject (readonly)

Returns the value of attribute host.



9
10
11
# File 'lib/nsq-cluster/nsqd.rb', line 9

def host
  @host
end

#http_portObject (readonly)

Returns the value of attribute http_port.



9
10
11
# File 'lib/nsq-cluster/nsqd.rb', line 9

def http_port
  @http_port
end

#tcp_portObject (readonly)

Returns the value of attribute tcp_port.



9
10
11
# File 'lib/nsq-cluster/nsqd.rb', line 9

def tcp_port
  @tcp_port
end

Instance Method Details

#argsObject



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/nsq-cluster/nsqd.rb', line 38

def args
  base_args = [
    %Q(--tcp-address=#{@host}:#{@tcp_port}),
    %Q(--http-address=#{@host}:#{@http_port}),
    %Q(--data-path=#{data_path}),
    %Q(--worker-id=#{worker_id}),
    %Q(--msg-timeout=#{@msg_timeout}),
    %Q(--broadcast-address=#{@broadcast_address})
  ]

  lookupd_args = @lookupd.map do |ld|
    %Q(--lookupd-tcp-address=#{ld.host}:#{ld.tcp_port})
  end

  base_args + lookupd_args
end

#commandObject



33
34
35
# File 'lib/nsq-cluster/nsqd.rb', line 33

def command
  'nsqd'
end

#create(params = {}) ⇒ Object

create a topic or a channel in an existing topic



80
81
82
# File 'lib/nsq-cluster/nsqd.rb', line 80

def create(params = {})
  nsqd_post 'create', topic: params[:topic], channel: params[:channel]
end

#data_pathObject

find or create a temporary data directory for this instance



62
63
64
# File 'lib/nsq-cluster/nsqd.rb', line 62

def data_path
  "/tmp/nsqd-#{worker_id}"
end

#delete(params = {}) ⇒ Object

delete a topic or a channel in an existing topic



86
87
88
# File 'lib/nsq-cluster/nsqd.rb', line 86

def delete(params = {})
  nsqd_post 'delete', topic: params[:topic], channel: params[:channel]
end

#destroyObject



27
28
29
30
# File 'lib/nsq-cluster/nsqd.rb', line 27

def destroy
  super
  clear_data_directory
end

#empty(params = {}) ⇒ Object

empty a topic or a channel in an existing topic



92
93
94
# File 'lib/nsq-cluster/nsqd.rb', line 92

def empty(params = {})
  nsqd_post 'empty', topic: params[:topic], channel: params[:channel]
end

#infoObject

returns version number



122
123
124
# File 'lib/nsq-cluster/nsqd.rb', line 122

def info
  get 'info'
end

#mpub(topic, *messages) ⇒ Object

publish multiple messages to a topic



74
75
76
# File 'lib/nsq-cluster/nsqd.rb', line 74

def mpub(topic, *messages)
  post 'mpub', { topic: topic }, messages.join("\n")
end

#pause(params = {}) ⇒ Object

pause a topic or a channel in a topic



98
99
100
# File 'lib/nsq-cluster/nsqd.rb', line 98

def pause(params = {})
  nsqd_post 'pause', topic: params[:topic], channel: params[:channel]
end

#pingObject

monitoring endpoint



116
117
118
# File 'lib/nsq-cluster/nsqd.rb', line 116

def ping
  get 'ping'
end

#pub(topic, message) ⇒ Object

publish a single message to a topic



68
69
70
# File 'lib/nsq-cluster/nsqd.rb', line 68

def pub(topic, message)
  post 'pub', { topic: topic }, message
end

#statsObject

return stats in json format



110
111
112
# File 'lib/nsq-cluster/nsqd.rb', line 110

def stats
  get 'stats', format: 'json'
end

#unpause(params = {}) ⇒ Object

unpause a topic or a channel in a topic



104
105
106
# File 'lib/nsq-cluster/nsqd.rb', line 104

def unpause(params = {})
  nsqd_post 'unpause', topic: params[:topic], channel: params[:channel]
end

#worker_idObject



56
57
58
# File 'lib/nsq-cluster/nsqd.rb', line 56

def worker_id
  @tcp_port
end