Module: MessageBus::Implementation
- Included in:
- MessageBus, Instance
- Defined in:
- lib/message_bus.rb
Overview
The main server-side interface to a message bus for the purposes of configuration, publishing and subscribing
Defined Under Namespace
Classes: Synchronizer
Instance Attribute Summary collapse
-
#config ⇒ Hash<Symbol => Object>
(also: #redis_config)
readonly
Configuration options hash.
Instance Method Summary collapse
-
#after_fork ⇒ void
Performs routines that are necessary after a process fork, typically triggered by a forking webserver.
- #allow_broadcast=(val) ⇒ void
-
#allow_broadcast? ⇒ Boolean
Whether or not broadcasting is allowed.
-
#backend ⇒ Symbol
The name of the backend implementation configured.
-
#backlog(channel = nil, last_id = nil, site_id = nil) ⇒ Array<MessageBus::Message>
Get messages from a channel backlog since the last ID specified, filtered by site.
-
#blocking_subscribe(channel = nil) {|message| ... } ⇒ void
Subscribe to messages.
-
#cache_assets ⇒ Boolean
Whether or not to cache static assets for the diagnostics pages.
- #cache_assets=(val) ⇒ void
- #chunked_encoding_enabled=(val) ⇒ void
-
#chunked_encoding_enabled? ⇒ Boolean
Whether or not chunked encoding is enabled.
-
#configure(config) ⇒ void
Overrides existing configuration.
-
#destroy ⇒ void
Stops listening for publications and stops executing scheduled tasks.
-
#enable_diagnostics ⇒ void
Enables diagnostics tracking.
- #extra_response_headers_lookup {|env| ... } ⇒ void
-
#global_backlog(last_id = nil) ⇒ Array<MessageBus::Message>
Get messages from the global backlog since the last ID specified.
- #group_ids_lookup {|env| ... } ⇒ void
- #initialize ⇒ Object
- #is_admin_lookup {|env| ... } ⇒ void
-
#keepalive_interval ⇒ Integer
The keepalive interval in seconds.
- #keepalive_interval=(interval) ⇒ Object
-
#last_id(channel, site_id = nil) ⇒ Integer
Get the ID of the last message published on a channel, filtered by site.
-
#last_message(channel) ⇒ MessageBus::Message
Get the last message published on a channel.
-
#listening? ⇒ Boolean
Whether or not the server is actively listening for publications on the bus.
-
#local_subscribe(channel = nil, last_id = -1,) {|message| ... } ⇒ Proc
Subscribe to messages on a particular channel, filtered by the current site (@see #site_id_lookup).
-
#local_unsubscribe(channel = nil, &blk) ⇒ void
Removes a subscription to a particular channel, filtered by the current site (@see #site_id_lookup).
-
#logger ⇒ Logger
The logger used by the bus.
- #logger=(logger) ⇒ void
- #long_polling_enabled=(val) ⇒ void
-
#long_polling_enabled? ⇒ Boolean
Whether or not long polling is enabled.
-
#long_polling_interval ⇒ Integer
The long-polling interval in milliseconds.
- #long_polling_interval=(millisecs) ⇒ void
-
#max_active_clients ⇒ Integer
The number of simultanuous clients we can service; will revert to polling if we are out of slots.
- #max_active_clients=(val) ⇒ void
-
#off ⇒ void
Disables publication to the bus.
-
#off? ⇒ Boolean
Whether the bus is disabled or not.
-
#on ⇒ void
Enables publication to the bus.
- #on_connect(&blk) ⇒ Object
- #on_disconnect(&blk) ⇒ Object
- #on_middleware_error {|env, e| ... } ⇒ void
-
#publish(channel, data, opts = nil) ⇒ Integer
Publishes a message to a channel.
- #rack_hijack_enabled=(val) ⇒ void
-
#rack_hijack_enabled? ⇒ Boolean
Whether or not Rack Hijack is enabled.
-
#redis_config=(config) ⇒ void
Overrides existing configuration, explicitly enabling the redis backend.
-
#reliable_pub_sub ⇒ MessageBus::Backend::Base
The configured backend.
- #reliable_pub_sub=(pub_sub) ⇒ void
- #reset! ⇒ Object
- #site_id_lookup {|env| ... } ⇒ void
-
#subscribe(channel = nil, last_id = -1,) {|message| ... } ⇒ Proc
Subscribe to messages on a particular channel.
-
#timer ⇒ MessageBus::TimerThread
The timer thread used for triggering scheduled routines at specific times/intervals.
-
#unsubscribe(channel = nil, &blk) ⇒ void
Removes a subscription to a particular channel.
- #user_id_lookup {|env| ... } ⇒ void
Instance Attribute Details
#config ⇒ Hash<Symbol => Object> (readonly) Also known as: redis_config
Returns Configuration options hash.
29 30 31 |
# File 'lib/message_bus.rb', line 29 def config @config end |
Instance Method Details
#after_fork ⇒ void
This method returns an undefined value.
Performs routines that are necessary after a process fork, typically
triggered by a forking webserver. Performs whatever the backend requires
and ensures the server is listening for publications and running
scheduled tasks.
512 513 514 515 516 517 |
# File 'lib/message_bus.rb', line 512 def after_fork reliable_pub_sub.after_fork ensure_subscriber_thread # will ensure timer is running timer.queue {} end |
#allow_broadcast=(val) ⇒ void
This method returns an undefined value.
252 253 254 |
# File 'lib/message_bus.rb', line 252 def allow_broadcast=(val) configure(allow_broadcast: val) end |
#allow_broadcast? ⇒ Boolean
Returns whether or not broadcasting is allowed. If not explicitly set, defaults to false unless we’re in Rails test or development mode.
258 259 260 261 262 263 264 265 |
# File 'lib/message_bus.rb', line 258 def allow_broadcast? @config[:allow_broadcast] ||= if defined? ::Rails ::Rails.env.test? || ::Rails.env.development? else false end end |
#backend ⇒ Symbol
Returns the name of the backend implementation configured.
292 293 294 |
# File 'lib/message_bus.rb', line 292 def backend @config[:backend] || :redis end |
#backlog(channel = nil, last_id = nil, site_id = nil) ⇒ Array<MessageBus::Message>
Get messages from a channel backlog since the last ID specified, filtered by site
451 452 453 454 455 456 457 458 459 460 461 462 463 |
# File 'lib/message_bus.rb', line 451 def backlog(channel = nil, last_id = nil, site_id = nil) old = if channel reliable_pub_sub.backlog(encode_channel_name(channel, site_id), last_id) else reliable_pub_sub.global_backlog(last_id) end old.each do |m| (m) end old end |
#blocking_subscribe(channel = nil) {|message| ... } ⇒ void
This method returns an undefined value.
Subscribe to messages. Each message will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.
371 372 373 374 375 376 377 |
# File 'lib/message_bus.rb', line 371 def blocking_subscribe(channel = nil, &blk) if channel reliable_pub_sub.subscribe(encode_channel_name(channel), &blk) else reliable_pub_sub.global_subscribe(&blk) end end |
#cache_assets ⇒ Boolean
Returns whether or not to cache static assets for the diagnostics pages.
48 49 50 51 52 53 54 |
# File 'lib/message_bus.rb', line 48 def cache_assets if defined? @config[:cache_assets] @config[:cache_assets] else true end end |
#cache_assets=(val) ⇒ void
This method returns an undefined value.
43 44 45 |
# File 'lib/message_bus.rb', line 43 def cache_assets=(val) configure(cache_assets: val) end |
#chunked_encoding_enabled=(val) ⇒ void
This method returns an undefined value.
82 83 84 |
# File 'lib/message_bus.rb', line 82 def chunked_encoding_enabled=(val) configure(chunked_encoding_enabled: val) end |
#chunked_encoding_enabled? ⇒ Boolean
Returns whether or not chunked encoding is enabled. If not explicitly set, defaults to true.
76 77 78 |
# File 'lib/message_bus.rb', line 76 def chunked_encoding_enabled? @config[:chunked_encoding_enabled] == false ? false : true end |
#configure(config) ⇒ void
This method returns an undefined value.
Overrides existing configuration
172 173 174 |
# File 'lib/message_bus.rb', line 172 def configure(config) @config.merge!(config) end |
#destroy ⇒ void
This method returns an undefined value.
Stops listening for publications and stops executing scheduled tasks. Mostly used in tests to detroy entire bus.
492 493 494 495 496 497 498 499 500 501 502 503 504 505 |
# File 'lib/message_bus.rb', line 492 def destroy return if @destroyed reliable_pub_sub.global_unsubscribe @mutex.synchronize do return if @destroyed @subscriptions ||= {} @destroyed = true end @subscriber_thread.join if @subscriber_thread timer.stop end |
#enable_diagnostics ⇒ void
This method returns an undefined value.
Enables diagnostics tracking
298 299 300 |
# File 'lib/message_bus.rb', line 298 def enable_diagnostics MessageBus::Diagnostics.enable(self) end |
#extra_response_headers_lookup {|env| ... } ⇒ void
This method returns an undefined value.
235 236 237 238 |
# File 'lib/message_bus.rb', line 235 def extra_response_headers_lookup(&blk) configure(extra_response_headers_lookup: blk) if blk @config[:extra_response_headers_lookup] end |
#global_backlog(last_id = nil) ⇒ Array<MessageBus::Message>
Get messages from the global backlog since the last ID specified
440 441 442 |
# File 'lib/message_bus.rb', line 440 def global_backlog(last_id = nil) backlog(nil, last_id) end |
#group_ids_lookup {|env| ... } ⇒ void
This method returns an undefined value.
207 208 209 210 |
# File 'lib/message_bus.rb', line 207 def group_ids_lookup(&blk) configure(group_ids_lookup: blk) if blk @config[:group_ids_lookup] end |
#initialize ⇒ Object
36 37 38 39 |
# File 'lib/message_bus.rb', line 36 def initialize @config = {} @mutex = Synchronizer.new end |
#is_admin_lookup {|env| ... } ⇒ void
This method returns an undefined value.
216 217 218 219 |
# File 'lib/message_bus.rb', line 216 def is_admin_lookup(&blk) configure(is_admin_lookup: blk) if blk @config[:is_admin_lookup] end |
#keepalive_interval ⇒ Integer
Returns the keepalive interval in seconds. If not explicitly set, defaults to ‘60`.
553 554 555 |
# File 'lib/message_bus.rb', line 553 def keepalive_interval @config[:keepalive_interval] || 60 end |
#keepalive_interval=(interval) ⇒ Object
547 548 549 |
# File 'lib/message_bus.rb', line 547 def keepalive_interval=(interval) configure(keepalive_interval: interval) end |
#last_id(channel, site_id = nil) ⇒ Integer
Get the ID of the last message published on a channel, filtered by site
471 472 473 |
# File 'lib/message_bus.rb', line 471 def last_id(channel, site_id = nil) reliable_pub_sub.last_id(encode_channel_name(channel, site_id)) end |
#last_message(channel) ⇒ MessageBus::Message
Get the last message published on a channel
480 481 482 483 484 485 486 487 |
# File 'lib/message_bus.rb', line 480 def (channel) if last_id = last_id(channel) = backlog(channel, last_id - 1) if [0] end end end |
#listening? ⇒ Boolean
Returns whether or not the server is actively listening for publications on the bus.
521 522 523 |
# File 'lib/message_bus.rb', line 521 def listening? @subscriber_thread && @subscriber_thread.alive? end |
#local_subscribe(channel = nil, last_id = -1,) {|message| ... } ⇒ Proc
Subscribe to messages on a particular channel, filtered by the current site (@see #site_id_lookup). Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will not block, but instead the callbacks will be executed asynchronously in a dedicated subscriber thread.
408 409 410 411 |
# File 'lib/message_bus.rb', line 408 def local_subscribe(channel = nil, last_id = -1, &blk) site_id = site_id_lookup.call if site_id_lookup && !global?(channel) subscribe_impl(channel, site_id, last_id, &blk) end |
#local_unsubscribe(channel = nil, &blk) ⇒ void
This method returns an undefined value.
Removes a subscription to a particular channel, filtered by the current site (@see #site_id_lookup).
430 431 432 433 |
# File 'lib/message_bus.rb', line 430 def local_unsubscribe(channel = nil, &blk) site_id = site_id_lookup.call if site_id_lookup unsubscribe_impl(channel, site_id, &blk) end |
#logger ⇒ Logger
Returns the logger used by the bus. If not explicitly set, is configured to log to STDOUT at INFO level.
64 65 66 67 68 69 70 71 72 |
# File 'lib/message_bus.rb', line 64 def logger return @config[:logger] if @config[:logger] require 'logger' logger = Logger.new(STDOUT) logger.level = Logger::INFO configure(logger: logger) logger end |
#logger=(logger) ⇒ void
This method returns an undefined value.
58 59 60 |
# File 'lib/message_bus.rb', line 58 def logger=(logger) configure(logger: logger) end |
#long_polling_enabled=(val) ⇒ void
This method returns an undefined value.
94 95 96 |
# File 'lib/message_bus.rb', line 94 def long_polling_enabled=(val) configure(long_polling_enabled: val) end |
#long_polling_enabled? ⇒ Boolean
Returns whether or not long polling is enabled. If not explicitly set, defaults to true.
88 89 90 |
# File 'lib/message_bus.rb', line 88 def long_polling_enabled? @config[:long_polling_enabled] == false ? false : true end |
#long_polling_interval ⇒ Integer
Returns the long-polling interval in milliseconds. If not explicitly set, defaults to 25,000.
148 149 150 |
# File 'lib/message_bus.rb', line 148 def long_polling_interval @config[:long_polling_interval] || 25 * 1000 end |
#long_polling_interval=(millisecs) ⇒ void
This method returns an undefined value.
142 143 144 |
# File 'lib/message_bus.rb', line 142 def long_polling_interval=(millisecs) configure(long_polling_interval: millisecs) end |
#max_active_clients ⇒ Integer
Returns The number of simultanuous clients we can service; will revert to polling if we are out of slots. Defaults to 1000 if not explicitly set.
108 109 110 |
# File 'lib/message_bus.rb', line 108 def max_active_clients @config[:max_active_clients] || 1000 end |
#max_active_clients=(val) ⇒ void
This method returns an undefined value.
101 102 103 |
# File 'lib/message_bus.rb', line 101 def max_active_clients=(val) configure(max_active_clients: val) end |
#off ⇒ void
This method returns an undefined value.
Disables publication to the bus
159 160 161 |
# File 'lib/message_bus.rb', line 159 def off @off = true end |
#off? ⇒ Boolean
Returns whether the bus is disabled or not.
153 154 155 |
# File 'lib/message_bus.rb', line 153 def off? @off end |
#on ⇒ void
This method returns an undefined value.
Enables publication to the bus
165 166 167 |
# File 'lib/message_bus.rb', line 165 def on @destroyed = @off = false end |
#on_connect(&blk) ⇒ Object
240 241 242 243 |
# File 'lib/message_bus.rb', line 240 def on_connect(&blk) configure(on_connect: blk) if blk @config[:on_connect] end |
#on_disconnect(&blk) ⇒ Object
245 246 247 248 |
# File 'lib/message_bus.rb', line 245 def on_disconnect(&blk) configure(on_disconnect: blk) if blk @config[:on_disconnect] end |
#on_middleware_error {|env, e| ... } ⇒ void
This method returns an undefined value.
226 227 228 229 |
# File 'lib/message_bus.rb', line 226 def on_middleware_error(&blk) configure(on_middleware_error: blk) if blk @config[:on_middleware_error] end |
#publish(channel, data, opts = nil) ⇒ Integer
Publishes a message to a channel
319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 |
# File 'lib/message_bus.rb', line 319 def publish(channel, data, opts = nil) return if @off @mutex.synchronize do raise ::MessageBus::BusDestroyed if @destroyed end user_ids = nil group_ids = nil client_ids = nil site_id = nil if opts user_ids = opts[:user_ids] group_ids = opts[:group_ids] client_ids = opts[:client_ids] site_id = opts[:site_id] end raise ::MessageBus::InvalidMessage if (user_ids || group_ids) && global?(channel) encoded_data = JSON.dump( data: data, user_ids: user_ids, group_ids: group_ids, client_ids: client_ids ) channel_opts = nil if opts && ((age = opts[:max_backlog_age]) || (size = opts[:max_backlog_size])) channel_opts = { max_backlog_size: size, max_backlog_age: age } end encoded_channel_name = encode_channel_name(channel, site_id) reliable_pub_sub.publish(encoded_channel_name, encoded_data, channel_opts) end |
#rack_hijack_enabled=(val) ⇒ void
This method returns an undefined value.
136 137 138 |
# File 'lib/message_bus.rb', line 136 def rack_hijack_enabled=(val) configure(rack_hijack_enabled: val) end |
#rack_hijack_enabled? ⇒ Boolean
Returns whether or not Rack Hijack is enabled. If not explicitly set, will default to true, unless we’re on Passenger without the ability to set the advertised_concurrency_level to 0.
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/message_bus.rb', line 115 def rack_hijack_enabled? if @config[:rack_hijack_enabled].nil? enable = true # without this switch passenger will explode # it will run out of connections after about 10 if defined? PhusionPassenger enable = false if PhusionPassenger.respond_to? :advertised_concurrency_level PhusionPassenger.advertised_concurrency_level = 0 enable = true end end configure(rack_hijack_enabled: enable) end @config[:rack_hijack_enabled] end |
#redis_config=(config) ⇒ void
This method returns an undefined value.
Overrides existing configuration, explicitly enabling the redis backend
179 180 181 |
# File 'lib/message_bus.rb', line 179 def redis_config=(config) configure(config.merge(backend: :redis)) end |
#reliable_pub_sub ⇒ MessageBus::Backend::Base
Returns the configured backend. If not explicitly set, will be loaded based on the configuration provided.
275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 |
# File 'lib/message_bus.rb', line 275 def reliable_pub_sub @mutex.synchronize do return nil if @destroyed # Make sure logger is loaded before config is # passed to backend. logger @config[:reliable_pub_sub] ||= begin @config[:backend_options] ||= {} require "message_bus/backends/#{backend}" MessageBus::BACKENDS[backend].new @config end end end |
#reliable_pub_sub=(pub_sub) ⇒ void
This method returns an undefined value.
269 270 271 |
# File 'lib/message_bus.rb', line 269 def reliable_pub_sub=(pub_sub) configure(reliable_pub_sub: pub_sub) end |
#reset! ⇒ Object
526 527 528 |
# File 'lib/message_bus.rb', line 526 def reset! reliable_pub_sub.reset! if reliable_pub_sub end |
#site_id_lookup {|env| ... } ⇒ void
This method returns an undefined value.
189 190 191 192 |
# File 'lib/message_bus.rb', line 189 def site_id_lookup(&blk) configure(site_id_lookup: blk) if blk @config[:site_id_lookup] end |
#subscribe(channel = nil, last_id = -1,) {|message| ... } ⇒ Proc
Subscribe to messages on a particular channel. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will not block, but instead the callbacks will be executed asynchronously in a dedicated subscriber thread.
391 392 393 |
# File 'lib/message_bus.rb', line 391 def subscribe(channel = nil, last_id = -1, &blk) subscribe_impl(channel, nil, last_id, &blk) end |
#timer ⇒ MessageBus::TimerThread
Returns the timer thread used for triggering scheduled routines at specific times/intervals.
532 533 534 535 536 537 538 539 540 541 542 |
# File 'lib/message_bus.rb', line 532 def timer return @timer_thread if @timer_thread @timer_thread ||= begin t = MessageBus::TimerThread.new t.on_error do |e| logger.warn "Failed to process job: #{e} #{e.backtrace}" end t end end |
#unsubscribe(channel = nil, &blk) ⇒ void
This method returns an undefined value.
Removes a subscription to a particular channel.
419 420 421 |
# File 'lib/message_bus.rb', line 419 def unsubscribe(channel = nil, &blk) unsubscribe_impl(channel, nil, &blk) end |
#user_id_lookup {|env| ... } ⇒ void
This method returns an undefined value.
198 199 200 201 |
# File 'lib/message_bus.rb', line 198 def user_id_lookup(&blk) configure(user_id_lookup: blk) if blk @config[:user_id_lookup] end |