Module: ResqueBus

Defined in:
lib/resque_bus/server.rb,
lib/resque-bus.rb,
lib/resque_bus/util.rb,
lib/resque_bus/local.rb,
lib/resque_bus/rider.rb,
lib/resque_bus/driver.rb,
lib/resque_bus/matcher.rb,
lib/resque_bus/dispatch.rb,
lib/resque_bus/heartbeat.rb,
lib/resque_bus/publisher.rb,
lib/resque_bus/subscriber.rb,
lib/resque_bus/application.rb,
lib/resque_bus/subscription.rb,
lib/resque_bus/task_manager.rb,
lib/resque_bus/subscription_list.rb

Overview

Creates a DSL for apps to define their blocks to run for event_types

Defined Under Namespace

Modules: Server, Subscriber, Util Classes: Application, Dispatch, Driver, Heartbeat, Local, Matcher, Publisher, Rider, Subscription, SubscriptionList, TaskManager

Class Method Summary collapse

Class Method Details

.before_publish=(proc) ⇒ Object



161
162
163
# File 'lib/resque-bus.rb', line 161

def before_publish=(proc)
  @before_publish_callback = proc
end

.before_publish_callback(attributes) ⇒ Object



165
166
167
168
169
# File 'lib/resque-bus.rb', line 165

def before_publish_callback(attributes)
  if @before_publish_callback
    @before_publish_callback.call(attributes)
  end
end

.default_app_keyObject



32
33
34
# File 'lib/resque-bus.rb', line 32

def default_app_key
  @default_app_key
end

.default_app_key=(val) ⇒ Object



28
29
30
# File 'lib/resque-bus.rb', line 28

def default_app_key=val
  @default_app_key = Application.normalize(val)
end

.default_queueObject



40
41
42
# File 'lib/resque-bus.rb', line 40

def default_queue
  @default_queue
end

.default_queue=(val) ⇒ Object



36
37
38
# File 'lib/resque-bus.rb', line 36

def default_queue=val
  @default_queue = val
end

.dispatch(app_key = nil, &block) ⇒ Object



48
49
50
51
52
# File 'lib/resque-bus.rb', line 48

def dispatch(app_key=nil, &block)
  dispatcher = dispatcher_by_key(app_key)
  dispatcher.instance_eval(&block)
  dispatcher
end

.dispatcher_by_key(app_key) ⇒ Object



59
60
61
62
63
# File 'lib/resque-bus.rb', line 59

def dispatcher_by_key(app_key)
  app_key = Application.normalize(app_key || default_app_key)
  @dispatchers ||= {}
  @dispatchers[app_key] ||= Dispatch.new(app_key)
end

.dispatcher_execute(app_key, key, attributes) ⇒ Object



65
66
67
68
69
# File 'lib/resque-bus.rb', line 65

def dispatcher_execute(app_key, key, attributes)
  @dispatchers ||= {}
  dispatcher = @dispatchers[app_key]
  dispatcher.execute(key, attributes) if dispatcher
end

.dispatchersObject



54
55
56
57
# File 'lib/resque-bus.rb', line 54

def dispatchers
  @dispatchers ||= {}
  @dispatchers.values
end

.enqueue_to(queue, klass, *args) ⇒ Object



217
218
219
# File 'lib/resque-bus.rb', line 217

def enqueue_to(queue, klass, *args)
  push(queue, :class => klass.to_s, :args => args)
end

.generate_uuidObject



184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/resque-bus.rb', line 184

def generate_uuid
  require 'securerandom' unless defined?(SecureRandom)
  return SecureRandom.uuid
  
  rescue Exception => e
    # secure random not there
    # big random number a few times
    n_bytes = [42].pack('i').size
    n_bits = n_bytes * 8
    max = 2 ** (n_bits - 2) - 1
    return "#{rand(max)}-#{rand(max)}-#{rand(max)}"
end

.heartbeat!Object



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/resque-bus.rb', line 79

def heartbeat!
  # turn on the heartbeat
  # should be down after loading scheduler yml if you do that
  # otherwise, anytime
  require 'resque/scheduler'
  name     = 'resquebus_hearbeat'
  schedule = { 'class' => '::ResqueBus::Heartbeat',
               'cron'  => '* * * * *',   # every minute
               'queue' => incoming_queue,
               'description' => 'I publish a heartbeat_minutes event every minute'
             }
  if Resque::Scheduler.dynamic
    Resque.set_schedule(name, schedule)
  end
  Resque.schedule[name] = schedule
end

.hostnameObject



44
45
46
# File 'lib/resque-bus.rb', line 44

def hostname
  @hostname ||= `hostname 2>&1`.strip.sub(/.local/,'')
end

.local_modeObject



75
76
77
# File 'lib/resque-bus.rb', line 75

def local_mode
  @local_mode
end

.local_mode=(value) ⇒ Object



71
72
73
# File 'lib/resque-bus.rb', line 71

def local_mode=value
  @local_mode = value
end

.log_application(message) ⇒ Object



229
230
231
232
233
234
# File 'lib/resque-bus.rb', line 229

def log_application(message)
  if logger
    time = Time.now.strftime('%H:%M:%S %Y-%m-%d')
    logger.info("** [#{time}] #$$: ResqueBus #{message}")
  end
end

.log_worker(message) ⇒ Object



236
237
238
239
240
241
# File 'lib/resque-bus.rb', line 236

def log_worker(message)
  if ENV['LOGGING'] || ENV['VERBOSE'] || ENV['VVERBOSE']
    time = Time.now.strftime('%H:%M:%S %Y-%m-%d')
    puts "** [#{time}] #$$: #{message}"
  end
end

.loggerObject



221
222
223
# File 'lib/resque-bus.rb', line 221

def logger
  @logger
end

.logger=(val) ⇒ Object



225
226
227
# File 'lib/resque-bus.rb', line 225

def logger=val
  @logger = val
end

.original_redisObject



137
138
139
# File 'lib/resque-bus.rb', line 137

def original_redis
  @original_redis
end

.original_redis=(server) ⇒ Object



134
135
136
# File 'lib/resque-bus.rb', line 134

def original_redis=(server)
  @original_redis = server
end

.publish(event_type, attributes = {}) ⇒ Object



197
198
199
200
201
202
203
204
205
# File 'lib/resque-bus.rb', line 197

def publish(event_type, attributes = {})
  to_publish = (event_type, attributes)
  ResqueBus.log_application("Event published: #{event_type} #{to_publish.inspect}")
  if local_mode
    ResqueBus::Local.perform(to_publish)
  else
    enqueue_to(incoming_queue, Driver, to_publish)
  end
end

.publish_at(timestamp_or_epoch, event_type, attributes = {}) ⇒ Object



207
208
209
210
211
212
213
214
215
# File 'lib/resque-bus.rb', line 207

def publish_at(timestamp_or_epoch, event_type, attributes = {})
  to_publish = (event_type, attributes)
  to_publish["bus_delayed_until"] ||= timestamp_or_epoch.to_i
  to_publish.delete("bus_published_at") unless attributes["bus_published_at"] # will be put on when it actually does it
  
  ResqueBus.log_application("Event published:#{event_type} #{to_publish.inspect} publish_at: #{timestamp_or_epoch.to_i}")
  item = delayed_job_to_hash_with_queue(incoming_queue, Publisher, [event_type, to_publish])
  delayed_push(timestamp_or_epoch, item)
end

.publish_metadata(event_type, attributes = {}) ⇒ Object



172
173
174
175
176
177
178
179
180
181
182
# File 'lib/resque-bus.rb', line 172

def (event_type, attributes={})
  # TODO: "bus_app_key" => application.app_key ?
  bus_attr = {"bus_published_at" => Time.now.to_i, "bus_event_type" => event_type}
  bus_attr["bus_id"]           = "#{Time.now.to_i}-#{generate_uuid}"
  bus_attr["bus_app_hostname"] = hostname
  bus_attr["bus_locale"]       = I18n.locale.to_s if defined?(I18n) && I18n.respond_to?(:locale) && I18n.locale
  bus_attr["bus_timezone"]     = Time.zone.name   if defined?(Time) && Time.respond_to?(:zone)   && Time.zone
  out = bus_attr.merge(attributes || {})
  ResqueBus.before_publish_callback(out)
  out
end

.redisObject

Returns the current Redis connection. If none has been created, will create a new one from the Reqsue one (with a different namespace)



126
127
128
129
130
131
132
# File 'lib/resque-bus.rb', line 126

def redis
  return @redis if @redis
  copy = Resque.redis.clone
  copy.namespace = default_namespace
  self.redis = copy
  self.redis
end

.redis=(server) ⇒ Object

Accepts:

1. A 'hostname:port' String
2. A 'hostname:port:db' String (to select the Redis db)
3. A 'hostname:port/namespace' String (to set the Redis namespace)
4. A Redis URL String 'redis://host:port'
5. An instance of `Redis`, `Redis::Client`, `Redis::DistRedis`,
   or `Redis::Namespace`.


103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/resque-bus.rb', line 103

def redis=(server)
  case server
  when String
    if server =~ /redis\:\/\//
      redis = Redis.connect(:url => server, :thread_safe => true)
    else
      server, namespace = server.split('/', 2)
      host, port, db = server.split(':')
      redis = Redis.new(:host => host, :port => port,
        :thread_safe => true, :db => db)
    end
    namespace ||= default_namespace

    @redis = Redis::Namespace.new(namespace, :redis => redis)
  when Redis::Namespace
    @redis = server
  else
    @redis = Redis::Namespace.new(default_namespace, :redis => server)
  end
end

.with_global_attributes(attributes) ⇒ Object



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/resque-bus.rb', line 141

def with_global_attributes(attributes)
  original_timezone = false
  original_locale   = false
  
  if attributes["bus_locale"] && defined?(I18n) && I18n.respond_to?(:locale=)
    original_locale = I18n.locale if I18n.respond_to?(:locale)
    I18n.locale = attributes["bus_locale"]
  end
  
  if attributes["bus_timezone"] && defined?(Time) && Time.respond_to?(:zone=)
    original_timezone = Time.zone if Time.respond_to?(:zone)
    Time.zone = attributes["bus_timezone"]
  end

  yield
ensure
  I18n.locale = original_locale   unless original_locale   == false
  Time.zone   = original_timezone unless original_timezone == false
end