Module: MessageBus::Implementation

Included in:
MessageBus, Instance
Defined in:
lib/message_bus.rb

Overview

The main server-side interface to a message bus for the purposes of configuration, publishing and subscribing

Defined Under Namespace

Classes: Synchronizer

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#configHash<Symbol => Object> (readonly) Also known as: redis_config



30
31
32
# File 'lib/message_bus.rb', line 30

def config
  @config
end

Instance Method Details

#after_forkvoid

This method returns an undefined value.

Performs routines that are necessary after a process fork, typically

triggered by a forking webserver. Performs whatever the backend requires
and ensures the server is listening for publications and running
scheduled tasks.


537
538
539
540
541
542
# File 'lib/message_bus.rb', line 537

def after_fork
  reliable_pub_sub.after_fork
  ensure_subscriber_thread
  # will ensure timer is running
  timer.queue {}
end

#allow_broadcast=(val) ⇒ void



266
267
268
# File 'lib/message_bus.rb', line 266

def allow_broadcast=(val)
  configure(allow_broadcast: val)
end

#allow_broadcast?Boolean



272
273
274
275
276
277
278
279
# File 'lib/message_bus.rb', line 272

def allow_broadcast?
  @config[:allow_broadcast] ||=
    if defined? ::Rails
      ::Rails.env.test? || ::Rails.env.development?
    else
      false
    end
end

#backendSymbol



306
307
308
# File 'lib/message_bus.rb', line 306

def backend
  @config[:backend] || :redis
end

#backlog(channel = nil, last_id = nil, site_id = nil) ⇒ Array<MessageBus::Message>

Get messages from a channel backlog since the last ID specified, filtered by site



476
477
478
479
480
481
482
483
484
485
486
487
488
# File 'lib/message_bus.rb', line 476

def backlog(channel = nil, last_id = nil, site_id = nil)
  old =
    if channel
      reliable_pub_sub.backlog(encode_channel_name(channel, site_id), last_id)
    else
      reliable_pub_sub.global_backlog(last_id)
    end

  old.each do |m|
    decode_message!(m)
  end
  old
end

#base_routeString



162
163
164
# File 'lib/message_bus.rb', line 162

def base_route
  @config[:base_route] || "/"
end

#base_route=(route) ⇒ void



155
156
157
# File 'lib/message_bus.rb', line 155

def base_route=(route)
  configure(base_route: route.gsub(Regexp.new('\A(?!/)|(?<!/)\Z|//+'), "/"))
end

#blocking_subscribe(channel = nil) {|message| ... } ⇒ void

This method returns an undefined value.

Subscribe to messages. Each message will be delivered by yielding to the passed block as soon as it is available. This will block until subscription is terminated.

Yields:

  • (message)

    a message-handler block

Yield Parameters:



396
397
398
399
400
401
402
# File 'lib/message_bus.rb', line 396

def blocking_subscribe(channel = nil, &blk)
  if channel
    reliable_pub_sub.subscribe(encode_channel_name(channel), &blk)
  else
    reliable_pub_sub.global_subscribe(&blk)
  end
end

#cache_assetsBoolean



49
50
51
52
53
54
55
# File 'lib/message_bus.rb', line 49

def cache_assets
  if defined? @config[:cache_assets]
    @config[:cache_assets]
  else
    true
  end
end

#cache_assets=(val) ⇒ void



44
45
46
# File 'lib/message_bus.rb', line 44

def cache_assets=(val)
  configure(cache_assets: val)
end

#chunked_encoding_enabled=(val) ⇒ void



83
84
85
# File 'lib/message_bus.rb', line 83

def chunked_encoding_enabled=(val)
  configure(chunked_encoding_enabled: val)
end

#chunked_encoding_enabled?Boolean



77
78
79
# File 'lib/message_bus.rb', line 77

def chunked_encoding_enabled?
  @config[:chunked_encoding_enabled] == false ? false : true
end

#client_message_filtersArray



597
598
599
600
# File 'lib/message_bus.rb', line 597

def client_message_filters
  configure(client_message_filters: []) if !@config[:client_message_filters]
  @config[:client_message_filters]
end

#configure(config) ⇒ void

This method returns an undefined value.

Overrides existing configuration



186
187
188
# File 'lib/message_bus.rb', line 186

def configure(config)
  @config.merge!(config)
end

#destroyvoid

This method returns an undefined value.

Stops listening for publications and stops executing scheduled tasks. Mostly used in tests to detroy entire bus.



517
518
519
520
521
522
523
524
525
526
527
528
529
530
# File 'lib/message_bus.rb', line 517

def destroy
  return if @destroyed

  reliable_pub_sub.global_unsubscribe

  @mutex.synchronize do
    return if @destroyed

    @subscriptions ||= {}
    @destroyed = true
  end
  @subscriber_thread.join if @subscriber_thread
  timer.stop
end

#enable_diagnosticsvoid

This method returns an undefined value.

Enables diagnostics tracking



312
313
314
# File 'lib/message_bus.rb', line 312

def enable_diagnostics
  MessageBus::Diagnostics.enable(self)
end

#extra_response_headers_lookup {|env| ... } ⇒ void

This method returns an undefined value.

Yields:

  • (env)

    a routine to determine extra headers to be set on a subscriber response

Yield Parameters:

  • env (Rack::Request::Env)

    the subscriber request environment

Yield Returns:

  • (Hash<String => String>)

    the extra headers to set on the response



249
250
251
252
# File 'lib/message_bus.rb', line 249

def extra_response_headers_lookup(&blk)
  configure(extra_response_headers_lookup: blk) if blk
  @config[:extra_response_headers_lookup]
end

#global_backlog(last_id = nil) ⇒ Array<MessageBus::Message>

Get messages from the global backlog since the last ID specified



465
466
467
# File 'lib/message_bus.rb', line 465

def global_backlog(last_id = nil)
  backlog(nil, last_id)
end

#group_ids_lookup {|env| ... } ⇒ void

This method returns an undefined value.

Yields:

  • (env)

    a routine to determine the group IDs for a subscriber

Yield Parameters:

  • env (optional, Rack::Request::Env)

    the subscriber request environment

Yield Returns:

  • (optional, Array<String,Integer>)

    the group IDs for the subscriber



221
222
223
224
# File 'lib/message_bus.rb', line 221

def group_ids_lookup(&blk)
  configure(group_ids_lookup: blk) if blk
  @config[:group_ids_lookup]
end

#initializeObject



37
38
39
40
# File 'lib/message_bus.rb', line 37

def initialize
  @config = {}
  @mutex = Synchronizer.new
end

#is_admin_lookup {|env| ... } ⇒ void

This method returns an undefined value.

Yields:

  • (env)

    a routine to determine if a request comes from an admin user

Yield Parameters:

  • env (Rack::Request::Env)

    the subscriber request environment

Yield Returns:

  • (Boolean)

    whether or not the request is from an admin user



230
231
232
233
# File 'lib/message_bus.rb', line 230

def is_admin_lookup(&blk)
  configure(is_admin_lookup: blk) if blk
  @config[:is_admin_lookup]
end

#keepalive_intervalInteger



578
579
580
# File 'lib/message_bus.rb', line 578

def keepalive_interval
  @config[:keepalive_interval] || 60
end

#keepalive_interval=(interval) ⇒ Object



572
573
574
# File 'lib/message_bus.rb', line 572

def keepalive_interval=(interval)
  configure(keepalive_interval: interval)
end

#last_id(channel, site_id = nil) ⇒ Integer

Get the ID of the last message published on a channel, filtered by site



496
497
498
# File 'lib/message_bus.rb', line 496

def last_id(channel, site_id = nil)
  reliable_pub_sub.last_id(encode_channel_name(channel, site_id))
end

#last_message(channel) ⇒ MessageBus::Message

Get the last message published on a channel



505
506
507
508
509
510
511
512
# File 'lib/message_bus.rb', line 505

def last_message(channel)
  if last_id = last_id(channel)
    messages = backlog(channel, last_id - 1)
    if messages
      messages[0]
    end
  end
end

#listening?Boolean



546
547
548
# File 'lib/message_bus.rb', line 546

def listening?
  @subscriber_thread && @subscriber_thread.alive?
end

#local_subscribe(channel = nil, last_id = -1,) {|message| ... } ⇒ Proc

Subscribe to messages on a particular channel, filtered by the current site (@see #site_id_lookup). Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will not block, but instead the callbacks will be executed asynchronously in a dedicated subscriber thread.

Yields:

  • (message)

    a message-handler block

Yield Parameters:



433
434
435
436
# File 'lib/message_bus.rb', line 433

def local_subscribe(channel = nil, last_id = -1, &blk)
  site_id = site_id_lookup.call if site_id_lookup && !global?(channel)
  subscribe_impl(channel, site_id, last_id, &blk)
end

#local_unsubscribe(channel = nil, &blk) ⇒ void

This method returns an undefined value.

Removes a subscription to a particular channel, filtered by the current site (@see #site_id_lookup).



455
456
457
458
# File 'lib/message_bus.rb', line 455

def local_unsubscribe(channel = nil, &blk)
  site_id = site_id_lookup.call if site_id_lookup
  unsubscribe_impl(channel, site_id, &blk)
end

#loggerLogger



65
66
67
68
69
70
71
72
73
# File 'lib/message_bus.rb', line 65

def logger
  return @config[:logger] if @config[:logger]

  require 'logger'
  logger = Logger.new(STDOUT)
  logger.level = Logger::INFO
  configure(logger: logger)
  logger
end

#logger=(logger) ⇒ void



59
60
61
# File 'lib/message_bus.rb', line 59

def logger=(logger)
  configure(logger: logger)
end

#long_polling_enabled=(val) ⇒ void



95
96
97
# File 'lib/message_bus.rb', line 95

def long_polling_enabled=(val)
  configure(long_polling_enabled: val)
end

#long_polling_enabled?Boolean



89
90
91
# File 'lib/message_bus.rb', line 89

def long_polling_enabled?
  @config[:long_polling_enabled] == false ? false : true
end

#long_polling_intervalInteger



149
150
151
# File 'lib/message_bus.rb', line 149

def long_polling_interval
  @config[:long_polling_interval] || 25 * 1000
end

#long_polling_interval=(millisecs) ⇒ void



143
144
145
# File 'lib/message_bus.rb', line 143

def long_polling_interval=(millisecs)
  configure(long_polling_interval: millisecs)
end

#max_active_clientsInteger



109
110
111
# File 'lib/message_bus.rb', line 109

def max_active_clients
  @config[:max_active_clients] || 1000
end

#max_active_clients=(val) ⇒ void



102
103
104
# File 'lib/message_bus.rb', line 102

def max_active_clients=(val)
  configure(max_active_clients: val)
end

#offvoid

This method returns an undefined value.

Disables publication to the bus



173
174
175
# File 'lib/message_bus.rb', line 173

def off
  @off = true
end

#off?Boolean



167
168
169
# File 'lib/message_bus.rb', line 167

def off?
  @off
end

#onvoid

This method returns an undefined value.

Enables publication to the bus



179
180
181
# File 'lib/message_bus.rb', line 179

def on
  @destroyed = @off = false
end

#on_connect(&blk) ⇒ Object



254
255
256
257
# File 'lib/message_bus.rb', line 254

def on_connect(&blk)
  configure(on_connect: blk) if blk
  @config[:on_connect]
end

#on_disconnect(&blk) ⇒ Object



259
260
261
262
# File 'lib/message_bus.rb', line 259

def on_disconnect(&blk)
  configure(on_disconnect: blk) if blk
  @config[:on_disconnect]
end

#on_middleware_error {|env, e| ... } ⇒ void

This method returns an undefined value.

Yields:

  • (env, e)

    a routine to handle exceptions raised when handling a subscriber request

Yield Parameters:

  • env (Rack::Request::Env)

    the subscriber request environment

  • e (Exception)

    the exception that was raised

Yield Returns:

  • (optional, Array<(Integer,Hash,Array)>)

    a Rack response to be delivered



240
241
242
243
# File 'lib/message_bus.rb', line 240

def on_middleware_error(&blk)
  configure(on_middleware_error: blk) if blk
  @config[:on_middleware_error]
end

#publish(channel, data, opts = nil) ⇒ Integer

Publishes a message to a channel

Options Hash (opts):

  • :client_ids (Array<String>) — default: `nil`

    the unique client IDs to which the message should be available. If nil, available to all.

  • :user_ids (Array<String,Integer>) — default: `nil`

    the user IDs to which the message should be available. If nil, available to all.

  • :group_ids (Array<String,Integer>) — default: `nil`

    the group IDs to which the message should be available. If nil, available to all.

  • :site_id (String) — default: `nil`

    the site ID to scope the message to; used for hosting multiple applications or instances of an application against a single message_bus

  • :max_backlog_age (nil, Integer)

    the longest amount of time a message may live in a backlog before beging removed, in seconds

  • :max_backlog_size (nil, Integer)

    the largest permitted size (number of messages) for the channel backlog; beyond this capacity, old messages will be dropped

Raises:



334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
# File 'lib/message_bus.rb', line 334

def publish(channel, data, opts = nil)
  return if @off

  @mutex.synchronize do
    raise ::MessageBus::BusDestroyed if @destroyed
  end

  user_ids = nil
  group_ids = nil
  client_ids = nil

  site_id = nil
  if opts
    user_ids = opts[:user_ids]
    group_ids = opts[:group_ids]
    client_ids = opts[:client_ids]
    site_id = opts[:site_id]
  end

  if (user_ids || group_ids) && global?(channel)
    raise ::MessageBus::InvalidMessage
  end

  if (user_ids == []) || (group_ids == []) || (client_ids == [])
    raise ::MessageBus::InvalidMessageTarget
  end

  encoded_data = JSON.dump(
    data: data,
    user_ids: user_ids,
    group_ids: group_ids,
    client_ids: client_ids
  )

  channel_opts = {}

  if opts
    if ((age = opts[:max_backlog_age]) || (size = opts[:max_backlog_size]))
      channel_opts[:max_backlog_size] = size
      channel_opts[:max_backlog_age] = age
    end

    if opts.has_key?(:queue_in_memory)
      channel_opts[:queue_in_memory] = opts[:queue_in_memory]
    end
  end

  encoded_channel_name = encode_channel_name(channel, site_id)
  reliable_pub_sub.publish(encoded_channel_name, encoded_data, channel_opts)
end

#rack_hijack_enabled=(val) ⇒ void



137
138
139
# File 'lib/message_bus.rb', line 137

def rack_hijack_enabled=(val)
  configure(rack_hijack_enabled: val)
end

#rack_hijack_enabled?Boolean



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/message_bus.rb', line 116

def rack_hijack_enabled?
  if @config[:rack_hijack_enabled].nil?
    enable = true

    # without this switch passenger will explode
    # it will run out of connections after about 10
    if defined? PhusionPassenger
      enable = false
      if PhusionPassenger.respond_to? :advertised_concurrency_level
        PhusionPassenger.advertised_concurrency_level = 0
        enable = true
      end
    end
    configure(rack_hijack_enabled: enable)
  end

  @config[:rack_hijack_enabled]
end

#redis_config=(config) ⇒ void

This method returns an undefined value.

Overrides existing configuration, explicitly enabling the redis backend



193
194
195
# File 'lib/message_bus.rb', line 193

def redis_config=(config)
  configure(config.merge(backend: :redis))
end

#register_client_message_filter(channel_prefix) {|message| ... } ⇒ void

This method returns an undefined value.

Registers a client message filter that allows messages to be filtered from the client.

Yield Parameters:

Yield Returns:

  • (Boolean)

    whether the message should be published to the client



589
590
591
592
593
594
# File 'lib/message_bus.rb', line 589

def register_client_message_filter(channel_prefix, &blk)
  if blk
    configure(client_message_filters: []) if !@config[:client_message_filters]
    @config[:client_message_filters] << [channel_prefix, blk]
  end
end

#reliable_pub_subMessageBus::Backend::Base



289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# File 'lib/message_bus.rb', line 289

def reliable_pub_sub
  @mutex.synchronize do
    return nil if @destroyed

    # Make sure logger is loaded before config is
    # passed to backend.
    logger

    @config[:reliable_pub_sub] ||= begin
      @config[:backend_options] ||= {}
      require "message_bus/backends/#{backend}"
      MessageBus::BACKENDS[backend].new @config
    end
  end
end

#reliable_pub_sub=(pub_sub) ⇒ void



283
284
285
# File 'lib/message_bus.rb', line 283

def reliable_pub_sub=(pub_sub)
  configure(reliable_pub_sub: pub_sub)
end

#reset!Object



551
552
553
# File 'lib/message_bus.rb', line 551

def reset!
  reliable_pub_sub.reset! if reliable_pub_sub
end

#site_id_lookup {|env| ... } ⇒ void

This method returns an undefined value.

Yields:

  • (env)

    a routine to determine the site ID for a subscriber

Yield Parameters:

  • env (optional, Rack::Request::Env)

    the subscriber request environment

Yield Returns:

  • (optional, String)

    the site ID for the subscriber



203
204
205
206
# File 'lib/message_bus.rb', line 203

def site_id_lookup(&blk)
  configure(site_id_lookup: blk) if blk
  @config[:site_id_lookup]
end

#subscribe(channel = nil, last_id = -1,) {|message| ... } ⇒ Proc

Subscribe to messages on a particular channel. Each message since the last ID specified will be delivered by yielding to the passed block as soon as it is available. This will not block, but instead the callbacks will be executed asynchronously in a dedicated subscriber thread.

Yields:

  • (message)

    a message-handler block

Yield Parameters:



416
417
418
# File 'lib/message_bus.rb', line 416

def subscribe(channel = nil, last_id = -1, &blk)
  subscribe_impl(channel, nil, last_id, &blk)
end

#timerMessageBus::TimerThread



557
558
559
560
561
562
563
564
565
566
567
# File 'lib/message_bus.rb', line 557

def timer
  return @timer_thread if @timer_thread

  @timer_thread ||= begin
    t = MessageBus::TimerThread.new
    t.on_error do |e|
      logger.warn "Failed to process job: #{e} #{e.backtrace}"
    end
    t
  end
end

#unsubscribe(channel = nil, &blk) ⇒ void

This method returns an undefined value.

Removes a subscription to a particular channel.



444
445
446
# File 'lib/message_bus.rb', line 444

def unsubscribe(channel = nil, &blk)
  unsubscribe_impl(channel, nil, &blk)
end

#user_id_lookup {|env| ... } ⇒ void

This method returns an undefined value.

Yields:

  • (env)

    a routine to determine the user ID for a subscriber (authenticate)

Yield Parameters:

  • env (optional, Rack::Request::Env)

    the subscriber request environment

Yield Returns:

  • (optional, String, Integer)

    the user ID for the subscriber



212
213
214
215
# File 'lib/message_bus.rb', line 212

def user_id_lookup(&blk)
  configure(user_id_lookup: blk) if blk
  @config[:user_id_lookup]
end