Module: MessageBus::Implementation

Included in:
MessageBus, Instance
Defined in:
lib/message_bus.rb

Defined Under Namespace

Classes: Synchronizer

Constant Summary collapse

ENCODE_SITE_TOKEN =
"$|$"

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#configObject (readonly) Also known as: redis_config

Configuration options hash



24
25
26
# File 'lib/message_bus.rb', line 24

def config
  @config
end

Instance Method Details

#after_forkObject



320
321
322
323
324
325
# File 'lib/message_bus.rb', line 320

def after_fork
  reliable_pub_sub.after_fork
  ensure_subscriber_thread
  # will ensure timer is running
  timer.queue{}
end

#allow_broadcast=(val) ⇒ Object



172
173
174
# File 'lib/message_bus.rb', line 172

def allow_broadcast=(val)
  configure(allow_broadcast: val)
end

#allow_broadcast?Boolean

Returns:

  • (Boolean)


176
177
178
179
180
181
182
183
# File 'lib/message_bus.rb', line 176

def allow_broadcast?
  @config[:allow_broadcast] ||=
    if defined? ::Rails
      ::Rails.env.test? || ::Rails.env.development?
    else
      false
    end
end

#backendObject



200
201
202
# File 'lib/message_bus.rb', line 200

def backend
  @config[:backend] || :redis
end

#backlog(channel = nil, last_id = nil, site_id = nil) ⇒ Object



283
284
285
286
287
288
289
290
291
292
293
294
295
# File 'lib/message_bus.rb', line 283

def backlog(channel=nil, last_id=nil, site_id=nil)
  old =
    if channel
      reliable_pub_sub.backlog(encode_channel_name(channel,site_id), last_id)
    else
      reliable_pub_sub.global_backlog(last_id)
    end

  old.each{ |m|
    decode_message!(m)
  }
  old
end

#blocking_subscribe(channel = nil, &blk) ⇒ Object



236
237
238
239
240
241
242
# File 'lib/message_bus.rb', line 236

def blocking_subscribe(channel=nil, &blk)
  if channel
    reliable_pub_sub.subscribe(encode_channel_name(channel), &blk)
  else
    reliable_pub_sub.global_subscribe(&blk)
  end
end

#cache_assetsObject



40
41
42
43
44
45
46
# File 'lib/message_bus.rb', line 40

def cache_assets
  if defined? @config[:cache_assets]
    @config[:cache_assets]
  else
    true
  end
end

#cache_assets=(val) ⇒ Object



36
37
38
# File 'lib/message_bus.rb', line 36

def cache_assets=(val)
  configure(cache_assets: val)
end

#chunked_encoding_enabled=(val) ⇒ Object



65
66
67
# File 'lib/message_bus.rb', line 65

def chunked_encoding_enabled=(val)
  configure(chunked_encoding_enabled: val)
end

#chunked_encoding_enabled?Boolean

Returns:

  • (Boolean)


61
62
63
# File 'lib/message_bus.rb', line 61

def chunked_encoding_enabled?
  @config[:chunked_encoding_enabled] == false ? false : true
end

#configure(config) ⇒ Object



126
127
128
# File 'lib/message_bus.rb', line 126

def configure(config)
  @config.merge!(config)
end

#decode_channel_name(channel) ⇒ Object



256
257
258
# File 'lib/message_bus.rb', line 256

def decode_channel_name(channel)
  channel.split(ENCODE_SITE_TOKEN)
end

#destroyObject



310
311
312
313
314
315
316
317
318
# File 'lib/message_bus.rb', line 310

def destroy
  @mutex.synchronize do
    @subscriptions ||= {}
    reliable_pub_sub.global_unsubscribe
    @destroyed = true
  end
  @subscriber_thread.join if @subscriber_thread
  timer.stop
end

#enable_diagnosticsObject



204
205
206
# File 'lib/message_bus.rb', line 204

def enable_diagnostics
  MessageBus::Diagnostics.enable
end

#encode_channel_name(channel, site_id = nil) ⇒ Object

encode channel name to include site



247
248
249
250
251
252
253
254
# File 'lib/message_bus.rb', line 247

def encode_channel_name(channel, site_id=nil)
  if (site_id || site_id_lookup) && !global?(channel)
    raise ArgumentError.new channel if channel.include? ENCODE_SITE_TOKEN
    "#{channel}#{ENCODE_SITE_TOKEN}#{site_id || site_id_lookup.call}"
  else
    channel
  end
end

#extra_response_headers_lookup(&blk) ⇒ Object



157
158
159
160
# File 'lib/message_bus.rb', line 157

def extra_response_headers_lookup(&blk)
  configure(extra_response_headers_lookup: blk) if blk
  @config[:extra_response_headers_lookup]
end

#global_backlog(last_id = nil) ⇒ Object



279
280
281
# File 'lib/message_bus.rb', line 279

def global_backlog(last_id=nil)
  backlog(nil, last_id)
end

#group_ids_lookup(&blk) ⇒ Object



147
148
149
150
# File 'lib/message_bus.rb', line 147

def group_ids_lookup(&blk)
  configure(group_ids_lookup: blk) if blk
  @config[:group_ids_lookup]
end

#initializeObject



31
32
33
34
# File 'lib/message_bus.rb', line 31

def initialize
  @config = {}
  @mutex = Synchronizer.new
end

#is_admin_lookup(&blk) ⇒ Object



152
153
154
155
# File 'lib/message_bus.rb', line 152

def is_admin_lookup(&blk)
  configure(is_admin_lookup: blk) if blk
  @config[:is_admin_lookup]
end

#keepalive_intervalObject



354
355
356
# File 'lib/message_bus.rb', line 354

def keepalive_interval
  @config[:keepalive_interval] || 60
end

#keepalive_interval=(interval) ⇒ Object

set to 0 to disable, anything higher and a keepalive will run every N seconds, if it fails process is killed



350
351
352
# File 'lib/message_bus.rb', line 350

def keepalive_interval=(interval)
  configure(keepalive_interval: interval)
end

#last_id(channel, site_id = nil) ⇒ Object



297
298
299
# File 'lib/message_bus.rb', line 297

def last_id(channel,site_id=nil)
  reliable_pub_sub.last_id(encode_channel_name(channel,site_id))
end

#last_message(channel) ⇒ Object



301
302
303
304
305
306
307
308
# File 'lib/message_bus.rb', line 301

def last_message(channel)
  if last_id = last_id(channel)
    messages = backlog(channel, last_id-1)
    if messages
      messages[0]
    end
  end
end

#listening?Boolean

Returns:

  • (Boolean)


327
328
329
# File 'lib/message_bus.rb', line 327

def listening?
  @subscriber_thread && @subscriber_thread.alive?
end

#local_subscribe(channel = nil, last_id = -1,, &blk) ⇒ Object

subscribe only on current site



265
266
267
268
# File 'lib/message_bus.rb', line 265

def local_subscribe(channel=nil, last_id=-1, &blk)
  site_id = site_id_lookup.call if site_id_lookup && ! global?(channel)
  subscribe_impl(channel, site_id, last_id, &blk)
end

#local_unsubscribe(channel = nil, &blk) ⇒ Object



274
275
276
277
# File 'lib/message_bus.rb', line 274

def local_unsubscribe(channel=nil, &blk)
  site_id = site_id_lookup.call if site_id_lookup
  unsubscribe_impl(channel, site_id, &blk)
end

#loggerObject



52
53
54
55
56
57
58
59
# File 'lib/message_bus.rb', line 52

def logger
  return @config[:logger] if @config[:logger]
  require 'logger'
  logger = Logger.new(STDOUT)
  logger.level = Logger::INFO
  configure(logger: logger)
  logger
end

#logger=(logger) ⇒ Object



48
49
50
# File 'lib/message_bus.rb', line 48

def logger=(logger)
  configure(logger: logger)
end

#long_polling_enabled=(val) ⇒ Object



73
74
75
# File 'lib/message_bus.rb', line 73

def long_polling_enabled=(val)
  configure(long_polling_enabled: val)
end

#long_polling_enabled?Boolean

Returns:

  • (Boolean)


69
70
71
# File 'lib/message_bus.rb', line 69

def long_polling_enabled?
  @config[:long_polling_enabled] == false ? false : true
end

#long_polling_intervalObject



114
115
116
# File 'lib/message_bus.rb', line 114

def long_polling_interval
  @config[:long_polling_interval] || 25 * 1000
end

#long_polling_interval=(millisecs) ⇒ Object



110
111
112
# File 'lib/message_bus.rb', line 110

def long_polling_interval=(millisecs)
  configure(long_polling_interval: millisecs)
end

#max_active_clientsObject



83
84
85
# File 'lib/message_bus.rb', line 83

def max_active_clients
  @config[:max_active_clients] || 1000
end

#max_active_clients=(val) ⇒ Object

The number of simultanuous clients we can service

will revert to polling if we are out of slots


79
80
81
# File 'lib/message_bus.rb', line 79

def max_active_clients=(val)
  configure(max_active_clients: val)
end

#offObject



118
119
120
# File 'lib/message_bus.rb', line 118

def off
  @off = true
end

#onObject



122
123
124
# File 'lib/message_bus.rb', line 122

def on
  @off = false
end

#on_connect(&blk) ⇒ Object



162
163
164
165
# File 'lib/message_bus.rb', line 162

def on_connect(&blk)
  configure(on_connect: blk) if blk
  @config[:on_connect]
end

#on_disconnect(&blk) ⇒ Object



167
168
169
170
# File 'lib/message_bus.rb', line 167

def on_disconnect(&blk)
  configure(on_disconnect: blk) if blk
  @config[:on_disconnect]
end

#publish(channel, data, opts = nil) ⇒ Object



208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# File 'lib/message_bus.rb', line 208

def publish(channel, data, opts = nil)
  return if @off
  @mutex.synchronize do
    raise ::MessageBus::BusDestroyed if @destroyed
  end

  user_ids = nil
  group_ids = nil
  client_ids = nil

  if opts
    user_ids = opts[:user_ids]
    group_ids = opts[:group_ids]
    client_ids = opts[:client_ids]
  end

  raise ::MessageBus::InvalidMessage if (user_ids || group_ids) && global?(channel)

  encoded_data = JSON.dump({
    data: data,
    user_ids: user_ids,
    group_ids: group_ids,
    client_ids: client_ids
  })

  reliable_pub_sub.publish(encode_channel_name(channel), encoded_data)
end

#rack_hijack_enabled=(val) ⇒ Object



106
107
108
# File 'lib/message_bus.rb', line 106

def rack_hijack_enabled=(val)
  configure(rack_hijack_enabled: val)
end

#rack_hijack_enabled?Boolean

Returns:

  • (Boolean)


87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/message_bus.rb', line 87

def rack_hijack_enabled?
  if @config[:rack_hijack_enabled].nil?
    enable = true

    # without this switch passenger will explode
    # it will run out of connections after about 10
    if defined? PhusionPassenger
      enable = false
      if PhusionPassenger.respond_to? :advertised_concurrency_level
        PhusionPassenger.advertised_concurrency_level = 0
        enable = true
      end
    end
    configure(rack_hijack_enabled: enable)
  end

  @config[:rack_hijack_enabled]
end

#redis_config=(config) ⇒ Object

Allow us to inject a redis db



131
132
133
# File 'lib/message_bus.rb', line 131

def redis_config=(config)
  configure(config.merge(:backend=>:redis))
end

#reliable_pub_subObject



189
190
191
192
193
194
195
196
197
198
# File 'lib/message_bus.rb', line 189

def reliable_pub_sub
  @mutex.synchronize do
    return nil if @destroyed
    @config[:reliable_pub_sub] ||= begin
      @config[:backend_options] ||= {}
      require "message_bus/backends/#{backend}"
      MessageBus::BACKENDS[backend].new @config
    end
  end
end

#reliable_pub_sub=(pub_sub) ⇒ Object



185
186
187
# File 'lib/message_bus.rb', line 185

def reliable_pub_sub=(pub_sub)
  configure(reliable_pub_sub: pub_sub)
end

#reset!Object

will reset all keys



332
333
334
# File 'lib/message_bus.rb', line 332

def reset!
  reliable_pub_sub.reset!
end

#site_id_lookup(&blk) ⇒ Object



137
138
139
140
# File 'lib/message_bus.rb', line 137

def site_id_lookup(&blk)
  configure(site_id_lookup: blk) if blk
  @config[:site_id_lookup]
end

#subscribe(channel = nil, last_id = -1,, &blk) ⇒ Object



260
261
262
# File 'lib/message_bus.rb', line 260

def subscribe(channel=nil, last_id=-1, &blk)
  subscribe_impl(channel, nil, last_id, &blk)
end

#timerObject



336
337
338
339
340
341
342
343
344
345
# File 'lib/message_bus.rb', line 336

def timer
  return @timer_thread if @timer_thread
  @timer_thread ||= begin
    t = MessageBus::TimerThread.new
    t.on_error do |e|
      logger.warn "Failed to process job: #{e} #{e.backtrace}"
    end
    t
  end
end

#unsubscribe(channel = nil, &blk) ⇒ Object



270
271
272
# File 'lib/message_bus.rb', line 270

def unsubscribe(channel=nil, &blk)
  unsubscribe_impl(channel, nil, &blk)
end

#user_id_lookup(&blk) ⇒ Object



142
143
144
145
# File 'lib/message_bus.rb', line 142

def user_id_lookup(&blk)
  configure(user_id_lookup: blk) if blk
  @config[:user_id_lookup]
end