Class: FastlyNsq::Http::Nsqd
- Inherits:
-
Object
- Object
- FastlyNsq::Http::Nsqd
- Extended by:
- Forwardable
- Defined in:
- lib/fastly_nsq/http/nsqd.rb
Overview
Provides an interface to the the functionality provided by the nsqd HTTP interface
Defined Under Namespace
Classes: InvalidFormatError
Constant Summary collapse
- BASE_NSQD_URL =
ENV.fetch('NSQD_URL') do if ENV['NSQD_HTTPS_ADDRESS'] "https://#{ENV.fetch('NSQD_HTTPS_ADDRESS')}" else "http://#{ENV.fetch('NSQD_HTTP_ADDRESS')}" end end
- VALID_FORMATS =
%w[text json].freeze
Class Method Summary collapse
-
.channel_create(topic:, channel:, **args) ⇒ Object
Create a channel for an existing topic.
-
.channel_delete(topic:, channel:, **args) ⇒ Object
Delete an existing channel for an existing topic.
-
.channel_empty(topic:, channel:, **args) ⇒ Object
Empty all queued messages (in-memory and disk) for an existing channel.
-
.channel_pause(topic:, channel:, **args) ⇒ Object
Pause message flow to consumers of an existing channel (messages will queue at the channel).
-
.channel_unpause(topic:, channel:, **args) ⇒ Object
Resume message flow to consumers of and existing, paused, channel.
-
.config_nsqlookupd_tcp_addresses(**args) ⇒ Object
List of nsqlookupd TCP addresses.
-
.info(**args) ⇒ Object
NSQ version information.
-
.mpub(topic:, binary: false, message:, **args) ⇒ Object
Publish multiple messages in one roundtrip.
-
.ping(**args) ⇒ Object
Monitoring endpoint, should return 200 OK.
-
.pub(topic:, message:, defer: nil, **args) ⇒ Object
Publish a message.
-
.stats(topic: nil, channel: nil, format: 'json', **args) ⇒ Object
Return Internal Statistics.
-
.topic_create(topic:, **args) ⇒ Object
Create a topic.
-
.topic_delete(topic:, **args) ⇒ Object
Delete a topic (and all of its channels).
-
.topic_empty(topic:, **args) ⇒ Object
Empty all the queued messages (in-memory and disk) for an existing topic.
-
.topic_pause(topic:, **args) ⇒ Object
Pause message flow to all channels on an existing topic (messages will queue at the topic).
-
.topic_unpause(topic:, **args) ⇒ Object
Unpause message flow to all channels of an existing, paused, topic.
Instance Method Summary collapse
-
#initialize(request_uri:, base_uri: BASE_NSQD_URL, adapter: FastlyNsq::Http) ⇒ Nsqd
constructor
Nsqd http wrapper.
Constructor Details
#initialize(request_uri:, base_uri: BASE_NSQD_URL, adapter: FastlyNsq::Http) ⇒ Nsqd
Nsqd http wrapper. Provides a simple interface to all NSQD http api’s
193 194 195 196 197 198 199 200 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 193 def initialize(request_uri:, base_uri: BASE_NSQD_URL, adapter: FastlyNsq::Http) @base_uri = base_uri @adapter = adapter uri = URI.join(@base_uri, request_uri) @client = @adapter.new(uri: uri) @client.use_ssl end |
Class Method Details
.channel_create(topic:, channel:, **args) ⇒ Object
Create a channel for an existing topic
145 146 147 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 145 def self.channel_create(topic:, channel:, **args) new(request_uri: '/channel/create', **args).post(topic: topic, channel: channel) end |
.channel_delete(topic:, channel:, **args) ⇒ Object
Delete an existing channel for an existing topic
154 155 156 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 154 def self.channel_delete(topic:, channel:, **args) new(request_uri: '/channel/delete', **args).post(topic: topic, channel: channel) end |
.channel_empty(topic:, channel:, **args) ⇒ Object
Empty all queued messages (in-memory and disk) for an existing channel
163 164 165 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 163 def self.channel_empty(topic:, channel:, **args) new(request_uri: '/channel/empty', **args).post(topic: topic, channel: channel) end |
.channel_pause(topic:, channel:, **args) ⇒ Object
Pause message flow to consumers of an existing channel (messages will queue at the channel)
173 174 175 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 173 def self.channel_pause(topic:, channel:, **args) new(request_uri: '/channel/pause', **args).post(topic: topic, channel: channel) end |
.channel_unpause(topic:, channel:, **args) ⇒ Object
Resume message flow to consumers of and existing, paused, channel
182 183 184 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 182 def self.channel_unpause(topic:, channel:, **args) new(request_uri: '/channel/unpause', **args).post(topic: topic, channel: channel) end |
.config_nsqlookupd_tcp_addresses(**args) ⇒ Object
List of nsqlookupd TCP addresses
95 96 97 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 95 def self.config_nsqlookupd_tcp_addresses(**args) new(request_uri: '/config/nsqlookupd_tcp_addresses', **args).get end |
.info(**args) ⇒ Object
NSQ version information
35 36 37 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 35 def self.info(**args) new(request_uri: '/info', **args).get end |
.mpub(topic:, binary: false, message:, **args) ⇒ Object
Publish multiple messages in one roundtrip
NOTE: by default /mpub expects messages to be delimited by \n, use the
+binary: true+ parameter to enable binary mode where body
is expected to be in the following format (the HTTP Content-Length
header should be sent as the total size of the POST body):
[ 4-byte num ]
[ 4-byte #1 size ][ N-byte binary data ]
... (repeated <> times)
TODO: setup Content-Legth header when binary is passed.
86 87 88 89 90 91 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 86 def self.mpub(topic:, binary: false, message:, **args) binary_param = binary ? 'true' : 'false' raise NotImplementedError, 'binary mode has yet to be implemented' if binary params = { topic: topic, binary: binary_param } new(request_uri: '/mpub', **args).post(params, ) end |
.ping(**args) ⇒ Object
Monitoring endpoint, should return 200 OK. It returns an HTTP 500 if it is not healthy.
NOTE: The only “unhealthy” state is if nsqd failed to write messages to disk when overflow occurred.
29 30 31 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 29 def self.ping(**args) new(request_uri: '/ping', **args).get end |
.pub(topic:, message:, defer: nil, **args) ⇒ Object
Publish a message
64 65 66 67 68 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 64 def self.pub(topic:, message:, defer: nil, **args) params = { topic: topic } params[:defer] = defer if defer new(request_uri: '/pub', **args).post(params, ) end |
.stats(topic: nil, channel: nil, format: 'json', **args) ⇒ Object
Return Internal Statistics
50 51 52 53 54 55 56 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 50 def self.stats(topic: nil, channel: nil, format: 'json', **args) raise InvalidFormatError unless VALID_FORMATS.include?(format) params = { format: format } params[:topic] = topic if topic params[:channel] = channel if channel new(request_uri: '/stats', **args).get(params) end |
.topic_create(topic:, **args) ⇒ Object
Create a topic
103 104 105 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 103 def self.topic_create(topic:, **args) new(request_uri: '/topic/create', **args).post(topic: topic) end |
.topic_delete(topic:, **args) ⇒ Object
Delete a topic (and all of its channels)
111 112 113 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 111 def self.topic_delete(topic:, **args) new(request_uri: '/topic/delete', **args).post(topic: topic) end |
.topic_empty(topic:, **args) ⇒ Object
Empty all the queued messages (in-memory and disk) for an existing topic
119 120 121 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 119 def self.topic_empty(topic:, **args) new(request_uri: '/topic/empty', **args).post(topic: topic) end |
.topic_pause(topic:, **args) ⇒ Object
Pause message flow to all channels on an existing topic (messages will queue at the topic)
128 129 130 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 128 def self.topic_pause(topic:, **args) new(request_uri: '/topic/pause', **args).post(topic: topic) end |
.topic_unpause(topic:, **args) ⇒ Object
Unpause message flow to all channels of an existing, paused, topic
136 137 138 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 136 def self.topic_unpause(topic:, **args) new(request_uri: '/topic/unpause', **args).post(topic: topic) end |