Module: FastlyNsq::Messenger
- Defined in:
- lib/fastly_nsq/messenger.rb
Overview
Provides interface for writing messages to NSQ. Manages tracking and creation of Producers
Constant Summary collapse
- DEFAULT_ORIGIN =
"Unknown"
Class Method Summary collapse
-
.deliver(message:, topic:, originating_service: nil, sent_at: nil, meta: {}) ⇒ Void
Deliver an NSQ message.
-
.deliver_multi(messages:, topic:, originating_service: nil, sent_at: nil, meta: {}) ⇒ Void
Deliver many NSQ messages at once.
- .deliver_payload(topic:, payload:) ⇒ Object
- .originating_service ⇒ Object
- .originating_service=(service) ⇒ Object
- .populate_meta(originating_service: nil, sent_at: nil, meta: {}) ⇒ Object
-
.producer_for(topic:) {|producer| ... } ⇒ FastlyNsq::Producer
Returns producer for given topic.
-
.producers ⇒ Hash
Map of subscribed topics to FastlyNsq::Producer.
- .terminate_all_producers ⇒ Object
-
.terminate_producer(topic:) ⇒ Object
Terminate producer for given topic.
Class Method Details
.deliver(message:, topic:, originating_service: nil, sent_at: nil, meta: {}) ⇒ Void
Deliver an NSQ message. Uses pub
Adds keys to the ‘meta
:
+originating_service+ which defaults to {FastlyNsq#originating_service}.
+sent_at+ which will be set to +Time.now.iso8601(5)+ if the +sent_at+ param is nil OR
if the passed +sent_at+ is not a valid timestamp.
37 38 39 40 41 42 43 44 |
# File 'lib/fastly_nsq/messenger.rb', line 37 def deliver(message:, topic:, originating_service: nil, sent_at: nil, meta: {}) payload = { data: , meta: (originating_service: originating_service, sent_at: sent_at, meta: ) } deliver_payload(topic: topic, payload: payload.to_json) end |
.deliver_multi(messages:, topic:, originating_service: nil, sent_at: nil, meta: {}) ⇒ Void
Deliver many NSQ messages at once. Uses mpub
For each message will add two keys to the ‘meta
payload of each message:
+originating_service+ which defaults to {FastlyNsq#originating_service}
+sent_at+ which will be set to +Time.now.iso8601(5)+ when messages are processed if not included
in the +meta+ param OR if the pased +sent_at+ is not a valid timestamp.
The +sent_at+ time and +originating_service+ will be the same for every message.
66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/fastly_nsq/messenger.rb', line 66 def deliver_multi(messages:, topic:, originating_service: nil, sent_at: nil, meta: {}) = (originating_service: originating_service, sent_at: sent_at, meta: ) payload = .each_with_object([]) do |, a| msg = { data: , meta: } a << msg.to_json end deliver_payload(topic: topic, payload: payload) end |
.deliver_payload(topic:, payload:) ⇒ Object
124 125 126 |
# File 'lib/fastly_nsq/messenger.rb', line 124 def deliver_payload(topic:, payload:) producer_for(topic: topic) { |producer| producer.write payload } end |
.originating_service ⇒ Object
118 119 120 |
# File 'lib/fastly_nsq/messenger.rb', line 118 def originating_service @originating_service || DEFAULT_ORIGIN end |
.originating_service=(service) ⇒ Object
81 82 83 |
# File 'lib/fastly_nsq/messenger.rb', line 81 def originating_service=(service) @originating_service = service end |
.populate_meta(originating_service: nil, sent_at: nil, meta: {}) ⇒ Object
128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/fastly_nsq/messenger.rb', line 128 def (originating_service: nil, sent_at: nil, meta: {}) [:originating_service] = originating_service || self.originating_service [:sent_at] = if sent_at && sent_at.respond_to?(:iso8601) sent_at.iso8601(5) else Time.now.iso8601(5) end end |
.producer_for(topic:) {|producer| ... } ⇒ FastlyNsq::Producer
Returns producer for given topic
88 89 90 91 92 93 94 |
# File 'lib/fastly_nsq/messenger.rb', line 88 def producer_for(topic:) producer = producers[topic] yield producer if block_given? producer end |
.producers ⇒ Hash
Map of subscribed topics to FastlyNsq::Producer
99 100 101 |
# File 'lib/fastly_nsq/messenger.rb', line 99 def producers @producers ||= Hash.new { |hash, topic| hash[topic] = FastlyNsq::Producer.new(topic: topic) } end |
.terminate_all_producers ⇒ Object
111 112 113 114 115 116 |
# File 'lib/fastly_nsq/messenger.rb', line 111 def terminate_all_producers producers.each do |topic, producer| producer.terminate producers.delete(topic) end end |
.terminate_producer(topic:) ⇒ Object
Terminate producer for given topic
106 107 108 109 |
# File 'lib/fastly_nsq/messenger.rb', line 106 def terminate_producer(topic:) producer_for(topic: topic).terminate producers.delete(topic) end |