Module: FastlyNsq
- Defined in:
- lib/fastly_nsq.rb,
lib/fastly_nsq/testing.rb,
lib/fastly_nsq/version.rb,
lib/fastly_nsq/tls_options.rb
Defined Under Namespace
Modules: ConsumerTesting, ListenerTesting, Messages, Messenger, ProducerTesting, SafeThread Classes: CLI, Consumer, FakeConnection, Feeder, Http, Launcher, Listener, Manager, Message, NewRelic, PriorityQueue, PriorityThreadPool, Producer, TestMessage, Testing, TlsOptions
Constant Summary collapse
- NotConnectedError =
Class.new(StandardError)
- ConnectionFailed =
Class.new(StandardError)
- LIFECYCLE_EVENTS =
i[startup shutdown heartbeat].freeze
- VERSION =
"1.18.1"
Class Attribute Summary collapse
-
.channel ⇒ String
NSQ Channel.
-
.max_attempts ⇒ Integer
Maximum number of times an NSQ message will be attempted.
-
.max_processing_pool_threads ⇒ Integer
Maximum number of threads for FastlyNsq::PriorityThreadPool Default setting is 5 and can be set via ENV.
-
.max_req_timeout ⇒ Integer
Maximum requeue timeout in milliseconds.
-
.preprocessor ⇒ Proc
Global preprocessor.
Class Method Summary collapse
-
.configure {|_self| ... } ⇒ Object
Configuration for FastlyNsq.
-
.consumer_nsqds ⇒ Array<String>
Return an array of NSQD TCP addresses for NSQ consumers.
-
.consumer_nsqds=(nsqd_addresses) ⇒ Object
Set the consumer_nsqd addresses.
-
.events ⇒ Hash
Map of lifecycle events.
-
.fire_event(event) ⇒ Object
Execute Procs assigned for the lifecycle event.
-
.listen(topic, processor, **options) ⇒ Object
Create a FastlyNsq::Listener.
-
.logger ⇒ Logger
Return logger or set logger to default.
-
.logger=(new_logger) ⇒ Object
Set the logger and also set Nsq.logger.
-
.lookupd_http_addresses ⇒ Array<String>
Return an array of NSQ lookupd http addresses sourced from ENV.
-
.lookupd_http_addresses=(lookups) ⇒ Object
Set the lookupd_http_addresses.
-
.manager ⇒ FastlyNsq::Manager
Returns a new FastlyNsq::Manager or the memoized instance @manager.
-
.manager=(manager) ⇒ FastlyNsq::Manager
Set a new manager and transfer listeners to the new manager.
-
.on(event) { ... } ⇒ Object
Register a block to run at a point in the lifecycle.
-
.producer_nsqds ⇒ Array<String>
Return an array of NSQD TCP addresses for NSQ producers.
-
.producer_nsqds=(nsqd_addresses) ⇒ Object
Set the producer_nsqd addresses.
-
.tracer ⇒ FastlyNsq::NewRelic
Instance of FastlyNsq::NewRelic.
Class Attribute Details
.channel ⇒ String
Returns NSQ Channel.
19 20 21 |
# File 'lib/fastly_nsq.rb', line 19 def channel @channel end |
.max_attempts ⇒ Integer
Maximum number of times an NSQ message will be attempted. When set to nil
, the number of attempts will be unlimited.
27 28 29 |
# File 'lib/fastly_nsq.rb', line 27 def max_attempts @max_attempts end |
.max_processing_pool_threads ⇒ Integer
Maximum number of threads for FastlyNsq::PriorityThreadPool Default setting is 5 and can be set via ENV
121 122 123 |
# File 'lib/fastly_nsq.rb', line 121 def max_processing_pool_threads @max_processing_pool_threads ||= ENV.fetch("MAX_PROCESSING_POOL_THREADS", 5).to_i end |
.max_req_timeout ⇒ Integer
Maximum requeue timeout in milliseconds. This setting controls the maximum value that will be sent from FastlyNsq::Message#requeue This value should be less than or equal to the nsqd command line option max-req-timeout
. The default setting is 1 hour.
114 115 116 |
# File 'lib/fastly_nsq.rb', line 114 def max_req_timeout @max_req_timeout ||= ENV.fetch("MAX_REQ_TIMEOUT", 60 * 60 * 1_000).to_i end |
.preprocessor ⇒ Proc
Returns global preprocessor.
22 23 24 |
# File 'lib/fastly_nsq.rb', line 22 def preprocessor @preprocessor end |
Class Method Details
.configure {|_self| ... } ⇒ Object
Configuration for FastlyNsq
86 87 88 |
# File 'lib/fastly_nsq.rb', line 86 def configure yield self end |
.consumer_nsqds ⇒ Array<String>
143 144 145 |
# File 'lib/fastly_nsq.rb', line 143 def consumer_nsqds @consumer_nsqds ||= ENV.fetch("NSQD_CONSUMERS", "").split(/, ?|\s+/).map(&:strip) end |
.consumer_nsqds=(nsqd_addresses) ⇒ Object
Set the consumer_nsqd addresses
150 151 152 |
# File 'lib/fastly_nsq.rb', line 150 def consumer_nsqds=(nsqd_addresses) @consumer_nsqds = nsqd_addresses.nil? ? nil : Array(nsqd_addresses) end |
.events ⇒ Hash
Map of lifecycle events
40 41 42 |
# File 'lib/fastly_nsq.rb', line 40 def events @events ||= LIFECYCLE_EVENTS.each_with_object({}) { |e, a| a[e] = [] } end |
.fire_event(event) ⇒ Object
Execute Procs assigned for the lifecycle event
188 189 190 191 192 193 194 195 |
# File 'lib/fastly_nsq.rb', line 188 def fire_event(event) blocks = FastlyNsq.events.fetch(event) blocks.each do |block| block.call rescue => e logger.error "[#{event}] #{e.inspect}" end end |
.listen(topic, processor, **options) ⇒ Object
Create a FastlyNsq::Listener
51 52 53 |
# File 'lib/fastly_nsq.rb', line 51 def listen(topic, processor, **) FastlyNsq::Listener.new(topic: topic, processor: processor, **) end |
.logger ⇒ Logger
Return logger or set logger to default.
58 59 60 61 62 |
# File 'lib/fastly_nsq.rb', line 58 def logger return @logger if @logger self.logger = Logger.new($stderr) end |
.logger=(new_logger) ⇒ Object
Set the logger and also set Nsq.logger
67 68 69 |
# File 'lib/fastly_nsq.rb', line 67 def logger=(new_logger) @logger = Nsq.logger = new_logger end |
.lookupd_http_addresses ⇒ Array<String>
Return an array of NSQ lookupd http addresses sourced from ENV
128 129 130 |
# File 'lib/fastly_nsq.rb', line 128 def lookupd_http_addresses @lookups ||= ENV.fetch("NSQLOOKUPD_HTTP_ADDRESS", "").split(/, ?|\s+/).map(&:strip) end |
.lookupd_http_addresses=(lookups) ⇒ Object
Set the lookupd_http_addresses
135 136 137 |
# File 'lib/fastly_nsq.rb', line 135 def lookupd_http_addresses=(lookups) @lookups = lookups.nil? ? nil : Array(lookups) end |
.manager ⇒ FastlyNsq::Manager
Returns a new FastlyNsq::Manager or the memoized instance @manager.
94 95 96 |
# File 'lib/fastly_nsq.rb', line 94 def manager @manager ||= FastlyNsq::Manager.new end |
.manager=(manager) ⇒ FastlyNsq::Manager
Set a new manager and transfer listeners to the new manager.
102 103 104 105 |
# File 'lib/fastly_nsq.rb', line 102 def manager=(manager) @manager&.transfer(manager) @manager = manager end |
.on(event) { ... } ⇒ Object
Register a block to run at a point in the lifecycle.
179 180 181 182 183 |
# File 'lib/fastly_nsq.rb', line 179 def on(event, &block) event = event.to_sym raise ArgumentError, "Invalid event name: #{event}" unless LIFECYCLE_EVENTS.include?(event) events[event] << block end |
.producer_nsqds ⇒ Array<String>
158 159 160 |
# File 'lib/fastly_nsq.rb', line 158 def producer_nsqds @producer_nsqds ||= ENV.fetch("NSQD_PRODUCERS", "").split(/, ?|\s+/).map(&:strip) end |
.producer_nsqds=(nsqd_addresses) ⇒ Object
Set the producer_nsqd addresses
165 166 167 |
# File 'lib/fastly_nsq.rb', line 165 def producer_nsqds=(nsqd_addresses) @producer_nsqds = nsqd_addresses.nil? ? nil : Array(nsqd_addresses) end |
.tracer ⇒ FastlyNsq::NewRelic
Instance of FastlyNsq::NewRelic
200 201 202 |
# File 'lib/fastly_nsq.rb', line 200 def tracer @tracer ||= FastlyNsq::NewRelic.new end |