Module: MessageBus::Implementation
- Included in:
- MessageBus, Instance
- Defined in:
- lib/message_bus.rb
Overview
The main server-side interface to a message bus for the purposes of configuration, publishing and subscribing
Defined Under Namespace
Classes: Synchronizer
Instance Attribute Summary collapse
-
#config ⇒ Hash<Symbol => Object>
(also: #redis_config)
readonly
Configuration options hash.
Instance Method Summary collapse
-
#after_fork ⇒ void
Performs routines that are necessary after a process fork, typically triggered by a forking webserver.
- #allow_broadcast=(val) ⇒ void
-
#allow_broadcast? ⇒ Boolean
Whether or not broadcasting is allowed.
-
#backend ⇒ Symbol
The name of the backend implementation configured.
-
#backlog(channel = nil, last_id = nil, site_id = nil) ⇒ Array<MessageBus::Message>
Get messages from a channel backlog since the last ID specified, filtered by site.
-
#base_route ⇒ String
The route that message bus will respond to.
- #base_route=(route) ⇒ void
-
#blocking_subscribe(channel = nil) {|message| ... } ⇒ void
Subscribe to messages.
-
#cache_assets ⇒ Boolean
Whether or not to cache static assets for the diagnostics pages.
- #cache_assets=(val) ⇒ void
- #chunked_encoding_enabled=(val) ⇒ void
-
#chunked_encoding_enabled? ⇒ Boolean
Whether or not chunked encoding is enabled.
-
#client_message_filters ⇒ Array
Returns a hash of message filters that have been registered.
-
#configure(config) ⇒ void
Overrides existing configuration.
-
#destroy ⇒ void
Stops listening for publications and stops executing scheduled tasks.
-
#enable_diagnostics ⇒ void
Enables diagnostics tracking.
- #extra_response_headers_lookup {|env| ... } ⇒ void
-
#global_backlog(last_id = nil) ⇒ Array<MessageBus::Message>
Get messages from the global backlog since the last ID specified.
- #group_ids_lookup {|env| ... } ⇒ void
- #initialize ⇒ Object
- #is_admin_lookup {|env| ... } ⇒ void
-
#keepalive_interval ⇒ Integer
The keepalive interval in seconds.
- #keepalive_interval=(interval) ⇒ Object
-
#last_id(channel, site_id = nil) ⇒ Integer
Get the ID of the last message published on a channel, filtered by site.
-
#last_message(channel) ⇒ MessageBus::Message
Get the last message published on a channel.
-
#listening? ⇒ Boolean
Whether or not the server is actively listening for publications on the bus.
-
#local_subscribe(channel = nil, last_id = -1,) {|message| ... } ⇒ Proc
Subscribe to messages on a particular channel, filtered by the current site (@see #site_id_lookup).
-
#local_unsubscribe(channel = nil, &blk) ⇒ void
Removes a subscription to a particular channel, filtered by the current site (@see #site_id_lookup).
-
#logger ⇒ Logger
The logger used by the bus.
- #logger=(logger) ⇒ void
- #long_polling_enabled=(val) ⇒ void
-
#long_polling_enabled? ⇒ Boolean
Whether or not long polling is enabled.
-
#long_polling_interval ⇒ Integer
The long-polling interval in milliseconds.
- #long_polling_interval=(millisecs) ⇒ void
-
#max_active_clients ⇒ Integer
The number of simultaneous clients we can service; will revert to polling if we are out of slots.
- #max_active_clients=(val) ⇒ void
-
#off ⇒ void
Disables publication to the bus.
-
#off? ⇒ Boolean
Whether the bus is disabled or not.
-
#on ⇒ void
Enables publication to the bus.
- #on_connect(&blk) ⇒ Object
- #on_disconnect(&blk) ⇒ Object
- #on_middleware_error {|env, e| ... } ⇒ void
-
#publish(channel, data, opts = nil) ⇒ Integer
Publishes a message to a channel.
- #rack_hijack_enabled=(val) ⇒ void
-
#rack_hijack_enabled? ⇒ Boolean
Whether or not Rack Hijack is enabled.
-
#redis_config=(config) ⇒ void
Overrides existing configuration, explicitly enabling the redis backend.
-
#register_client_message_filter(channel_prefix) {|message| ... } ⇒ void
Registers a client message filter that allows messages to be filtered from the client.
-
#reliable_pub_sub ⇒ MessageBus::Backend::Base
The configured backend.
- #reliable_pub_sub=(pub_sub) ⇒ void
- #reset! ⇒ Object
- #site_id_lookup {|env| ... } ⇒ void
-
#subscribe(channel = nil, last_id = -1,) {|message| ... } ⇒ Proc
Subscribe to messages on a particular channel.
-
#timer ⇒ MessageBus::TimerThread
The timer thread used for triggering scheduled routines at specific times/intervals.
-
#transport_codec ⇒ MessageBus::Codec::Base
Codec used to encode and decode Message payloads.
- #transport_codec=(codec) ⇒ void
-
#unsubscribe(channel = nil, &blk) ⇒ void
Removes a subscription to a particular channel.
- #user_id_lookup {|env| ... } ⇒ void
Instance Attribute Details
#config ⇒ Hash<Symbol => Object> (readonly) Also known as: redis_config
Returns Configuration options hash.
34 35 36 |
# File 'lib/message_bus.rb', line 34 def config @config end |
Instance Method Details
#after_fork ⇒ void
This method returns an undefined value.
Performs routines that are necessary after a process fork, typically
triggered by a forking webserver. Performs whatever the backend requires
and ensures the server is listening for publications and running
scheduled tasks.
557 558 559 560 561 562 |
# File 'lib/message_bus.rb', line 557 def after_fork reliable_pub_sub.after_fork ensure_subscriber_thread # will ensure timer is running timer.queue {} end |
#allow_broadcast=(val) ⇒ void
This method returns an undefined value.
274 275 276 |
# File 'lib/message_bus.rb', line 274 def allow_broadcast=(val) configure(allow_broadcast: val) end |
#allow_broadcast? ⇒ Boolean
Returns whether or not broadcasting is allowed. If not explicitly set, defaults to false unless we’re in Rails test or development mode.
280 281 282 283 284 285 286 287 |
# File 'lib/message_bus.rb', line 280 def allow_broadcast? @config[:allow_broadcast] ||= if defined? ::Rails.env ::Rails.env.test? || ::Rails.env.development? else false end end |
#backend ⇒ Symbol
Returns the name of the backend implementation configured.
325 326 327 |
# File 'lib/message_bus.rb', line 325 def backend @config[:backend] || :redis end |
#backlog(channel = nil, last_id = nil, site_id = nil) ⇒ Array<MessageBus::Message>
Get messages from a channel backlog since the last ID specified, filtered by site
495 496 497 498 499 500 501 502 503 504 505 506 507 |
# File 'lib/message_bus.rb', line 495 def backlog(channel = nil, last_id = nil, site_id = nil) old = if channel reliable_pub_sub.backlog(encode_channel_name(channel, site_id), last_id) else reliable_pub_sub.global_backlog(last_id) end old.each do |m| (m) end old end |
#base_route ⇒ String
Returns the route that message bus will respond to. If not explicitly set, defaults to “/”. Requests to “##base_routemessage-bus/*” will be handled by the message bus server.
170 171 172 |
# File 'lib/message_bus.rb', line 170 def base_route @config[:base_route] || "/" end |
#base_route=(route) ⇒ void
This method returns an undefined value.
163 164 165 |
# File 'lib/message_bus.rb', line 163 def base_route=(route) configure(base_route: route.gsub(Regexp.new('\A(?!/)|(?<!/)\Z|//+'), "/")) end |
#blocking_subscribe(channel = nil) {|message| ... } ⇒ void
This method returns an undefined value.
Subscribe to messages. Each message will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.
415 416 417 418 419 420 421 |
# File 'lib/message_bus.rb', line 415 def blocking_subscribe(channel = nil, &blk) if channel reliable_pub_sub.subscribe(encode_channel_name(channel), &blk) else reliable_pub_sub.global_subscribe(&blk) end end |
#cache_assets ⇒ Boolean
Returns whether or not to cache static assets for the diagnostics pages.
57 58 59 60 61 62 63 |
# File 'lib/message_bus.rb', line 57 def cache_assets if defined? @config[:cache_assets] @config[:cache_assets] else true end end |
#cache_assets=(val) ⇒ void
This method returns an undefined value.
52 53 54 |
# File 'lib/message_bus.rb', line 52 def cache_assets=(val) configure(cache_assets: val) end |
#chunked_encoding_enabled=(val) ⇒ void
This method returns an undefined value.
91 92 93 |
# File 'lib/message_bus.rb', line 91 def chunked_encoding_enabled=(val) configure(chunked_encoding_enabled: val) end |
#chunked_encoding_enabled? ⇒ Boolean
Returns whether or not chunked encoding is enabled. If not explicitly set, defaults to true.
85 86 87 |
# File 'lib/message_bus.rb', line 85 def chunked_encoding_enabled? @config[:chunked_encoding_enabled] == false ? false : true end |
#client_message_filters ⇒ Array
Returns a hash of message filters that have been registered
617 618 619 620 |
# File 'lib/message_bus.rb', line 617 def configure(client_message_filters: []) if !@config[:client_message_filters] @config[:client_message_filters] end |
#configure(config) ⇒ void
This method returns an undefined value.
Overrides existing configuration
194 195 196 |
# File 'lib/message_bus.rb', line 194 def configure(config) @config.merge!(config) end |
#destroy ⇒ void
This method returns an undefined value.
Stops listening for publications and stops executing scheduled tasks. Mostly used in tests to destroy entire bus.
536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 |
# File 'lib/message_bus.rb', line 536 def destroy return if @destroyed reliable_pub_sub.global_unsubscribe reliable_pub_sub.destroy @mutex.synchronize do return if @destroyed @subscriptions ||= {} @destroyed = true end @subscriber_thread.join if @subscriber_thread timer.stop end |
#enable_diagnostics ⇒ void
This method returns an undefined value.
Enables diagnostics tracking
331 332 333 |
# File 'lib/message_bus.rb', line 331 def enable_diagnostics MessageBus::Diagnostics.enable(self) end |
#extra_response_headers_lookup {|env| ... } ⇒ void
This method returns an undefined value.
257 258 259 260 |
# File 'lib/message_bus.rb', line 257 def extra_response_headers_lookup(&blk) configure(extra_response_headers_lookup: blk) if blk @config[:extra_response_headers_lookup] end |
#global_backlog(last_id = nil) ⇒ Array<MessageBus::Message>
Get messages from the global backlog since the last ID specified
484 485 486 |
# File 'lib/message_bus.rb', line 484 def global_backlog(last_id = nil) backlog(nil, last_id) end |
#group_ids_lookup {|env| ... } ⇒ void
This method returns an undefined value.
229 230 231 232 |
# File 'lib/message_bus.rb', line 229 def group_ids_lookup(&blk) configure(group_ids_lookup: blk) if blk @config[:group_ids_lookup] end |
#initialize ⇒ Object
41 42 43 44 45 46 47 48 |
# File 'lib/message_bus.rb', line 41 def initialize @config = {} @mutex = Synchronizer.new @off = false @destroyed = false @timer_thread = nil @subscriber_thread = nil end |
#is_admin_lookup {|env| ... } ⇒ void
This method returns an undefined value.
238 239 240 241 |
# File 'lib/message_bus.rb', line 238 def is_admin_lookup(&blk) configure(is_admin_lookup: blk) if blk @config[:is_admin_lookup] end |
#keepalive_interval ⇒ Integer
Returns the keepalive interval in seconds. If not explicitly set, defaults to ‘60`.
598 599 600 |
# File 'lib/message_bus.rb', line 598 def keepalive_interval @config[:keepalive_interval] || 60 end |
#keepalive_interval=(interval) ⇒ Object
592 593 594 |
# File 'lib/message_bus.rb', line 592 def keepalive_interval=(interval) configure(keepalive_interval: interval) end |
#last_id(channel, site_id = nil) ⇒ Integer
Get the ID of the last message published on a channel, filtered by site
515 516 517 |
# File 'lib/message_bus.rb', line 515 def last_id(channel, site_id = nil) reliable_pub_sub.last_id(encode_channel_name(channel, site_id)) end |
#last_message(channel) ⇒ MessageBus::Message
Get the last message published on a channel
524 525 526 527 528 529 530 531 |
# File 'lib/message_bus.rb', line 524 def (channel) if last_id = last_id(channel) = backlog(channel, last_id - 1) if [0] end end end |
#listening? ⇒ Boolean
Returns whether or not the server is actively listening for publications on the bus.
566 567 568 |
# File 'lib/message_bus.rb', line 566 def listening? @subscriber_thread && @subscriber_thread.alive? end |
#local_subscribe(channel = nil, last_id = -1,) {|message| ... } ⇒ Proc
Subscribe to messages on a particular channel, filtered by the current site (@see #site_id_lookup). Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will not block, but instead the callbacks will be executed asynchronously in a dedicated subscriber thread.
452 453 454 455 |
# File 'lib/message_bus.rb', line 452 def local_subscribe(channel = nil, last_id = -1, &blk) site_id = site_id_lookup.call if site_id_lookup && !global?(channel) subscribe_impl(channel, site_id, last_id, &blk) end |
#local_unsubscribe(channel = nil, &blk) ⇒ void
This method returns an undefined value.
Removes a subscription to a particular channel, filtered by the current site (@see #site_id_lookup).
474 475 476 477 |
# File 'lib/message_bus.rb', line 474 def local_unsubscribe(channel = nil, &blk) site_id = site_id_lookup.call if site_id_lookup unsubscribe_impl(channel, site_id, &blk) end |
#logger ⇒ Logger
Returns the logger used by the bus. If not explicitly set, is configured to log to STDOUT at INFO level.
73 74 75 76 77 78 79 80 81 |
# File 'lib/message_bus.rb', line 73 def logger return @config[:logger] if @config[:logger] require 'logger' logger = Logger.new(STDOUT) logger.level = Logger::INFO configure(logger: logger) logger end |
#logger=(logger) ⇒ void
This method returns an undefined value.
67 68 69 |
# File 'lib/message_bus.rb', line 67 def logger=(logger) configure(logger: logger) end |
#long_polling_enabled=(val) ⇒ void
This method returns an undefined value.
103 104 105 |
# File 'lib/message_bus.rb', line 103 def long_polling_enabled=(val) configure(long_polling_enabled: val) end |
#long_polling_enabled? ⇒ Boolean
Returns whether or not long polling is enabled. If not explicitly set, defaults to true.
97 98 99 |
# File 'lib/message_bus.rb', line 97 def long_polling_enabled? @config[:long_polling_enabled] == false ? false : true end |
#long_polling_interval ⇒ Integer
Returns the long-polling interval in milliseconds. If not explicitly set, defaults to 25,000.
157 158 159 |
# File 'lib/message_bus.rb', line 157 def long_polling_interval @config[:long_polling_interval] || 25 * 1000 end |
#long_polling_interval=(millisecs) ⇒ void
This method returns an undefined value.
151 152 153 |
# File 'lib/message_bus.rb', line 151 def long_polling_interval=(millisecs) configure(long_polling_interval: millisecs) end |
#max_active_clients ⇒ Integer
Returns The number of simultaneous clients we can service; will revert to polling if we are out of slots. Defaults to 1000 if not explicitly set.
117 118 119 |
# File 'lib/message_bus.rb', line 117 def max_active_clients @config[:max_active_clients] || 1000 end |
#max_active_clients=(val) ⇒ void
This method returns an undefined value.
110 111 112 |
# File 'lib/message_bus.rb', line 110 def max_active_clients=(val) configure(max_active_clients: val) end |
#off ⇒ void
This method returns an undefined value.
Disables publication to the bus
181 182 183 |
# File 'lib/message_bus.rb', line 181 def off @off = true end |
#off? ⇒ Boolean
Returns whether the bus is disabled or not.
175 176 177 |
# File 'lib/message_bus.rb', line 175 def off? @off end |
#on ⇒ void
This method returns an undefined value.
Enables publication to the bus
187 188 189 |
# File 'lib/message_bus.rb', line 187 def on @destroyed = @off = false end |
#on_connect(&blk) ⇒ Object
262 263 264 265 |
# File 'lib/message_bus.rb', line 262 def on_connect(&blk) configure(on_connect: blk) if blk @config[:on_connect] end |
#on_disconnect(&blk) ⇒ Object
267 268 269 270 |
# File 'lib/message_bus.rb', line 267 def on_disconnect(&blk) configure(on_disconnect: blk) if blk @config[:on_disconnect] end |
#on_middleware_error {|env, e| ... } ⇒ void
This method returns an undefined value.
248 249 250 251 |
# File 'lib/message_bus.rb', line 248 def on_middleware_error(&blk) configure(on_middleware_error: blk) if blk @config[:on_middleware_error] end |
#publish(channel, data, opts = nil) ⇒ Integer
Publishes a message to a channel
353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 |
# File 'lib/message_bus.rb', line 353 def publish(channel, data, opts = nil) return if @off @mutex.synchronize do raise ::MessageBus::BusDestroyed if @destroyed end user_ids = nil group_ids = nil client_ids = nil site_id = nil if opts user_ids = opts[:user_ids] group_ids = opts[:group_ids] client_ids = opts[:client_ids] site_id = opts[:site_id] end if (user_ids || group_ids) && global?(channel) raise ::MessageBus::InvalidMessage end if (user_ids == []) || (group_ids == []) || (client_ids == []) raise ::MessageBus::InvalidMessageTarget end encoded_data = transport_codec.encode({ "data" => data, "user_ids" => user_ids, "group_ids" => group_ids, "client_ids" => client_ids }) channel_opts = {} if opts if ((age = opts[:max_backlog_age]) || (size = opts[:max_backlog_size])) channel_opts[:max_backlog_size] = size channel_opts[:max_backlog_age] = age end if opts.has_key?(:queue_in_memory) channel_opts[:queue_in_memory] = opts[:queue_in_memory] end end encoded_channel_name = encode_channel_name(channel, site_id) reliable_pub_sub.publish(encoded_channel_name, encoded_data, channel_opts) end |
#rack_hijack_enabled=(val) ⇒ void
This method returns an undefined value.
145 146 147 |
# File 'lib/message_bus.rb', line 145 def rack_hijack_enabled=(val) configure(rack_hijack_enabled: val) end |
#rack_hijack_enabled? ⇒ Boolean
Returns whether or not Rack Hijack is enabled. If not explicitly set, will default to true, unless we’re on Passenger without the ability to set the advertised_concurrency_level to 0.
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 |
# File 'lib/message_bus.rb', line 124 def rack_hijack_enabled? if @config[:rack_hijack_enabled].nil? enable = true # without this switch passenger will explode # it will run out of connections after about 10 if defined? PhusionPassenger enable = false if PhusionPassenger.respond_to? :advertised_concurrency_level PhusionPassenger.advertised_concurrency_level = 0 enable = true end end configure(rack_hijack_enabled: enable) end @config[:rack_hijack_enabled] end |
#redis_config=(config) ⇒ void
This method returns an undefined value.
Overrides existing configuration, explicitly enabling the redis backend
201 202 203 |
# File 'lib/message_bus.rb', line 201 def redis_config=(config) configure(config.merge(backend: :redis)) end |
#register_client_message_filter(channel_prefix) {|message| ... } ⇒ void
This method returns an undefined value.
Registers a client message filter that allows messages to be filtered from the client.
609 610 611 612 613 614 |
# File 'lib/message_bus.rb', line 609 def (channel_prefix, &blk) if blk configure(client_message_filters: []) if !@config[:client_message_filters] @config[:client_message_filters] << [channel_prefix, blk] end end |
#reliable_pub_sub ⇒ MessageBus::Backend::Base
Returns the configured backend. If not explicitly set, will be loaded based on the configuration provided.
308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 |
# File 'lib/message_bus.rb', line 308 def reliable_pub_sub @mutex.synchronize do return nil if @destroyed # Make sure logger is loaded before config is # passed to backend. logger @config[:reliable_pub_sub] ||= begin @config[:backend_options] ||= {} require "message_bus/backends/#{backend}" MessageBus::BACKENDS[backend].new @config end end end |
#reliable_pub_sub=(pub_sub) ⇒ void
This method returns an undefined value.
302 303 304 |
# File 'lib/message_bus.rb', line 302 def reliable_pub_sub=(pub_sub) configure(reliable_pub_sub: pub_sub) end |
#reset! ⇒ Object
571 572 573 |
# File 'lib/message_bus.rb', line 571 def reset! reliable_pub_sub.reset! if reliable_pub_sub end |
#site_id_lookup {|env| ... } ⇒ void
This method returns an undefined value.
211 212 213 214 |
# File 'lib/message_bus.rb', line 211 def site_id_lookup(&blk) configure(site_id_lookup: blk) if blk @config[:site_id_lookup] end |
#subscribe(channel = nil, last_id = -1,) {|message| ... } ⇒ Proc
Subscribe to messages on a particular channel. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will not block, but instead the callbacks will be executed asynchronously in a dedicated subscriber thread.
435 436 437 |
# File 'lib/message_bus.rb', line 435 def subscribe(channel = nil, last_id = -1, &blk) subscribe_impl(channel, nil, last_id, &blk) end |
#timer ⇒ MessageBus::TimerThread
Returns the timer thread used for triggering scheduled routines at specific times/intervals.
577 578 579 580 581 582 583 584 585 586 587 |
# File 'lib/message_bus.rb', line 577 def timer return @timer_thread if @timer_thread @timer_thread ||= begin t = MessageBus::TimerThread.new t.on_error do |e| logger.warn "Failed to process job: #{e} #{e.backtrace}" end t end end |
#transport_codec ⇒ MessageBus::Codec::Base
Returns codec used to encode and decode Message payloads.
296 297 298 |
# File 'lib/message_bus.rb', line 296 def transport_codec @config[:transport_codec] ||= MessageBus::Codec::Json.new end |
#transport_codec=(codec) ⇒ void
This method returns an undefined value.
291 292 293 |
# File 'lib/message_bus.rb', line 291 def transport_codec=(codec) configure(transport_codec: codec) end |
#unsubscribe(channel = nil, &blk) ⇒ void
This method returns an undefined value.
Removes a subscription to a particular channel.
463 464 465 |
# File 'lib/message_bus.rb', line 463 def unsubscribe(channel = nil, &blk) unsubscribe_impl(channel, nil, &blk) end |
#user_id_lookup {|env| ... } ⇒ void
This method returns an undefined value.
220 221 222 223 |
# File 'lib/message_bus.rb', line 220 def user_id_lookup(&blk) configure(user_id_lookup: blk) if blk @config[:user_id_lookup] end |