Module: FastlyNsq

Defined in:
lib/fastly_nsq.rb,
lib/fastly_nsq/testing.rb,
lib/fastly_nsq/version.rb,
lib/fastly_nsq/tls_options.rb

Defined Under Namespace

Modules: ConsumerTesting, ListenerTesting, Messages, Messenger, ProducerTesting, SafeThread Classes: CLI, Consumer, FakeConnection, Feeder, Http, Launcher, Listener, Manager, Message, NewRelic, PriorityQueue, PriorityThreadPool, Producer, TestMessage, Testing, TlsOptions

Constant Summary collapse

NotConnectedError =
Class.new(StandardError)
ConnectionFailed =
Class.new(StandardError)
LIFECYCLE_EVENTS =
i[startup shutdown heartbeat].freeze
VERSION =
"1.18.1"

Class Attribute Summary collapse

Class Method Summary collapse

Class Attribute Details

.channelString

Returns NSQ Channel.

Returns:

  • (String)

    NSQ Channel



19
20
21
# File 'lib/fastly_nsq.rb', line 19

def channel
  @channel
end

.max_attemptsInteger

Maximum number of times an NSQ message will be attempted. When set to nil, the number of attempts will be unlimited.

Returns:

  • (Integer)


27
28
29
# File 'lib/fastly_nsq.rb', line 27

def max_attempts
  @max_attempts
end

.max_processing_pool_threadsInteger

Maximum number of threads for FastlyNsq::PriorityThreadPool Default setting is 5 and can be set via ENV

Returns:

  • (Integer)


121
122
123
# File 'lib/fastly_nsq.rb', line 121

def max_processing_pool_threads
  @max_processing_pool_threads ||= ENV.fetch("MAX_PROCESSING_POOL_THREADS", 5).to_i
end

.max_req_timeoutInteger

Maximum requeue timeout in milliseconds. This setting controls the maximum value that will be sent from FastlyNsq::Message#requeue This value should be less than or equal to the nsqd command line option max-req-timeout. The default setting is 1 hour.



114
115
116
# File 'lib/fastly_nsq.rb', line 114

def max_req_timeout
  @max_req_timeout ||= ENV.fetch("MAX_REQ_TIMEOUT", 60 * 60 * 1_000).to_i
end

.preprocessorProc

Returns global preprocessor.

Returns:

  • (Proc)

    global preprocessor



22
23
24
# File 'lib/fastly_nsq.rb', line 22

def preprocessor
  @preprocessor
end

Class Method Details

.configure {|_self| ... } ⇒ Object

Configuration for FastlyNsq

Examples:

FastlyNsq.configure do |config|
  config.channel = 'Z'
  config.logger = Logger.new
end
FastlyNsq.configure do |config|
  config.channel = 'fnsq'
  config.logger = Logger.new
  config.preprocessor = ->(_) { FastlyNsq.logger.info 'PREPROCESSESES' }
  lc.listen 'posts', ->(m) { puts "posts: #{m.body}" }
  lc.listen 'blogs', ->(m) { puts "blogs: #{m.body}" }, priority: 3
end

Yields:

  • (_self)

Yield Parameters:

  • _self (FastlyNsq)

    the object that the method was called on



86
87
88
# File 'lib/fastly_nsq.rb', line 86

def configure
  yield self
end

.consumer_nsqdsArray<String>

Return an array of NSQD TCP addresses for NSQ consumers. Defaults to the value of ENV. ENV must be a comma or space seperated string of NSQD addresses

Returns:

  • (Array<String>)

    list of nsqd addresses



143
144
145
# File 'lib/fastly_nsq.rb', line 143

def consumer_nsqds
  @consumer_nsqds ||= ENV.fetch("NSQD_CONSUMERS", "").split(/, ?|\s+/).map(&:strip)
end

.consumer_nsqds=(nsqd_addresses) ⇒ Object

Set the consumer_nsqd addresses

Parameters:

  • nsqd_addresses (Array)

    List of consumer nsqd addresses to use



150
151
152
# File 'lib/fastly_nsq.rb', line 150

def consumer_nsqds=(nsqd_addresses)
  @consumer_nsqds = nsqd_addresses.nil? ? nil : Array(nsqd_addresses)
end

.eventsHash

Map of lifecycle events

Returns:

  • (Hash)


40
41
42
# File 'lib/fastly_nsq.rb', line 40

def events
  @events ||= LIFECYCLE_EVENTS.each_with_object({}) { |e, a| a[e] = [] }
end

.fire_event(event) ⇒ Object

Execute Procs assigned for the lifecycle event

Parameters:

  • event (Symbol)

    Lifecycle event to trigger



188
189
190
191
192
193
194
195
# File 'lib/fastly_nsq.rb', line 188

def fire_event(event)
  blocks = FastlyNsq.events.fetch(event)
  blocks.each do |block|
    block.call
  rescue => e
    logger.error "[#{event}] #{e.inspect}"
  end
end

.listen(topic, processor, **options) ⇒ Object

Create a FastlyNsq::Listener

Parameters:

  • topic (String)

    NSQ topic on which to listen

  • processor (Proc)

    processor that will be called per message

  • options (Hash)

    additional options that are passed to FastlyNsq::Listener’s constructor

Returns:

  • FastlyNsq::Listener



51
52
53
# File 'lib/fastly_nsq.rb', line 51

def listen(topic, processor, **options)
  FastlyNsq::Listener.new(topic: topic, processor: processor, **options)
end

.loggerLogger

Return logger or set logger to default.

Returns:

  • (Logger)


58
59
60
61
62
# File 'lib/fastly_nsq.rb', line 58

def logger
  return @logger if @logger

  self.logger = Logger.new($stderr)
end

.logger=(new_logger) ⇒ Object

Set the logger and also set Nsq.logger



67
68
69
# File 'lib/fastly_nsq.rb', line 67

def logger=(new_logger)
  @logger = Nsq.logger = new_logger
end

.lookupd_http_addressesArray<String>

Return an array of NSQ lookupd http addresses sourced from ENV

Returns:

  • (Array<String>)

    list of nsqlookupd http addresses



128
129
130
# File 'lib/fastly_nsq.rb', line 128

def lookupd_http_addresses
  @lookups ||= ENV.fetch("NSQLOOKUPD_HTTP_ADDRESS", "").split(/, ?|\s+/).map(&:strip)
end

.lookupd_http_addresses=(lookups) ⇒ Object

Set the lookupd_http_addresses

Parameters:

  • lookups (Array)

    List of http lookupd addresses to use.



135
136
137
# File 'lib/fastly_nsq.rb', line 135

def lookupd_http_addresses=(lookups)
  @lookups = lookups.nil? ? nil : Array(lookups)
end

.managerFastlyNsq::Manager

Returns a new FastlyNsq::Manager or the memoized instance @manager.

Returns:



94
95
96
# File 'lib/fastly_nsq.rb', line 94

def manager
  @manager ||= FastlyNsq::Manager.new
end

.manager=(manager) ⇒ FastlyNsq::Manager

Set a new manager and transfer listeners to the new manager.

Parameters:

Returns:



102
103
104
105
# File 'lib/fastly_nsq.rb', line 102

def manager=(manager)
  @manager&.transfer(manager)
  @manager = manager
end

.on(event) { ... } ⇒ Object

Register a block to run at a point in the lifecycle.

Examples:

FastlyNsq.configure do |config|
  config.on(:shutdown) do
    puts "Goodbye cruel world!"
  end
end

Parameters:

  • event (Symbol)

    Event to hook into. One of :startup, :heartbeat or :shutdown.

Yields:

  • Proc to execute when event is triggered.

Raises:

  • (ArgumentError)


179
180
181
182
183
# File 'lib/fastly_nsq.rb', line 179

def on(event, &block)
  event = event.to_sym
  raise ArgumentError, "Invalid event name: #{event}" unless LIFECYCLE_EVENTS.include?(event)
  events[event] << block
end

.producer_nsqdsArray<String>

Return an array of NSQD TCP addresses for NSQ producers. Defaults to the value of ENV. ENV must be a comma or space seperated string of NSQD addresses

Returns:

  • (Array<String>)

    list of nsqd addresses



158
159
160
# File 'lib/fastly_nsq.rb', line 158

def producer_nsqds
  @producer_nsqds ||= ENV.fetch("NSQD_PRODUCERS", "").split(/, ?|\s+/).map(&:strip)
end

.producer_nsqds=(nsqd_addresses) ⇒ Object

Set the producer_nsqd addresses

Parameters:

  • nsqd_addresses (Array)

    List of producer nsqd addresses to use



165
166
167
# File 'lib/fastly_nsq.rb', line 165

def producer_nsqds=(nsqd_addresses)
  @producer_nsqds = nsqd_addresses.nil? ? nil : Array(nsqd_addresses)
end

.tracerFastlyNsq::NewRelic

Instance of FastlyNsq::NewRelic

Returns:



200
201
202
# File 'lib/fastly_nsq.rb', line 200

def tracer
  @tracer ||= FastlyNsq::NewRelic.new
end