Module: SayWhen::Poller::BasePoller

Included in:
CelluloidPoller, ConcurrentPoller, SimplePoller
Defined in:
lib/say_when/poller/base_poller.rb

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(mod) ⇒ Object



8
9
10
11
# File 'lib/say_when/poller/base_poller.rb', line 8

def self.included(mod)
  mod.include(SayWhen::Utils)
  attr_accessor :reset_next_at
end

Instance Method Details

#acquire(time_now) ⇒ Object



54
55
56
57
58
59
60
61
62
# File 'lib/say_when/poller/base_poller.rb', line 54

def acquire(time_now)
  logger.debug "SayWhen:: Looking for job that should be ready to fire before #{time_now}"
  if job = self.storage.acquire_next(time_now)
    logger.debug "SayWhen:: got a job: #{job.inspect}"
  else
    logger.debug "SayWhen:: no jobs to acquire"
  end
  job
end

#error_tick_lengthObject



87
88
89
# File 'lib/say_when/poller/base_poller.rb', line 87

def error_tick_length
  @error_tick_length ||= SayWhen.options[:error_tick_length].to_f || tick_length
end

#job_error(msg, job, ex) ⇒ Object



30
31
32
33
34
# File 'lib/say_when/poller/base_poller.rb', line 30

def job_error(msg, job, ex)
  job_msg = job && " job:'#{job.inspect}'"
  logger.error "#{self.class.name} #{msg}#{job_msg}: #{ex.message}\n\t#{ex.backtrace.join("\t\n")}"
  release(job)
end

#loggerObject



103
104
105
# File 'lib/say_when/poller/base_poller.rb', line 103

def logger
  SayWhen.logger
end

#process(job, time_now) ⇒ Object



64
65
66
67
68
69
70
71
72
# File 'lib/say_when/poller/base_poller.rb', line 64

def process(job, time_now)
  # delegate processing the trigger to the processor
  processor.process(job)
  logger.debug "SayWhen:: job processed: #{job.inspect}"

  # this should update next fire at, and put back in list of scheduled jobs
  storage.fired(job, time_now)
  logger.debug "SayWhen:: job fired: #{job.inspect}"
end

#process_jobsObject



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/say_when/poller/base_poller.rb', line 36

def process_jobs
  reset_acquired
  time_now = Time.now
  while job = acquire(time_now)
    process(job, time_now)
    time_now = Time.now
  end
rescue StandardError => ex
  job_error("Error!", job, ex)
  tick(error_tick_length)
rescue Interrupt => ex
  job_error("Interrupt!", job, ex)
  raise ex
rescue Exception => ex
  job_error("Exception!", job, ex)
  raise ex
end

#processorObject



95
96
97
# File 'lib/say_when/poller/base_poller.rb', line 95

def processor
  @processor ||= load_strategy(:processor, SayWhen.options[:processor_strategy])
end

#release(job) ⇒ Object



74
75
76
77
# File 'lib/say_when/poller/base_poller.rb', line 74

def release(job)
  logger.info "SayWhen::Scheduler release: #{job.inspect}"
  job.release if job
end

#reset_acquiredObject



19
20
21
22
23
24
25
26
27
28
# File 'lib/say_when/poller/base_poller.rb', line 19

def reset_acquired
  time_now = Time.now
  self.reset_next_at ||= time_now

  if reset_acquired_length > 0 && reset_next_at <= time_now
    self.reset_next_at = time_now + reset_acquired_length
    logger.debug "SayWhen:: reset acquired at #{time_now}, try again at #{reset_next_at}"
    storage.reset_acquired(reset_acquired_length)
  end
end

#reset_acquired_lengthObject



91
92
93
# File 'lib/say_when/poller/base_poller.rb', line 91

def reset_acquired_length
  @reset_acquired_length ||= SayWhen.options[:reset_acquired_length].to_f
end

#startObject



16
17
# File 'lib/say_when/poller/base_poller.rb', line 16

def start
end

#stopObject



13
14
# File 'lib/say_when/poller/base_poller.rb', line 13

def stop
end

#storageObject



99
100
101
# File 'lib/say_when/poller/base_poller.rb', line 99

def storage
  @storage ||= load_strategy(:storage, SayWhen.options[:storage_strategy])
end

#tick(t = tick_length) ⇒ Object



79
80
81
# File 'lib/say_when/poller/base_poller.rb', line 79

def tick(t = tick_length)
  sleep(t.to_f)
end

#tick_lengthObject



83
84
85
# File 'lib/say_when/poller/base_poller.rb', line 83

def tick_length
  @tick_length ||= SayWhen.options[:tick_length].to_f
end