Module: ResqueBus

Defined in:
lib/resque_bus/server.rb,
lib/resque-bus.rb,
lib/resque_bus/util.rb,
lib/resque_bus/local.rb,
lib/resque_bus/rider.rb,
lib/resque_bus/driver.rb,
lib/resque_bus/matcher.rb,
lib/resque_bus/dispatch.rb,
lib/resque_bus/heartbeat.rb,
lib/resque_bus/publisher.rb,
lib/resque_bus/subscriber.rb,
lib/resque_bus/application.rb,
lib/resque_bus/subscription.rb,
lib/resque_bus/task_manager.rb,
lib/resque_bus/subscription_list.rb

Overview

Creates a DSL for apps to define their blocks to run for event_types

Defined Under Namespace

Modules: Server, Subscriber, Util Classes: Application, Dispatch, Driver, Heartbeat, Local, Matcher, Publisher, Rider, Subscription, SubscriptionList, TaskManager

Class Method Summary collapse

Class Method Details

.default_app_keyObject



28
29
30
# File 'lib/resque-bus.rb', line 28

def default_app_key
  @default_app_key
end

.default_app_key=(val) ⇒ Object



24
25
26
# File 'lib/resque-bus.rb', line 24

def default_app_key=val
  @default_app_key = Application.normalize(val)
end

.default_queueObject



36
37
38
# File 'lib/resque-bus.rb', line 36

def default_queue
  @default_queue
end

.default_queue=(val) ⇒ Object



32
33
34
# File 'lib/resque-bus.rb', line 32

def default_queue=val
  @default_queue = val
end

.dispatch(app_key = nil, &block) ⇒ Object



44
45
46
47
48
# File 'lib/resque-bus.rb', line 44

def dispatch(app_key=nil, &block)
  dispatcher = dispatcher_by_key(app_key)
  dispatcher.instance_eval(&block)
  dispatcher
end

.dispatcher_by_key(app_key) ⇒ Object



55
56
57
58
59
# File 'lib/resque-bus.rb', line 55

def dispatcher_by_key(app_key)
  app_key = Application.normalize(app_key || default_app_key)
  @dispatchers ||= {}
  @dispatchers[app_key] ||= Dispatch.new(app_key)
end

.dispatcher_execute(app_key, key, attributes) ⇒ Object



61
62
63
64
65
# File 'lib/resque-bus.rb', line 61

def dispatcher_execute(app_key, key, attributes)
  @dispatchers ||= {}
  dispatcher = @dispatchers[app_key]
  dispatcher.execute(key, attributes) if dispatcher
end

.dispatchersObject



50
51
52
53
# File 'lib/resque-bus.rb', line 50

def dispatchers
  @dispatchers ||= {}
  @dispatchers.values
end

.enqueue_to(queue, klass, *args) ⇒ Object



193
194
195
# File 'lib/resque-bus.rb', line 193

def enqueue_to(queue, klass, *args)
  push(queue, :class => klass.to_s, :args => args)
end

.generate_uuidObject



160
161
162
163
164
165
166
167
168
169
170
171
# File 'lib/resque-bus.rb', line 160

def generate_uuid
  require 'securerandom' unless defined?(SecureRandom)
  return SecureRandom.uuid
  
  rescue Exception => e
    # secure random not there
    # big random number a few times
    n_bytes = [42].pack('i').size
    n_bits = n_bytes * 8
    max = 2 ** (n_bits - 2) - 1
    return "#{rand(max)}-#{rand(max)}-#{rand(max)}"
end

.heartbeat!Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
# File 'lib/resque-bus.rb', line 75

def heartbeat!
  # turn on the heartbeat
  # should be down after loading scheduler yml if you do that
  # otherwise, anytime
  require 'resque/scheduler'
  name     = 'resquebus_hearbeat'
  schedule = { 'class' => '::ResqueBus::Heartbeat',
               'cron'  => '* * * * *',   # every minute
               'queue' => incoming_queue,
               'description' => 'I publish a heartbeat_minutes event every minute'
             }
  if Resque::Scheduler.dynamic
    Resque.set_schedule(name, schedule)
  end
  Resque.schedule[name] = schedule
end

.hostnameObject



40
41
42
# File 'lib/resque-bus.rb', line 40

def hostname
  @hostname ||= `hostname 2>&1`.strip.sub(/.local/,'')
end

.local_modeObject



71
72
73
# File 'lib/resque-bus.rb', line 71

def local_mode
  @local_mode
end

.local_mode=(value) ⇒ Object



67
68
69
# File 'lib/resque-bus.rb', line 67

def local_mode=value
  @local_mode = value
end

.log_application(message) ⇒ Object



205
206
207
208
209
210
# File 'lib/resque-bus.rb', line 205

def log_application(message)
  if logger
    time = Time.now.strftime('%H:%M:%S %Y-%m-%d')
    logger.info("** [#{time}] #$$: ResqueBus #{message}")
  end
end

.log_worker(message) ⇒ Object



212
213
214
215
216
217
# File 'lib/resque-bus.rb', line 212

def log_worker(message)
  if ENV['LOGGING'] || ENV['VERBOSE'] || ENV['VVERBOSE']
    time = Time.now.strftime('%H:%M:%S %Y-%m-%d')
    puts "** [#{time}] #$$: #{message}"
  end
end

.loggerObject



197
198
199
# File 'lib/resque-bus.rb', line 197

def logger
  @logger
end

.logger=(val) ⇒ Object



201
202
203
# File 'lib/resque-bus.rb', line 201

def logger=val
  @logger = val
end

.original_redisObject



133
134
135
# File 'lib/resque-bus.rb', line 133

def original_redis
  @original_redis
end

.original_redis=(server) ⇒ Object



130
131
132
# File 'lib/resque-bus.rb', line 130

def original_redis=(server)
  @original_redis = server
end

.publish(event_type, attributes = {}) ⇒ Object



173
174
175
176
177
178
179
180
181
# File 'lib/resque-bus.rb', line 173

def publish(event_type, attributes = {})
  to_publish = (event_type, attributes)
  ResqueBus.log_application("Event published: #{event_type} #{to_publish.inspect}")
  if local_mode
    ResqueBus::Local.perform(to_publish)
  else
    enqueue_to(incoming_queue, Driver, to_publish)
  end
end

.publish_at(timestamp_or_epoch, event_type, attributes = {}) ⇒ Object



183
184
185
186
187
188
189
190
191
# File 'lib/resque-bus.rb', line 183

def publish_at(timestamp_or_epoch, event_type, attributes = {})
  to_publish = (event_type, attributes)
  to_publish["bus_delayed_until"] ||= timestamp_or_epoch.to_i
  to_publish.delete("bus_published_at") unless attributes["bus_published_at"] # will be put on when it actually does it
  
  ResqueBus.log_application("Event published:#{event_type} #{to_publish.inspect} publish_at: #{timestamp_or_epoch.to_i}")
  item = delayed_job_to_hash_with_queue(incoming_queue, Publisher, [event_type, to_publish])
  delayed_push(timestamp_or_epoch, item)
end

.publish_metadata(event_type, attributes = {}) ⇒ Object



150
151
152
153
154
155
156
157
158
# File 'lib/resque-bus.rb', line 150

def (event_type, attributes={})
  # TODO: "bus_app_key" => application.app_key ?
  bus_attr = {"bus_published_at" => Time.now.to_i, "bus_event_type" => event_type}
  bus_attr["bus_id"]           = "#{Time.now.to_i}-#{generate_uuid}"
  bus_attr["bus_app_hostname"] = hostname
  bus_attr["bus_locale"]       = I18n.locale.to_s if defined?(I18n) && I18n.respond_to?(:locale) && I18n.locale
  bus_attr["bus_timezone"]     = Time.zone.name   if defined?(Time) && Time.respond_to?(:zone)   && Time.zone
  bus_attr.merge(attributes || {})
end

.redisObject

Returns the current Redis connection. If none has been created, will create a new one from the Reqsue one (with a different namespace)



122
123
124
125
126
127
128
# File 'lib/resque-bus.rb', line 122

def redis
  return @redis if @redis
  copy = Resque.redis.clone
  copy.namespace = default_namespace
  self.redis = copy
  self.redis
end

.redis=(server) ⇒ Object

Accepts:

1. A 'hostname:port' String
2. A 'hostname:port:db' String (to select the Redis db)
3. A 'hostname:port/namespace' String (to set the Redis namespace)
4. A Redis URL String 'redis://host:port'
5. An instance of `Redis`, `Redis::Client`, `Redis::DistRedis`,
   or `Redis::Namespace`.


99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/resque-bus.rb', line 99

def redis=(server)
  case server
  when String
    if server =~ /redis\:\/\//
      redis = Redis.connect(:url => server, :thread_safe => true)
    else
      server, namespace = server.split('/', 2)
      host, port, db = server.split(':')
      redis = Redis.new(:host => host, :port => port,
        :thread_safe => true, :db => db)
    end
    namespace ||= default_namespace

    @redis = Redis::Namespace.new(namespace, :redis => redis)
  when Redis::Namespace
    @redis = server
  else
    @redis = Redis::Namespace.new(default_namespace, :redis => server)
  end
end

.with_global_attributes(attributes) ⇒ Object



137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/resque-bus.rb', line 137

def with_global_attributes(attributes)
  original_timezone = false
  original_locale   = false
  
  I18n.locale = attributes["bus_locale"]   if defined?(I18n) && I18n.respond_to?(:locale=)
  Time.zone   = attributes["bus_timezone"] if defined?(Time) && Time.respond_to?(:zone=)
  
  yield
ensure
  I18n.locale = original_locale   unless original_locale   == false
  Time.zone   = original_timezone unless original_timezone == false
end