Module: MessageBus::Implementation
- Included in:
- MessageBus, Instance
- Defined in:
- lib/message_bus.rb
Defined Under Namespace
Classes: Synchronizer
Constant Summary collapse
- ENCODE_SITE_TOKEN =
"$|$"
Instance Method Summary collapse
- #after_fork ⇒ Object
- #allow_broadcast=(val) ⇒ Object
- #allow_broadcast? ⇒ Boolean
- #around_client_batch(channel, &blk) ⇒ Object
- #backlog(channel = nil, last_id) ⇒ Object
- #blocking_subscribe(channel = nil, &blk) ⇒ Object
- #cache_assets ⇒ Object
- #cache_assets=(val) ⇒ Object
- #client_filter(channel, &blk) ⇒ Object
- #decode_channel_name(channel) ⇒ Object
- #destroy ⇒ Object
- #enable_diagnostics ⇒ Object
-
#encode_channel_name(channel) ⇒ Object
encode channel name to include site.
- #group_ids_lookup(&blk) ⇒ Object
- #initialize ⇒ Object
- #is_admin_lookup(&blk) ⇒ Object
- #last_id(channel) ⇒ Object
- #last_message(channel) ⇒ Object
- #listening? ⇒ Boolean
-
#local_subscribe(channel = nil, &blk) ⇒ Object
subscribe only on current site.
- #local_unsubscribe(channel = nil, &blk) ⇒ Object
- #logger ⇒ Object
- #logger=(logger) ⇒ Object
- #long_polling_enabled=(val) ⇒ Object
- #long_polling_enabled? ⇒ Boolean
- #long_polling_interval ⇒ Object
- #long_polling_interval=(millisecs) ⇒ Object
- #max_active_clients ⇒ Object
-
#max_active_clients=(val) ⇒ Object
The number of simultanuous clients we can service will revert to polling if we are out of slots.
- #off ⇒ Object
- #on ⇒ Object
- #on_connect(&blk) ⇒ Object
- #on_disconnect(&blk) ⇒ Object
- #publish(channel, data, opts = nil) ⇒ Object
- #rack_hijack_enabled=(val) ⇒ Object
- #rack_hijack_enabled? ⇒ Boolean
- #redis_config ⇒ Object
-
#redis_config=(config) ⇒ Object
Allow us to inject a redis db.
- #reliable_pub_sub ⇒ Object
-
#reset! ⇒ Object
will reset all keys.
- #site_id_lookup(&blk) ⇒ Object
- #subscribe(channel = nil, &blk) ⇒ Object
- #unsubscribe(channel = nil, &blk) ⇒ Object
- #user_id_lookup(&blk) ⇒ Object
Instance Method Details
#after_fork ⇒ Object
293 294 295 296 |
# File 'lib/message_bus.rb', line 293 def after_fork reliable_pub_sub.after_fork ensure_subscriber_thread end |
#allow_broadcast=(val) ⇒ Object
166 167 168 |
# File 'lib/message_bus.rb', line 166 def allow_broadcast=(val) @allow_broadcast = val end |
#allow_broadcast? ⇒ Boolean
170 171 172 173 174 175 176 177 |
# File 'lib/message_bus.rb', line 170 def allow_broadcast? @allow_broadcast ||= if defined? ::Rails ::Rails.env.test? || ::Rails.env.development? else false end end |
#around_client_batch(channel, &blk) ⇒ Object
150 151 152 153 154 |
# File 'lib/message_bus.rb', line 150 def around_client_batch(channel, &blk) @around_client_batches ||= {} @around_client_batches[channel] = blk if blk @around_client_batches[channel] end |
#backlog(channel = nil, last_id) ⇒ Object
257 258 259 260 261 262 263 264 265 266 267 268 269 |
# File 'lib/message_bus.rb', line 257 def backlog(channel=nil, last_id) old = if channel reliable_pub_sub.backlog(encode_channel_name(channel), last_id) else reliable_pub_sub.global_backlog(last_id) end old.each{ |m| (m) } old end |
#blocking_subscribe(channel = nil, &blk) ⇒ Object
214 215 216 217 218 219 220 |
# File 'lib/message_bus.rb', line 214 def blocking_subscribe(channel=nil, &blk) if channel reliable_pub_sub.subscribe(encode_channel_name(channel), &blk) else reliable_pub_sub.global_subscribe(&blk) end end |
#cache_assets ⇒ Object
41 42 43 44 45 46 47 |
# File 'lib/message_bus.rb', line 41 def cache_assets if defined? @cache_assets @cache_assets else true end end |
#cache_assets=(val) ⇒ Object
37 38 39 |
# File 'lib/message_bus.rb', line 37 def cache_assets=(val) @cache_assets = val end |
#client_filter(channel, &blk) ⇒ Object
144 145 146 147 148 |
# File 'lib/message_bus.rb', line 144 def client_filter(channel, &blk) @client_filters ||= {} @client_filters[channel] = blk if blk @client_filters[channel] end |
#decode_channel_name(channel) ⇒ Object
234 235 236 |
# File 'lib/message_bus.rb', line 234 def decode_channel_name(channel) channel.split(ENCODE_SITE_TOKEN) end |
#destroy ⇒ Object
284 285 286 287 288 289 290 291 |
# File 'lib/message_bus.rb', line 284 def destroy @mutex.synchronize do @subscriptions ||= {} reliable_pub_sub.global_unsubscribe @destroyed = true end @subscriber_thread.join if @subscriber_thread end |
#enable_diagnostics ⇒ Object
186 187 188 |
# File 'lib/message_bus.rb', line 186 def enable_diagnostics MessageBus::Diagnostics.enable end |
#encode_channel_name(channel) ⇒ Object
encode channel name to include site
225 226 227 228 229 230 231 232 |
# File 'lib/message_bus.rb', line 225 def encode_channel_name(channel) if site_id_lookup && !global?(channel) raise ArgumentError.new channel if channel.include? ENCODE_SITE_TOKEN "#{channel}#{ENCODE_SITE_TOKEN}#{site_id_lookup.call}" else channel end end |
#group_ids_lookup(&blk) ⇒ Object
134 135 136 137 |
# File 'lib/message_bus.rb', line 134 def group_ids_lookup(&blk) @group_ids_lookup = blk if blk @group_ids_lookup end |
#initialize ⇒ Object
33 34 35 |
# File 'lib/message_bus.rb', line 33 def initialize @mutex = Synchronizer.new end |
#is_admin_lookup(&blk) ⇒ Object
139 140 141 142 |
# File 'lib/message_bus.rb', line 139 def is_admin_lookup(&blk) @is_admin_lookup = blk if blk @is_admin_lookup end |
#last_id(channel) ⇒ Object
271 272 273 |
# File 'lib/message_bus.rb', line 271 def last_id(channel) reliable_pub_sub.last_id(encode_channel_name(channel)) end |
#last_message(channel) ⇒ Object
275 276 277 278 279 280 281 282 |
# File 'lib/message_bus.rb', line 275 def (channel) if last_id = last_id(channel) = backlog(channel, last_id-1) if [0] end end end |
#listening? ⇒ Boolean
298 299 300 |
# File 'lib/message_bus.rb', line 298 def listening? @subscriber_thread && @subscriber_thread.alive? end |
#local_subscribe(channel = nil, &blk) ⇒ Object
subscribe only on current site
252 253 254 255 |
# File 'lib/message_bus.rb', line 252 def local_subscribe(channel=nil, &blk) site_id = site_id_lookup.call if site_id_lookup && ! global?(channel) subscribe_impl(channel, site_id, &blk) end |
#local_unsubscribe(channel = nil, &blk) ⇒ Object
246 247 248 249 |
# File 'lib/message_bus.rb', line 246 def local_unsubscribe(channel=nil, &blk) site_id = site_id_lookup.call if site_id_lookup unsubscribe_impl(channel, site_id, &blk) end |
#logger ⇒ Object
53 54 55 56 57 |
# File 'lib/message_bus.rb', line 53 def logger return @logger if @logger require 'logger' @logger = Logger.new(STDOUT) end |
#logger=(logger) ⇒ Object
49 50 51 |
# File 'lib/message_bus.rb', line 49 def logger=(logger) @logger = logger end |
#long_polling_enabled=(val) ⇒ Object
63 64 65 |
# File 'lib/message_bus.rb', line 63 def long_polling_enabled=(val) @long_polling_enabled = val end |
#long_polling_enabled? ⇒ Boolean
59 60 61 |
# File 'lib/message_bus.rb', line 59 def long_polling_enabled? @long_polling_enabled == false ? false : true end |
#long_polling_interval ⇒ Object
103 104 105 |
# File 'lib/message_bus.rb', line 103 def long_polling_interval @long_polling_interval || 30 * 1000 end |
#long_polling_interval=(millisecs) ⇒ Object
99 100 101 |
# File 'lib/message_bus.rb', line 99 def long_polling_interval=(millisecs) @long_polling_interval = millisecs end |
#max_active_clients ⇒ Object
73 74 75 |
# File 'lib/message_bus.rb', line 73 def max_active_clients @max_active_clients || 1000 end |
#max_active_clients=(val) ⇒ Object
The number of simultanuous clients we can service
will revert to polling if we are out of slots
69 70 71 |
# File 'lib/message_bus.rb', line 69 def max_active_clients=(val) @max_active_clients = val end |
#off ⇒ Object
107 108 109 |
# File 'lib/message_bus.rb', line 107 def off @off = true end |
#on ⇒ Object
111 112 113 |
# File 'lib/message_bus.rb', line 111 def on @off = false end |
#on_connect(&blk) ⇒ Object
156 157 158 159 |
# File 'lib/message_bus.rb', line 156 def on_connect(&blk) @on_connect = blk if blk @on_connect end |
#on_disconnect(&blk) ⇒ Object
161 162 163 164 |
# File 'lib/message_bus.rb', line 161 def on_disconnect(&blk) @on_disconnect = blk if blk @on_disconnect end |
#publish(channel, data, opts = nil) ⇒ Object
190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
# File 'lib/message_bus.rb', line 190 def publish(channel, data, opts = nil) return if @off @mutex.synchronize do raise ::MessageBus::BusDestroyed if @destroyed end user_ids = nil group_ids = nil if opts user_ids = opts[:user_ids] group_ids = opts[:group_ids] end raise ::MessageBus::InvalidMessage if (user_ids || group_ids) && global?(channel) encoded_data = JSON.dump({ data: data, user_ids: user_ids, group_ids: group_ids }) reliable_pub_sub.publish(encode_channel_name(channel), encoded_data) end |
#rack_hijack_enabled=(val) ⇒ Object
95 96 97 |
# File 'lib/message_bus.rb', line 95 def rack_hijack_enabled=(val) @rack_hijack_enabled = val end |
#rack_hijack_enabled? ⇒ Boolean
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/message_bus.rb', line 77 def rack_hijack_enabled? if @rack_hijack_enabled.nil? @rack_hijack_enabled = true # without this switch passenger will explode # it will run out of connections after about 10 if defined? PhusionPassenger @rack_hijack_enabled = false if PhusionPassenger.respond_to? :advertised_concurrency_level PhusionPassenger.advertised_concurrency_level = 0 @rack_hijack_enabled = true end end end @rack_hijack_enabled end |
#redis_config ⇒ Object
120 121 122 |
# File 'lib/message_bus.rb', line 120 def redis_config @redis_config ||= {} end |
#redis_config=(config) ⇒ Object
Allow us to inject a redis db
116 117 118 |
# File 'lib/message_bus.rb', line 116 def redis_config=(config) @redis_config = config end |
#reliable_pub_sub ⇒ Object
179 180 181 182 183 184 |
# File 'lib/message_bus.rb', line 179 def reliable_pub_sub @mutex.synchronize do return nil if @destroyed @reliable_pub_sub ||= MessageBus::ReliablePubSub.new redis_config end end |
#reset! ⇒ Object
will reset all keys
303 304 305 |
# File 'lib/message_bus.rb', line 303 def reset! reliable_pub_sub.reset! end |
#site_id_lookup(&blk) ⇒ Object
124 125 126 127 |
# File 'lib/message_bus.rb', line 124 def site_id_lookup(&blk) @site_id_lookup = blk if blk @site_id_lookup end |
#subscribe(channel = nil, &blk) ⇒ Object
238 239 240 |
# File 'lib/message_bus.rb', line 238 def subscribe(channel=nil, &blk) subscribe_impl(channel, nil, &blk) end |
#unsubscribe(channel = nil, &blk) ⇒ Object
242 243 244 |
# File 'lib/message_bus.rb', line 242 def unsubscribe(channel=nil, &blk) unsubscribe_impl(channel, nil, &blk) end |
#user_id_lookup(&blk) ⇒ Object
129 130 131 132 |
# File 'lib/message_bus.rb', line 129 def user_id_lookup(&blk) @user_id_lookup = blk if blk @user_id_lookup end |