Module: MessageBus::Implementation

Included in:
MessageBus, Instance
Defined in:
lib/message_bus.rb

Defined Under Namespace

Classes: Synchronizer

Constant Summary collapse

ENCODE_SITE_TOKEN =
"$|$"

Instance Method Summary collapse

Instance Method Details

#after_forkObject



293
294
295
296
# File 'lib/message_bus.rb', line 293

def after_fork
  reliable_pub_sub.after_fork
  ensure_subscriber_thread
end

#allow_broadcast=(val) ⇒ Object



166
167
168
# File 'lib/message_bus.rb', line 166

def allow_broadcast=(val)
  @allow_broadcast = val
end

#allow_broadcast?Boolean



170
171
172
173
174
175
176
177
# File 'lib/message_bus.rb', line 170

def allow_broadcast?
  @allow_broadcast ||=
    if defined? ::Rails
      ::Rails.env.test? || ::Rails.env.development?
    else
      false
    end
end

#around_client_batch(channel, &blk) ⇒ Object



150
151
152
153
154
# File 'lib/message_bus.rb', line 150

def around_client_batch(channel, &blk)
  @around_client_batches ||= {}
  @around_client_batches[channel] = blk if blk
  @around_client_batches[channel]
end

#backlog(channel = nil, last_id) ⇒ Object



257
258
259
260
261
262
263
264
265
266
267
268
269
# File 'lib/message_bus.rb', line 257

def backlog(channel=nil, last_id)
  old =
    if channel
      reliable_pub_sub.backlog(encode_channel_name(channel), last_id)
    else
      reliable_pub_sub.global_backlog(last_id)
    end

  old.each{ |m|
    decode_message!(m)
  }
  old
end

#blocking_subscribe(channel = nil, &blk) ⇒ Object



214
215
216
217
218
219
220
# File 'lib/message_bus.rb', line 214

def blocking_subscribe(channel=nil, &blk)
  if channel
    reliable_pub_sub.subscribe(encode_channel_name(channel), &blk)
  else
    reliable_pub_sub.global_subscribe(&blk)
  end
end

#cache_assetsObject



41
42
43
44
45
46
47
# File 'lib/message_bus.rb', line 41

def cache_assets
  if defined? @cache_assets
    @cache_assets
  else
    true
  end
end

#cache_assets=(val) ⇒ Object



37
38
39
# File 'lib/message_bus.rb', line 37

def cache_assets=(val)
  @cache_assets = val
end

#client_filter(channel, &blk) ⇒ Object



144
145
146
147
148
# File 'lib/message_bus.rb', line 144

def client_filter(channel, &blk)
  @client_filters ||= {}
  @client_filters[channel] = blk if blk
  @client_filters[channel]
end

#decode_channel_name(channel) ⇒ Object



234
235
236
# File 'lib/message_bus.rb', line 234

def decode_channel_name(channel)
  channel.split(ENCODE_SITE_TOKEN)
end

#destroyObject



284
285
286
287
288
289
290
291
# File 'lib/message_bus.rb', line 284

def destroy
  @mutex.synchronize do
    @subscriptions ||= {}
    reliable_pub_sub.global_unsubscribe
    @destroyed = true
  end
  @subscriber_thread.join if @subscriber_thread
end

#enable_diagnosticsObject



186
187
188
# File 'lib/message_bus.rb', line 186

def enable_diagnostics
  MessageBus::Diagnostics.enable
end

#encode_channel_name(channel) ⇒ Object

encode channel name to include site



225
226
227
228
229
230
231
232
# File 'lib/message_bus.rb', line 225

def encode_channel_name(channel)
  if site_id_lookup && !global?(channel)
    raise ArgumentError.new channel if channel.include? ENCODE_SITE_TOKEN
    "#{channel}#{ENCODE_SITE_TOKEN}#{site_id_lookup.call}"
  else
    channel
  end
end

#group_ids_lookup(&blk) ⇒ Object



134
135
136
137
# File 'lib/message_bus.rb', line 134

def group_ids_lookup(&blk)
  @group_ids_lookup = blk if blk
  @group_ids_lookup
end

#initializeObject



33
34
35
# File 'lib/message_bus.rb', line 33

def initialize
  @mutex = Synchronizer.new
end

#is_admin_lookup(&blk) ⇒ Object



139
140
141
142
# File 'lib/message_bus.rb', line 139

def is_admin_lookup(&blk)
  @is_admin_lookup = blk if blk
  @is_admin_lookup
end

#last_id(channel) ⇒ Object



271
272
273
# File 'lib/message_bus.rb', line 271

def last_id(channel)
  reliable_pub_sub.last_id(encode_channel_name(channel))
end

#last_message(channel) ⇒ Object



275
276
277
278
279
280
281
282
# File 'lib/message_bus.rb', line 275

def last_message(channel)
  if last_id = last_id(channel)
    messages = backlog(channel, last_id-1)
    if messages
      messages[0]
    end
  end
end

#listening?Boolean



298
299
300
# File 'lib/message_bus.rb', line 298

def listening?
  @subscriber_thread && @subscriber_thread.alive?
end

#local_subscribe(channel = nil, &blk) ⇒ Object

subscribe only on current site



252
253
254
255
# File 'lib/message_bus.rb', line 252

def local_subscribe(channel=nil, &blk)
  site_id = site_id_lookup.call if site_id_lookup && ! global?(channel)
  subscribe_impl(channel, site_id, &blk)
end

#local_unsubscribe(channel = nil, &blk) ⇒ Object



246
247
248
249
# File 'lib/message_bus.rb', line 246

def local_unsubscribe(channel=nil, &blk)
  site_id = site_id_lookup.call if site_id_lookup
  unsubscribe_impl(channel, site_id, &blk)
end

#loggerObject



53
54
55
56
57
# File 'lib/message_bus.rb', line 53

def logger
  return @logger if @logger
  require 'logger'
  @logger = Logger.new(STDOUT)
end

#logger=(logger) ⇒ Object



49
50
51
# File 'lib/message_bus.rb', line 49

def logger=(logger)
  @logger = logger
end

#long_polling_enabled=(val) ⇒ Object



63
64
65
# File 'lib/message_bus.rb', line 63

def long_polling_enabled=(val)
  @long_polling_enabled = val
end

#long_polling_enabled?Boolean



59
60
61
# File 'lib/message_bus.rb', line 59

def long_polling_enabled?
  @long_polling_enabled == false ? false : true
end

#long_polling_intervalObject



103
104
105
# File 'lib/message_bus.rb', line 103

def long_polling_interval
  @long_polling_interval || 30 * 1000
end

#long_polling_interval=(millisecs) ⇒ Object



99
100
101
# File 'lib/message_bus.rb', line 99

def long_polling_interval=(millisecs)
  @long_polling_interval = millisecs
end

#max_active_clientsObject



73
74
75
# File 'lib/message_bus.rb', line 73

def max_active_clients
  @max_active_clients || 1000
end

#max_active_clients=(val) ⇒ Object

The number of simultanuous clients we can service

will revert to polling if we are out of slots


69
70
71
# File 'lib/message_bus.rb', line 69

def max_active_clients=(val)
  @max_active_clients = val
end

#offObject



107
108
109
# File 'lib/message_bus.rb', line 107

def off
  @off = true
end

#onObject



111
112
113
# File 'lib/message_bus.rb', line 111

def on
  @off = false
end

#on_connect(&blk) ⇒ Object



156
157
158
159
# File 'lib/message_bus.rb', line 156

def on_connect(&blk)
  @on_connect = blk if blk
  @on_connect
end

#on_disconnect(&blk) ⇒ Object



161
162
163
164
# File 'lib/message_bus.rb', line 161

def on_disconnect(&blk)
  @on_disconnect = blk if blk
  @on_disconnect
end

#publish(channel, data, opts = nil) ⇒ Object



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/message_bus.rb', line 190

def publish(channel, data, opts = nil)
  return if @off
  @mutex.synchronize do
    raise ::MessageBus::BusDestroyed if @destroyed
  end

  user_ids = nil
  group_ids = nil
  if opts
    user_ids = opts[:user_ids]
    group_ids = opts[:group_ids]
  end

  raise ::MessageBus::InvalidMessage if (user_ids || group_ids) && global?(channel)

  encoded_data = JSON.dump({
    data: data,
    user_ids: user_ids,
    group_ids: group_ids
  })

  reliable_pub_sub.publish(encode_channel_name(channel), encoded_data)
end

#rack_hijack_enabled=(val) ⇒ Object



95
96
97
# File 'lib/message_bus.rb', line 95

def rack_hijack_enabled=(val)
  @rack_hijack_enabled = val
end

#rack_hijack_enabled?Boolean



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/message_bus.rb', line 77

def rack_hijack_enabled?
  if @rack_hijack_enabled.nil?
    @rack_hijack_enabled = true

    # without this switch passenger will explode
    # it will run out of connections after about 10
    if defined? PhusionPassenger
      @rack_hijack_enabled = false
      if PhusionPassenger.respond_to? :advertised_concurrency_level
        PhusionPassenger.advertised_concurrency_level = 0
        @rack_hijack_enabled = true
      end
    end
  end

  @rack_hijack_enabled
end

#redis_configObject



120
121
122
# File 'lib/message_bus.rb', line 120

def redis_config
  @redis_config ||= {}
end

#redis_config=(config) ⇒ Object

Allow us to inject a redis db



116
117
118
# File 'lib/message_bus.rb', line 116

def redis_config=(config)
  @redis_config = config
end

#reliable_pub_subObject



179
180
181
182
183
184
# File 'lib/message_bus.rb', line 179

def reliable_pub_sub
  @mutex.synchronize do
    return nil if @destroyed
    @reliable_pub_sub ||= MessageBus::ReliablePubSub.new redis_config
  end
end

#reset!Object

will reset all keys



303
304
305
# File 'lib/message_bus.rb', line 303

def reset!
  reliable_pub_sub.reset!
end

#site_id_lookup(&blk) ⇒ Object



124
125
126
127
# File 'lib/message_bus.rb', line 124

def site_id_lookup(&blk)
  @site_id_lookup = blk if blk
  @site_id_lookup
end

#subscribe(channel = nil, &blk) ⇒ Object



238
239
240
# File 'lib/message_bus.rb', line 238

def subscribe(channel=nil, &blk)
  subscribe_impl(channel, nil, &blk)
end

#unsubscribe(channel = nil, &blk) ⇒ Object



242
243
244
# File 'lib/message_bus.rb', line 242

def unsubscribe(channel=nil, &blk)
  unsubscribe_impl(channel, nil, &blk)
end

#user_id_lookup(&blk) ⇒ Object



129
130
131
132
# File 'lib/message_bus.rb', line 129

def user_id_lookup(&blk)
  @user_id_lookup = blk if blk
  @user_id_lookup
end