Class: FastlyNsq::Http::Nsqd

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/fastly_nsq/http/nsqd.rb

Overview

Provides an interface to the the functionality provided by the nsqd HTTP interface

Defined Under Namespace

Classes: InvalidFormatError

Constant Summary collapse

BASE_NSQD_URL =
ENV.fetch("NSQD_URL") do
  if ENV["NSQD_HTTPS_ADDRESS"]
    "https://#{ENV.fetch("NSQD_HTTPS_ADDRESS")}"
  else
    "http://#{ENV.fetch("NSQD_HTTP_ADDRESS")}"
  end
end
VALID_FORMATS =
%w[text json].freeze

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(request_uri:, base_uri: BASE_NSQD_URL, adapter: FastlyNsq::Http) ⇒ Nsqd

Nsqd http wrapper. Provides a simple interface to all NSQD http api’s



193
194
195
196
197
198
199
200
# File 'lib/fastly_nsq/http/nsqd.rb', line 193

def initialize(request_uri:, base_uri: BASE_NSQD_URL, adapter: FastlyNsq::Http)
  @base_uri = base_uri
  @adapter = adapter
  uri = URI.join(@base_uri, request_uri)

  @client = @adapter.new(uri: uri)
  @client.use_ssl
end

Class Method Details

.channel_create(topic:, channel:, **args) ⇒ Object

Create a channel for an existing topic

Parameters:

  • topic (String)

    the existing topic

  • channel (String)

    the channel to create



145
146
147
# File 'lib/fastly_nsq/http/nsqd.rb', line 145

def self.channel_create(topic:, channel:, **args)
  new(request_uri: "/channel/create", **args).post(topic: topic, channel: channel)
end

.channel_delete(topic:, channel:, **args) ⇒ Object

Delete an existing channel for an existing topic

Parameters:

  • topic (String)

    the existing topic

  • channel (String)

    the channel to delete



154
155
156
# File 'lib/fastly_nsq/http/nsqd.rb', line 154

def self.channel_delete(topic:, channel:, **args)
  new(request_uri: "/channel/delete", **args).post(topic: topic, channel: channel)
end

.channel_empty(topic:, channel:, **args) ⇒ Object

Empty all queued messages (in-memory and disk) for an existing channel

Parameters:

  • topic (String)

    the existing topic

  • channel (String)

    the channel to empty



163
164
165
# File 'lib/fastly_nsq/http/nsqd.rb', line 163

def self.channel_empty(topic:, channel:, **args)
  new(request_uri: "/channel/empty", **args).post(topic: topic, channel: channel)
end

.channel_pause(topic:, channel:, **args) ⇒ Object

Pause message flow to consumers of an existing channel (messages will queue at the channel)

Parameters:

  • topic (String)

    the existing topic

  • channel (String)

    the channel to pause



173
174
175
# File 'lib/fastly_nsq/http/nsqd.rb', line 173

def self.channel_pause(topic:, channel:, **args)
  new(request_uri: "/channel/pause", **args).post(topic: topic, channel: channel)
end

.channel_unpause(topic:, channel:, **args) ⇒ Object

Resume message flow to consumers of and existing, paused, channel

Parameters:

  • topic (String)

    the existing topic

  • channel (String)

    the existing, paused, channel to unpause



182
183
184
# File 'lib/fastly_nsq/http/nsqd.rb', line 182

def self.channel_unpause(topic:, channel:, **args)
  new(request_uri: "/channel/unpause", **args).post(topic: topic, channel: channel)
end

.config_nsqlookupd_tcp_addresses(**args) ⇒ Object

List of nsqlookupd TCP addresses



95
96
97
# File 'lib/fastly_nsq/http/nsqd.rb', line 95

def self.config_nsqlookupd_tcp_addresses(**args)
  new(request_uri: "/config/nsqlookupd_tcp_addresses", **args).get
end

.info(**args) ⇒ Object

NSQ version information



35
36
37
# File 'lib/fastly_nsq/http/nsqd.rb', line 35

def self.info(**args)
  new(request_uri: "/info", **args).get
end

.mpub(topic:, message:, binary: false, **args) ⇒ Object

Publish multiple messages in one roundtrip

NOTE: by default /mpub expects messages to be delimited by \n, use the

+binary: true+ parameter to enable binary mode where message body
is expected to be in the following format (the HTTP Content-Length
header should be sent as the total size of the POST body):
   [ 4-byte num messages ]
   [ 4-byte message #1 size ][ N-byte binary data ]
       ... (repeated <num_messages> times)

TODO: setup Content-Legth header when binary is passed.

Parameters:

  • topic (String)

    the topic to publish to

  • binary (Boolean) (defaults to: false)

    enables binary mode

  • message

    the messages to send with n used to seperate messages

Raises:

  • (NotImplementedError)


86
87
88
89
90
91
# File 'lib/fastly_nsq/http/nsqd.rb', line 86

def self.mpub(topic:, message:, binary: false, **args)
  binary_param = binary ? "true" : "false"
  raise NotImplementedError, "binary mode has yet to be implemented" if binary
  params = {topic: topic, binary: binary_param}
  new(request_uri: "/mpub", **args).post(params, message)
end

.ping(**args) ⇒ Object

Monitoring endpoint, should return 200 OK. It returns an HTTP 500 if it is not healthy.

NOTE: The only “unhealthy” state is if nsqd failed to write messages to disk when overflow occurred.



29
30
31
# File 'lib/fastly_nsq/http/nsqd.rb', line 29

def self.ping(**args)
  new(request_uri: "/ping", **args).get
end

.pub(topic:, message:, defer: nil, **args) ⇒ Object

Publish a message

Parameters:

  • topic (String)

    the topic to publish to

  • defer (String) (defaults to: nil)

    the time in ms to delay message delivery

  • message

    the message body



64
65
66
67
68
# File 'lib/fastly_nsq/http/nsqd.rb', line 64

def self.pub(topic:, message:, defer: nil, **args)
  params = {topic: topic}
  params[:defer] = defer if defer
  new(request_uri: "/pub", **args).post(params, message)
end

.stats(topic: nil, channel: nil, format: "json", **args) ⇒ Object

Return Internal Statistics

Examples:

Fetch Statistics for topic: ‘foo’, channel: ‘bar’ as text

Nsqd.stats(topic: 'foo', channel: 'bar', format: 'text')

Parameters:

  • topic (String) (defaults to: nil)

    filter to topic

  • channel (String) (defaults to: nil)

    filter to channel

  • format (String) (defaults to: "json")

    can be text or json

Raises:

  • (InvaildFormatError)

    provided format is not in list of valid formats



50
51
52
53
54
55
56
# File 'lib/fastly_nsq/http/nsqd.rb', line 50

def self.stats(topic: nil, channel: nil, format: "json", **args)
  raise InvalidFormatError unless VALID_FORMATS.include?(format)
  params = {format: format}
  params[:topic] = topic if topic
  params[:channel] = channel if channel
  new(request_uri: "/stats", **args).get(params)
end

.topic_create(topic:, **args) ⇒ Object

Create a topic

Parameters:

  • topic (String)

    the topic to create



103
104
105
# File 'lib/fastly_nsq/http/nsqd.rb', line 103

def self.topic_create(topic:, **args)
  new(request_uri: "/topic/create", **args).post(topic: topic)
end

.topic_delete(topic:, **args) ⇒ Object

Delete a topic (and all of its channels)

Parameters:

  • topic (String)

    the existing topic to delete



111
112
113
# File 'lib/fastly_nsq/http/nsqd.rb', line 111

def self.topic_delete(topic:, **args)
  new(request_uri: "/topic/delete", **args).post(topic: topic)
end

.topic_empty(topic:, **args) ⇒ Object

Empty all the queued messages (in-memory and disk) for an existing topic

Parameters:

  • topic (String)

    the existing topic to empty



119
120
121
# File 'lib/fastly_nsq/http/nsqd.rb', line 119

def self.topic_empty(topic:, **args)
  new(request_uri: "/topic/empty", **args).post(topic: topic)
end

.topic_pause(topic:, **args) ⇒ Object

Pause message flow to all channels on an existing topic (messages will queue at the topic)

Parameters:

  • topic (String)

    the existing topic to pause



128
129
130
# File 'lib/fastly_nsq/http/nsqd.rb', line 128

def self.topic_pause(topic:, **args)
  new(request_uri: "/topic/pause", **args).post(topic: topic)
end

.topic_unpause(topic:, **args) ⇒ Object

Unpause message flow to all channels of an existing, paused, topic

Parameters:

  • topic (String)

    the existing, paused topic to unpause



136
137
138
# File 'lib/fastly_nsq/http/nsqd.rb', line 136

def self.topic_unpause(topic:, **args)
  new(request_uri: "/topic/unpause", **args).post(topic: topic)
end