Module: FastlyNsq::Messenger

Defined in:
lib/fastly_nsq/messenger.rb

Overview

Provides interface for writing messages to NSQ. Manages tracking and creation of Producers

Examples:

FastlyNsq::Messenger.deliver(
  message: message,
  topic: 'topic',
  meta: ,
)

Constant Summary collapse

DEFAULT_ORIGIN =
"Unknown"

Class Method Summary collapse

Class Method Details

.deliver(message:, topic:, originating_service: nil, sent_at: nil, meta: {}) ⇒ Void

Deliver an NSQ message. Uses pub

Adds keys to the ‘meta:

+originating_service+ which defaults to {FastlyNsq#originating_service}.
+sent_at+ which will be set to +Time.now.iso8601(5)+ if the +sent_at+ param is nil OR
if the passed +sent_at+ is not a valid timestamp.

Examples:

FastlyNsq::Messenger.deliver(
  message: {a: 1, count: 123},
  topic: 'count',
  meta: { sent_at: Time.now }
)

Parameters:

  • message (#to_json(*))

    written to the data key of the NSQ message payload

  • topic (String)

    NSQ topic on which to deliver the message

  • originating_service (String) (defaults to: nil)

    added to meta key of message payload

  • sent_at (Time) (defaults to: nil)

    Timestamp that will be added to the meta payload

  • meta (Hash) (defaults to: {})

Returns:

  • (Void)


37
38
39
40
41
42
43
44
# File 'lib/fastly_nsq/messenger.rb', line 37

def deliver(message:, topic:, originating_service: nil, sent_at: nil, meta: {})
  payload = {
    data: message,
    meta: populate_meta(originating_service: originating_service, sent_at: sent_at, meta: meta)
  }

  deliver_payload(topic: topic, payload: payload.to_json)
end

.deliver_multi(messages:, topic:, originating_service: nil, sent_at: nil, meta: {}) ⇒ Void

Deliver many NSQ messages at once. Uses mpub

For each message will add two keys to the ‘meta payload of each message:

+originating_service+ which defaults to {FastlyNsq#originating_service}
+sent_at+ which will be set to +Time.now.iso8601(5)+ when messages are processed if not included
in the +meta+ param OR if the pased +sent_at+ is not a valid timestamp.
The +sent_at+ time and +originating_service+ will be the same for every message.

Examples:

FastlyNsq::Messenger.deliver_multi(
  messages: [{a: 1, count: 11}, {a: 2, count: 22}],
  topic: 'counts',
)

Parameters:

  • messages (Array)

    Array of message which will be written to data key of the individual NSQ message payload. Each message needs to respond to to_json(*).

  • topic (String)

    NSQ topic on which to deliver the message

  • originating_service (String) (defaults to: nil)

    added to meta key of message payload

  • sent_at (Time) (defaults to: nil)

    Timestamp that will be added to the meta payload

  • meta (Hash) (defaults to: {})

Returns:

  • (Void)


66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/fastly_nsq/messenger.rb', line 66

def deliver_multi(messages:, topic:, originating_service: nil, sent_at: nil, meta: {})
  meta = populate_meta(originating_service: originating_service, sent_at: sent_at, meta: meta)

  payload = messages.each_with_object([]) do |message, a|
    msg = {
      data: message,
      meta: meta
    }

    a << msg.to_json
  end

  deliver_payload(topic: topic, payload: payload)
end

.deliver_payload(topic:, payload:) ⇒ Object



124
125
126
# File 'lib/fastly_nsq/messenger.rb', line 124

def deliver_payload(topic:, payload:)
  producer_for(topic: topic) { |producer| producer.write payload }
end

.originating_serviceObject



118
119
120
# File 'lib/fastly_nsq/messenger.rb', line 118

def originating_service
  @originating_service || DEFAULT_ORIGIN
end

.originating_service=(service) ⇒ Object



81
82
83
# File 'lib/fastly_nsq/messenger.rb', line 81

def originating_service=(service)
  @originating_service = service
end

.populate_meta(originating_service: nil, sent_at: nil, meta: {}) ⇒ Object



128
129
130
131
132
133
134
135
136
137
138
# File 'lib/fastly_nsq/messenger.rb', line 128

def populate_meta(originating_service: nil, sent_at: nil, meta: {})
  meta[:originating_service] = originating_service || self.originating_service

  meta[:sent_at] = if sent_at && sent_at.respond_to?(:iso8601)
    sent_at.iso8601(5)
  else
    Time.now.iso8601(5)
  end

  meta
end

.producer_for(topic:) {|producer| ... } ⇒ FastlyNsq::Producer

Returns producer for given topic

Parameters:

  • topic (String)

    NSQ topic

Yields:

  • (producer)

Returns:



88
89
90
91
92
93
94
# File 'lib/fastly_nsq/messenger.rb', line 88

def producer_for(topic:)
  producer = producers[topic]

  yield producer if block_given?

  producer
end

.producersHash

Map of subscribed topics to FastlyNsq::Producer

Returns:

  • (Hash)


99
100
101
# File 'lib/fastly_nsq/messenger.rb', line 99

def producers
  @producers ||= Hash.new { |hash, topic| hash[topic] = FastlyNsq::Producer.new(topic: topic) }
end

.terminate_all_producersObject



111
112
113
114
115
116
# File 'lib/fastly_nsq/messenger.rb', line 111

def terminate_all_producers
  producers.each do |topic, producer|
    producer.terminate
    producers.delete(topic)
  end
end

.terminate_producer(topic:) ⇒ Object

Terminate producer for given topic

Parameters:

  • topic (String)

    NSQ topic



106
107
108
109
# File 'lib/fastly_nsq/messenger.rb', line 106

def terminate_producer(topic:)
  producer_for(topic: topic).terminate
  producers.delete(topic)
end