Class: Natsy::Client
- Inherits:
-
Object
- Object
- Natsy::Client
- Defined in:
- lib/natsy/client.rb
Overview
The Natsy::Client class provides a basic interface for subscribing to messages by subject & queue, and replying to those messages. It also logs most functionality if desired.
Class Method Summary collapse
-
.reply_to(subject, queue: nil, &block) ⇒ Object
Register a message handler with the
Natsy::Client::reply_tomethod. -
.reset! ⇒ Object
**USE WITH CAUTION:** This method (
::reset!) clears all subscriptions, stops listening (if started), and kills any active threads. -
.start! ⇒ Object
Start listening for messages with the
Natsy::Client::start!method. -
.started? ⇒ Boolean
Returns
trueif::start!has already been called (meaning the client is listening to NATS messages). -
.stopped? ⇒ Boolean
Opposite of
::started?: returnsfalseif::start!has already been called (meaning the client is listening to NATS messages).
Class Method Details
.reply_to(subject, queue: nil, &block) ⇒ Object
Register a message handler with the Natsy::Client::reply_to method. Pass a subject string as the first argument (either a static subject string or a pattern to match more than one subject). Specify a queue (or don’t) with the queue: option. If you don’t provide the queue: option, it will be set to the value of Natsy::Config::default_queue, or to nil (no queue) if a default queue hasn’t been set.
The result of the given block will be published in reply to the message. The block is passed two arguments when a message matching the subject is received: data and subject. The data argument is the payload of the message (JSON objects/arrays will be parsed into string-keyed Hash objects/Array objects, respectively). The subject argument is the subject of the message received (mostly only useful if a pattern was specified instead of a static subject string).
60 61 62 63 64 65 |
# File 'lib/natsy/client.rb', line 60 def reply_to(subject, queue: nil, &block) queue = Utils.presence(queue) || Config.default_queue queue_desc = " in queue '#{queue}'" if queue log("Registering a reply handler for subject '#{subject}'#{queue_desc}", level: :debug) register_reply!(subject: subject.to_s, handler: block, queue: queue.to_s) end |
.reset! ⇒ Object
**USE WITH CAUTION:** This method (::reset!) clears all subscriptions, stops listening (if started), and kills any active threads.
131 132 133 134 135 |
# File 'lib/natsy/client.rb', line 131 def reset! replies.clear stop! kill! end |
.start! ⇒ Object
Start listening for messages with the Natsy::Client::start! method. This will spin up a non-blocking thread that subscribes to subjects (as specified by invocation(s) of ::reply_to) and waits for messages to come in. When a message is received, the appropriate ::reply_to block will be used to compute a response, and that response will be published.
NOTE: If an error is raised in one of the handlers, Natsy::Client will restart automatically.
NOTE: You can invoke ::reply_to to create additional message subscriptions after Natsy::Client.start!, but be aware that this forces the client to restart. You may see (benign, already-handled) errors in the logs generated when this restart happens. It will force the client to restart and re-subscribe after _each additional ::reply_to invoked after ::start!._ So, if you have a lot of additional ::reply_to invocations, you may want to consider refactoring so that your call to Natsy::Client.start! occurs after those additions.
NOTE: The ::start! method can be safely called multiple times; only the first will be honored, and any subsequent calls to ::start! after the client is already started will do nothing (except write a _“NATS is already running”_ log to the logger at the DEBUG level).
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/natsy/client.rb', line 95 def start! log("Starting NATS", level: :debug) if started? log("NATS is already running", level: :debug) return end started! thread = Thread.new do Thread.handle_interrupt(StandardError => :never) do Thread.handle_interrupt(StandardError => :immediate) { listen } rescue NATS::ConnectError => e log("Could not connect to NATS server:", level: :error) log(e., level: :error, indent: 2) Thread.current.exit rescue NewSubscriptionsError => _e log("New subscriptions! Restarting...", level: :info) restart! Thread.current.exit # raise e # TODO: there has to be a better way rescue StandardError => e log("Encountered an error:", level: :error) log(e., level: :error, indent: 2) restart! Thread.current.exit # raise e end end threads << thread end |
.started? ⇒ Boolean
Returns true if ::start! has already been called (meaning the client is listening to NATS messages). Returns false if it has not yet been called, or if it has been stopped.
16 17 18 |
# File 'lib/natsy/client.rb', line 16 def started? @started ||= false end |
.stopped? ⇒ Boolean
Opposite of ::started?: returns false if ::start! has already been called (meaning the client is listening to NATS messages). Returns true if it has not yet been called, or if it has been stopped.
23 24 25 |
# File 'lib/natsy/client.rb', line 23 def stopped? !started? end |