Module: FastlyNsq::Messenger

Defined in:
lib/fastly_nsq/messenger.rb

Overview

Provides interface for writing messages to NSQ. Manages tracking and creation of Producers

Examples:

FastlyNsq::Messenger.deliver(
  message: message,
  topic: 'topic',
  meta: ,
)

Constant Summary collapse

DEFAULT_ORIGIN =
'Unknown'

Class Method Summary collapse

Class Method Details

.deliver(message:, topic:, originating_service: nil, meta: {}) ⇒ Void

Deliver an NSQ message. Uses pub

Will add two keys to the ‘meta payload that cannot be overidden:

+originating_service+ which defaults to {FastlyNsq#originating_service} and
+sent_at+ which will be set to +Time.now.iso8601(5)+ when the payload is created.

Examples:

FastlyNsq::Messenger.deliver(
  message: {a: 1, count: 123},
  topic: 'count',
)


34
35
36
37
38
39
40
41
# File 'lib/fastly_nsq/messenger.rb', line 34

def deliver(message:, topic:, originating_service: nil, meta: {})
  payload = {
    data: message,
    meta: populate_meta(originating_service: originating_service, meta: meta),
  }

  deliver_payload(topic: topic, payload: payload.to_json)
end

.deliver_multi(messages:, topic:, originating_service: nil, meta: {}) ⇒ Void

Deliver many NSQ messages at once. Uses mpub

For each message will add two keys to the ‘meta payload of each message that cannot be overidden:

+originating_service+ which defaults to {FastlyNsq#originating_service} and
+sent_at+ which will be set to +Time.now.iso8601(5)+ when messages are processed.
The +sent_at+ time and +originating_service+ will be the same for every message.

Examples:

FastlyNsq::Messenger.deliver_multi(
  messages: [{a: 1, count: 11}, {a: 2, count: 22}],
  topic: 'counts',
)


62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/fastly_nsq/messenger.rb', line 62

def deliver_multi(messages:, topic:, originating_service: nil, meta: {})
  meta = populate_meta(originating_service: originating_service, meta: meta)

  payload = messages.each_with_object([]) do |message, a|
    msg = {
      data: message,
      meta: meta,
    }

    a << msg.to_json
  end

  deliver_payload(topic: topic, payload: payload)
end

.deliver_payload(topic:, payload:) ⇒ Object



120
121
122
# File 'lib/fastly_nsq/messenger.rb', line 120

def deliver_payload(topic:, payload:)
  producer_for(topic: topic) { |producer| producer.write payload }
end

.originating_serviceObject



114
115
116
# File 'lib/fastly_nsq/messenger.rb', line 114

def originating_service
  @originating_service || DEFAULT_ORIGIN
end

.originating_service=(service) ⇒ Object



77
78
79
# File 'lib/fastly_nsq/messenger.rb', line 77

def originating_service=(service)
  @originating_service = service
end

.populate_meta(originating_service: nil, meta: {}) ⇒ Object



124
125
126
127
128
# File 'lib/fastly_nsq/messenger.rb', line 124

def populate_meta(originating_service: nil, meta: {})
  meta[:originating_service] = originating_service || self.originating_service
  meta[:sent_at] = Time.now.iso8601(5)
  meta
end

.producer_for(topic:) {|producer| ... } ⇒ FastlyNsq::Producer

Returns producer for given topic

Yields:

  • (producer)


84
85
86
87
88
89
90
# File 'lib/fastly_nsq/messenger.rb', line 84

def producer_for(topic:)
  producer = producers[topic]

  yield producer if block_given?

  producer
end

.producersHash

Map of subscribed topics to FastlyNsq::Producer



95
96
97
# File 'lib/fastly_nsq/messenger.rb', line 95

def producers
  @producers ||= Hash.new { |hash, topic| hash[topic] = FastlyNsq::Producer.new(topic: topic) }
end

.terminate_all_producersObject



107
108
109
110
111
112
# File 'lib/fastly_nsq/messenger.rb', line 107

def terminate_all_producers
  producers.each do |topic, producer|
    producer.terminate
    producers.delete(topic)
  end
end

.terminate_producer(topic:) ⇒ Object

Terminate producer for given topic



102
103
104
105
# File 'lib/fastly_nsq/messenger.rb', line 102

def terminate_producer(topic:)
  producer_for(topic: topic).terminate
  producers.delete(topic)
end