Class: FastlyNsq::Http::Nsqd
- Inherits:
-
Object
- Object
- FastlyNsq::Http::Nsqd
- Extended by:
- Forwardable
- Defined in:
- lib/fastly_nsq/http/nsqd.rb
Overview
Provides an interface to the the functionality provided by the nsqd HTTP interface
Defined Under Namespace
Classes: InvalidFormatError
Constant Summary collapse
- BASE_NSQD_URL =
ENV.fetch("NSQD_URL") do if ENV["NSQD_HTTPS_ADDRESS"] "https://#{ENV.fetch("NSQD_HTTPS_ADDRESS")}" else "http://#{ENV.fetch("NSQD_HTTP_ADDRESS")}" end end
- VALID_FORMATS =
%w[text json].freeze
Class Method Summary collapse
-
.channel_create(topic:, channel:, **args) ⇒ Object
Create a channel for an existing topic.
-
.channel_delete(topic:, channel:, **args) ⇒ Object
Delete an existing channel for an existing topic.
-
.channel_empty(topic:, channel:, **args) ⇒ Object
Empty all queued messages (in-memory and disk) for an existing channel.
-
.channel_pause(topic:, channel:, **args) ⇒ Object
Pause message flow to consumers of an existing channel (messages will queue at the channel).
-
.channel_unpause(topic:, channel:, **args) ⇒ Object
Resume message flow to consumers of and existing, paused, channel.
-
.config_nsqlookupd_tcp_addresses(**args) ⇒ Object
List of nsqlookupd TCP addresses.
-
.info(**args) ⇒ Object
NSQ version information.
-
.mpub(topic:, message:, binary: false, **args) ⇒ Object
Publish multiple messages in one roundtrip.
-
.ping(**args) ⇒ Object
Monitoring endpoint, should return 200 OK.
-
.pub(topic:, message:, defer: nil, **args) ⇒ Object
Publish a message.
-
.stats(topic: nil, channel: nil, format: "json", **args) ⇒ Object
Return Internal Statistics.
-
.topic_create(topic:, **args) ⇒ Object
Create a topic.
-
.topic_delete(topic:, **args) ⇒ Object
Delete a topic (and all of its channels).
-
.topic_empty(topic:, **args) ⇒ Object
Empty all the queued messages (in-memory and disk) for an existing topic.
-
.topic_pause(topic:, **args) ⇒ Object
Pause message flow to all channels on an existing topic (messages will queue at the topic).
-
.topic_unpause(topic:, **args) ⇒ Object
Unpause message flow to all channels of an existing, paused, topic.
Instance Method Summary collapse
-
#initialize(request_uri:, base_uri: BASE_NSQD_URL, adapter: FastlyNsq::Http) ⇒ Nsqd
constructor
Nsqd http wrapper.
Constructor Details
#initialize(request_uri:, base_uri: BASE_NSQD_URL, adapter: FastlyNsq::Http) ⇒ Nsqd
Nsqd http wrapper. Provides a simple interface to all NSQD http api’s
193 194 195 196 197 198 199 200 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 193 def initialize(request_uri:, base_uri: BASE_NSQD_URL, adapter: FastlyNsq::Http) @base_uri = base_uri @adapter = adapter uri = URI.join(@base_uri, request_uri) @client = @adapter.new(uri: uri) @client.use_ssl end |
Class Method Details
.channel_create(topic:, channel:, **args) ⇒ Object
Create a channel for an existing topic
145 146 147 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 145 def self.channel_create(topic:, channel:, **args) new(request_uri: "/channel/create", **args).post(topic: topic, channel: channel) end |
.channel_delete(topic:, channel:, **args) ⇒ Object
Delete an existing channel for an existing topic
154 155 156 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 154 def self.channel_delete(topic:, channel:, **args) new(request_uri: "/channel/delete", **args).post(topic: topic, channel: channel) end |
.channel_empty(topic:, channel:, **args) ⇒ Object
Empty all queued messages (in-memory and disk) for an existing channel
163 164 165 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 163 def self.channel_empty(topic:, channel:, **args) new(request_uri: "/channel/empty", **args).post(topic: topic, channel: channel) end |
.channel_pause(topic:, channel:, **args) ⇒ Object
Pause message flow to consumers of an existing channel (messages will queue at the channel)
173 174 175 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 173 def self.channel_pause(topic:, channel:, **args) new(request_uri: "/channel/pause", **args).post(topic: topic, channel: channel) end |
.channel_unpause(topic:, channel:, **args) ⇒ Object
Resume message flow to consumers of and existing, paused, channel
182 183 184 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 182 def self.channel_unpause(topic:, channel:, **args) new(request_uri: "/channel/unpause", **args).post(topic: topic, channel: channel) end |
.config_nsqlookupd_tcp_addresses(**args) ⇒ Object
List of nsqlookupd TCP addresses
95 96 97 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 95 def self.config_nsqlookupd_tcp_addresses(**args) new(request_uri: "/config/nsqlookupd_tcp_addresses", **args).get end |
.info(**args) ⇒ Object
NSQ version information
35 36 37 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 35 def self.info(**args) new(request_uri: "/info", **args).get end |
.mpub(topic:, message:, binary: false, **args) ⇒ Object
Publish multiple messages in one roundtrip
NOTE: by default /mpub
expects messages to be delimited by \n
, use the
+binary: true+ parameter to enable binary mode where message body
is expected to be in the following format (the HTTP Content-Length
header should be sent as the total size of the POST body):
[ 4-byte num messages ]
[ 4-byte message #1 size ][ N-byte binary data ]
... (repeated <num_messages> times)
TODO: setup Content-Legth
header when binary is passed.
86 87 88 89 90 91 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 86 def self.mpub(topic:, message:, binary: false, **args) binary_param = binary ? "true" : "false" raise NotImplementedError, "binary mode has yet to be implemented" if binary params = {topic: topic, binary: binary_param} new(request_uri: "/mpub", **args).post(params, ) end |
.ping(**args) ⇒ Object
Monitoring endpoint, should return 200 OK. It returns an HTTP 500 if it is not healthy.
NOTE: The only “unhealthy” state is if nsqd failed to write messages to disk when overflow occurred.
29 30 31 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 29 def self.ping(**args) new(request_uri: "/ping", **args).get end |
.pub(topic:, message:, defer: nil, **args) ⇒ Object
Publish a message
64 65 66 67 68 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 64 def self.pub(topic:, message:, defer: nil, **args) params = {topic: topic} params[:defer] = defer if defer new(request_uri: "/pub", **args).post(params, ) end |
.stats(topic: nil, channel: nil, format: "json", **args) ⇒ Object
Return Internal Statistics
50 51 52 53 54 55 56 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 50 def self.stats(topic: nil, channel: nil, format: "json", **args) raise InvalidFormatError unless VALID_FORMATS.include?(format) params = {format: format} params[:topic] = topic if topic params[:channel] = channel if channel new(request_uri: "/stats", **args).get(params) end |
.topic_create(topic:, **args) ⇒ Object
Create a topic
103 104 105 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 103 def self.topic_create(topic:, **args) new(request_uri: "/topic/create", **args).post(topic: topic) end |
.topic_delete(topic:, **args) ⇒ Object
Delete a topic (and all of its channels)
111 112 113 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 111 def self.topic_delete(topic:, **args) new(request_uri: "/topic/delete", **args).post(topic: topic) end |
.topic_empty(topic:, **args) ⇒ Object
Empty all the queued messages (in-memory and disk) for an existing topic
119 120 121 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 119 def self.topic_empty(topic:, **args) new(request_uri: "/topic/empty", **args).post(topic: topic) end |
.topic_pause(topic:, **args) ⇒ Object
Pause message flow to all channels on an existing topic (messages will queue at the topic)
128 129 130 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 128 def self.topic_pause(topic:, **args) new(request_uri: "/topic/pause", **args).post(topic: topic) end |
.topic_unpause(topic:, **args) ⇒ Object
Unpause message flow to all channels of an existing, paused, topic
136 137 138 |
# File 'lib/fastly_nsq/http/nsqd.rb', line 136 def self.topic_unpause(topic:, **args) new(request_uri: "/topic/unpause", **args).post(topic: topic) end |