Module: MessageBus::Implementation
- Included in:
- MessageBus, Instance
- Defined in:
- lib/message_bus.rb
Defined Under Namespace
Classes: Synchronizer
Constant Summary collapse
- ENCODE_SITE_TOKEN =
"$|$"
Instance Attribute Summary collapse
-
#config ⇒ Object
(also: #redis_config)
readonly
Configuration options hash.
Instance Method Summary collapse
- #after_fork ⇒ Object
- #allow_broadcast=(val) ⇒ Object
- #allow_broadcast? ⇒ Boolean
- #backend ⇒ Object
- #backlog(channel = nil, last_id = nil, site_id = nil) ⇒ Object
- #blocking_subscribe(channel = nil, &blk) ⇒ Object
- #cache_assets ⇒ Object
- #cache_assets=(val) ⇒ Object
- #chunked_encoding_enabled=(val) ⇒ Object
- #chunked_encoding_enabled? ⇒ Boolean
- #configure(config) ⇒ Object
- #decode_channel_name(channel) ⇒ Object
-
#destroy ⇒ Object
mostly used in tests to detroy entire bus.
- #enable_diagnostics ⇒ Object
-
#encode_channel_name(channel, site_id = nil) ⇒ Object
encode channel name to include site.
- #extra_response_headers_lookup(&blk) ⇒ Object
- #global_backlog(last_id = nil) ⇒ Object
- #group_ids_lookup(&blk) ⇒ Object
- #initialize ⇒ Object
- #is_admin_lookup(&blk) ⇒ Object
- #keepalive_interval ⇒ Object
-
#keepalive_interval=(interval) ⇒ Object
set to 0 to disable, anything higher and a keepalive will run every N seconds, if it fails process is killed.
- #last_id(channel, site_id = nil) ⇒ Object
- #last_message(channel) ⇒ Object
- #listening? ⇒ Boolean
-
#local_subscribe(channel = nil, last_id = -1,, &blk) ⇒ Object
subscribe only on current site.
- #local_unsubscribe(channel = nil, &blk) ⇒ Object
- #logger ⇒ Object
- #logger=(logger) ⇒ Object
- #long_polling_enabled=(val) ⇒ Object
- #long_polling_enabled? ⇒ Boolean
- #long_polling_interval ⇒ Object
- #long_polling_interval=(millisecs) ⇒ Object
- #max_active_clients ⇒ Object
-
#max_active_clients=(val) ⇒ Object
The number of simultanuous clients we can service will revert to polling if we are out of slots.
- #off ⇒ Object
- #on ⇒ Object
- #on_connect(&blk) ⇒ Object
- #on_disconnect(&blk) ⇒ Object
- #on_middleware_error(&blk) ⇒ Object
- #publish(channel, data, opts = nil) ⇒ Object
- #rack_hijack_enabled=(val) ⇒ Object
- #rack_hijack_enabled? ⇒ Boolean
-
#redis_config=(config) ⇒ Object
Allow us to inject a redis db.
- #reliable_pub_sub ⇒ Object
- #reliable_pub_sub=(pub_sub) ⇒ Object
-
#reset! ⇒ Object
will reset all keys.
- #site_id_lookup(&blk) ⇒ Object
- #subscribe(channel = nil, last_id = -1,, &blk) ⇒ Object
- #timer ⇒ Object
- #unsubscribe(channel = nil, &blk) ⇒ Object
- #user_id_lookup(&blk) ⇒ Object
Instance Attribute Details
#config ⇒ Object (readonly) Also known as: redis_config
Configuration options hash
25 26 27 |
# File 'lib/message_bus.rb', line 25 def config @config end |
Instance Method Details
#after_fork ⇒ Object
329 330 331 332 333 334 |
# File 'lib/message_bus.rb', line 329 def after_fork reliable_pub_sub.after_fork ensure_subscriber_thread # will ensure timer is running timer.queue {} end |
#allow_broadcast=(val) ⇒ Object
178 179 180 |
# File 'lib/message_bus.rb', line 178 def allow_broadcast=(val) configure(allow_broadcast: val) end |
#allow_broadcast? ⇒ Boolean
182 183 184 185 186 187 188 189 |
# File 'lib/message_bus.rb', line 182 def allow_broadcast? @config[:allow_broadcast] ||= if defined? ::Rails ::Rails.env.test? || ::Rails.env.development? else false end end |
#backend ⇒ Object
206 207 208 |
# File 'lib/message_bus.rb', line 206 def backend @config[:backend] || :redis end |
#backlog(channel = nil, last_id = nil, site_id = nil) ⇒ Object
289 290 291 292 293 294 295 296 297 298 299 300 301 |
# File 'lib/message_bus.rb', line 289 def backlog(channel = nil, last_id = nil, site_id = nil) old = if channel reliable_pub_sub.backlog(encode_channel_name(channel, site_id), last_id) else reliable_pub_sub.global_backlog(last_id) end old.each do |m| (m) end old end |
#blocking_subscribe(channel = nil, &blk) ⇒ Object
242 243 244 245 246 247 248 |
# File 'lib/message_bus.rb', line 242 def blocking_subscribe(channel = nil, &blk) if channel reliable_pub_sub.subscribe(encode_channel_name(channel), &blk) else reliable_pub_sub.global_subscribe(&blk) end end |
#cache_assets ⇒ Object
41 42 43 44 45 46 47 |
# File 'lib/message_bus.rb', line 41 def cache_assets if defined? @config[:cache_assets] @config[:cache_assets] else true end end |
#cache_assets=(val) ⇒ Object
37 38 39 |
# File 'lib/message_bus.rb', line 37 def cache_assets=(val) configure(cache_assets: val) end |
#chunked_encoding_enabled=(val) ⇒ Object
66 67 68 |
# File 'lib/message_bus.rb', line 66 def chunked_encoding_enabled=(val) configure(chunked_encoding_enabled: val) end |
#chunked_encoding_enabled? ⇒ Boolean
62 63 64 |
# File 'lib/message_bus.rb', line 62 def chunked_encoding_enabled? @config[:chunked_encoding_enabled] == false ? false : true end |
#configure(config) ⇒ Object
127 128 129 |
# File 'lib/message_bus.rb', line 127 def configure(config) @config.merge!(config) end |
#decode_channel_name(channel) ⇒ Object
262 263 264 |
# File 'lib/message_bus.rb', line 262 def decode_channel_name(channel) channel.split(ENCODE_SITE_TOKEN) end |
#destroy ⇒ Object
mostly used in tests to detroy entire bus
317 318 319 320 321 322 323 324 325 326 327 |
# File 'lib/message_bus.rb', line 317 def destroy return if @destroyed reliable_pub_sub.global_unsubscribe @mutex.synchronize do @subscriptions ||= {} @destroyed = true end @subscriber_thread.join if @subscriber_thread timer.stop end |
#enable_diagnostics ⇒ Object
210 211 212 |
# File 'lib/message_bus.rb', line 210 def enable_diagnostics MessageBus::Diagnostics.enable end |
#encode_channel_name(channel, site_id = nil) ⇒ Object
encode channel name to include site
253 254 255 256 257 258 259 260 |
# File 'lib/message_bus.rb', line 253 def encode_channel_name(channel, site_id = nil) if (site_id || site_id_lookup) && !global?(channel) raise ArgumentError.new channel if channel.include? ENCODE_SITE_TOKEN "#{channel}#{ENCODE_SITE_TOKEN}#{site_id || site_id_lookup.call}" else channel end end |
#extra_response_headers_lookup(&blk) ⇒ Object
163 164 165 166 |
# File 'lib/message_bus.rb', line 163 def extra_response_headers_lookup(&blk) configure(extra_response_headers_lookup: blk) if blk @config[:extra_response_headers_lookup] end |
#global_backlog(last_id = nil) ⇒ Object
285 286 287 |
# File 'lib/message_bus.rb', line 285 def global_backlog(last_id = nil) backlog(nil, last_id) end |
#group_ids_lookup(&blk) ⇒ Object
148 149 150 151 |
# File 'lib/message_bus.rb', line 148 def group_ids_lookup(&blk) configure(group_ids_lookup: blk) if blk @config[:group_ids_lookup] end |
#initialize ⇒ Object
32 33 34 35 |
# File 'lib/message_bus.rb', line 32 def initialize @config = {} @mutex = Synchronizer.new end |
#is_admin_lookup(&blk) ⇒ Object
153 154 155 156 |
# File 'lib/message_bus.rb', line 153 def is_admin_lookup(&blk) configure(is_admin_lookup: blk) if blk @config[:is_admin_lookup] end |
#keepalive_interval ⇒ Object
363 364 365 |
# File 'lib/message_bus.rb', line 363 def keepalive_interval @config[:keepalive_interval] || 60 end |
#keepalive_interval=(interval) ⇒ Object
set to 0 to disable, anything higher and a keepalive will run every N seconds, if it fails process is killed
359 360 361 |
# File 'lib/message_bus.rb', line 359 def keepalive_interval=(interval) configure(keepalive_interval: interval) end |
#last_id(channel, site_id = nil) ⇒ Object
303 304 305 |
# File 'lib/message_bus.rb', line 303 def last_id(channel , site_id = nil) reliable_pub_sub.last_id(encode_channel_name(channel, site_id)) end |
#last_message(channel) ⇒ Object
307 308 309 310 311 312 313 314 |
# File 'lib/message_bus.rb', line 307 def (channel) if last_id = last_id(channel) = backlog(channel, last_id - 1) if [0] end end end |
#listening? ⇒ Boolean
336 337 338 |
# File 'lib/message_bus.rb', line 336 def listening? @subscriber_thread && @subscriber_thread.alive? end |
#local_subscribe(channel = nil, last_id = -1,, &blk) ⇒ Object
subscribe only on current site
271 272 273 274 |
# File 'lib/message_bus.rb', line 271 def local_subscribe(channel = nil, last_id = -1, &blk) site_id = site_id_lookup.call if site_id_lookup && ! global?(channel) subscribe_impl(channel, site_id, last_id, &blk) end |
#local_unsubscribe(channel = nil, &blk) ⇒ Object
280 281 282 283 |
# File 'lib/message_bus.rb', line 280 def local_unsubscribe(channel = nil, &blk) site_id = site_id_lookup.call if site_id_lookup unsubscribe_impl(channel, site_id, &blk) end |
#logger ⇒ Object
53 54 55 56 57 58 59 60 |
# File 'lib/message_bus.rb', line 53 def logger return @config[:logger] if @config[:logger] require 'logger' logger = Logger.new(STDOUT) logger.level = Logger::INFO configure(logger: logger) logger end |
#logger=(logger) ⇒ Object
49 50 51 |
# File 'lib/message_bus.rb', line 49 def logger=(logger) configure(logger: logger) end |
#long_polling_enabled=(val) ⇒ Object
74 75 76 |
# File 'lib/message_bus.rb', line 74 def long_polling_enabled=(val) configure(long_polling_enabled: val) end |
#long_polling_enabled? ⇒ Boolean
70 71 72 |
# File 'lib/message_bus.rb', line 70 def long_polling_enabled? @config[:long_polling_enabled] == false ? false : true end |
#long_polling_interval ⇒ Object
115 116 117 |
# File 'lib/message_bus.rb', line 115 def long_polling_interval @config[:long_polling_interval] || 25 * 1000 end |
#long_polling_interval=(millisecs) ⇒ Object
111 112 113 |
# File 'lib/message_bus.rb', line 111 def long_polling_interval=(millisecs) configure(long_polling_interval: millisecs) end |
#max_active_clients ⇒ Object
84 85 86 |
# File 'lib/message_bus.rb', line 84 def max_active_clients @config[:max_active_clients] || 1000 end |
#max_active_clients=(val) ⇒ Object
The number of simultanuous clients we can service
will revert to polling if we are out of slots
80 81 82 |
# File 'lib/message_bus.rb', line 80 def max_active_clients=(val) configure(max_active_clients: val) end |
#off ⇒ Object
119 120 121 |
# File 'lib/message_bus.rb', line 119 def off @off = true end |
#on ⇒ Object
123 124 125 |
# File 'lib/message_bus.rb', line 123 def on @off = false end |
#on_connect(&blk) ⇒ Object
168 169 170 171 |
# File 'lib/message_bus.rb', line 168 def on_connect(&blk) configure(on_connect: blk) if blk @config[:on_connect] end |
#on_disconnect(&blk) ⇒ Object
173 174 175 176 |
# File 'lib/message_bus.rb', line 173 def on_disconnect(&blk) configure(on_disconnect: blk) if blk @config[:on_disconnect] end |
#on_middleware_error(&blk) ⇒ Object
158 159 160 161 |
# File 'lib/message_bus.rb', line 158 def on_middleware_error(&blk) configure(on_middleware_error: blk) if blk @config[:on_middleware_error] end |
#publish(channel, data, opts = nil) ⇒ Object
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 |
# File 'lib/message_bus.rb', line 214 def publish(channel, data, opts = nil) return if @off @mutex.synchronize do raise ::MessageBus::BusDestroyed if @destroyed end user_ids = nil group_ids = nil client_ids = nil if opts user_ids = opts[:user_ids] group_ids = opts[:group_ids] client_ids = opts[:client_ids] end raise ::MessageBus::InvalidMessage if (user_ids || group_ids) && global?(channel) encoded_data = JSON.dump( data: data, user_ids: user_ids, group_ids: group_ids, client_ids: client_ids ) reliable_pub_sub.publish(encode_channel_name(channel), encoded_data) end |
#rack_hijack_enabled=(val) ⇒ Object
107 108 109 |
# File 'lib/message_bus.rb', line 107 def rack_hijack_enabled=(val) configure(rack_hijack_enabled: val) end |
#rack_hijack_enabled? ⇒ Boolean
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/message_bus.rb', line 88 def rack_hijack_enabled? if @config[:rack_hijack_enabled].nil? enable = true # without this switch passenger will explode # it will run out of connections after about 10 if defined? PhusionPassenger enable = false if PhusionPassenger.respond_to? :advertised_concurrency_level PhusionPassenger.advertised_concurrency_level = 0 enable = true end end configure(rack_hijack_enabled: enable) end @config[:rack_hijack_enabled] end |
#redis_config=(config) ⇒ Object
Allow us to inject a redis db
132 133 134 |
# File 'lib/message_bus.rb', line 132 def redis_config=(config) configure(config.merge(backend: :redis)) end |
#reliable_pub_sub ⇒ Object
195 196 197 198 199 200 201 202 203 204 |
# File 'lib/message_bus.rb', line 195 def reliable_pub_sub @mutex.synchronize do return nil if @destroyed @config[:reliable_pub_sub] ||= begin @config[:backend_options] ||= {} require "message_bus/backends/#{backend}" MessageBus::BACKENDS[backend].new @config end end end |
#reliable_pub_sub=(pub_sub) ⇒ Object
191 192 193 |
# File 'lib/message_bus.rb', line 191 def reliable_pub_sub=(pub_sub) configure(reliable_pub_sub: pub_sub) end |
#reset! ⇒ Object
will reset all keys
341 342 343 |
# File 'lib/message_bus.rb', line 341 def reset! reliable_pub_sub.reset! end |
#site_id_lookup(&blk) ⇒ Object
138 139 140 141 |
# File 'lib/message_bus.rb', line 138 def site_id_lookup(&blk) configure(site_id_lookup: blk) if blk @config[:site_id_lookup] end |
#subscribe(channel = nil, last_id = -1,, &blk) ⇒ Object
266 267 268 |
# File 'lib/message_bus.rb', line 266 def subscribe(channel = nil, last_id = -1, &blk) subscribe_impl(channel, nil, last_id, &blk) end |
#timer ⇒ Object
345 346 347 348 349 350 351 352 353 354 |
# File 'lib/message_bus.rb', line 345 def timer return @timer_thread if @timer_thread @timer_thread ||= begin t = MessageBus::TimerThread.new t.on_error do |e| logger.warn "Failed to process job: #{e} #{e.backtrace}" end t end end |
#unsubscribe(channel = nil, &blk) ⇒ Object
276 277 278 |
# File 'lib/message_bus.rb', line 276 def unsubscribe(channel = nil, &blk) unsubscribe_impl(channel, nil, &blk) end |
#user_id_lookup(&blk) ⇒ Object
143 144 145 146 |
# File 'lib/message_bus.rb', line 143 def user_id_lookup(&blk) configure(user_id_lookup: blk) if blk @config[:user_id_lookup] end |