Module: MessageBus::Implementation
- Included in:
- MessageBus, Instance
- Defined in:
- lib/message_bus.rb
Defined Under Namespace
Classes: Synchronizer
Constant Summary collapse
- ENCODE_SITE_TOKEN =
"$|$"
Instance Method Summary collapse
- #access_control_allow_origin ⇒ Object
- #access_control_allow_origin=(origin) ⇒ Object
- #after_fork ⇒ Object
- #allow_broadcast=(val) ⇒ Object
- #allow_broadcast? ⇒ Boolean
- #around_client_batch(channel, &blk) ⇒ Object
- #backlog(channel = nil, last_id) ⇒ Object
- #blocking_subscribe(channel = nil, &blk) ⇒ Object
- #cache_assets ⇒ Object
- #cache_assets=(val) ⇒ Object
- #client_filter(channel, &blk) ⇒ Object
- #decode_channel_name(channel) ⇒ Object
- #destroy ⇒ Object
- #enable_diagnostics ⇒ Object
-
#encode_channel_name(channel) ⇒ Object
encode channel name to include site.
- #group_ids_lookup(&blk) ⇒ Object
- #initialize ⇒ Object
- #is_admin_lookup(&blk) ⇒ Object
- #last_id(channel) ⇒ Object
- #last_message(channel) ⇒ Object
- #listening? ⇒ Boolean
-
#local_subscribe(channel = nil, &blk) ⇒ Object
subscribe only on current site.
- #local_unsubscribe(channel = nil, &blk) ⇒ Object
- #logger ⇒ Object
- #logger=(logger) ⇒ Object
- #long_polling_enabled=(val) ⇒ Object
- #long_polling_enabled? ⇒ Boolean
- #long_polling_interval ⇒ Object
- #long_polling_interval=(millisecs) ⇒ Object
- #max_active_clients ⇒ Object
-
#max_active_clients=(val) ⇒ Object
The number of simultanuous clients we can service will revert to polling if we are out of slots.
- #off ⇒ Object
- #on ⇒ Object
- #on_connect(&blk) ⇒ Object
- #on_disconnect(&blk) ⇒ Object
- #publish(channel, data, opts = nil) ⇒ Object
- #rack_hijack_enabled=(val) ⇒ Object
- #rack_hijack_enabled? ⇒ Boolean
- #redis_config ⇒ Object
-
#redis_config=(config) ⇒ Object
Allow us to inject a redis db.
- #reliable_pub_sub ⇒ Object
-
#reset! ⇒ Object
will reset all keys.
- #site_id_lookup(&blk) ⇒ Object
- #subscribe(channel = nil, &blk) ⇒ Object
- #unsubscribe(channel = nil, &blk) ⇒ Object
- #user_id_lookup(&blk) ⇒ Object
Instance Method Details
#access_control_allow_origin ⇒ Object
148 149 150 |
# File 'lib/message_bus.rb', line 148 def access_control_allow_origin @access_control_allow_origin end |
#access_control_allow_origin=(origin) ⇒ Object
144 145 146 |
# File 'lib/message_bus.rb', line 144 def access_control_allow_origin=(origin) @access_control_allow_origin = origin end |
#after_fork ⇒ Object
301 302 303 304 |
# File 'lib/message_bus.rb', line 301 def after_fork reliable_pub_sub.after_fork ensure_subscriber_thread end |
#allow_broadcast=(val) ⇒ Object
174 175 176 |
# File 'lib/message_bus.rb', line 174 def allow_broadcast=(val) @allow_broadcast = val end |
#allow_broadcast? ⇒ Boolean
178 179 180 181 182 183 184 185 |
# File 'lib/message_bus.rb', line 178 def allow_broadcast? @allow_broadcast ||= if defined? ::Rails ::Rails.env.test? || ::Rails.env.development? else false end end |
#around_client_batch(channel, &blk) ⇒ Object
158 159 160 161 162 |
# File 'lib/message_bus.rb', line 158 def around_client_batch(channel, &blk) @around_client_batches ||= {} @around_client_batches[channel] = blk if blk @around_client_batches[channel] end |
#backlog(channel = nil, last_id) ⇒ Object
265 266 267 268 269 270 271 272 273 274 275 276 277 |
# File 'lib/message_bus.rb', line 265 def backlog(channel=nil, last_id) old = if channel reliable_pub_sub.backlog(encode_channel_name(channel), last_id) else reliable_pub_sub.global_backlog(last_id) end old.each{ |m| (m) } old end |
#blocking_subscribe(channel = nil, &blk) ⇒ Object
222 223 224 225 226 227 228 |
# File 'lib/message_bus.rb', line 222 def blocking_subscribe(channel=nil, &blk) if channel reliable_pub_sub.subscribe(encode_channel_name(channel), &blk) else reliable_pub_sub.global_subscribe(&blk) end end |
#cache_assets ⇒ Object
41 42 43 44 45 46 47 |
# File 'lib/message_bus.rb', line 41 def cache_assets if defined? @cache_assets @cache_assets else true end end |
#cache_assets=(val) ⇒ Object
37 38 39 |
# File 'lib/message_bus.rb', line 37 def cache_assets=(val) @cache_assets = val end |
#client_filter(channel, &blk) ⇒ Object
152 153 154 155 156 |
# File 'lib/message_bus.rb', line 152 def client_filter(channel, &blk) @client_filters ||= {} @client_filters[channel] = blk if blk @client_filters[channel] end |
#decode_channel_name(channel) ⇒ Object
242 243 244 |
# File 'lib/message_bus.rb', line 242 def decode_channel_name(channel) channel.split(ENCODE_SITE_TOKEN) end |
#destroy ⇒ Object
292 293 294 295 296 297 298 299 |
# File 'lib/message_bus.rb', line 292 def destroy @mutex.synchronize do @subscriptions ||= {} reliable_pub_sub.global_unsubscribe @destroyed = true end @subscriber_thread.join if @subscriber_thread end |
#enable_diagnostics ⇒ Object
194 195 196 |
# File 'lib/message_bus.rb', line 194 def enable_diagnostics MessageBus::Diagnostics.enable end |
#encode_channel_name(channel) ⇒ Object
encode channel name to include site
233 234 235 236 237 238 239 240 |
# File 'lib/message_bus.rb', line 233 def encode_channel_name(channel) if site_id_lookup && !global?(channel) raise ArgumentError.new channel if channel.include? ENCODE_SITE_TOKEN "#{channel}#{ENCODE_SITE_TOKEN}#{site_id_lookup.call}" else channel end end |
#group_ids_lookup(&blk) ⇒ Object
134 135 136 137 |
# File 'lib/message_bus.rb', line 134 def group_ids_lookup(&blk) @group_ids_lookup = blk if blk @group_ids_lookup end |
#initialize ⇒ Object
33 34 35 |
# File 'lib/message_bus.rb', line 33 def initialize @mutex = Synchronizer.new end |
#is_admin_lookup(&blk) ⇒ Object
139 140 141 142 |
# File 'lib/message_bus.rb', line 139 def is_admin_lookup(&blk) @is_admin_lookup = blk if blk @is_admin_lookup end |
#last_id(channel) ⇒ Object
279 280 281 |
# File 'lib/message_bus.rb', line 279 def last_id(channel) reliable_pub_sub.last_id(encode_channel_name(channel)) end |
#last_message(channel) ⇒ Object
283 284 285 286 287 288 289 290 |
# File 'lib/message_bus.rb', line 283 def (channel) if last_id = last_id(channel) = backlog(channel, last_id-1) if [0] end end end |
#listening? ⇒ Boolean
306 307 308 |
# File 'lib/message_bus.rb', line 306 def listening? @subscriber_thread && @subscriber_thread.alive? end |
#local_subscribe(channel = nil, &blk) ⇒ Object
subscribe only on current site
260 261 262 263 |
# File 'lib/message_bus.rb', line 260 def local_subscribe(channel=nil, &blk) site_id = site_id_lookup.call if site_id_lookup && ! global?(channel) subscribe_impl(channel, site_id, &blk) end |
#local_unsubscribe(channel = nil, &blk) ⇒ Object
254 255 256 257 |
# File 'lib/message_bus.rb', line 254 def local_unsubscribe(channel=nil, &blk) site_id = site_id_lookup.call if site_id_lookup unsubscribe_impl(channel, site_id, &blk) end |
#logger ⇒ Object
53 54 55 56 57 |
# File 'lib/message_bus.rb', line 53 def logger return @logger if @logger require 'logger' @logger = Logger.new(STDOUT) end |
#logger=(logger) ⇒ Object
49 50 51 |
# File 'lib/message_bus.rb', line 49 def logger=(logger) @logger = logger end |
#long_polling_enabled=(val) ⇒ Object
63 64 65 |
# File 'lib/message_bus.rb', line 63 def long_polling_enabled=(val) @long_polling_enabled = val end |
#long_polling_enabled? ⇒ Boolean
59 60 61 |
# File 'lib/message_bus.rb', line 59 def long_polling_enabled? @long_polling_enabled == false ? false : true end |
#long_polling_interval ⇒ Object
103 104 105 |
# File 'lib/message_bus.rb', line 103 def long_polling_interval @long_polling_interval || 25 * 1000 end |
#long_polling_interval=(millisecs) ⇒ Object
99 100 101 |
# File 'lib/message_bus.rb', line 99 def long_polling_interval=(millisecs) @long_polling_interval = millisecs end |
#max_active_clients ⇒ Object
73 74 75 |
# File 'lib/message_bus.rb', line 73 def max_active_clients @max_active_clients || 1000 end |
#max_active_clients=(val) ⇒ Object
The number of simultanuous clients we can service
will revert to polling if we are out of slots
69 70 71 |
# File 'lib/message_bus.rb', line 69 def max_active_clients=(val) @max_active_clients = val end |
#off ⇒ Object
107 108 109 |
# File 'lib/message_bus.rb', line 107 def off @off = true end |
#on ⇒ Object
111 112 113 |
# File 'lib/message_bus.rb', line 111 def on @off = false end |
#on_connect(&blk) ⇒ Object
164 165 166 167 |
# File 'lib/message_bus.rb', line 164 def on_connect(&blk) @on_connect = blk if blk @on_connect end |
#on_disconnect(&blk) ⇒ Object
169 170 171 172 |
# File 'lib/message_bus.rb', line 169 def on_disconnect(&blk) @on_disconnect = blk if blk @on_disconnect end |
#publish(channel, data, opts = nil) ⇒ Object
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 |
# File 'lib/message_bus.rb', line 198 def publish(channel, data, opts = nil) return if @off @mutex.synchronize do raise ::MessageBus::BusDestroyed if @destroyed end user_ids = nil group_ids = nil if opts user_ids = opts[:user_ids] group_ids = opts[:group_ids] end raise ::MessageBus::InvalidMessage if (user_ids || group_ids) && global?(channel) encoded_data = JSON.dump({ data: data, user_ids: user_ids, group_ids: group_ids }) reliable_pub_sub.publish(encode_channel_name(channel), encoded_data) end |
#rack_hijack_enabled=(val) ⇒ Object
95 96 97 |
# File 'lib/message_bus.rb', line 95 def rack_hijack_enabled=(val) @rack_hijack_enabled = val end |
#rack_hijack_enabled? ⇒ Boolean
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/message_bus.rb', line 77 def rack_hijack_enabled? if @rack_hijack_enabled.nil? @rack_hijack_enabled = true # without this switch passenger will explode # it will run out of connections after about 10 if defined? PhusionPassenger @rack_hijack_enabled = false if PhusionPassenger.respond_to? :advertised_concurrency_level PhusionPassenger.advertised_concurrency_level = 0 @rack_hijack_enabled = true end end end @rack_hijack_enabled end |
#redis_config ⇒ Object
120 121 122 |
# File 'lib/message_bus.rb', line 120 def redis_config @redis_config ||= {} end |
#redis_config=(config) ⇒ Object
Allow us to inject a redis db
116 117 118 |
# File 'lib/message_bus.rb', line 116 def redis_config=(config) @redis_config = config end |
#reliable_pub_sub ⇒ Object
187 188 189 190 191 192 |
# File 'lib/message_bus.rb', line 187 def reliable_pub_sub @mutex.synchronize do return nil if @destroyed @reliable_pub_sub ||= MessageBus::ReliablePubSub.new redis_config end end |
#reset! ⇒ Object
will reset all keys
311 312 313 |
# File 'lib/message_bus.rb', line 311 def reset! reliable_pub_sub.reset! end |
#site_id_lookup(&blk) ⇒ Object
124 125 126 127 |
# File 'lib/message_bus.rb', line 124 def site_id_lookup(&blk) @site_id_lookup = blk if blk @site_id_lookup end |
#subscribe(channel = nil, &blk) ⇒ Object
246 247 248 |
# File 'lib/message_bus.rb', line 246 def subscribe(channel=nil, &blk) subscribe_impl(channel, nil, &blk) end |
#unsubscribe(channel = nil, &blk) ⇒ Object
250 251 252 |
# File 'lib/message_bus.rb', line 250 def unsubscribe(channel=nil, &blk) unsubscribe_impl(channel, nil, &blk) end |
#user_id_lookup(&blk) ⇒ Object
129 130 131 132 |
# File 'lib/message_bus.rb', line 129 def user_id_lookup(&blk) @user_id_lookup = blk if blk @user_id_lookup end |