Class: Natsy::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/natsy/client.rb

Overview

The Natsy::Client class provides a basic interface for subscribing to messages by subject & queue, and replying to those messages. It also logs most functionality if desired.

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.default_queueObject

Optional default queue for message subscription and replies.



17
18
19
# File 'lib/natsy/client.rb', line 17

def default_queue
  @default_queue
end

.loggerObject

Optional logger for lifecycle events, messages received, etc.



14
15
16
# File 'lib/natsy/client.rb', line 14

def logger
  @logger
end

Class Method Details

.reply_to(subject, queue: nil, &block) ⇒ Object

Register a message handler with the Natsy::Client::reply_to method. Pass a subject string as the first argument (either a static subject string or a pattern to match more than one subject). Specify a queue (or don’t) with the queue: option. If you don’t provide the queue: option, it will be set to the value of default_queue, or to nil (no queue) if a default queue hasn’t been set.

The result of the given block will be published in reply to the message. The block is passed two arguments when a message matching the subject is received: data and subject. The data argument is the payload of the message (JSON objects/arrays will be parsed into string-keyed Hash objects/Array objects, respectively). The subject argument is the subject of the message received (mostly only useful if a pattern was specified instead of a static subject string).

Examples:

Natsy::Client.reply_to("some.subject", queue: "foobar") { |data| "Got it! #{data.inspect}" }

Natsy::Client.reply_to("some.*.pattern") { |data, subject| "Got #{data} on #{subject}" }

Natsy::Client.reply_to("other.subject") do |data|
  if data["foo"] == "bar"
    { is_bar: "Yep!" }
  else
    { is_bar: "No way!" }
  end
end

Natsy::Client.reply_to("subject.in.queue", queue: "barbaz") do
  "My turn!"
end


103
104
105
106
107
108
# File 'lib/natsy/client.rb', line 103

def reply_to(subject, queue: nil, &block)
  queue = Utils.presence(queue) || default_queue
  queue_desc = " in queue '#{queue}'" if queue
  log("Registering a reply handler for subject '#{subject}'#{queue_desc}", level: :debug)
  register_reply!(subject: subject.to_s, handler: block, queue: queue.to_s)
end

.start!Object

Start listening for messages with the Natsy::Client::start! method. This will spin up a non-blocking thread that subscribes to subjects (as specified by invocation(s) of ::reply_to) and waits for messages to come in. When a message is received, the appropriate ::reply_to block will be used to compute a response, and that response will be published.

NOTE: If an error is raised in one of the handlers, Natsy::Client will restart automatically.

NOTE: You can invoke ::reply_to to create additional message subscriptions after Natsy::Client.start!, but be aware that this forces the client to restart. You may see (benign, already-handled) errors in the logs generated when this restart happens. It will force the client to restart and re-subscribe after _each additional ::reply_to invoked after ::start!._ So, if you have a lot of additional ::reply_to invocations, you may want to consider refactoring so that your call to Natsy::Client.start! occurs after those additions.

NOTE: The ::start! method can be safely called multiple times; only the first will be honored, and any subsequent calls to ::start! after the client is already started will do nothing (except write a _“NATS is already running”_ log to the logger at the DEBUG level).

Examples:

Natsy::Client.start!


138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/natsy/client.rb', line 138

def start!
  log("Starting NATS", level: :debug)

  if started?
    log("NATS is already running", level: :debug)
    return
  end

  started!

  thread = Thread.new do
    Thread.handle_interrupt(StandardError => :never) do
      Thread.handle_interrupt(StandardError => :immediate) { listen }
    rescue NATS::ConnectError => e
      log("Could not connect to NATS server:", level: :error)
      log(e.full_message, level: :error, indent: 2)
      Thread.current.exit
    rescue NewSubscriptionsError => _e
      log("New subscriptions! Restarting...", level: :info)
      restart!
      Thread.current.exit
      # raise e # TODO: there has to be a better way
    rescue StandardError => e
      log("Encountered an error:", level: :error)
      log(e.full_message, level: :error, indent: 2)
      restart!
      Thread.current.exit
      # raise e
    end
  end

  threads << thread
end

.started?Boolean

Returns true if ::start! has already been called (meaning the client is listening to NATS messages). Returns false if it has not yet been called, or if it has been stopped.

Returns:

  • (Boolean)


60
61
62
# File 'lib/natsy/client.rb', line 60

def started?
  @started ||= false
end

.stopped?Boolean

Opposite of ::started?: returns false if ::start! has already been called (meaning the client is listening to NATS messages). Returns true if it has not yet been called, or if it has been stopped.

Returns:

  • (Boolean)


67
68
69
# File 'lib/natsy/client.rb', line 67

def stopped?
  !started?
end