Class: Natsy::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/natsy/client.rb

Overview

The Natsy::Client class provides a basic interface for subscribing to messages by subject & queue, and replying to those messages. It also logs most functionality if desired.

Class Method Summary collapse

Class Method Details

.reply_to(subject, queue: nil, &block) ⇒ Object

Register a message handler with the Natsy::Client::reply_to method. Pass a subject string as the first argument (either a static subject string or a pattern to match more than one subject). Specify a queue (or don’t) with the queue: option. If you don’t provide the queue: option, it will be set to the value of Natsy::Config::default_queue, or to nil (no queue) if a default queue hasn’t been set.

The result of the given block will be published in reply to the message. The block is passed two arguments when a message matching the subject is received: data and subject. The data argument is the payload of the message (JSON objects/arrays will be parsed into string-keyed Hash objects/Array objects, respectively). The subject argument is the subject of the message received (mostly only useful if a pattern was specified instead of a static subject string).

Examples:

Natsy::Client.reply_to("some.subject", queue: "foobar") { |data| "Got it! #{data.inspect}" }

Natsy::Client.reply_to("some.*.pattern") { |data, subject| "Got #{data} on #{subject}" }

Natsy::Client.reply_to("other.subject") do |data|
  if data["foo"] == "bar"
    { is_bar: "Yep!" }
  else
    { is_bar: "No way!" }
  end
end

Natsy::Client.reply_to("subject.in.queue", queue: "barbaz") do
  "My turn!"
end


60
61
62
63
64
65
# File 'lib/natsy/client.rb', line 60

def reply_to(subject, queue: nil, &block)
  queue = Utils.presence(queue) || Config.default_queue
  queue_desc = " in queue '#{queue}'" if queue
  log("Registering a reply handler for subject '#{subject}'#{queue_desc}", level: :debug)
  register_reply!(subject: subject.to_s, handler: block, queue: queue.to_s)
end

.reset!Object

**USE WITH CAUTION:** This method (::reset!) clears all subscriptions, stops listening (if started), and kills any active threads.



131
132
133
134
135
# File 'lib/natsy/client.rb', line 131

def reset!
  replies.clear
  stop!
  kill!
end

.start!Object

Start listening for messages with the Natsy::Client::start! method. This will spin up a non-blocking thread that subscribes to subjects (as specified by invocation(s) of ::reply_to) and waits for messages to come in. When a message is received, the appropriate ::reply_to block will be used to compute a response, and that response will be published.

NOTE: If an error is raised in one of the handlers, Natsy::Client will restart automatically.

NOTE: You can invoke ::reply_to to create additional message subscriptions after Natsy::Client.start!, but be aware that this forces the client to restart. You may see (benign, already-handled) errors in the logs generated when this restart happens. It will force the client to restart and re-subscribe after _each additional ::reply_to invoked after ::start!._ So, if you have a lot of additional ::reply_to invocations, you may want to consider refactoring so that your call to Natsy::Client.start! occurs after those additions.

NOTE: The ::start! method can be safely called multiple times; only the first will be honored, and any subsequent calls to ::start! after the client is already started will do nothing (except write a _“NATS is already running”_ log to the logger at the DEBUG level).

Examples:

Natsy::Client.start!


95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/natsy/client.rb', line 95

def start!
  log("Starting NATS", level: :debug)

  if started?
    log("NATS is already running", level: :debug)
    return
  end

  started!

  thread = Thread.new do
    Thread.handle_interrupt(StandardError => :never) do
      Thread.handle_interrupt(StandardError => :immediate) { listen }
    rescue NATS::ConnectError => e
      log("Could not connect to NATS server:", level: :error)
      log(e.full_message, level: :error, indent: 2)
      Thread.current.exit
    rescue NewSubscriptionsError => _e
      log("New subscriptions! Restarting...", level: :info)
      restart!
      Thread.current.exit
      # raise e # TODO: there has to be a better way
    rescue StandardError => e
      log("Encountered an error:", level: :error)
      log(e.full_message, level: :error, indent: 2)
      restart!
      Thread.current.exit
      # raise e
    end
  end

  threads << thread
end

.started?Boolean

Returns true if ::start! has already been called (meaning the client is listening to NATS messages). Returns false if it has not yet been called, or if it has been stopped.

Returns:



16
17
18
# File 'lib/natsy/client.rb', line 16

def started?
  @started ||= false
end

.stopped?Boolean

Opposite of ::started?: returns false if ::start! has already been called (meaning the client is listening to NATS messages). Returns true if it has not yet been called, or if it has been stopped.

Returns:



23
24
25
# File 'lib/natsy/client.rb', line 23

def stopped?
  !started?
end