Module: Asyncron

Extended by:
Asyncron
Included in:
Asyncron
Defined in:
lib/asyncron.rb,
lib/asyncron/cron.rb,
lib/asyncron/version.rb,
lib/asyncron/schedule.rb

Defined Under Namespace

Modules: Cron, Schedule

Constant Summary collapse

DEFAULT_OPTS =
{
  redis: Redis.new,
  key: "sorted_set_asyncron/%{callback_str}"
}
VERSION =
"0.1"

Instance Method Summary collapse

Instance Method Details

#callback(callback_str) ⇒ Object



56
57
58
59
60
61
# File 'lib/asyncron.rb', line 56

def callback(callback_str)
  mod, method = callback_str.split(".")
  mods = mod.split("::")
  ref = mods.reduce(Object) { |acc, m| acc.const_get(m) }
  return ref.method(method)
end

#due(opts = {}) ⇒ Object



35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/asyncron.rb', line 35

def due(opts = {})
  t = Time.now
  redis(opts).keys(key(opts, "*")).each do |set_key|
    cb = callback(set_key.split("/").last)
    redis(opts).zrangebyscore(set_key, 0, t.to_i).each do |payload|
      parsed_payload = JSON.parse(payload, symbolize_names: true)
      cb.call(parsed_payload)
      redis(opts).zrem(set_key, payload)
      insert(opts, set_key.split("/").last, parsed_payload)
    end
  end
end

#insert(opts = {}, callback_str, payload) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/asyncron.rb', line 17

def insert(opts = {}, callback_str, payload)
  unless payload.key?(:expr)
    raise RuntimeError.new("#{payload.inspect} has no :expr key")
  end
  time = Schedule.next(payload[:expr])
  if time.nil?
    raise RuntimeError.new("#{payload[:expr]} for #{callback_str} and " \
      "#{payload.inspect} has no future execution time")
  end
  set_key = key(opts, callback_str)
  return if redis(opts).zscore(set_key, payload.to_json)
  return redis(opts).zadd(set_key, time.to_i, payload.to_json)
end

#key(opts, callback_str) ⇒ Object



52
53
54
# File 'lib/asyncron.rb', line 52

def key(opts, callback_str)
  (opts[:key] || DEFAULT_OPTS[:key]) % {callback_str: callback_str}
end

#redis(opts) ⇒ Object



48
49
50
# File 'lib/asyncron.rb', line 48

def redis(opts)
  opts[:redis] || DEFAULT_OPTS[:redis]
end

#remove(opts = {}, callback_str, payload) ⇒ Object



31
32
33
# File 'lib/asyncron.rb', line 31

def remove(opts = {}, callback_str, payload)
  redis(opts).zrem(key(opts, callback_str), payload.to_json)
end