Module: MessageBus::Implementation
- Included in:
- MessageBus, Instance
- Defined in:
- lib/message_bus.rb
Defined Under Namespace
Classes: Synchronizer
Constant Summary collapse
- ENCODE_SITE_TOKEN =
"$|$"
Instance Attribute Summary collapse
-
#config ⇒ Object
(also: #redis_config)
readonly
Configuration options hash.
Instance Method Summary collapse
- #after_fork ⇒ Object
- #allow_broadcast=(val) ⇒ Object
- #allow_broadcast? ⇒ Boolean
- #backend ⇒ Object
- #backlog(channel = nil, last_id = nil, site_id = nil) ⇒ Object
- #blocking_subscribe(channel = nil, &blk) ⇒ Object
- #cache_assets ⇒ Object
- #cache_assets=(val) ⇒ Object
- #chunked_encoding_enabled=(val) ⇒ Object
- #chunked_encoding_enabled? ⇒ Boolean
- #configure(config) ⇒ Object
- #decode_channel_name(channel) ⇒ Object
- #destroy ⇒ Object
- #enable_diagnostics ⇒ Object
-
#encode_channel_name(channel, site_id = nil) ⇒ Object
encode channel name to include site.
- #extra_response_headers_lookup(&blk) ⇒ Object
- #global_backlog(last_id = nil) ⇒ Object
- #group_ids_lookup(&blk) ⇒ Object
- #initialize ⇒ Object
- #is_admin_lookup(&blk) ⇒ Object
- #keepalive_interval ⇒ Object
-
#keepalive_interval=(interval) ⇒ Object
set to 0 to disable, anything higher and a keepalive will run every N seconds, if it fails process is killed.
- #last_id(channel, site_id = nil) ⇒ Object
- #last_message(channel) ⇒ Object
- #listening? ⇒ Boolean
-
#local_subscribe(channel = nil, last_id = -1,, &blk) ⇒ Object
subscribe only on current site.
- #local_unsubscribe(channel = nil, &blk) ⇒ Object
- #logger ⇒ Object
- #logger=(logger) ⇒ Object
- #long_polling_enabled=(val) ⇒ Object
- #long_polling_enabled? ⇒ Boolean
- #long_polling_interval ⇒ Object
- #long_polling_interval=(millisecs) ⇒ Object
- #max_active_clients ⇒ Object
-
#max_active_clients=(val) ⇒ Object
The number of simultanuous clients we can service will revert to polling if we are out of slots.
- #off ⇒ Object
- #on ⇒ Object
- #on_connect(&blk) ⇒ Object
- #on_disconnect(&blk) ⇒ Object
- #publish(channel, data, opts = nil) ⇒ Object
- #rack_hijack_enabled=(val) ⇒ Object
- #rack_hijack_enabled? ⇒ Boolean
-
#redis_config=(config) ⇒ Object
Allow us to inject a redis db.
- #reliable_pub_sub ⇒ Object
- #reliable_pub_sub=(pub_sub) ⇒ Object
-
#reset! ⇒ Object
will reset all keys.
- #site_id_lookup(&blk) ⇒ Object
- #subscribe(channel = nil, last_id = -1,, &blk) ⇒ Object
- #timer ⇒ Object
- #unsubscribe(channel = nil, &blk) ⇒ Object
- #user_id_lookup(&blk) ⇒ Object
Instance Attribute Details
#config ⇒ Object (readonly) Also known as: redis_config
Configuration options hash
24 25 26 |
# File 'lib/message_bus.rb', line 24 def config @config end |
Instance Method Details
#after_fork ⇒ Object
320 321 322 323 324 325 |
# File 'lib/message_bus.rb', line 320 def after_fork reliable_pub_sub.after_fork ensure_subscriber_thread # will ensure timer is running timer.queue{} end |
#allow_broadcast=(val) ⇒ Object
172 173 174 |
# File 'lib/message_bus.rb', line 172 def allow_broadcast=(val) configure(allow_broadcast: val) end |
#allow_broadcast? ⇒ Boolean
176 177 178 179 180 181 182 183 |
# File 'lib/message_bus.rb', line 176 def allow_broadcast? @config[:allow_broadcast] ||= if defined? ::Rails ::Rails.env.test? || ::Rails.env.development? else false end end |
#backend ⇒ Object
200 201 202 |
# File 'lib/message_bus.rb', line 200 def backend @config[:backend] || :redis end |
#backlog(channel = nil, last_id = nil, site_id = nil) ⇒ Object
283 284 285 286 287 288 289 290 291 292 293 294 295 |
# File 'lib/message_bus.rb', line 283 def backlog(channel=nil, last_id=nil, site_id=nil) old = if channel reliable_pub_sub.backlog(encode_channel_name(channel,site_id), last_id) else reliable_pub_sub.global_backlog(last_id) end old.each{ |m| (m) } old end |
#blocking_subscribe(channel = nil, &blk) ⇒ Object
236 237 238 239 240 241 242 |
# File 'lib/message_bus.rb', line 236 def blocking_subscribe(channel=nil, &blk) if channel reliable_pub_sub.subscribe(encode_channel_name(channel), &blk) else reliable_pub_sub.global_subscribe(&blk) end end |
#cache_assets ⇒ Object
40 41 42 43 44 45 46 |
# File 'lib/message_bus.rb', line 40 def cache_assets if defined? @config[:cache_assets] @config[:cache_assets] else true end end |
#cache_assets=(val) ⇒ Object
36 37 38 |
# File 'lib/message_bus.rb', line 36 def cache_assets=(val) configure(cache_assets: val) end |
#chunked_encoding_enabled=(val) ⇒ Object
65 66 67 |
# File 'lib/message_bus.rb', line 65 def chunked_encoding_enabled=(val) configure(chunked_encoding_enabled: val) end |
#chunked_encoding_enabled? ⇒ Boolean
61 62 63 |
# File 'lib/message_bus.rb', line 61 def chunked_encoding_enabled? @config[:chunked_encoding_enabled] == false ? false : true end |
#configure(config) ⇒ Object
126 127 128 |
# File 'lib/message_bus.rb', line 126 def configure(config) @config.merge!(config) end |
#decode_channel_name(channel) ⇒ Object
256 257 258 |
# File 'lib/message_bus.rb', line 256 def decode_channel_name(channel) channel.split(ENCODE_SITE_TOKEN) end |
#destroy ⇒ Object
310 311 312 313 314 315 316 317 318 |
# File 'lib/message_bus.rb', line 310 def destroy @mutex.synchronize do @subscriptions ||= {} reliable_pub_sub.global_unsubscribe @destroyed = true end @subscriber_thread.join if @subscriber_thread timer.stop end |
#enable_diagnostics ⇒ Object
204 205 206 |
# File 'lib/message_bus.rb', line 204 def enable_diagnostics MessageBus::Diagnostics.enable end |
#encode_channel_name(channel, site_id = nil) ⇒ Object
encode channel name to include site
247 248 249 250 251 252 253 254 |
# File 'lib/message_bus.rb', line 247 def encode_channel_name(channel, site_id=nil) if (site_id || site_id_lookup) && !global?(channel) raise ArgumentError.new channel if channel.include? ENCODE_SITE_TOKEN "#{channel}#{ENCODE_SITE_TOKEN}#{site_id || site_id_lookup.call}" else channel end end |
#extra_response_headers_lookup(&blk) ⇒ Object
157 158 159 160 |
# File 'lib/message_bus.rb', line 157 def extra_response_headers_lookup(&blk) configure(extra_response_headers_lookup: blk) if blk @config[:extra_response_headers_lookup] end |
#global_backlog(last_id = nil) ⇒ Object
279 280 281 |
# File 'lib/message_bus.rb', line 279 def global_backlog(last_id=nil) backlog(nil, last_id) end |
#group_ids_lookup(&blk) ⇒ Object
147 148 149 150 |
# File 'lib/message_bus.rb', line 147 def group_ids_lookup(&blk) configure(group_ids_lookup: blk) if blk @config[:group_ids_lookup] end |
#initialize ⇒ Object
31 32 33 34 |
# File 'lib/message_bus.rb', line 31 def initialize @config = {} @mutex = Synchronizer.new end |
#is_admin_lookup(&blk) ⇒ Object
152 153 154 155 |
# File 'lib/message_bus.rb', line 152 def is_admin_lookup(&blk) configure(is_admin_lookup: blk) if blk @config[:is_admin_lookup] end |
#keepalive_interval ⇒ Object
354 355 356 |
# File 'lib/message_bus.rb', line 354 def keepalive_interval @config[:keepalive_interval] || 60 end |
#keepalive_interval=(interval) ⇒ Object
set to 0 to disable, anything higher and a keepalive will run every N seconds, if it fails process is killed
350 351 352 |
# File 'lib/message_bus.rb', line 350 def keepalive_interval=(interval) configure(keepalive_interval: interval) end |
#last_id(channel, site_id = nil) ⇒ Object
297 298 299 |
# File 'lib/message_bus.rb', line 297 def last_id(channel,site_id=nil) reliable_pub_sub.last_id(encode_channel_name(channel,site_id)) end |
#last_message(channel) ⇒ Object
301 302 303 304 305 306 307 308 |
# File 'lib/message_bus.rb', line 301 def (channel) if last_id = last_id(channel) = backlog(channel, last_id-1) if [0] end end end |
#listening? ⇒ Boolean
327 328 329 |
# File 'lib/message_bus.rb', line 327 def listening? @subscriber_thread && @subscriber_thread.alive? end |
#local_subscribe(channel = nil, last_id = -1,, &blk) ⇒ Object
subscribe only on current site
265 266 267 268 |
# File 'lib/message_bus.rb', line 265 def local_subscribe(channel=nil, last_id=-1, &blk) site_id = site_id_lookup.call if site_id_lookup && ! global?(channel) subscribe_impl(channel, site_id, last_id, &blk) end |
#local_unsubscribe(channel = nil, &blk) ⇒ Object
274 275 276 277 |
# File 'lib/message_bus.rb', line 274 def local_unsubscribe(channel=nil, &blk) site_id = site_id_lookup.call if site_id_lookup unsubscribe_impl(channel, site_id, &blk) end |
#logger ⇒ Object
52 53 54 55 56 57 58 59 |
# File 'lib/message_bus.rb', line 52 def logger return @config[:logger] if @config[:logger] require 'logger' logger = Logger.new(STDOUT) logger.level = Logger::INFO configure(logger: logger) logger end |
#logger=(logger) ⇒ Object
48 49 50 |
# File 'lib/message_bus.rb', line 48 def logger=(logger) configure(logger: logger) end |
#long_polling_enabled=(val) ⇒ Object
73 74 75 |
# File 'lib/message_bus.rb', line 73 def long_polling_enabled=(val) configure(long_polling_enabled: val) end |
#long_polling_enabled? ⇒ Boolean
69 70 71 |
# File 'lib/message_bus.rb', line 69 def long_polling_enabled? @config[:long_polling_enabled] == false ? false : true end |
#long_polling_interval ⇒ Object
114 115 116 |
# File 'lib/message_bus.rb', line 114 def long_polling_interval @config[:long_polling_interval] || 25 * 1000 end |
#long_polling_interval=(millisecs) ⇒ Object
110 111 112 |
# File 'lib/message_bus.rb', line 110 def long_polling_interval=(millisecs) configure(long_polling_interval: millisecs) end |
#max_active_clients ⇒ Object
83 84 85 |
# File 'lib/message_bus.rb', line 83 def max_active_clients @config[:max_active_clients] || 1000 end |
#max_active_clients=(val) ⇒ Object
The number of simultanuous clients we can service
will revert to polling if we are out of slots
79 80 81 |
# File 'lib/message_bus.rb', line 79 def max_active_clients=(val) configure(max_active_clients: val) end |
#off ⇒ Object
118 119 120 |
# File 'lib/message_bus.rb', line 118 def off @off = true end |
#on ⇒ Object
122 123 124 |
# File 'lib/message_bus.rb', line 122 def on @off = false end |
#on_connect(&blk) ⇒ Object
162 163 164 165 |
# File 'lib/message_bus.rb', line 162 def on_connect(&blk) configure(on_connect: blk) if blk @config[:on_connect] end |
#on_disconnect(&blk) ⇒ Object
167 168 169 170 |
# File 'lib/message_bus.rb', line 167 def on_disconnect(&blk) configure(on_disconnect: blk) if blk @config[:on_disconnect] end |
#publish(channel, data, opts = nil) ⇒ Object
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
# File 'lib/message_bus.rb', line 208 def publish(channel, data, opts = nil) return if @off @mutex.synchronize do raise ::MessageBus::BusDestroyed if @destroyed end user_ids = nil group_ids = nil client_ids = nil if opts user_ids = opts[:user_ids] group_ids = opts[:group_ids] client_ids = opts[:client_ids] end raise ::MessageBus::InvalidMessage if (user_ids || group_ids) && global?(channel) encoded_data = JSON.dump({ data: data, user_ids: user_ids, group_ids: group_ids, client_ids: client_ids }) reliable_pub_sub.publish(encode_channel_name(channel), encoded_data) end |
#rack_hijack_enabled=(val) ⇒ Object
106 107 108 |
# File 'lib/message_bus.rb', line 106 def rack_hijack_enabled=(val) configure(rack_hijack_enabled: val) end |
#rack_hijack_enabled? ⇒ Boolean
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/message_bus.rb', line 87 def rack_hijack_enabled? if @config[:rack_hijack_enabled].nil? enable = true # without this switch passenger will explode # it will run out of connections after about 10 if defined? PhusionPassenger enable = false if PhusionPassenger.respond_to? :advertised_concurrency_level PhusionPassenger.advertised_concurrency_level = 0 enable = true end end configure(rack_hijack_enabled: enable) end @config[:rack_hijack_enabled] end |
#redis_config=(config) ⇒ Object
Allow us to inject a redis db
131 132 133 |
# File 'lib/message_bus.rb', line 131 def redis_config=(config) configure(config.merge(:backend=>:redis)) end |
#reliable_pub_sub ⇒ Object
189 190 191 192 193 194 195 196 197 198 |
# File 'lib/message_bus.rb', line 189 def reliable_pub_sub @mutex.synchronize do return nil if @destroyed @config[:reliable_pub_sub] ||= begin @config[:backend_options] ||= {} require "message_bus/backends/#{backend}" MessageBus::BACKENDS[backend].new @config end end end |
#reliable_pub_sub=(pub_sub) ⇒ Object
185 186 187 |
# File 'lib/message_bus.rb', line 185 def reliable_pub_sub=(pub_sub) configure(reliable_pub_sub: pub_sub) end |
#reset! ⇒ Object
will reset all keys
332 333 334 |
# File 'lib/message_bus.rb', line 332 def reset! reliable_pub_sub.reset! end |
#site_id_lookup(&blk) ⇒ Object
137 138 139 140 |
# File 'lib/message_bus.rb', line 137 def site_id_lookup(&blk) configure(site_id_lookup: blk) if blk @config[:site_id_lookup] end |
#subscribe(channel = nil, last_id = -1,, &blk) ⇒ Object
260 261 262 |
# File 'lib/message_bus.rb', line 260 def subscribe(channel=nil, last_id=-1, &blk) subscribe_impl(channel, nil, last_id, &blk) end |
#timer ⇒ Object
336 337 338 339 340 341 342 343 344 345 |
# File 'lib/message_bus.rb', line 336 def timer return @timer_thread if @timer_thread @timer_thread ||= begin t = MessageBus::TimerThread.new t.on_error do |e| logger.warn "Failed to process job: #{e} #{e.backtrace}" end t end end |
#unsubscribe(channel = nil, &blk) ⇒ Object
270 271 272 |
# File 'lib/message_bus.rb', line 270 def unsubscribe(channel=nil, &blk) unsubscribe_impl(channel, nil, &blk) end |
#user_id_lookup(&blk) ⇒ Object
142 143 144 145 |
# File 'lib/message_bus.rb', line 142 def user_id_lookup(&blk) configure(user_id_lookup: blk) if blk @config[:user_id_lookup] end |