Module: MessageBus::Implementation

Included in:
MessageBus, Instance
Defined in:
lib/message_bus.rb

Defined Under Namespace

Classes: Synchronizer

Constant Summary collapse

ENCODE_SITE_TOKEN =
"$|$"

Instance Method Summary collapse

Instance Method Details

#access_control_allow_origin_lookup(&blk) ⇒ Object



144
145
146
147
# File 'lib/message_bus.rb', line 144

def access_control_allow_origin_lookup(&blk)
  @access_control_allow_origin_lookup = blk if blk
  @access_control_allow_origin_lookup
end

#after_forkObject



298
299
300
301
# File 'lib/message_bus.rb', line 298

def after_fork
  reliable_pub_sub.after_fork
  ensure_subscriber_thread
end

#allow_broadcast=(val) ⇒ Object



171
172
173
# File 'lib/message_bus.rb', line 171

def allow_broadcast=(val)
  @allow_broadcast = val
end

#allow_broadcast?Boolean



175
176
177
178
179
180
181
182
# File 'lib/message_bus.rb', line 175

def allow_broadcast?
  @allow_broadcast ||=
    if defined? ::Rails
      ::Rails.env.test? || ::Rails.env.development?
    else
      false
    end
end

#around_client_batch(channel, &blk) ⇒ Object



155
156
157
158
159
# File 'lib/message_bus.rb', line 155

def around_client_batch(channel, &blk)
  @around_client_batches ||= {}
  @around_client_batches[channel] = blk if blk
  @around_client_batches[channel]
end

#backlog(channel = nil, last_id) ⇒ Object



262
263
264
265
266
267
268
269
270
271
272
273
274
# File 'lib/message_bus.rb', line 262

def backlog(channel=nil, last_id)
  old =
    if channel
      reliable_pub_sub.backlog(encode_channel_name(channel), last_id)
    else
      reliable_pub_sub.global_backlog(last_id)
    end

  old.each{ |m|
    decode_message!(m)
  }
  old
end

#blocking_subscribe(channel = nil, &blk) ⇒ Object



219
220
221
222
223
224
225
# File 'lib/message_bus.rb', line 219

def blocking_subscribe(channel=nil, &blk)
  if channel
    reliable_pub_sub.subscribe(encode_channel_name(channel), &blk)
  else
    reliable_pub_sub.global_subscribe(&blk)
  end
end

#cache_assetsObject



41
42
43
44
45
46
47
# File 'lib/message_bus.rb', line 41

def cache_assets
  if defined? @cache_assets
    @cache_assets
  else
    true
  end
end

#cache_assets=(val) ⇒ Object



37
38
39
# File 'lib/message_bus.rb', line 37

def cache_assets=(val)
  @cache_assets = val
end

#client_filter(channel, &blk) ⇒ Object



149
150
151
152
153
# File 'lib/message_bus.rb', line 149

def client_filter(channel, &blk)
  @client_filters ||= {}
  @client_filters[channel] = blk if blk
  @client_filters[channel]
end

#decode_channel_name(channel) ⇒ Object



239
240
241
# File 'lib/message_bus.rb', line 239

def decode_channel_name(channel)
  channel.split(ENCODE_SITE_TOKEN)
end

#destroyObject



289
290
291
292
293
294
295
296
# File 'lib/message_bus.rb', line 289

def destroy
  @mutex.synchronize do
    @subscriptions ||= {}
    reliable_pub_sub.global_unsubscribe
    @destroyed = true
  end
  @subscriber_thread.join if @subscriber_thread
end

#enable_diagnosticsObject



191
192
193
# File 'lib/message_bus.rb', line 191

def enable_diagnostics
  MessageBus::Diagnostics.enable
end

#encode_channel_name(channel) ⇒ Object

encode channel name to include site



230
231
232
233
234
235
236
237
# File 'lib/message_bus.rb', line 230

def encode_channel_name(channel)
  if site_id_lookup && !global?(channel)
    raise ArgumentError.new channel if channel.include? ENCODE_SITE_TOKEN
    "#{channel}#{ENCODE_SITE_TOKEN}#{site_id_lookup.call}"
  else
    channel
  end
end

#group_ids_lookup(&blk) ⇒ Object



134
135
136
137
# File 'lib/message_bus.rb', line 134

def group_ids_lookup(&blk)
  @group_ids_lookup = blk if blk
  @group_ids_lookup
end

#initializeObject



33
34
35
# File 'lib/message_bus.rb', line 33

def initialize
  @mutex = Synchronizer.new
end

#is_admin_lookup(&blk) ⇒ Object



139
140
141
142
# File 'lib/message_bus.rb', line 139

def is_admin_lookup(&blk)
  @is_admin_lookup = blk if blk
  @is_admin_lookup
end

#last_id(channel) ⇒ Object



276
277
278
# File 'lib/message_bus.rb', line 276

def last_id(channel)
  reliable_pub_sub.last_id(encode_channel_name(channel))
end

#last_message(channel) ⇒ Object



280
281
282
283
284
285
286
287
# File 'lib/message_bus.rb', line 280

def last_message(channel)
  if last_id = last_id(channel)
    messages = backlog(channel, last_id-1)
    if messages
      messages[0]
    end
  end
end

#listening?Boolean



303
304
305
# File 'lib/message_bus.rb', line 303

def listening?
  @subscriber_thread && @subscriber_thread.alive?
end

#local_subscribe(channel = nil, &blk) ⇒ Object

subscribe only on current site



257
258
259
260
# File 'lib/message_bus.rb', line 257

def local_subscribe(channel=nil, &blk)
  site_id = site_id_lookup.call if site_id_lookup && ! global?(channel)
  subscribe_impl(channel, site_id, &blk)
end

#local_unsubscribe(channel = nil, &blk) ⇒ Object



251
252
253
254
# File 'lib/message_bus.rb', line 251

def local_unsubscribe(channel=nil, &blk)
  site_id = site_id_lookup.call if site_id_lookup
  unsubscribe_impl(channel, site_id, &blk)
end

#loggerObject



53
54
55
56
57
# File 'lib/message_bus.rb', line 53

def logger
  return @logger if @logger
  require 'logger'
  @logger = Logger.new(STDOUT)
end

#logger=(logger) ⇒ Object



49
50
51
# File 'lib/message_bus.rb', line 49

def logger=(logger)
  @logger = logger
end

#long_polling_enabled=(val) ⇒ Object



63
64
65
# File 'lib/message_bus.rb', line 63

def long_polling_enabled=(val)
  @long_polling_enabled = val
end

#long_polling_enabled?Boolean



59
60
61
# File 'lib/message_bus.rb', line 59

def long_polling_enabled?
  @long_polling_enabled == false ? false : true
end

#long_polling_intervalObject



103
104
105
# File 'lib/message_bus.rb', line 103

def long_polling_interval
  @long_polling_interval || 25 * 1000
end

#long_polling_interval=(millisecs) ⇒ Object



99
100
101
# File 'lib/message_bus.rb', line 99

def long_polling_interval=(millisecs)
  @long_polling_interval = millisecs
end

#max_active_clientsObject



73
74
75
# File 'lib/message_bus.rb', line 73

def max_active_clients
  @max_active_clients || 1000
end

#max_active_clients=(val) ⇒ Object

The number of simultanuous clients we can service

will revert to polling if we are out of slots


69
70
71
# File 'lib/message_bus.rb', line 69

def max_active_clients=(val)
  @max_active_clients = val
end

#offObject



107
108
109
# File 'lib/message_bus.rb', line 107

def off
  @off = true
end

#onObject



111
112
113
# File 'lib/message_bus.rb', line 111

def on
  @off = false
end

#on_connect(&blk) ⇒ Object



161
162
163
164
# File 'lib/message_bus.rb', line 161

def on_connect(&blk)
  @on_connect = blk if blk
  @on_connect
end

#on_disconnect(&blk) ⇒ Object



166
167
168
169
# File 'lib/message_bus.rb', line 166

def on_disconnect(&blk)
  @on_disconnect = blk if blk
  @on_disconnect
end

#publish(channel, data, opts = nil) ⇒ Object



195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/message_bus.rb', line 195

def publish(channel, data, opts = nil)
  return if @off
  @mutex.synchronize do
    raise ::MessageBus::BusDestroyed if @destroyed
  end

  user_ids = nil
  group_ids = nil
  if opts
    user_ids = opts[:user_ids]
    group_ids = opts[:group_ids]
  end

  raise ::MessageBus::InvalidMessage if (user_ids || group_ids) && global?(channel)

  encoded_data = JSON.dump({
    data: data,
    user_ids: user_ids,
    group_ids: group_ids
  })

  reliable_pub_sub.publish(encode_channel_name(channel), encoded_data)
end

#rack_hijack_enabled=(val) ⇒ Object



95
96
97
# File 'lib/message_bus.rb', line 95

def rack_hijack_enabled=(val)
  @rack_hijack_enabled = val
end

#rack_hijack_enabled?Boolean



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/message_bus.rb', line 77

def rack_hijack_enabled?
  if @rack_hijack_enabled.nil?
    @rack_hijack_enabled = true

    # without this switch passenger will explode
    # it will run out of connections after about 10
    if defined? PhusionPassenger
      @rack_hijack_enabled = false
      if PhusionPassenger.respond_to? :advertised_concurrency_level
        PhusionPassenger.advertised_concurrency_level = 0
        @rack_hijack_enabled = true
      end
    end
  end

  @rack_hijack_enabled
end

#redis_configObject



120
121
122
# File 'lib/message_bus.rb', line 120

def redis_config
  @redis_config ||= {}
end

#redis_config=(config) ⇒ Object

Allow us to inject a redis db



116
117
118
# File 'lib/message_bus.rb', line 116

def redis_config=(config)
  @redis_config = config
end

#reliable_pub_subObject



184
185
186
187
188
189
# File 'lib/message_bus.rb', line 184

def reliable_pub_sub
  @mutex.synchronize do
    return nil if @destroyed
    @reliable_pub_sub ||= MessageBus::ReliablePubSub.new redis_config
  end
end

#reset!Object

will reset all keys



308
309
310
# File 'lib/message_bus.rb', line 308

def reset!
  reliable_pub_sub.reset!
end

#site_id_lookup(&blk) ⇒ Object



124
125
126
127
# File 'lib/message_bus.rb', line 124

def site_id_lookup(&blk)
  @site_id_lookup = blk if blk
  @site_id_lookup
end

#subscribe(channel = nil, &blk) ⇒ Object



243
244
245
# File 'lib/message_bus.rb', line 243

def subscribe(channel=nil, &blk)
  subscribe_impl(channel, nil, &blk)
end

#unsubscribe(channel = nil, &blk) ⇒ Object



247
248
249
# File 'lib/message_bus.rb', line 247

def unsubscribe(channel=nil, &blk)
  unsubscribe_impl(channel, nil, &blk)
end

#user_id_lookup(&blk) ⇒ Object



129
130
131
132
# File 'lib/message_bus.rb', line 129

def user_id_lookup(&blk)
  @user_id_lookup = blk if blk
  @user_id_lookup
end