Module: MessageBus::Implementation

Included in:
MessageBus, Instance
Defined in:
lib/message_bus.rb

Defined Under Namespace

Classes: Synchronizer

Constant Summary collapse

ENCODE_SITE_TOKEN =
"$|$"

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#configObject (readonly) Also known as: redis_config

Configuration options hash



25
26
27
# File 'lib/message_bus.rb', line 25

def config
  @config
end

Instance Method Details

#after_forkObject



346
347
348
349
350
351
# File 'lib/message_bus.rb', line 346

def after_fork
  reliable_pub_sub.after_fork
  ensure_subscriber_thread
  # will ensure timer is running
  timer.queue {}
end

#allow_broadcast=(val) ⇒ Object



178
179
180
# File 'lib/message_bus.rb', line 178

def allow_broadcast=(val)
  configure(allow_broadcast: val)
end

#allow_broadcast?Boolean

Returns:

  • (Boolean)


182
183
184
185
186
187
188
189
# File 'lib/message_bus.rb', line 182

def allow_broadcast?
  @config[:allow_broadcast] ||=
    if defined? ::Rails
      ::Rails.env.test? || ::Rails.env.development?
    else
      false
    end
end

#backendObject



211
212
213
# File 'lib/message_bus.rb', line 211

def backend
  @config[:backend] || :redis
end

#backlog(channel = nil, last_id = nil, site_id = nil) ⇒ Object



306
307
308
309
310
311
312
313
314
315
316
317
318
# File 'lib/message_bus.rb', line 306

def backlog(channel = nil, last_id = nil, site_id = nil)
  old =
    if channel
      reliable_pub_sub.backlog(encode_channel_name(channel, site_id), last_id)
    else
      reliable_pub_sub.global_backlog(last_id)
    end

  old.each do |m|
    decode_message!(m)
  end
  old
end

#blocking_subscribe(channel = nil, &blk) ⇒ Object



259
260
261
262
263
264
265
# File 'lib/message_bus.rb', line 259

def blocking_subscribe(channel = nil, &blk)
  if channel
    reliable_pub_sub.subscribe(encode_channel_name(channel), &blk)
  else
    reliable_pub_sub.global_subscribe(&blk)
  end
end

#cache_assetsObject



41
42
43
44
45
46
47
# File 'lib/message_bus.rb', line 41

def cache_assets
  if defined? @config[:cache_assets]
    @config[:cache_assets]
  else
    true
  end
end

#cache_assets=(val) ⇒ Object



37
38
39
# File 'lib/message_bus.rb', line 37

def cache_assets=(val)
  configure(cache_assets: val)
end

#chunked_encoding_enabled=(val) ⇒ Object



66
67
68
# File 'lib/message_bus.rb', line 66

def chunked_encoding_enabled=(val)
  configure(chunked_encoding_enabled: val)
end

#chunked_encoding_enabled?Boolean

Returns:

  • (Boolean)


62
63
64
# File 'lib/message_bus.rb', line 62

def chunked_encoding_enabled?
  @config[:chunked_encoding_enabled] == false ? false : true
end

#configure(config) ⇒ Object



127
128
129
# File 'lib/message_bus.rb', line 127

def configure(config)
  @config.merge!(config)
end

#decode_channel_name(channel) ⇒ Object



279
280
281
# File 'lib/message_bus.rb', line 279

def decode_channel_name(channel)
  channel.split(ENCODE_SITE_TOKEN)
end

#destroyObject

mostly used in tests to detroy entire bus



334
335
336
337
338
339
340
341
342
343
344
# File 'lib/message_bus.rb', line 334

def destroy
  return if @destroyed
  reliable_pub_sub.global_unsubscribe

  @mutex.synchronize do
    @subscriptions ||= {}
    @destroyed = true
  end
  @subscriber_thread.join if @subscriber_thread
  timer.stop
end

#enable_diagnosticsObject



215
216
217
# File 'lib/message_bus.rb', line 215

def enable_diagnostics
  MessageBus::Diagnostics.enable(self)
end

#encode_channel_name(channel, site_id = nil) ⇒ Object

encode channel name to include site



270
271
272
273
274
275
276
277
# File 'lib/message_bus.rb', line 270

def encode_channel_name(channel, site_id = nil)
  if (site_id || site_id_lookup) && !global?(channel)
    raise ArgumentError.new channel if channel.include? ENCODE_SITE_TOKEN
    "#{channel}#{ENCODE_SITE_TOKEN}#{site_id || site_id_lookup.call}"
  else
    channel
  end
end

#extra_response_headers_lookup(&blk) ⇒ Object



163
164
165
166
# File 'lib/message_bus.rb', line 163

def extra_response_headers_lookup(&blk)
  configure(extra_response_headers_lookup: blk) if blk
  @config[:extra_response_headers_lookup]
end

#global_backlog(last_id = nil) ⇒ Object



302
303
304
# File 'lib/message_bus.rb', line 302

def global_backlog(last_id = nil)
  backlog(nil, last_id)
end

#group_ids_lookup(&blk) ⇒ Object



148
149
150
151
# File 'lib/message_bus.rb', line 148

def group_ids_lookup(&blk)
  configure(group_ids_lookup: blk) if blk
  @config[:group_ids_lookup]
end

#initializeObject



32
33
34
35
# File 'lib/message_bus.rb', line 32

def initialize
  @config = {}
  @mutex = Synchronizer.new
end

#is_admin_lookup(&blk) ⇒ Object



153
154
155
156
# File 'lib/message_bus.rb', line 153

def is_admin_lookup(&blk)
  configure(is_admin_lookup: blk) if blk
  @config[:is_admin_lookup]
end

#keepalive_intervalObject



380
381
382
# File 'lib/message_bus.rb', line 380

def keepalive_interval
  @config[:keepalive_interval] || 60
end

#keepalive_interval=(interval) ⇒ Object

set to 0 to disable, anything higher and a keepalive will run every N seconds, if it fails process is killed



376
377
378
# File 'lib/message_bus.rb', line 376

def keepalive_interval=(interval)
  configure(keepalive_interval: interval)
end

#last_id(channel, site_id = nil) ⇒ Object



320
321
322
# File 'lib/message_bus.rb', line 320

def last_id(channel , site_id = nil)
  reliable_pub_sub.last_id(encode_channel_name(channel, site_id))
end

#last_message(channel) ⇒ Object



324
325
326
327
328
329
330
331
# File 'lib/message_bus.rb', line 324

def last_message(channel)
  if last_id = last_id(channel)
    messages = backlog(channel, last_id - 1)
    if messages
      messages[0]
    end
  end
end

#listening?Boolean

Returns:

  • (Boolean)


353
354
355
# File 'lib/message_bus.rb', line 353

def listening?
  @subscriber_thread && @subscriber_thread.alive?
end

#local_subscribe(channel = nil, last_id = -1,, &blk) ⇒ Object

subscribe only on current site



288
289
290
291
# File 'lib/message_bus.rb', line 288

def local_subscribe(channel = nil, last_id = -1, &blk)
  site_id = site_id_lookup.call if site_id_lookup && ! global?(channel)
  subscribe_impl(channel, site_id, last_id, &blk)
end

#local_unsubscribe(channel = nil, &blk) ⇒ Object



297
298
299
300
# File 'lib/message_bus.rb', line 297

def local_unsubscribe(channel = nil, &blk)
  site_id = site_id_lookup.call if site_id_lookup
  unsubscribe_impl(channel, site_id, &blk)
end

#loggerObject



53
54
55
56
57
58
59
60
# File 'lib/message_bus.rb', line 53

def logger
  return @config[:logger] if @config[:logger]
  require 'logger'
  logger = Logger.new(STDOUT)
  logger.level = Logger::INFO
  configure(logger: logger)
  logger
end

#logger=(logger) ⇒ Object



49
50
51
# File 'lib/message_bus.rb', line 49

def logger=(logger)
  configure(logger: logger)
end

#long_polling_enabled=(val) ⇒ Object



74
75
76
# File 'lib/message_bus.rb', line 74

def long_polling_enabled=(val)
  configure(long_polling_enabled: val)
end

#long_polling_enabled?Boolean

Returns:

  • (Boolean)


70
71
72
# File 'lib/message_bus.rb', line 70

def long_polling_enabled?
  @config[:long_polling_enabled] == false ? false : true
end

#long_polling_intervalObject



115
116
117
# File 'lib/message_bus.rb', line 115

def long_polling_interval
  @config[:long_polling_interval] || 25 * 1000
end

#long_polling_interval=(millisecs) ⇒ Object



111
112
113
# File 'lib/message_bus.rb', line 111

def long_polling_interval=(millisecs)
  configure(long_polling_interval: millisecs)
end

#max_active_clientsObject



84
85
86
# File 'lib/message_bus.rb', line 84

def max_active_clients
  @config[:max_active_clients] || 1000
end

#max_active_clients=(val) ⇒ Object

The number of simultanuous clients we can service

will revert to polling if we are out of slots


80
81
82
# File 'lib/message_bus.rb', line 80

def max_active_clients=(val)
  configure(max_active_clients: val)
end

#offObject



119
120
121
# File 'lib/message_bus.rb', line 119

def off
  @off = true
end

#onObject



123
124
125
# File 'lib/message_bus.rb', line 123

def on
  @off = false
end

#on_connect(&blk) ⇒ Object



168
169
170
171
# File 'lib/message_bus.rb', line 168

def on_connect(&blk)
  configure(on_connect: blk) if blk
  @config[:on_connect]
end

#on_disconnect(&blk) ⇒ Object



173
174
175
176
# File 'lib/message_bus.rb', line 173

def on_disconnect(&blk)
  configure(on_disconnect: blk) if blk
  @config[:on_disconnect]
end

#on_middleware_error(&blk) ⇒ Object



158
159
160
161
# File 'lib/message_bus.rb', line 158

def on_middleware_error(&blk)
  configure(on_middleware_error: blk) if blk
  @config[:on_middleware_error]
end

#publish(channel, data, opts = nil) ⇒ Object



219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/message_bus.rb', line 219

def publish(channel, data, opts = nil)
  return if @off
  @mutex.synchronize do
    raise ::MessageBus::BusDestroyed if @destroyed
  end

  user_ids = nil
  group_ids = nil
  client_ids = nil

  site_id = nil
  if opts
    user_ids = opts[:user_ids]
    group_ids = opts[:group_ids]
    client_ids = opts[:client_ids]
    site_id = opts[:site_id]
  end

  raise ::MessageBus::InvalidMessage if (user_ids || group_ids) && global?(channel)

  encoded_data = JSON.dump(
    data: data,
    user_ids: user_ids,
    group_ids: group_ids,
    client_ids: client_ids
  )

  channel_opts = nil

  if opts && ((age = opts[:max_backlog_age]) || (size = opts[:max_backlog_size]))
    channel_opts = {
      max_backlog_size: size,
      max_backlog_age: age
    }
  end

  encoded_channel_name = encode_channel_name(channel, site_id)
  reliable_pub_sub.publish(encoded_channel_name, encoded_data, channel_opts)
end

#rack_hijack_enabled=(val) ⇒ Object



107
108
109
# File 'lib/message_bus.rb', line 107

def rack_hijack_enabled=(val)
  configure(rack_hijack_enabled: val)
end

#rack_hijack_enabled?Boolean

Returns:

  • (Boolean)


88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/message_bus.rb', line 88

def rack_hijack_enabled?
  if @config[:rack_hijack_enabled].nil?
    enable = true

    # without this switch passenger will explode
    # it will run out of connections after about 10
    if defined? PhusionPassenger
      enable = false
      if PhusionPassenger.respond_to? :advertised_concurrency_level
        PhusionPassenger.advertised_concurrency_level = 0
        enable = true
      end
    end
    configure(rack_hijack_enabled: enable)
  end

  @config[:rack_hijack_enabled]
end

#redis_config=(config) ⇒ Object

Allow us to inject a redis db



132
133
134
# File 'lib/message_bus.rb', line 132

def redis_config=(config)
  configure(config.merge(backend: :redis))
end

#reliable_pub_subObject



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/message_bus.rb', line 195

def reliable_pub_sub
  @mutex.synchronize do
    return nil if @destroyed

    # Make sure logger is loaded before config is
    # passed to backend.
    logger

    @config[:reliable_pub_sub] ||= begin
      @config[:backend_options] ||= {}
      require "message_bus/backends/#{backend}"
      MessageBus::BACKENDS[backend].new @config
    end
  end
end

#reliable_pub_sub=(pub_sub) ⇒ Object



191
192
193
# File 'lib/message_bus.rb', line 191

def reliable_pub_sub=(pub_sub)
  configure(reliable_pub_sub: pub_sub)
end

#reset!Object

will reset all keys



358
359
360
# File 'lib/message_bus.rb', line 358

def reset!
  reliable_pub_sub.reset!
end

#site_id_lookup(&blk) ⇒ Object



138
139
140
141
# File 'lib/message_bus.rb', line 138

def site_id_lookup(&blk)
  configure(site_id_lookup: blk) if blk
  @config[:site_id_lookup]
end

#subscribe(channel = nil, last_id = -1,, &blk) ⇒ Object



283
284
285
# File 'lib/message_bus.rb', line 283

def subscribe(channel = nil, last_id = -1, &blk)
  subscribe_impl(channel, nil, last_id, &blk)
end

#timerObject



362
363
364
365
366
367
368
369
370
371
# File 'lib/message_bus.rb', line 362

def timer
  return @timer_thread if @timer_thread
  @timer_thread ||= begin
    t = MessageBus::TimerThread.new
    t.on_error do |e|
      logger.warn "Failed to process job: #{e} #{e.backtrace}"
    end
    t
  end
end

#unsubscribe(channel = nil, &blk) ⇒ Object



293
294
295
# File 'lib/message_bus.rb', line 293

def unsubscribe(channel = nil, &blk)
  unsubscribe_impl(channel, nil, &blk)
end

#user_id_lookup(&blk) ⇒ Object



143
144
145
146
# File 'lib/message_bus.rb', line 143

def user_id_lookup(&blk)
  configure(user_id_lookup: blk) if blk
  @config[:user_id_lookup]
end