Module: Asyncron
- Extended by:
- Asyncron
- Included in:
- Asyncron
- Defined in:
- lib/asyncron.rb,
lib/asyncron/cron.rb,
lib/asyncron/version.rb,
lib/asyncron/schedule.rb
Defined Under Namespace
Constant Summary collapse
- DEFAULT_OPTS =
{ redis: Redis.new, key: "sorted_set_asyncron/%{callback_str}" }
- VERSION =
"0.1"
Instance Method Summary collapse
- #callback(callback_str) ⇒ Object
- #due(opts = {}) ⇒ Object
- #insert(opts = {}, callback_str, payload) ⇒ Object
- #key(opts, callback_str) ⇒ Object
- #redis(opts) ⇒ Object
- #remove(opts = {}, callback_str, payload) ⇒ Object
Instance Method Details
#callback(callback_str) ⇒ Object
56 57 58 59 60 61 |
# File 'lib/asyncron.rb', line 56 def callback(callback_str) mod, method = callback_str.split(".") mods = mod.split("::") ref = mods.reduce(Object) { |acc, m| acc.const_get(m) } return ref.method(method) end |
#due(opts = {}) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/asyncron.rb', line 35 def due(opts = {}) t = Time.now redis(opts).keys(key(opts, "*")).each do |set_key| cb = callback(set_key.split("/").last) redis(opts).zrangebyscore(set_key, 0, t.to_i).each do |payload| parsed_payload = JSON.parse(payload, symbolize_names: true) cb.call(parsed_payload) redis(opts).zrem(set_key, payload) insert(opts, set_key.split("/").last, parsed_payload) end end end |
#insert(opts = {}, callback_str, payload) ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/asyncron.rb', line 17 def insert(opts = {}, callback_str, payload) unless payload.key?(:expr) raise RuntimeError.new("#{payload.inspect} has no :expr key") end time = Schedule.next(payload[:expr]) if time.nil? raise RuntimeError.new("#{payload[:expr]} for #{callback_str} and " \ "#{payload.inspect} has no future execution time") end set_key = key(opts, callback_str) return if redis(opts).zscore(set_key, payload.to_json) return redis(opts).zadd(set_key, time.to_i, payload.to_json) end |
#key(opts, callback_str) ⇒ Object
52 53 54 |
# File 'lib/asyncron.rb', line 52 def key(opts, callback_str) (opts[:key] || DEFAULT_OPTS[:key]) % {callback_str: callback_str} end |
#redis(opts) ⇒ Object
48 49 50 |
# File 'lib/asyncron.rb', line 48 def redis(opts) opts[:redis] || DEFAULT_OPTS[:redis] end |
#remove(opts = {}, callback_str, payload) ⇒ Object
31 32 33 |
# File 'lib/asyncron.rb', line 31 def remove(opts = {}, callback_str, payload) redis(opts).zrem(key(opts, callback_str), payload.to_json) end |