Module: MessageBus::Implementation

Included in:
MessageBus, Instance
Defined in:
lib/message_bus.rb

Defined Under Namespace

Classes: Synchronizer

Constant Summary collapse

ENCODE_SITE_TOKEN =
"$|$"

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#configObject (readonly) Also known as: redis_config

Configuration options hash



25
26
27
# File 'lib/message_bus.rb', line 25

def config
  @config
end

Instance Method Details

#after_forkObject



329
330
331
332
333
334
# File 'lib/message_bus.rb', line 329

def after_fork
  reliable_pub_sub.after_fork
  ensure_subscriber_thread
  # will ensure timer is running
  timer.queue {}
end

#allow_broadcast=(val) ⇒ Object



178
179
180
# File 'lib/message_bus.rb', line 178

def allow_broadcast=(val)
  configure(allow_broadcast: val)
end

#allow_broadcast?Boolean

Returns:

  • (Boolean)


182
183
184
185
186
187
188
189
# File 'lib/message_bus.rb', line 182

def allow_broadcast?
  @config[:allow_broadcast] ||=
    if defined? ::Rails
      ::Rails.env.test? || ::Rails.env.development?
    else
      false
    end
end

#backendObject



206
207
208
# File 'lib/message_bus.rb', line 206

def backend
  @config[:backend] || :redis
end

#backlog(channel = nil, last_id = nil, site_id = nil) ⇒ Object



289
290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/message_bus.rb', line 289

def backlog(channel = nil, last_id = nil, site_id = nil)
  old =
    if channel
      reliable_pub_sub.backlog(encode_channel_name(channel, site_id), last_id)
    else
      reliable_pub_sub.global_backlog(last_id)
    end

  old.each do |m|
    decode_message!(m)
  end
  old
end

#blocking_subscribe(channel = nil, &blk) ⇒ Object



242
243
244
245
246
247
248
# File 'lib/message_bus.rb', line 242

def blocking_subscribe(channel = nil, &blk)
  if channel
    reliable_pub_sub.subscribe(encode_channel_name(channel), &blk)
  else
    reliable_pub_sub.global_subscribe(&blk)
  end
end

#cache_assetsObject



41
42
43
44
45
46
47
# File 'lib/message_bus.rb', line 41

def cache_assets
  if defined? @config[:cache_assets]
    @config[:cache_assets]
  else
    true
  end
end

#cache_assets=(val) ⇒ Object



37
38
39
# File 'lib/message_bus.rb', line 37

def cache_assets=(val)
  configure(cache_assets: val)
end

#chunked_encoding_enabled=(val) ⇒ Object



66
67
68
# File 'lib/message_bus.rb', line 66

def chunked_encoding_enabled=(val)
  configure(chunked_encoding_enabled: val)
end

#chunked_encoding_enabled?Boolean

Returns:

  • (Boolean)


62
63
64
# File 'lib/message_bus.rb', line 62

def chunked_encoding_enabled?
  @config[:chunked_encoding_enabled] == false ? false : true
end

#configure(config) ⇒ Object



127
128
129
# File 'lib/message_bus.rb', line 127

def configure(config)
  @config.merge!(config)
end

#decode_channel_name(channel) ⇒ Object



262
263
264
# File 'lib/message_bus.rb', line 262

def decode_channel_name(channel)
  channel.split(ENCODE_SITE_TOKEN)
end

#destroyObject

mostly used in tests to detroy entire bus



317
318
319
320
321
322
323
324
325
326
327
# File 'lib/message_bus.rb', line 317

def destroy
  return if @destroyed
  reliable_pub_sub.global_unsubscribe

  @mutex.synchronize do
    @subscriptions ||= {}
    @destroyed = true
  end
  @subscriber_thread.join if @subscriber_thread
  timer.stop
end

#enable_diagnosticsObject



210
211
212
# File 'lib/message_bus.rb', line 210

def enable_diagnostics
  MessageBus::Diagnostics.enable
end

#encode_channel_name(channel, site_id = nil) ⇒ Object

encode channel name to include site



253
254
255
256
257
258
259
260
# File 'lib/message_bus.rb', line 253

def encode_channel_name(channel, site_id = nil)
  if (site_id || site_id_lookup) && !global?(channel)
    raise ArgumentError.new channel if channel.include? ENCODE_SITE_TOKEN
    "#{channel}#{ENCODE_SITE_TOKEN}#{site_id || site_id_lookup.call}"
  else
    channel
  end
end

#extra_response_headers_lookup(&blk) ⇒ Object



163
164
165
166
# File 'lib/message_bus.rb', line 163

def extra_response_headers_lookup(&blk)
  configure(extra_response_headers_lookup: blk) if blk
  @config[:extra_response_headers_lookup]
end

#global_backlog(last_id = nil) ⇒ Object



285
286
287
# File 'lib/message_bus.rb', line 285

def global_backlog(last_id = nil)
  backlog(nil, last_id)
end

#group_ids_lookup(&blk) ⇒ Object



148
149
150
151
# File 'lib/message_bus.rb', line 148

def group_ids_lookup(&blk)
  configure(group_ids_lookup: blk) if blk
  @config[:group_ids_lookup]
end

#initializeObject



32
33
34
35
# File 'lib/message_bus.rb', line 32

def initialize
  @config = {}
  @mutex = Synchronizer.new
end

#is_admin_lookup(&blk) ⇒ Object



153
154
155
156
# File 'lib/message_bus.rb', line 153

def is_admin_lookup(&blk)
  configure(is_admin_lookup: blk) if blk
  @config[:is_admin_lookup]
end

#keepalive_intervalObject



363
364
365
# File 'lib/message_bus.rb', line 363

def keepalive_interval
  @config[:keepalive_interval] || 60
end

#keepalive_interval=(interval) ⇒ Object

set to 0 to disable, anything higher and a keepalive will run every N seconds, if it fails process is killed



359
360
361
# File 'lib/message_bus.rb', line 359

def keepalive_interval=(interval)
  configure(keepalive_interval: interval)
end

#last_id(channel, site_id = nil) ⇒ Object



303
304
305
# File 'lib/message_bus.rb', line 303

def last_id(channel , site_id = nil)
  reliable_pub_sub.last_id(encode_channel_name(channel, site_id))
end

#last_message(channel) ⇒ Object



307
308
309
310
311
312
313
314
# File 'lib/message_bus.rb', line 307

def last_message(channel)
  if last_id = last_id(channel)
    messages = backlog(channel, last_id - 1)
    if messages
      messages[0]
    end
  end
end

#listening?Boolean

Returns:

  • (Boolean)


336
337
338
# File 'lib/message_bus.rb', line 336

def listening?
  @subscriber_thread && @subscriber_thread.alive?
end

#local_subscribe(channel = nil, last_id = -1,, &blk) ⇒ Object

subscribe only on current site



271
272
273
274
# File 'lib/message_bus.rb', line 271

def local_subscribe(channel = nil, last_id = -1, &blk)
  site_id = site_id_lookup.call if site_id_lookup && ! global?(channel)
  subscribe_impl(channel, site_id, last_id, &blk)
end

#local_unsubscribe(channel = nil, &blk) ⇒ Object



280
281
282
283
# File 'lib/message_bus.rb', line 280

def local_unsubscribe(channel = nil, &blk)
  site_id = site_id_lookup.call if site_id_lookup
  unsubscribe_impl(channel, site_id, &blk)
end

#loggerObject



53
54
55
56
57
58
59
60
# File 'lib/message_bus.rb', line 53

def logger
  return @config[:logger] if @config[:logger]
  require 'logger'
  logger = Logger.new(STDOUT)
  logger.level = Logger::INFO
  configure(logger: logger)
  logger
end

#logger=(logger) ⇒ Object



49
50
51
# File 'lib/message_bus.rb', line 49

def logger=(logger)
  configure(logger: logger)
end

#long_polling_enabled=(val) ⇒ Object



74
75
76
# File 'lib/message_bus.rb', line 74

def long_polling_enabled=(val)
  configure(long_polling_enabled: val)
end

#long_polling_enabled?Boolean

Returns:

  • (Boolean)


70
71
72
# File 'lib/message_bus.rb', line 70

def long_polling_enabled?
  @config[:long_polling_enabled] == false ? false : true
end

#long_polling_intervalObject



115
116
117
# File 'lib/message_bus.rb', line 115

def long_polling_interval
  @config[:long_polling_interval] || 25 * 1000
end

#long_polling_interval=(millisecs) ⇒ Object



111
112
113
# File 'lib/message_bus.rb', line 111

def long_polling_interval=(millisecs)
  configure(long_polling_interval: millisecs)
end

#max_active_clientsObject



84
85
86
# File 'lib/message_bus.rb', line 84

def max_active_clients
  @config[:max_active_clients] || 1000
end

#max_active_clients=(val) ⇒ Object

The number of simultanuous clients we can service

will revert to polling if we are out of slots


80
81
82
# File 'lib/message_bus.rb', line 80

def max_active_clients=(val)
  configure(max_active_clients: val)
end

#offObject



119
120
121
# File 'lib/message_bus.rb', line 119

def off
  @off = true
end

#onObject



123
124
125
# File 'lib/message_bus.rb', line 123

def on
  @off = false
end

#on_connect(&blk) ⇒ Object



168
169
170
171
# File 'lib/message_bus.rb', line 168

def on_connect(&blk)
  configure(on_connect: blk) if blk
  @config[:on_connect]
end

#on_disconnect(&blk) ⇒ Object



173
174
175
176
# File 'lib/message_bus.rb', line 173

def on_disconnect(&blk)
  configure(on_disconnect: blk) if blk
  @config[:on_disconnect]
end

#on_middleware_error(&blk) ⇒ Object



158
159
160
161
# File 'lib/message_bus.rb', line 158

def on_middleware_error(&blk)
  configure(on_middleware_error: blk) if blk
  @config[:on_middleware_error]
end

#publish(channel, data, opts = nil) ⇒ Object



214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/message_bus.rb', line 214

def publish(channel, data, opts = nil)
  return if @off
  @mutex.synchronize do
    raise ::MessageBus::BusDestroyed if @destroyed
  end

  user_ids = nil
  group_ids = nil
  client_ids = nil

  if opts
    user_ids = opts[:user_ids]
    group_ids = opts[:group_ids]
    client_ids = opts[:client_ids]
  end

  raise ::MessageBus::InvalidMessage if (user_ids || group_ids) && global?(channel)

  encoded_data = JSON.dump(
    data: data,
    user_ids: user_ids,
    group_ids: group_ids,
    client_ids: client_ids
  )

  reliable_pub_sub.publish(encode_channel_name(channel), encoded_data)
end

#rack_hijack_enabled=(val) ⇒ Object



107
108
109
# File 'lib/message_bus.rb', line 107

def rack_hijack_enabled=(val)
  configure(rack_hijack_enabled: val)
end

#rack_hijack_enabled?Boolean

Returns:

  • (Boolean)


88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/message_bus.rb', line 88

def rack_hijack_enabled?
  if @config[:rack_hijack_enabled].nil?
    enable = true

    # without this switch passenger will explode
    # it will run out of connections after about 10
    if defined? PhusionPassenger
      enable = false
      if PhusionPassenger.respond_to? :advertised_concurrency_level
        PhusionPassenger.advertised_concurrency_level = 0
        enable = true
      end
    end
    configure(rack_hijack_enabled: enable)
  end

  @config[:rack_hijack_enabled]
end

#redis_config=(config) ⇒ Object

Allow us to inject a redis db



132
133
134
# File 'lib/message_bus.rb', line 132

def redis_config=(config)
  configure(config.merge(backend: :redis))
end

#reliable_pub_subObject



195
196
197
198
199
200
201
202
203
204
# File 'lib/message_bus.rb', line 195

def reliable_pub_sub
  @mutex.synchronize do
    return nil if @destroyed
    @config[:reliable_pub_sub] ||= begin
      @config[:backend_options] ||= {}
      require "message_bus/backends/#{backend}"
      MessageBus::BACKENDS[backend].new @config
    end
  end
end

#reliable_pub_sub=(pub_sub) ⇒ Object



191
192
193
# File 'lib/message_bus.rb', line 191

def reliable_pub_sub=(pub_sub)
  configure(reliable_pub_sub: pub_sub)
end

#reset!Object

will reset all keys



341
342
343
# File 'lib/message_bus.rb', line 341

def reset!
  reliable_pub_sub.reset!
end

#site_id_lookup(&blk) ⇒ Object



138
139
140
141
# File 'lib/message_bus.rb', line 138

def site_id_lookup(&blk)
  configure(site_id_lookup: blk) if blk
  @config[:site_id_lookup]
end

#subscribe(channel = nil, last_id = -1,, &blk) ⇒ Object



266
267
268
# File 'lib/message_bus.rb', line 266

def subscribe(channel = nil, last_id = -1, &blk)
  subscribe_impl(channel, nil, last_id, &blk)
end

#timerObject



345
346
347
348
349
350
351
352
353
354
# File 'lib/message_bus.rb', line 345

def timer
  return @timer_thread if @timer_thread
  @timer_thread ||= begin
    t = MessageBus::TimerThread.new
    t.on_error do |e|
      logger.warn "Failed to process job: #{e} #{e.backtrace}"
    end
    t
  end
end

#unsubscribe(channel = nil, &blk) ⇒ Object



276
277
278
# File 'lib/message_bus.rb', line 276

def unsubscribe(channel = nil, &blk)
  unsubscribe_impl(channel, nil, &blk)
end

#user_id_lookup(&blk) ⇒ Object



143
144
145
146
# File 'lib/message_bus.rb', line 143

def user_id_lookup(&blk)
  configure(user_id_lookup: blk) if blk
  @config[:user_id_lookup]
end