Class: LogStash::PluginMixins::Jdbc::Scheduler

Inherits:
Rufus::Scheduler
  • Object
show all
Includes:
Util::Loggable
Defined in:
lib/logstash/plugin_mixins/jdbc/scheduler.rb

Defined Under Namespace

Modules: JobDecorator

Constant Summary collapse

TimeImpl =

Rufus::Scheduler >= 3.4 moved the Time impl into a gem EoTime = ::EtOrbi::EoTime` Rufus::Scheduler 3.1 - 3.3 using it’s own Time impl ‘Rufus::Scheduler::ZoTime`

defined?(Rufus::Scheduler::EoTime) ? Rufus::Scheduler::EoTime :
(defined?(Rufus::Scheduler::ZoTime) ? Rufus::Scheduler::ZoTime : ::Time)

Instance Method Summary collapse

Instance Method Details

#on_error(job, err) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/logstash/plugin_mixins/jdbc/scheduler.rb', line 52

def on_error(job, err)
  details = { exception: err.class, message: err.message, backtrace: err.backtrace }
  details[:cause] = err.cause if err.cause

  details[:now] = debug_format_time(TimeImpl.now)
  details[:last_time] = (debug_format_time(job.last_time) rescue nil)
  details[:next_time] = (debug_format_time(job.next_time) rescue nil)
  details[:job] = job

  details[:opts] = @opts
  details[:started_at] = started_at
  details[:thread] = thread.inspect
  details[:jobs_size] = @jobs.size
  details[:work_threads_size] = work_threads.size
  details[:work_queue_size] = work_queue.size

  logger.error("Scheduler intercepted an error:", details)

rescue => e
  logger.error("Scheduler failed in #on_error #{e.inspect}")
end

#timeout_jobsObject



16
17
18
19
20
21
22
23
24
25
26
# File 'lib/logstash/plugin_mixins/jdbc/scheduler.rb', line 16

def timeout_jobs
  # Rufus relies on `Thread.list` which is a blocking operation and with many schedulers
  # (and threads) within LS will have a negative impact on performance as scheduler
  # threads will end up waiting to obtain the `Thread.list` lock.
  #
  # However, this isn't necessary we can easily detect whether there are any jobs
  # that might need to timeout: only when `@opts[:timeout]` is set causes worker thread(s)
  # to have a `Thread.current[:rufus_scheduler_timeout]` that is not nil
  return unless @opts[:timeout]
  super
end

#work_thread_name_prefixObject



81
82
83
# File 'lib/logstash/plugin_mixins/jdbc/scheduler.rb', line 81

def work_thread_name_prefix
  ( @opts[:thread_name] || "#{@thread_key}_scheduler" ) + '_worker-'
end

#work_threads(query = :all) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/logstash/plugin_mixins/jdbc/scheduler.rb', line 29

def work_threads(query = :all)
  if query == :__all_no_cache__ # special case from JobDecorator#start_work_thread
    @_work_threads = nil # when a new worker thread is being added reset
    return super(:all)
  end

  # Gets executed every time a job is triggered, we're going to cache the
  # worker threads for this scheduler (to avoid `Thread.list`) - they only
  # change when a new thread is being started from #start_work_thread ...
  work_threads = @_work_threads
  if work_threads.nil?
    work_threads = threads.select { |t| t[:rufus_scheduler_work_thread] }
    @_work_threads = work_threads
  end

  case query
  when :active then work_threads.select { |t| t[:rufus_scheduler_job] }
  when :vacant then work_threads.reject { |t| t[:rufus_scheduler_job] }
  else work_threads
  end
end