Module: MessageBus::Implementation

Included in:
MessageBus, Instance
Defined in:
lib/message_bus.rb

Defined Under Namespace

Classes: Synchronizer

Constant Summary collapse

ENCODE_SITE_TOKEN =
"$|$"

Instance Method Summary collapse

Instance Method Details

#access_control_allow_originObject



148
149
150
# File 'lib/message_bus.rb', line 148

def access_control_allow_origin
  @access_control_allow_origin
end

#access_control_allow_origin=(origin) ⇒ Object



144
145
146
# File 'lib/message_bus.rb', line 144

def access_control_allow_origin=(origin)
  @access_control_allow_origin = origin
end

#after_forkObject



301
302
303
304
# File 'lib/message_bus.rb', line 301

def after_fork
  reliable_pub_sub.after_fork
  ensure_subscriber_thread
end

#allow_broadcast=(val) ⇒ Object



174
175
176
# File 'lib/message_bus.rb', line 174

def allow_broadcast=(val)
  @allow_broadcast = val
end

#allow_broadcast?Boolean

Returns:

  • (Boolean)


178
179
180
181
182
183
184
185
# File 'lib/message_bus.rb', line 178

def allow_broadcast?
  @allow_broadcast ||=
    if defined? ::Rails
      ::Rails.env.test? || ::Rails.env.development?
    else
      false
    end
end

#around_client_batch(channel, &blk) ⇒ Object



158
159
160
161
162
# File 'lib/message_bus.rb', line 158

def around_client_batch(channel, &blk)
  @around_client_batches ||= {}
  @around_client_batches[channel] = blk if blk
  @around_client_batches[channel]
end

#backlog(channel = nil, last_id) ⇒ Object



265
266
267
268
269
270
271
272
273
274
275
276
277
# File 'lib/message_bus.rb', line 265

def backlog(channel=nil, last_id)
  old =
    if channel
      reliable_pub_sub.backlog(encode_channel_name(channel), last_id)
    else
      reliable_pub_sub.global_backlog(last_id)
    end

  old.each{ |m|
    decode_message!(m)
  }
  old
end

#blocking_subscribe(channel = nil, &blk) ⇒ Object



222
223
224
225
226
227
228
# File 'lib/message_bus.rb', line 222

def blocking_subscribe(channel=nil, &blk)
  if channel
    reliable_pub_sub.subscribe(encode_channel_name(channel), &blk)
  else
    reliable_pub_sub.global_subscribe(&blk)
  end
end

#cache_assetsObject



41
42
43
44
45
46
47
# File 'lib/message_bus.rb', line 41

def cache_assets
  if defined? @cache_assets
    @cache_assets
  else
    true
  end
end

#cache_assets=(val) ⇒ Object



37
38
39
# File 'lib/message_bus.rb', line 37

def cache_assets=(val)
  @cache_assets = val
end

#client_filter(channel, &blk) ⇒ Object



152
153
154
155
156
# File 'lib/message_bus.rb', line 152

def client_filter(channel, &blk)
  @client_filters ||= {}
  @client_filters[channel] = blk if blk
  @client_filters[channel]
end

#decode_channel_name(channel) ⇒ Object



242
243
244
# File 'lib/message_bus.rb', line 242

def decode_channel_name(channel)
  channel.split(ENCODE_SITE_TOKEN)
end

#destroyObject



292
293
294
295
296
297
298
299
# File 'lib/message_bus.rb', line 292

def destroy
  @mutex.synchronize do
    @subscriptions ||= {}
    reliable_pub_sub.global_unsubscribe
    @destroyed = true
  end
  @subscriber_thread.join if @subscriber_thread
end

#enable_diagnosticsObject



194
195
196
# File 'lib/message_bus.rb', line 194

def enable_diagnostics
  MessageBus::Diagnostics.enable
end

#encode_channel_name(channel) ⇒ Object

encode channel name to include site



233
234
235
236
237
238
239
240
# File 'lib/message_bus.rb', line 233

def encode_channel_name(channel)
  if site_id_lookup && !global?(channel)
    raise ArgumentError.new channel if channel.include? ENCODE_SITE_TOKEN
    "#{channel}#{ENCODE_SITE_TOKEN}#{site_id_lookup.call}"
  else
    channel
  end
end

#group_ids_lookup(&blk) ⇒ Object



134
135
136
137
# File 'lib/message_bus.rb', line 134

def group_ids_lookup(&blk)
  @group_ids_lookup = blk if blk
  @group_ids_lookup
end

#initializeObject



33
34
35
# File 'lib/message_bus.rb', line 33

def initialize
  @mutex = Synchronizer.new
end

#is_admin_lookup(&blk) ⇒ Object



139
140
141
142
# File 'lib/message_bus.rb', line 139

def is_admin_lookup(&blk)
  @is_admin_lookup = blk if blk
  @is_admin_lookup
end

#last_id(channel) ⇒ Object



279
280
281
# File 'lib/message_bus.rb', line 279

def last_id(channel)
  reliable_pub_sub.last_id(encode_channel_name(channel))
end

#last_message(channel) ⇒ Object



283
284
285
286
287
288
289
290
# File 'lib/message_bus.rb', line 283

def last_message(channel)
  if last_id = last_id(channel)
    messages = backlog(channel, last_id-1)
    if messages
      messages[0]
    end
  end
end

#listening?Boolean

Returns:

  • (Boolean)


306
307
308
# File 'lib/message_bus.rb', line 306

def listening?
  @subscriber_thread && @subscriber_thread.alive?
end

#local_subscribe(channel = nil, &blk) ⇒ Object

subscribe only on current site



260
261
262
263
# File 'lib/message_bus.rb', line 260

def local_subscribe(channel=nil, &blk)
  site_id = site_id_lookup.call if site_id_lookup && ! global?(channel)
  subscribe_impl(channel, site_id, &blk)
end

#local_unsubscribe(channel = nil, &blk) ⇒ Object



254
255
256
257
# File 'lib/message_bus.rb', line 254

def local_unsubscribe(channel=nil, &blk)
  site_id = site_id_lookup.call if site_id_lookup
  unsubscribe_impl(channel, site_id, &blk)
end

#loggerObject



53
54
55
56
57
# File 'lib/message_bus.rb', line 53

def logger
  return @logger if @logger
  require 'logger'
  @logger = Logger.new(STDOUT)
end

#logger=(logger) ⇒ Object



49
50
51
# File 'lib/message_bus.rb', line 49

def logger=(logger)
  @logger = logger
end

#long_polling_enabled=(val) ⇒ Object



63
64
65
# File 'lib/message_bus.rb', line 63

def long_polling_enabled=(val)
  @long_polling_enabled = val
end

#long_polling_enabled?Boolean

Returns:

  • (Boolean)


59
60
61
# File 'lib/message_bus.rb', line 59

def long_polling_enabled?
  @long_polling_enabled == false ? false : true
end

#long_polling_intervalObject



103
104
105
# File 'lib/message_bus.rb', line 103

def long_polling_interval
  @long_polling_interval || 25 * 1000
end

#long_polling_interval=(millisecs) ⇒ Object



99
100
101
# File 'lib/message_bus.rb', line 99

def long_polling_interval=(millisecs)
  @long_polling_interval = millisecs
end

#max_active_clientsObject



73
74
75
# File 'lib/message_bus.rb', line 73

def max_active_clients
  @max_active_clients || 1000
end

#max_active_clients=(val) ⇒ Object

The number of simultanuous clients we can service

will revert to polling if we are out of slots


69
70
71
# File 'lib/message_bus.rb', line 69

def max_active_clients=(val)
  @max_active_clients = val
end

#offObject



107
108
109
# File 'lib/message_bus.rb', line 107

def off
  @off = true
end

#onObject



111
112
113
# File 'lib/message_bus.rb', line 111

def on
  @off = false
end

#on_connect(&blk) ⇒ Object



164
165
166
167
# File 'lib/message_bus.rb', line 164

def on_connect(&blk)
  @on_connect = blk if blk
  @on_connect
end

#on_disconnect(&blk) ⇒ Object



169
170
171
172
# File 'lib/message_bus.rb', line 169

def on_disconnect(&blk)
  @on_disconnect = blk if blk
  @on_disconnect
end

#publish(channel, data, opts = nil) ⇒ Object



198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/message_bus.rb', line 198

def publish(channel, data, opts = nil)
  return if @off
  @mutex.synchronize do
    raise ::MessageBus::BusDestroyed if @destroyed
  end

  user_ids = nil
  group_ids = nil
  if opts
    user_ids = opts[:user_ids]
    group_ids = opts[:group_ids]
  end

  raise ::MessageBus::InvalidMessage if (user_ids || group_ids) && global?(channel)

  encoded_data = JSON.dump({
    data: data,
    user_ids: user_ids,
    group_ids: group_ids
  })

  reliable_pub_sub.publish(encode_channel_name(channel), encoded_data)
end

#rack_hijack_enabled=(val) ⇒ Object



95
96
97
# File 'lib/message_bus.rb', line 95

def rack_hijack_enabled=(val)
  @rack_hijack_enabled = val
end

#rack_hijack_enabled?Boolean

Returns:

  • (Boolean)


77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/message_bus.rb', line 77

def rack_hijack_enabled?
  if @rack_hijack_enabled.nil?
    @rack_hijack_enabled = true

    # without this switch passenger will explode
    # it will run out of connections after about 10
    if defined? PhusionPassenger
      @rack_hijack_enabled = false
      if PhusionPassenger.respond_to? :advertised_concurrency_level
        PhusionPassenger.advertised_concurrency_level = 0
        @rack_hijack_enabled = true
      end
    end
  end

  @rack_hijack_enabled
end

#redis_configObject



120
121
122
# File 'lib/message_bus.rb', line 120

def redis_config
  @redis_config ||= {}
end

#redis_config=(config) ⇒ Object

Allow us to inject a redis db



116
117
118
# File 'lib/message_bus.rb', line 116

def redis_config=(config)
  @redis_config = config
end

#reliable_pub_subObject



187
188
189
190
191
192
# File 'lib/message_bus.rb', line 187

def reliable_pub_sub
  @mutex.synchronize do
    return nil if @destroyed
    @reliable_pub_sub ||= MessageBus::ReliablePubSub.new redis_config
  end
end

#reset!Object

will reset all keys



311
312
313
# File 'lib/message_bus.rb', line 311

def reset!
  reliable_pub_sub.reset!
end

#site_id_lookup(&blk) ⇒ Object



124
125
126
127
# File 'lib/message_bus.rb', line 124

def site_id_lookup(&blk)
  @site_id_lookup = blk if blk
  @site_id_lookup
end

#subscribe(channel = nil, &blk) ⇒ Object



246
247
248
# File 'lib/message_bus.rb', line 246

def subscribe(channel=nil, &blk)
  subscribe_impl(channel, nil, &blk)
end

#unsubscribe(channel = nil, &blk) ⇒ Object



250
251
252
# File 'lib/message_bus.rb', line 250

def unsubscribe(channel=nil, &blk)
  unsubscribe_impl(channel, nil, &blk)
end

#user_id_lookup(&blk) ⇒ Object



129
130
131
132
# File 'lib/message_bus.rb', line 129

def user_id_lookup(&blk)
  @user_id_lookup = blk if blk
  @user_id_lookup
end