Module: FastlyNsq::Messenger
- Defined in:
- lib/fastly_nsq/messenger.rb
Overview
Provides interface for writing messages to NSQ. Manages tracking and creation of Producers
Constant Summary collapse
- DEFAULT_ORIGIN =
'Unknown'
Class Method Summary collapse
-
.deliver(message:, topic:, originating_service: nil, meta: {}) ⇒ Void
Deliver an NSQ message.
-
.deliver_multi(messages:, topic:, originating_service: nil, meta: {}) ⇒ Void
Deliver many NSQ messages at once.
- .deliver_payload(topic:, payload:) ⇒ Object
- .originating_service ⇒ Object
- .originating_service=(service) ⇒ Object
- .populate_meta(originating_service: nil, meta: {}) ⇒ Object
-
.producer_for(topic:) {|producer| ... } ⇒ FastlyNsq::Producer
Returns producer for given topic.
-
.producers ⇒ Hash
Map of subscribed topics to FastlyNsq::Producer.
- .terminate_all_producers ⇒ Object
-
.terminate_producer(topic:) ⇒ Object
Terminate producer for given topic.
Class Method Details
.deliver(message:, topic:, originating_service: nil, meta: {}) ⇒ Void
Deliver an NSQ message. Uses pub
Will add two keys to the ‘meta payload that cannot be overidden:
+originating_service+ which defaults to {FastlyNsq#originating_service} and
+sent_at+ which will be set to +Time.now.iso8601(5)+ when the payload is created.
34 35 36 37 38 39 40 41 |
# File 'lib/fastly_nsq/messenger.rb', line 34 def deliver(message:, topic:, originating_service: nil, meta: {}) payload = { data: , meta: (originating_service: originating_service, meta: ), } deliver_payload(topic: topic, payload: payload.to_json) end |
.deliver_multi(messages:, topic:, originating_service: nil, meta: {}) ⇒ Void
Deliver many NSQ messages at once. Uses mpub
For each message will add two keys to the ‘meta payload of each message that cannot be overidden:
+originating_service+ which defaults to {FastlyNsq#originating_service} and
+sent_at+ which will be set to +Time.now.iso8601(5)+ when messages are processed.
The +sent_at+ time and +originating_service+ will be the same for every message.
62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/fastly_nsq/messenger.rb', line 62 def deliver_multi(messages:, topic:, originating_service: nil, meta: {}) = (originating_service: originating_service, meta: ) payload = .each_with_object([]) do |, a| msg = { data: , meta: , } a << msg.to_json end deliver_payload(topic: topic, payload: payload) end |
.deliver_payload(topic:, payload:) ⇒ Object
120 121 122 |
# File 'lib/fastly_nsq/messenger.rb', line 120 def deliver_payload(topic:, payload:) producer_for(topic: topic) { |producer| producer.write payload } end |
.originating_service ⇒ Object
114 115 116 |
# File 'lib/fastly_nsq/messenger.rb', line 114 def originating_service @originating_service || DEFAULT_ORIGIN end |
.originating_service=(service) ⇒ Object
77 78 79 |
# File 'lib/fastly_nsq/messenger.rb', line 77 def originating_service=(service) @originating_service = service end |
.populate_meta(originating_service: nil, meta: {}) ⇒ Object
124 125 126 127 128 |
# File 'lib/fastly_nsq/messenger.rb', line 124 def (originating_service: nil, meta: {}) [:originating_service] = originating_service || self.originating_service [:sent_at] = Time.now.iso8601(5) end |
.producer_for(topic:) {|producer| ... } ⇒ FastlyNsq::Producer
Returns producer for given topic
84 85 86 87 88 89 90 |
# File 'lib/fastly_nsq/messenger.rb', line 84 def producer_for(topic:) producer = producers[topic] yield producer if block_given? producer end |
.producers ⇒ Hash
Map of subscribed topics to FastlyNsq::Producer
95 96 97 |
# File 'lib/fastly_nsq/messenger.rb', line 95 def producers @producers ||= Hash.new { |hash, topic| hash[topic] = FastlyNsq::Producer.new(topic: topic) } end |
.terminate_all_producers ⇒ Object
107 108 109 110 111 112 |
# File 'lib/fastly_nsq/messenger.rb', line 107 def terminate_all_producers producers.each do |topic, producer| producer.terminate producers.delete(topic) end end |
.terminate_producer(topic:) ⇒ Object
Terminate producer for given topic
102 103 104 105 |
# File 'lib/fastly_nsq/messenger.rb', line 102 def terminate_producer(topic:) producer_for(topic: topic).terminate producers.delete(topic) end |