Module: MessageBus::Implementation
- Included in:
- MessageBus, Instance
- Defined in:
- lib/message_bus.rb
Defined Under Namespace
Classes: Synchronizer
Constant Summary collapse
- ENCODE_SITE_TOKEN =
"$|$"
Instance Method Summary collapse
- #after_fork ⇒ Object
- #allow_broadcast=(val) ⇒ Object
- #allow_broadcast? ⇒ Boolean
- #around_client_batch(channel, &blk) ⇒ Object
- #backlog(channel = nil, last_id) ⇒ Object
- #blocking_subscribe(channel = nil, &blk) ⇒ Object
- #cache_assets ⇒ Object
- #cache_assets=(val) ⇒ Object
- #client_filter(channel, &blk) ⇒ Object
- #decode_channel_name(channel) ⇒ Object
- #destroy ⇒ Object
- #enable_diagnostics ⇒ Object
-
#encode_channel_name(channel) ⇒ Object
encode channel name to include site.
- #extra_response_headers_lookup(&blk) ⇒ Object
- #group_ids_lookup(&blk) ⇒ Object
- #initialize ⇒ Object
- #is_admin_lookup(&blk) ⇒ Object
- #last_id(channel) ⇒ Object
- #last_message(channel) ⇒ Object
- #listening? ⇒ Boolean
-
#local_subscribe(channel = nil, &blk) ⇒ Object
subscribe only on current site.
- #local_unsubscribe(channel = nil, &blk) ⇒ Object
- #logger ⇒ Object
- #logger=(logger) ⇒ Object
- #long_polling_enabled=(val) ⇒ Object
- #long_polling_enabled? ⇒ Boolean
- #long_polling_interval ⇒ Object
- #long_polling_interval=(millisecs) ⇒ Object
- #max_active_clients ⇒ Object
-
#max_active_clients=(val) ⇒ Object
The number of simultanuous clients we can service will revert to polling if we are out of slots.
- #off ⇒ Object
- #on ⇒ Object
- #on_connect(&blk) ⇒ Object
- #on_disconnect(&blk) ⇒ Object
- #publish(channel, data, opts = nil) ⇒ Object
- #rack_hijack_enabled=(val) ⇒ Object
- #rack_hijack_enabled? ⇒ Boolean
- #redis_config ⇒ Object
-
#redis_config=(config) ⇒ Object
Allow us to inject a redis db.
- #reliable_pub_sub ⇒ Object
- #reliable_pub_sub=(pub_sub) ⇒ Object
-
#reset! ⇒ Object
will reset all keys.
- #site_id_lookup(&blk) ⇒ Object
- #subscribe(channel = nil, &blk) ⇒ Object
- #timer ⇒ Object
- #unsubscribe(channel = nil, &blk) ⇒ Object
- #user_id_lookup(&blk) ⇒ Object
Instance Method Details
#after_fork ⇒ Object
304 305 306 307 308 309 |
# File 'lib/message_bus.rb', line 304 def after_fork reliable_pub_sub.after_fork ensure_subscriber_thread # will ensure timer is running timer.queue{} end |
#allow_broadcast=(val) ⇒ Object
168 169 170 |
# File 'lib/message_bus.rb', line 168 def allow_broadcast=(val) @allow_broadcast = val end |
#allow_broadcast? ⇒ Boolean
172 173 174 175 176 177 178 179 |
# File 'lib/message_bus.rb', line 172 def allow_broadcast? @allow_broadcast ||= if defined? ::Rails ::Rails.env.test? || ::Rails.env.development? else false end end |
#around_client_batch(channel, &blk) ⇒ Object
152 153 154 155 156 |
# File 'lib/message_bus.rb', line 152 def around_client_batch(channel, &blk) @around_client_batches ||= {} @around_client_batches[channel] = blk if blk @around_client_batches[channel] end |
#backlog(channel = nil, last_id) ⇒ Object
267 268 269 270 271 272 273 274 275 276 277 278 279 |
# File 'lib/message_bus.rb', line 267 def backlog(channel=nil, last_id) old = if channel reliable_pub_sub.backlog(encode_channel_name(channel), last_id) else reliable_pub_sub.global_backlog(last_id) end old.each{ |m| (m) } old end |
#blocking_subscribe(channel = nil, &blk) ⇒ Object
224 225 226 227 228 229 230 |
# File 'lib/message_bus.rb', line 224 def blocking_subscribe(channel=nil, &blk) if channel reliable_pub_sub.subscribe(encode_channel_name(channel), &blk) else reliable_pub_sub.global_subscribe(&blk) end end |
#cache_assets ⇒ Object
38 39 40 41 42 43 44 |
# File 'lib/message_bus.rb', line 38 def cache_assets if defined? @cache_assets @cache_assets else true end end |
#cache_assets=(val) ⇒ Object
34 35 36 |
# File 'lib/message_bus.rb', line 34 def cache_assets=(val) @cache_assets = val end |
#client_filter(channel, &blk) ⇒ Object
146 147 148 149 150 |
# File 'lib/message_bus.rb', line 146 def client_filter(channel, &blk) @client_filters ||= {} @client_filters[channel] = blk if blk @client_filters[channel] end |
#decode_channel_name(channel) ⇒ Object
244 245 246 |
# File 'lib/message_bus.rb', line 244 def decode_channel_name(channel) channel.split(ENCODE_SITE_TOKEN) end |
#destroy ⇒ Object
294 295 296 297 298 299 300 301 302 |
# File 'lib/message_bus.rb', line 294 def destroy @mutex.synchronize do @subscriptions ||= {} reliable_pub_sub.global_unsubscribe @destroyed = true end @subscriber_thread.join if @subscriber_thread timer.stop end |
#enable_diagnostics ⇒ Object
192 193 194 |
# File 'lib/message_bus.rb', line 192 def enable_diagnostics MessageBus::Diagnostics.enable end |
#encode_channel_name(channel) ⇒ Object
encode channel name to include site
235 236 237 238 239 240 241 242 |
# File 'lib/message_bus.rb', line 235 def encode_channel_name(channel) if site_id_lookup && !global?(channel) raise ArgumentError.new channel if channel.include? ENCODE_SITE_TOKEN "#{channel}#{ENCODE_SITE_TOKEN}#{site_id_lookup.call}" else channel end end |
#extra_response_headers_lookup(&blk) ⇒ Object
141 142 143 144 |
# File 'lib/message_bus.rb', line 141 def extra_response_headers_lookup(&blk) @extra_response_headers_lookup = blk if blk @extra_response_headers_lookup end |
#group_ids_lookup(&blk) ⇒ Object
131 132 133 134 |
# File 'lib/message_bus.rb', line 131 def group_ids_lookup(&blk) @group_ids_lookup = blk if blk @group_ids_lookup end |
#initialize ⇒ Object
30 31 32 |
# File 'lib/message_bus.rb', line 30 def initialize @mutex = Synchronizer.new end |
#is_admin_lookup(&blk) ⇒ Object
136 137 138 139 |
# File 'lib/message_bus.rb', line 136 def is_admin_lookup(&blk) @is_admin_lookup = blk if blk @is_admin_lookup end |
#last_id(channel) ⇒ Object
281 282 283 |
# File 'lib/message_bus.rb', line 281 def last_id(channel) reliable_pub_sub.last_id(encode_channel_name(channel)) end |
#last_message(channel) ⇒ Object
285 286 287 288 289 290 291 292 |
# File 'lib/message_bus.rb', line 285 def (channel) if last_id = last_id(channel) = backlog(channel, last_id-1) if [0] end end end |
#listening? ⇒ Boolean
311 312 313 |
# File 'lib/message_bus.rb', line 311 def listening? @subscriber_thread && @subscriber_thread.alive? end |
#local_subscribe(channel = nil, &blk) ⇒ Object
subscribe only on current site
262 263 264 265 |
# File 'lib/message_bus.rb', line 262 def local_subscribe(channel=nil, &blk) site_id = site_id_lookup.call if site_id_lookup && ! global?(channel) subscribe_impl(channel, site_id, &blk) end |
#local_unsubscribe(channel = nil, &blk) ⇒ Object
256 257 258 259 |
# File 'lib/message_bus.rb', line 256 def local_unsubscribe(channel=nil, &blk) site_id = site_id_lookup.call if site_id_lookup unsubscribe_impl(channel, site_id, &blk) end |
#logger ⇒ Object
50 51 52 53 54 |
# File 'lib/message_bus.rb', line 50 def logger return @logger if @logger require 'logger' @logger = Logger.new(STDOUT) end |
#logger=(logger) ⇒ Object
46 47 48 |
# File 'lib/message_bus.rb', line 46 def logger=(logger) @logger = logger end |
#long_polling_enabled=(val) ⇒ Object
60 61 62 |
# File 'lib/message_bus.rb', line 60 def long_polling_enabled=(val) @long_polling_enabled = val end |
#long_polling_enabled? ⇒ Boolean
56 57 58 |
# File 'lib/message_bus.rb', line 56 def long_polling_enabled? @long_polling_enabled == false ? false : true end |
#long_polling_interval ⇒ Object
100 101 102 |
# File 'lib/message_bus.rb', line 100 def long_polling_interval @long_polling_interval || 25 * 1000 end |
#long_polling_interval=(millisecs) ⇒ Object
96 97 98 |
# File 'lib/message_bus.rb', line 96 def long_polling_interval=(millisecs) @long_polling_interval = millisecs end |
#max_active_clients ⇒ Object
70 71 72 |
# File 'lib/message_bus.rb', line 70 def max_active_clients @max_active_clients || 1000 end |
#max_active_clients=(val) ⇒ Object
The number of simultanuous clients we can service
will revert to polling if we are out of slots
66 67 68 |
# File 'lib/message_bus.rb', line 66 def max_active_clients=(val) @max_active_clients = val end |
#off ⇒ Object
104 105 106 |
# File 'lib/message_bus.rb', line 104 def off @off = true end |
#on ⇒ Object
108 109 110 |
# File 'lib/message_bus.rb', line 108 def on @off = false end |
#on_connect(&blk) ⇒ Object
158 159 160 161 |
# File 'lib/message_bus.rb', line 158 def on_connect(&blk) @on_connect = blk if blk @on_connect end |
#on_disconnect(&blk) ⇒ Object
163 164 165 166 |
# File 'lib/message_bus.rb', line 163 def on_disconnect(&blk) @on_disconnect = blk if blk @on_disconnect end |
#publish(channel, data, opts = nil) ⇒ Object
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 |
# File 'lib/message_bus.rb', line 196 def publish(channel, data, opts = nil) return if @off @mutex.synchronize do raise ::MessageBus::BusDestroyed if @destroyed end user_ids = nil group_ids = nil client_ids = nil if opts user_ids = opts[:user_ids] group_ids = opts[:group_ids] client_ids = opts[:client_ids] end raise ::MessageBus::InvalidMessage if (user_ids || group_ids) && global?(channel) encoded_data = JSON.dump({ data: data, user_ids: user_ids, group_ids: group_ids, client_ids: client_ids }) reliable_pub_sub.publish(encode_channel_name(channel), encoded_data) end |
#rack_hijack_enabled=(val) ⇒ Object
92 93 94 |
# File 'lib/message_bus.rb', line 92 def rack_hijack_enabled=(val) @rack_hijack_enabled = val end |
#rack_hijack_enabled? ⇒ Boolean
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
# File 'lib/message_bus.rb', line 74 def rack_hijack_enabled? if @rack_hijack_enabled.nil? @rack_hijack_enabled = true # without this switch passenger will explode # it will run out of connections after about 10 if defined? PhusionPassenger @rack_hijack_enabled = false if PhusionPassenger.respond_to? :advertised_concurrency_level PhusionPassenger.advertised_concurrency_level = 0 @rack_hijack_enabled = true end end end @rack_hijack_enabled end |
#redis_config ⇒ Object
117 118 119 |
# File 'lib/message_bus.rb', line 117 def redis_config @redis_config ||= {} end |
#redis_config=(config) ⇒ Object
Allow us to inject a redis db
113 114 115 |
# File 'lib/message_bus.rb', line 113 def redis_config=(config) @redis_config = config end |
#reliable_pub_sub ⇒ Object
185 186 187 188 189 190 |
# File 'lib/message_bus.rb', line 185 def reliable_pub_sub @mutex.synchronize do return nil if @destroyed @reliable_pub_sub ||= MessageBus::Redis::ReliablePubSub.new redis_config end end |
#reliable_pub_sub=(pub_sub) ⇒ Object
181 182 183 |
# File 'lib/message_bus.rb', line 181 def reliable_pub_sub=(pub_sub) @reliable_pub_sub = pub_sub end |
#reset! ⇒ Object
will reset all keys
316 317 318 |
# File 'lib/message_bus.rb', line 316 def reset! reliable_pub_sub.reset! end |
#site_id_lookup(&blk) ⇒ Object
121 122 123 124 |
# File 'lib/message_bus.rb', line 121 def site_id_lookup(&blk) @site_id_lookup = blk if blk @site_id_lookup end |
#subscribe(channel = nil, &blk) ⇒ Object
248 249 250 |
# File 'lib/message_bus.rb', line 248 def subscribe(channel=nil, &blk) subscribe_impl(channel, nil, &blk) end |
#timer ⇒ Object
320 321 322 323 324 325 326 327 328 329 |
# File 'lib/message_bus.rb', line 320 def timer return @timer_thread if @timer_thread @timer_thread ||= begin t = MessageBus::TimerThread.new t.on_error do |e| logger.warn "Failed to process job: #{e} #{e.backtrace}" end t end end |
#unsubscribe(channel = nil, &blk) ⇒ Object
252 253 254 |
# File 'lib/message_bus.rb', line 252 def unsubscribe(channel=nil, &blk) unsubscribe_impl(channel, nil, &blk) end |
#user_id_lookup(&blk) ⇒ Object
126 127 128 129 |
# File 'lib/message_bus.rb', line 126 def user_id_lookup(&blk) @user_id_lookup = blk if blk @user_id_lookup end |