Class: Nsqd

Inherits:
ProcessWrapper show all
Includes:
HTTPWrapper
Defined in:
lib/nsq-cluster/nsqd.rb

Constant Summary

Constants inherited from ProcessWrapper

ProcessWrapper::HTTPCHECK_INTERVAL

Instance Attribute Summary collapse

Attributes inherited from ProcessWrapper

#pid

Instance Method Summary collapse

Methods included from HTTPWrapper

#get, #post

Methods inherited from ProcessWrapper

#another_instance_is_running?, #block_until_running, #block_until_stopped, #output, #running?, #start, #stop

Constructor Details

#initialize(opts = {}, verbose = false) ⇒ Nsqd

Returns a new instance of Nsqd.



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/nsq-cluster/nsqd.rb', line 12

def initialize(opts = {}, verbose = false)
  super

  @id = opts.delete(:id) || 0
  @host = opts.delete(:host) || '127.0.0.1'
  @tcp_port = opts.delete(:tcp_port) || (4150 + @id * 2)
  @http_port = opts.delete(:http_port) || (4151 + @id * 2)
  @lookupd = opts.delete(:nsqlookupd) || []
  @broadcast_address = opts.delete(:broadcast_address) || @host

  @extra_args = opts.map do |key, value|
    "--#{key.to_s.gsub('_', '-')}=#{value}"
  end

  clear_data_directory
  create_data_directory
end

Instance Attribute Details

#hostObject (readonly)

Returns the value of attribute host.



9
10
11
# File 'lib/nsq-cluster/nsqd.rb', line 9

def host
  @host
end

#http_portObject (readonly)

Returns the value of attribute http_port.



9
10
11
# File 'lib/nsq-cluster/nsqd.rb', line 9

def http_port
  @http_port
end

#idObject (readonly)

Returns the value of attribute id.



9
10
11
# File 'lib/nsq-cluster/nsqd.rb', line 9

def id
  @id
end

#tcp_portObject (readonly)

Returns the value of attribute tcp_port.



9
10
11
# File 'lib/nsq-cluster/nsqd.rb', line 9

def tcp_port
  @tcp_port
end

Instance Method Details

#argsObject



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/nsq-cluster/nsqd.rb', line 42

def args
  base_args = [
    %Q(--tcp-address=#{@host}:#{@tcp_port}),
    %Q(--http-address=#{@host}:#{@http_port}),
    %Q(--data-path=#{data_path}),
    %Q(--worker-id=#{id}),
    %Q(--broadcast-address=#{@broadcast_address})
  ]

  lookupd_args = @lookupd.map do |ld|
    %Q(--lookupd-tcp-address=#{ld.host}:#{ld.tcp_port})
  end

  base_args + @extra_args + lookupd_args
end

#commandObject



37
38
39
# File 'lib/nsq-cluster/nsqd.rb', line 37

def command
  'nsqd'
end

#create(params = {}) ⇒ Object

create a topic or a channel in an existing topic



78
79
80
# File 'lib/nsq-cluster/nsqd.rb', line 78

def create(params = {})
  nsqd_post 'create', topic: params[:topic], channel: params[:channel]
end

#data_pathObject

find or create a temporary data directory for this instance



60
61
62
# File 'lib/nsq-cluster/nsqd.rb', line 60

def data_path
  "/tmp/nsqd-#{id}"
end

#delete(params = {}) ⇒ Object

delete a topic or a channel in an existing topic



84
85
86
# File 'lib/nsq-cluster/nsqd.rb', line 84

def delete(params = {})
  nsqd_post 'delete', topic: params[:topic], channel: params[:channel]
end

#destroyObject



31
32
33
34
# File 'lib/nsq-cluster/nsqd.rb', line 31

def destroy
  super
  clear_data_directory
end

#empty(params = {}) ⇒ Object

empty a topic or a channel in an existing topic



90
91
92
# File 'lib/nsq-cluster/nsqd.rb', line 90

def empty(params = {})
  nsqd_post 'empty', topic: params[:topic], channel: params[:channel]
end

#infoObject

returns version number



120
121
122
# File 'lib/nsq-cluster/nsqd.rb', line 120

def info
  get 'info'
end

#mpub(topic, *messages) ⇒ Object

publish multiple messages to a topic



72
73
74
# File 'lib/nsq-cluster/nsqd.rb', line 72

def mpub(topic, *messages)
  post 'mpub', { topic: topic }, messages.join("\n")
end

#pause(params = {}) ⇒ Object

pause a topic or a channel in a topic



96
97
98
# File 'lib/nsq-cluster/nsqd.rb', line 96

def pause(params = {})
  nsqd_post 'pause', topic: params[:topic], channel: params[:channel]
end

#pingObject

monitoring endpoint



114
115
116
# File 'lib/nsq-cluster/nsqd.rb', line 114

def ping
  get 'ping'
end

#pub(topic, message) ⇒ Object

publish a single message to a topic



66
67
68
# File 'lib/nsq-cluster/nsqd.rb', line 66

def pub(topic, message)
  post 'pub', { topic: topic }, message
end

#statsObject

return stats in json format



108
109
110
# File 'lib/nsq-cluster/nsqd.rb', line 108

def stats
  get 'stats', format: 'json'
end

#unpause(params = {}) ⇒ Object

unpause a topic or a channel in a topic



102
103
104
# File 'lib/nsq-cluster/nsqd.rb', line 102

def unpause(params = {})
  nsqd_post 'unpause', topic: params[:topic], channel: params[:channel]
end