Module: MessageBus::Implementation

Included in:
MessageBus, Instance
Defined in:
lib/message_bus.rb

Defined Under Namespace

Classes: Synchronizer

Constant Summary collapse

ENCODE_SITE_TOKEN =
"$|$"

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#configObject (readonly) Also known as: redis_config

Configuration options hash



25
26
27
# File 'lib/message_bus.rb', line 25

def config
  @config
end

Instance Method Details

#after_forkObject



338
339
340
341
342
343
# File 'lib/message_bus.rb', line 338

def after_fork
  reliable_pub_sub.after_fork
  ensure_subscriber_thread
  # will ensure timer is running
  timer.queue {}
end

#allow_broadcast=(val) ⇒ Object



178
179
180
# File 'lib/message_bus.rb', line 178

def allow_broadcast=(val)
  configure(allow_broadcast: val)
end

#allow_broadcast?Boolean

Returns:

  • (Boolean)


182
183
184
185
186
187
188
189
# File 'lib/message_bus.rb', line 182

def allow_broadcast?
  @config[:allow_broadcast] ||=
    if defined? ::Rails
      ::Rails.env.test? || ::Rails.env.development?
    else
      false
    end
end

#backendObject



206
207
208
# File 'lib/message_bus.rb', line 206

def backend
  @config[:backend] || :redis
end

#backlog(channel = nil, last_id = nil, site_id = nil) ⇒ Object



298
299
300
301
302
303
304
305
306
307
308
309
310
# File 'lib/message_bus.rb', line 298

def backlog(channel = nil, last_id = nil, site_id = nil)
  old =
    if channel
      reliable_pub_sub.backlog(encode_channel_name(channel, site_id), last_id)
    else
      reliable_pub_sub.global_backlog(last_id)
    end

  old.each do |m|
    decode_message!(m)
  end
  old
end

#blocking_subscribe(channel = nil, &blk) ⇒ Object



251
252
253
254
255
256
257
# File 'lib/message_bus.rb', line 251

def blocking_subscribe(channel = nil, &blk)
  if channel
    reliable_pub_sub.subscribe(encode_channel_name(channel), &blk)
  else
    reliable_pub_sub.global_subscribe(&blk)
  end
end

#cache_assetsObject



41
42
43
44
45
46
47
# File 'lib/message_bus.rb', line 41

def cache_assets
  if defined? @config[:cache_assets]
    @config[:cache_assets]
  else
    true
  end
end

#cache_assets=(val) ⇒ Object



37
38
39
# File 'lib/message_bus.rb', line 37

def cache_assets=(val)
  configure(cache_assets: val)
end

#chunked_encoding_enabled=(val) ⇒ Object



66
67
68
# File 'lib/message_bus.rb', line 66

def chunked_encoding_enabled=(val)
  configure(chunked_encoding_enabled: val)
end

#chunked_encoding_enabled?Boolean

Returns:

  • (Boolean)


62
63
64
# File 'lib/message_bus.rb', line 62

def chunked_encoding_enabled?
  @config[:chunked_encoding_enabled] == false ? false : true
end

#configure(config) ⇒ Object



127
128
129
# File 'lib/message_bus.rb', line 127

def configure(config)
  @config.merge!(config)
end

#decode_channel_name(channel) ⇒ Object



271
272
273
# File 'lib/message_bus.rb', line 271

def decode_channel_name(channel)
  channel.split(ENCODE_SITE_TOKEN)
end

#destroyObject

mostly used in tests to detroy entire bus



326
327
328
329
330
331
332
333
334
335
336
# File 'lib/message_bus.rb', line 326

def destroy
  return if @destroyed
  reliable_pub_sub.global_unsubscribe

  @mutex.synchronize do
    @subscriptions ||= {}
    @destroyed = true
  end
  @subscriber_thread.join if @subscriber_thread
  timer.stop
end

#enable_diagnosticsObject



210
211
212
# File 'lib/message_bus.rb', line 210

def enable_diagnostics
  MessageBus::Diagnostics.enable
end

#encode_channel_name(channel, site_id = nil) ⇒ Object

encode channel name to include site



262
263
264
265
266
267
268
269
# File 'lib/message_bus.rb', line 262

def encode_channel_name(channel, site_id = nil)
  if (site_id || site_id_lookup) && !global?(channel)
    raise ArgumentError.new channel if channel.include? ENCODE_SITE_TOKEN
    "#{channel}#{ENCODE_SITE_TOKEN}#{site_id || site_id_lookup.call}"
  else
    channel
  end
end

#extra_response_headers_lookup(&blk) ⇒ Object



163
164
165
166
# File 'lib/message_bus.rb', line 163

def extra_response_headers_lookup(&blk)
  configure(extra_response_headers_lookup: blk) if blk
  @config[:extra_response_headers_lookup]
end

#global_backlog(last_id = nil) ⇒ Object



294
295
296
# File 'lib/message_bus.rb', line 294

def global_backlog(last_id = nil)
  backlog(nil, last_id)
end

#group_ids_lookup(&blk) ⇒ Object



148
149
150
151
# File 'lib/message_bus.rb', line 148

def group_ids_lookup(&blk)
  configure(group_ids_lookup: blk) if blk
  @config[:group_ids_lookup]
end

#initializeObject



32
33
34
35
# File 'lib/message_bus.rb', line 32

def initialize
  @config = {}
  @mutex = Synchronizer.new
end

#is_admin_lookup(&blk) ⇒ Object



153
154
155
156
# File 'lib/message_bus.rb', line 153

def is_admin_lookup(&blk)
  configure(is_admin_lookup: blk) if blk
  @config[:is_admin_lookup]
end

#keepalive_intervalObject



372
373
374
# File 'lib/message_bus.rb', line 372

def keepalive_interval
  @config[:keepalive_interval] || 60
end

#keepalive_interval=(interval) ⇒ Object

set to 0 to disable, anything higher and a keepalive will run every N seconds, if it fails process is killed



368
369
370
# File 'lib/message_bus.rb', line 368

def keepalive_interval=(interval)
  configure(keepalive_interval: interval)
end

#last_id(channel, site_id = nil) ⇒ Object



312
313
314
# File 'lib/message_bus.rb', line 312

def last_id(channel , site_id = nil)
  reliable_pub_sub.last_id(encode_channel_name(channel, site_id))
end

#last_message(channel) ⇒ Object



316
317
318
319
320
321
322
323
# File 'lib/message_bus.rb', line 316

def last_message(channel)
  if last_id = last_id(channel)
    messages = backlog(channel, last_id - 1)
    if messages
      messages[0]
    end
  end
end

#listening?Boolean

Returns:

  • (Boolean)


345
346
347
# File 'lib/message_bus.rb', line 345

def listening?
  @subscriber_thread && @subscriber_thread.alive?
end

#local_subscribe(channel = nil, last_id = -1,, &blk) ⇒ Object

subscribe only on current site



280
281
282
283
# File 'lib/message_bus.rb', line 280

def local_subscribe(channel = nil, last_id = -1, &blk)
  site_id = site_id_lookup.call if site_id_lookup && ! global?(channel)
  subscribe_impl(channel, site_id, last_id, &blk)
end

#local_unsubscribe(channel = nil, &blk) ⇒ Object



289
290
291
292
# File 'lib/message_bus.rb', line 289

def local_unsubscribe(channel = nil, &blk)
  site_id = site_id_lookup.call if site_id_lookup
  unsubscribe_impl(channel, site_id, &blk)
end

#loggerObject



53
54
55
56
57
58
59
60
# File 'lib/message_bus.rb', line 53

def logger
  return @config[:logger] if @config[:logger]
  require 'logger'
  logger = Logger.new(STDOUT)
  logger.level = Logger::INFO
  configure(logger: logger)
  logger
end

#logger=(logger) ⇒ Object



49
50
51
# File 'lib/message_bus.rb', line 49

def logger=(logger)
  configure(logger: logger)
end

#long_polling_enabled=(val) ⇒ Object



74
75
76
# File 'lib/message_bus.rb', line 74

def long_polling_enabled=(val)
  configure(long_polling_enabled: val)
end

#long_polling_enabled?Boolean

Returns:

  • (Boolean)


70
71
72
# File 'lib/message_bus.rb', line 70

def long_polling_enabled?
  @config[:long_polling_enabled] == false ? false : true
end

#long_polling_intervalObject



115
116
117
# File 'lib/message_bus.rb', line 115

def long_polling_interval
  @config[:long_polling_interval] || 25 * 1000
end

#long_polling_interval=(millisecs) ⇒ Object



111
112
113
# File 'lib/message_bus.rb', line 111

def long_polling_interval=(millisecs)
  configure(long_polling_interval: millisecs)
end

#max_active_clientsObject



84
85
86
# File 'lib/message_bus.rb', line 84

def max_active_clients
  @config[:max_active_clients] || 1000
end

#max_active_clients=(val) ⇒ Object

The number of simultanuous clients we can service

will revert to polling if we are out of slots


80
81
82
# File 'lib/message_bus.rb', line 80

def max_active_clients=(val)
  configure(max_active_clients: val)
end

#offObject



119
120
121
# File 'lib/message_bus.rb', line 119

def off
  @off = true
end

#onObject



123
124
125
# File 'lib/message_bus.rb', line 123

def on
  @off = false
end

#on_connect(&blk) ⇒ Object



168
169
170
171
# File 'lib/message_bus.rb', line 168

def on_connect(&blk)
  configure(on_connect: blk) if blk
  @config[:on_connect]
end

#on_disconnect(&blk) ⇒ Object



173
174
175
176
# File 'lib/message_bus.rb', line 173

def on_disconnect(&blk)
  configure(on_disconnect: blk) if blk
  @config[:on_disconnect]
end

#on_middleware_error(&blk) ⇒ Object



158
159
160
161
# File 'lib/message_bus.rb', line 158

def on_middleware_error(&blk)
  configure(on_middleware_error: blk) if blk
  @config[:on_middleware_error]
end

#publish(channel, data, opts = nil) ⇒ Object



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/message_bus.rb', line 214

def publish(channel, data, opts = nil)
  return if @off
  @mutex.synchronize do
    raise ::MessageBus::BusDestroyed if @destroyed
  end

  user_ids = nil
  group_ids = nil
  client_ids = nil

  if opts
    user_ids = opts[:user_ids]
    group_ids = opts[:group_ids]
    client_ids = opts[:client_ids]
  end

  raise ::MessageBus::InvalidMessage if (user_ids || group_ids) && global?(channel)

  encoded_data = JSON.dump(
    data: data,
    user_ids: user_ids,
    group_ids: group_ids,
    client_ids: client_ids
  )

  channel_opts = nil

  if opts && ((age = opts[:max_backlog_age]) || (size = opts[:max_backlog_size]))
    channel_opts = {
      max_backlog_size: size,
      max_backlog_age: age
    }
  end

  reliable_pub_sub.publish(encode_channel_name(channel), encoded_data, channel_opts)
end

#rack_hijack_enabled=(val) ⇒ Object



107
108
109
# File 'lib/message_bus.rb', line 107

def rack_hijack_enabled=(val)
  configure(rack_hijack_enabled: val)
end

#rack_hijack_enabled?Boolean

Returns:

  • (Boolean)


88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/message_bus.rb', line 88

def rack_hijack_enabled?
  if @config[:rack_hijack_enabled].nil?
    enable = true

    # without this switch passenger will explode
    # it will run out of connections after about 10
    if defined? PhusionPassenger
      enable = false
      if PhusionPassenger.respond_to? :advertised_concurrency_level
        PhusionPassenger.advertised_concurrency_level = 0
        enable = true
      end
    end
    configure(rack_hijack_enabled: enable)
  end

  @config[:rack_hijack_enabled]
end

#redis_config=(config) ⇒ Object

Allow us to inject a redis db



132
133
134
# File 'lib/message_bus.rb', line 132

def redis_config=(config)
  configure(config.merge(backend: :redis))
end

#reliable_pub_subObject



195
196
197
198
199
200
201
202
203
204
# File 'lib/message_bus.rb', line 195

def reliable_pub_sub
  @mutex.synchronize do
    return nil if @destroyed
    @config[:reliable_pub_sub] ||= begin
      @config[:backend_options] ||= {}
      require "message_bus/backends/#{backend}"
      MessageBus::BACKENDS[backend].new @config
    end
  end
end

#reliable_pub_sub=(pub_sub) ⇒ Object



191
192
193
# File 'lib/message_bus.rb', line 191

def reliable_pub_sub=(pub_sub)
  configure(reliable_pub_sub: pub_sub)
end

#reset!Object

will reset all keys



350
351
352
# File 'lib/message_bus.rb', line 350

def reset!
  reliable_pub_sub.reset!
end

#site_id_lookup(&blk) ⇒ Object



138
139
140
141
# File 'lib/message_bus.rb', line 138

def site_id_lookup(&blk)
  configure(site_id_lookup: blk) if blk
  @config[:site_id_lookup]
end

#subscribe(channel = nil, last_id = -1,, &blk) ⇒ Object



275
276
277
# File 'lib/message_bus.rb', line 275

def subscribe(channel = nil, last_id = -1, &blk)
  subscribe_impl(channel, nil, last_id, &blk)
end

#timerObject



354
355
356
357
358
359
360
361
362
363
# File 'lib/message_bus.rb', line 354

def timer
  return @timer_thread if @timer_thread
  @timer_thread ||= begin
    t = MessageBus::TimerThread.new
    t.on_error do |e|
      logger.warn "Failed to process job: #{e} #{e.backtrace}"
    end
    t
  end
end

#unsubscribe(channel = nil, &blk) ⇒ Object



285
286
287
# File 'lib/message_bus.rb', line 285

def unsubscribe(channel = nil, &blk)
  unsubscribe_impl(channel, nil, &blk)
end

#user_id_lookup(&blk) ⇒ Object



143
144
145
146
# File 'lib/message_bus.rb', line 143

def user_id_lookup(&blk)
  configure(user_id_lookup: blk) if blk
  @config[:user_id_lookup]
end