Class: LogStash::PluginMixins::Jdbc::Scheduler

Inherits:
Rufus::Scheduler
  • Object
show all
Includes:
Util::Loggable
Defined in:
lib/logstash/plugin_mixins/jdbc/scheduler.rb

Defined Under Namespace

Modules: JobDecorator

Constant Summary collapse

TimeImpl =

Rufus::Scheduler >= 3.4 moved the Time impl into a gem EoTime = ::EtOrbi::EoTime` Rufus::Scheduler 3.1 - 3.3 using it’s own Time impl ‘Rufus::Scheduler::ZoTime`

defined?(Rufus::Scheduler::EoTime) ? Rufus::Scheduler::EoTime :
(defined?(Rufus::Scheduler::ZoTime) ? Rufus::Scheduler::ZoTime : ::Time)

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.new_scheduler(opts) ⇒ Object

Returns scheduler instance.

Parameters:

  • opts (Hash)

    scheduler options

Returns:

  • scheduler instance



29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/logstash/plugin_mixins/jdbc/scheduler.rb', line 29

def self.new_scheduler(opts)
  unless opts.key?(:thread_name)
    raise ArgumentError, 'thread_name: option is required to be able to distinguish multiple scheduler threads'
  end
  opts[:max_work_threads] ||= 1
  # amount the scheduler thread sleeps between checking whether jobs
  # should trigger, default is 0.3 which is a bit too often ...
  # in theory the cron expression '* * * * * *' supports running jobs
  # every second but this is very rare, we could potentially go higher
  opts[:frequency] ||= 1.0

  new(opts)
end

.start_cron_scheduler(cron, opts = {}, &block) ⇒ Object

Returns scheduler instance.

Parameters:

  • cron (String)

    cron-line

  • opts (Hash) (defaults to: {})

    scheduler options

Returns:

  • scheduler instance



18
19
20
21
22
23
24
25
# File 'lib/logstash/plugin_mixins/jdbc/scheduler.rb', line 18

def self.start_cron_scheduler(cron, opts = {}, &block)
  unless block_given?
    raise ArgumentError, 'missing (cron scheduler) block - worker task to execute'
  end
  scheduler = new_scheduler(opts)
  scheduler.schedule_cron(cron, &block)
  scheduler
end

Instance Method Details

#on_error(job, err) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/logstash/plugin_mixins/jdbc/scheduler.rb', line 80

def on_error(job, err)
  details = { exception: err.class, message: err.message, backtrace: err.backtrace }
  details[:cause] = err.cause if err.cause

  details[:now] = debug_format_time(TimeImpl.now)
  details[:last_time] = (debug_format_time(job.last_time) rescue nil)
  details[:next_time] = (debug_format_time(job.next_time) rescue nil)
  details[:job] = job

  details[:opts] = @opts
  details[:started_at] = started_at
  details[:thread] = thread.inspect
  details[:jobs_size] = @jobs.size
  details[:work_threads_size] = work_threads.size
  details[:work_queue_size] = work_queue.size

  logger.error("Scheduler intercepted an error:", details)

rescue => e
  logger.error("Scheduler failed in #on_error #{e.inspect}")
end

#timeout_jobsObject



44
45
46
47
48
49
50
51
52
53
54
# File 'lib/logstash/plugin_mixins/jdbc/scheduler.rb', line 44

def timeout_jobs
  # Rufus relies on `Thread.list` which is a blocking operation and with many schedulers
  # (and threads) within LS will have a negative impact on performance as scheduler
  # threads will end up waiting to obtain the `Thread.list` lock.
  #
  # However, this isn't necessary we can easily detect whether there are any jobs
  # that might need to timeout: only when `@opts[:timeout]` is set causes worker thread(s)
  # to have a `Thread.current[:rufus_scheduler_timeout]` that is not nil
  return unless @opts[:timeout]
  super
end

#work_thread_name_prefixObject



109
110
111
# File 'lib/logstash/plugin_mixins/jdbc/scheduler.rb', line 109

def work_thread_name_prefix
  ( @opts[:thread_name] || "#{@thread_key}_scheduler" ) + '_worker-'
end

#work_threads(query = :all) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/logstash/plugin_mixins/jdbc/scheduler.rb', line 57

def work_threads(query = :all)
  if query == :__all_no_cache__ # special case from JobDecorator#start_work_thread
    @_work_threads = nil # when a new worker thread is being added reset
    return super(:all)
  end

  # Gets executed every time a job is triggered, we're going to cache the
  # worker threads for this scheduler (to avoid `Thread.list`) - they only
  # change when a new thread is being started from #start_work_thread ...
  work_threads = @_work_threads
  if work_threads.nil?
    work_threads = threads.select { |t| t[:rufus_scheduler_work_thread] }
    @_work_threads = work_threads
  end

  case query
  when :active then work_threads.select { |t| t[:rufus_scheduler_job] }
  when :vacant then work_threads.reject { |t| t[:rufus_scheduler_job] }
  else work_threads
  end
end