Module: MessageBus::Implementation

Included in:
MessageBus, Instance
Defined in:
lib/message_bus.rb

Defined Under Namespace

Classes: Synchronizer

Constant Summary collapse

ENCODE_SITE_TOKEN =
"$|$"

Instance Method Summary collapse

Instance Method Details

#after_forkObject



304
305
306
307
308
309
# File 'lib/message_bus.rb', line 304

def after_fork
  reliable_pub_sub.after_fork
  ensure_subscriber_thread
  # will ensure timer is running
  timer.queue{}
end

#allow_broadcast=(val) ⇒ Object



168
169
170
# File 'lib/message_bus.rb', line 168

def allow_broadcast=(val)
  @allow_broadcast = val
end

#allow_broadcast?Boolean

Returns:

  • (Boolean)


172
173
174
175
176
177
178
179
# File 'lib/message_bus.rb', line 172

def allow_broadcast?
  @allow_broadcast ||=
    if defined? ::Rails
      ::Rails.env.test? || ::Rails.env.development?
    else
      false
    end
end

#around_client_batch(channel, &blk) ⇒ Object



152
153
154
155
156
# File 'lib/message_bus.rb', line 152

def around_client_batch(channel, &blk)
  @around_client_batches ||= {}
  @around_client_batches[channel] = blk if blk
  @around_client_batches[channel]
end

#backlog(channel = nil, last_id) ⇒ Object



267
268
269
270
271
272
273
274
275
276
277
278
279
# File 'lib/message_bus.rb', line 267

def backlog(channel=nil, last_id)
  old =
    if channel
      reliable_pub_sub.backlog(encode_channel_name(channel), last_id)
    else
      reliable_pub_sub.global_backlog(last_id)
    end

  old.each{ |m|
    decode_message!(m)
  }
  old
end

#blocking_subscribe(channel = nil, &blk) ⇒ Object



224
225
226
227
228
229
230
# File 'lib/message_bus.rb', line 224

def blocking_subscribe(channel=nil, &blk)
  if channel
    reliable_pub_sub.subscribe(encode_channel_name(channel), &blk)
  else
    reliable_pub_sub.global_subscribe(&blk)
  end
end

#cache_assetsObject



38
39
40
41
42
43
44
# File 'lib/message_bus.rb', line 38

def cache_assets
  if defined? @cache_assets
    @cache_assets
  else
    true
  end
end

#cache_assets=(val) ⇒ Object



34
35
36
# File 'lib/message_bus.rb', line 34

def cache_assets=(val)
  @cache_assets = val
end

#client_filter(channel, &blk) ⇒ Object



146
147
148
149
150
# File 'lib/message_bus.rb', line 146

def client_filter(channel, &blk)
  @client_filters ||= {}
  @client_filters[channel] = blk if blk
  @client_filters[channel]
end

#decode_channel_name(channel) ⇒ Object



244
245
246
# File 'lib/message_bus.rb', line 244

def decode_channel_name(channel)
  channel.split(ENCODE_SITE_TOKEN)
end

#destroyObject



294
295
296
297
298
299
300
301
302
# File 'lib/message_bus.rb', line 294

def destroy
  @mutex.synchronize do
    @subscriptions ||= {}
    reliable_pub_sub.global_unsubscribe
    @destroyed = true
  end
  @subscriber_thread.join if @subscriber_thread
  timer.stop
end

#enable_diagnosticsObject



192
193
194
# File 'lib/message_bus.rb', line 192

def enable_diagnostics
  MessageBus::Diagnostics.enable
end

#encode_channel_name(channel) ⇒ Object

encode channel name to include site



235
236
237
238
239
240
241
242
# File 'lib/message_bus.rb', line 235

def encode_channel_name(channel)
  if site_id_lookup && !global?(channel)
    raise ArgumentError.new channel if channel.include? ENCODE_SITE_TOKEN
    "#{channel}#{ENCODE_SITE_TOKEN}#{site_id_lookup.call}"
  else
    channel
  end
end

#extra_response_headers_lookup(&blk) ⇒ Object



141
142
143
144
# File 'lib/message_bus.rb', line 141

def extra_response_headers_lookup(&blk)
  @extra_response_headers_lookup = blk if blk
  @extra_response_headers_lookup
end

#group_ids_lookup(&blk) ⇒ Object



131
132
133
134
# File 'lib/message_bus.rb', line 131

def group_ids_lookup(&blk)
  @group_ids_lookup = blk if blk
  @group_ids_lookup
end

#initializeObject



30
31
32
# File 'lib/message_bus.rb', line 30

def initialize
  @mutex = Synchronizer.new
end

#is_admin_lookup(&blk) ⇒ Object



136
137
138
139
# File 'lib/message_bus.rb', line 136

def is_admin_lookup(&blk)
  @is_admin_lookup = blk if blk
  @is_admin_lookup
end

#last_id(channel) ⇒ Object



281
282
283
# File 'lib/message_bus.rb', line 281

def last_id(channel)
  reliable_pub_sub.last_id(encode_channel_name(channel))
end

#last_message(channel) ⇒ Object



285
286
287
288
289
290
291
292
# File 'lib/message_bus.rb', line 285

def last_message(channel)
  if last_id = last_id(channel)
    messages = backlog(channel, last_id-1)
    if messages
      messages[0]
    end
  end
end

#listening?Boolean

Returns:

  • (Boolean)


311
312
313
# File 'lib/message_bus.rb', line 311

def listening?
  @subscriber_thread && @subscriber_thread.alive?
end

#local_subscribe(channel = nil, &blk) ⇒ Object

subscribe only on current site



262
263
264
265
# File 'lib/message_bus.rb', line 262

def local_subscribe(channel=nil, &blk)
  site_id = site_id_lookup.call if site_id_lookup && ! global?(channel)
  subscribe_impl(channel, site_id, &blk)
end

#local_unsubscribe(channel = nil, &blk) ⇒ Object



256
257
258
259
# File 'lib/message_bus.rb', line 256

def local_unsubscribe(channel=nil, &blk)
  site_id = site_id_lookup.call if site_id_lookup
  unsubscribe_impl(channel, site_id, &blk)
end

#loggerObject



50
51
52
53
54
# File 'lib/message_bus.rb', line 50

def logger
  return @logger if @logger
  require 'logger'
  @logger = Logger.new(STDOUT)
end

#logger=(logger) ⇒ Object



46
47
48
# File 'lib/message_bus.rb', line 46

def logger=(logger)
  @logger = logger
end

#long_polling_enabled=(val) ⇒ Object



60
61
62
# File 'lib/message_bus.rb', line 60

def long_polling_enabled=(val)
  @long_polling_enabled = val
end

#long_polling_enabled?Boolean

Returns:

  • (Boolean)


56
57
58
# File 'lib/message_bus.rb', line 56

def long_polling_enabled?
  @long_polling_enabled == false ? false : true
end

#long_polling_intervalObject



100
101
102
# File 'lib/message_bus.rb', line 100

def long_polling_interval
  @long_polling_interval || 25 * 1000
end

#long_polling_interval=(millisecs) ⇒ Object



96
97
98
# File 'lib/message_bus.rb', line 96

def long_polling_interval=(millisecs)
  @long_polling_interval = millisecs
end

#max_active_clientsObject



70
71
72
# File 'lib/message_bus.rb', line 70

def max_active_clients
  @max_active_clients || 1000
end

#max_active_clients=(val) ⇒ Object

The number of simultanuous clients we can service

will revert to polling if we are out of slots


66
67
68
# File 'lib/message_bus.rb', line 66

def max_active_clients=(val)
  @max_active_clients = val
end

#offObject



104
105
106
# File 'lib/message_bus.rb', line 104

def off
  @off = true
end

#onObject



108
109
110
# File 'lib/message_bus.rb', line 108

def on
  @off = false
end

#on_connect(&blk) ⇒ Object



158
159
160
161
# File 'lib/message_bus.rb', line 158

def on_connect(&blk)
  @on_connect = blk if blk
  @on_connect
end

#on_disconnect(&blk) ⇒ Object



163
164
165
166
# File 'lib/message_bus.rb', line 163

def on_disconnect(&blk)
  @on_disconnect = blk if blk
  @on_disconnect
end

#publish(channel, data, opts = nil) ⇒ Object



196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
# File 'lib/message_bus.rb', line 196

def publish(channel, data, opts = nil)
  return if @off
  @mutex.synchronize do
    raise ::MessageBus::BusDestroyed if @destroyed
  end

  user_ids = nil
  group_ids = nil
  client_ids = nil

  if opts
    user_ids = opts[:user_ids]
    group_ids = opts[:group_ids]
    client_ids = opts[:client_ids]
  end

  raise ::MessageBus::InvalidMessage if (user_ids || group_ids) && global?(channel)

  encoded_data = JSON.dump({
    data: data,
    user_ids: user_ids,
    group_ids: group_ids,
    client_ids: client_ids
  })

  reliable_pub_sub.publish(encode_channel_name(channel), encoded_data)
end

#rack_hijack_enabled=(val) ⇒ Object



92
93
94
# File 'lib/message_bus.rb', line 92

def rack_hijack_enabled=(val)
  @rack_hijack_enabled = val
end

#rack_hijack_enabled?Boolean

Returns:

  • (Boolean)


74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/message_bus.rb', line 74

def rack_hijack_enabled?
  if @rack_hijack_enabled.nil?
    @rack_hijack_enabled = true

    # without this switch passenger will explode
    # it will run out of connections after about 10
    if defined? PhusionPassenger
      @rack_hijack_enabled = false
      if PhusionPassenger.respond_to? :advertised_concurrency_level
        PhusionPassenger.advertised_concurrency_level = 0
        @rack_hijack_enabled = true
      end
    end
  end

  @rack_hijack_enabled
end

#redis_configObject



117
118
119
# File 'lib/message_bus.rb', line 117

def redis_config
  @redis_config ||= {}
end

#redis_config=(config) ⇒ Object

Allow us to inject a redis db



113
114
115
# File 'lib/message_bus.rb', line 113

def redis_config=(config)
  @redis_config = config
end

#reliable_pub_subObject



185
186
187
188
189
190
# File 'lib/message_bus.rb', line 185

def reliable_pub_sub
  @mutex.synchronize do
    return nil if @destroyed
    @reliable_pub_sub ||= MessageBus::Redis::ReliablePubSub.new redis_config
  end
end

#reliable_pub_sub=(pub_sub) ⇒ Object



181
182
183
# File 'lib/message_bus.rb', line 181

def reliable_pub_sub=(pub_sub)
  @reliable_pub_sub = pub_sub
end

#reset!Object

will reset all keys



316
317
318
# File 'lib/message_bus.rb', line 316

def reset!
  reliable_pub_sub.reset!
end

#site_id_lookup(&blk) ⇒ Object



121
122
123
124
# File 'lib/message_bus.rb', line 121

def site_id_lookup(&blk)
  @site_id_lookup = blk if blk
  @site_id_lookup
end

#subscribe(channel = nil, &blk) ⇒ Object



248
249
250
# File 'lib/message_bus.rb', line 248

def subscribe(channel=nil, &blk)
  subscribe_impl(channel, nil, &blk)
end

#timerObject



320
321
322
323
324
325
326
327
328
329
# File 'lib/message_bus.rb', line 320

def timer
  return @timer_thread if @timer_thread
  @timer_thread ||= begin
    t = MessageBus::TimerThread.new
    t.on_error do |e|
      logger.warn "Failed to process job: #{e} #{e.backtrace}"
    end
    t
  end
end

#unsubscribe(channel = nil, &blk) ⇒ Object



252
253
254
# File 'lib/message_bus.rb', line 252

def unsubscribe(channel=nil, &blk)
  unsubscribe_impl(channel, nil, &blk)
end

#user_id_lookup(&blk) ⇒ Object



126
127
128
129
# File 'lib/message_bus.rb', line 126

def user_id_lookup(&blk)
  @user_id_lookup = blk if blk
  @user_id_lookup
end