Module: ResqueBus
- Defined in:
- lib/resque_bus/server.rb,
lib/resque-bus.rb,
lib/resque_bus/util.rb,
lib/resque_bus/local.rb,
lib/resque_bus/rider.rb,
lib/resque_bus/driver.rb,
lib/resque_bus/matcher.rb,
lib/resque_bus/dispatch.rb,
lib/resque_bus/heartbeat.rb,
lib/resque_bus/publisher.rb,
lib/resque_bus/subscriber.rb,
lib/resque_bus/application.rb,
lib/resque_bus/subscription.rb,
lib/resque_bus/task_manager.rb,
lib/resque_bus/subscription_list.rb
Overview
Creates a DSL for apps to define their blocks to run for event_types
Defined Under Namespace
Modules: Server, Subscriber, Util
Classes: Application, Dispatch, Driver, Heartbeat, Local, Matcher, Publisher, Rider, Subscription, SubscriptionList, TaskManager
Class Method Summary
collapse
Class Method Details
.default_app_key ⇒ Object
28
29
30
|
# File 'lib/resque-bus.rb', line 28
def default_app_key
@default_app_key
end
|
.default_app_key=(val) ⇒ Object
24
25
26
|
# File 'lib/resque-bus.rb', line 24
def default_app_key=val
@default_app_key = Application.normalize(val)
end
|
.default_queue ⇒ Object
36
37
38
|
# File 'lib/resque-bus.rb', line 36
def default_queue
@default_queue
end
|
.default_queue=(val) ⇒ Object
32
33
34
|
# File 'lib/resque-bus.rb', line 32
def default_queue=val
@default_queue = val
end
|
.dispatch(app_key = nil, &block) ⇒ Object
44
45
46
47
48
|
# File 'lib/resque-bus.rb', line 44
def dispatch(app_key=nil, &block)
dispatcher = dispatcher_by_key(app_key)
dispatcher.instance_eval(&block)
dispatcher
end
|
.dispatcher_by_key(app_key) ⇒ Object
55
56
57
58
59
|
# File 'lib/resque-bus.rb', line 55
def dispatcher_by_key(app_key)
app_key = Application.normalize(app_key || default_app_key)
@dispatchers ||= {}
@dispatchers[app_key] ||= Dispatch.new(app_key)
end
|
.dispatcher_execute(app_key, key, attributes) ⇒ Object
61
62
63
64
65
|
# File 'lib/resque-bus.rb', line 61
def dispatcher_execute(app_key, key, attributes)
@dispatchers ||= {}
dispatcher = @dispatchers[app_key]
dispatcher.execute(key, attributes) if dispatcher
end
|
.dispatchers ⇒ Object
50
51
52
53
|
# File 'lib/resque-bus.rb', line 50
def dispatchers
@dispatchers ||= {}
@dispatchers.values
end
|
.enqueue_to(queue, klass, *args) ⇒ Object
193
194
195
|
# File 'lib/resque-bus.rb', line 193
def enqueue_to(queue, klass, *args)
push(queue, :class => klass.to_s, :args => args)
end
|
.generate_uuid ⇒ Object
160
161
162
163
164
165
166
167
168
169
170
171
|
# File 'lib/resque-bus.rb', line 160
def generate_uuid
require 'securerandom' unless defined?(SecureRandom)
return SecureRandom.uuid
rescue Exception => e
n_bytes = [42].pack('i').size
n_bits = n_bytes * 8
max = 2 ** (n_bits - 2) - 1
return "#{rand(max)}-#{rand(max)}-#{rand(max)}"
end
|
.heartbeat! ⇒ Object
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
# File 'lib/resque-bus.rb', line 75
def heartbeat!
require 'resque/scheduler'
name = 'resquebus_hearbeat'
schedule = { 'class' => '::ResqueBus::Heartbeat',
'cron' => '* * * * *',
'queue' => incoming_queue,
'description' => 'I publish a heartbeat_minutes event every minute'
}
if Resque::Scheduler.dynamic
Resque.set_schedule(name, schedule)
end
Resque.schedule[name] = schedule
end
|
.hostname ⇒ Object
40
41
42
|
# File 'lib/resque-bus.rb', line 40
def hostname
@hostname ||= `hostname 2>&1`.strip.sub(/.local/,'')
end
|
.local_mode ⇒ Object
71
72
73
|
# File 'lib/resque-bus.rb', line 71
def local_mode
@local_mode
end
|
.local_mode=(value) ⇒ Object
67
68
69
|
# File 'lib/resque-bus.rb', line 67
def local_mode=value
@local_mode = value
end
|
.log_application(message) ⇒ Object
205
206
207
208
209
210
|
# File 'lib/resque-bus.rb', line 205
def log_application(message)
if logger
time = Time.now.strftime('%H:%M:%S %Y-%m-%d')
logger.info("** [#{time}] #$$: ResqueBus #{message}")
end
end
|
.log_worker(message) ⇒ Object
212
213
214
215
216
217
|
# File 'lib/resque-bus.rb', line 212
def log_worker(message)
if ENV['LOGGING'] || ENV['VERBOSE'] || ENV['VVERBOSE']
time = Time.now.strftime('%H:%M:%S %Y-%m-%d')
puts "** [#{time}] #$$: #{message}"
end
end
|
.logger ⇒ Object
197
198
199
|
# File 'lib/resque-bus.rb', line 197
def logger
@logger
end
|
.logger=(val) ⇒ Object
201
202
203
|
# File 'lib/resque-bus.rb', line 201
def logger=val
@logger = val
end
|
.original_redis ⇒ Object
133
134
135
|
# File 'lib/resque-bus.rb', line 133
def original_redis
@original_redis
end
|
.original_redis=(server) ⇒ Object
130
131
132
|
# File 'lib/resque-bus.rb', line 130
def original_redis=(server)
@original_redis = server
end
|
.publish(event_type, attributes = {}) ⇒ Object
173
174
175
176
177
178
179
180
181
|
# File 'lib/resque-bus.rb', line 173
def publish(event_type, attributes = {})
to_publish = publish_metadata(event_type, attributes)
ResqueBus.log_application("Event published: #{event_type} #{to_publish.inspect}")
if local_mode
ResqueBus::Local.perform(to_publish)
else
enqueue_to(incoming_queue, Driver, to_publish)
end
end
|
.publish_at(timestamp_or_epoch, event_type, attributes = {}) ⇒ Object
183
184
185
186
187
188
189
190
191
|
# File 'lib/resque-bus.rb', line 183
def publish_at(timestamp_or_epoch, event_type, attributes = {})
to_publish = publish_metadata(event_type, attributes)
to_publish["bus_delayed_until"] ||= timestamp_or_epoch.to_i
to_publish.delete("bus_published_at") unless attributes["bus_published_at"]
ResqueBus.log_application("Event published:#{event_type} #{to_publish.inspect} publish_at: #{timestamp_or_epoch.to_i}")
item = delayed_job_to_hash_with_queue(incoming_queue, Publisher, [event_type, to_publish])
delayed_push(timestamp_or_epoch, item)
end
|
150
151
152
153
154
155
156
157
158
|
# File 'lib/resque-bus.rb', line 150
def publish_metadata(event_type, attributes={})
bus_attr = {"bus_published_at" => Time.now.to_i, "bus_event_type" => event_type}
bus_attr["bus_id"] = "#{Time.now.to_i}-#{generate_uuid}"
bus_attr["bus_app_hostname"] = hostname
bus_attr["bus_locale"] = I18n.locale.to_s if defined?(I18n) && I18n.respond_to?(:locale) && I18n.locale
bus_attr["bus_timezone"] = Time.zone.name if defined?(Time) && Time.respond_to?(:zone) && Time.zone
bus_attr.merge(attributes || {})
end
|
.redis ⇒ Object
Returns the current Redis connection. If none has been created, will create a new one from the Reqsue one (with a different namespace)
122
123
124
125
126
127
128
|
# File 'lib/resque-bus.rb', line 122
def redis
return @redis if @redis
copy = Resque.redis.clone
copy.namespace = default_namespace
self.redis = copy
self.redis
end
|
.redis=(server) ⇒ Object
Accepts:
1. A 'hostname:port' String
2. A 'hostname:port:db' String (to select the Redis db)
3. A 'hostname:port/namespace' String (to set the Redis namespace)
4. A Redis URL String 'redis://host:port'
5. An instance of `Redis`, `Redis::Client`, `Redis::DistRedis`,
or `Redis::Namespace`.
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
|
# File 'lib/resque-bus.rb', line 99
def redis=(server)
case server
when String
if server =~ /redis\:\/\//
redis = Redis.connect(:url => server, :thread_safe => true)
else
server, namespace = server.split('/', 2)
host, port, db = server.split(':')
redis = Redis.new(:host => host, :port => port,
:thread_safe => true, :db => db)
end
namespace ||= default_namespace
@redis = Redis::Namespace.new(namespace, :redis => redis)
when Redis::Namespace
@redis = server
else
@redis = Redis::Namespace.new(default_namespace, :redis => server)
end
end
|
.with_global_attributes(attributes) ⇒ Object
137
138
139
140
141
142
143
144
145
146
147
148
|
# File 'lib/resque-bus.rb', line 137
def with_global_attributes(attributes)
original_timezone = false
original_locale = false
I18n.locale = attributes["bus_locale"] if defined?(I18n) && I18n.respond_to?(:locale=)
Time.zone = attributes["bus_timezone"] if defined?(Time) && Time.respond_to?(:zone=)
yield
ensure
I18n.locale = original_locale unless original_locale == false
Time.zone = original_timezone unless original_timezone == false
end
|