Class: LogStash::PluginMixins::Jdbc::Scheduler
- Inherits:
-
Rufus::Scheduler
- Object
- Rufus::Scheduler
- LogStash::PluginMixins::Jdbc::Scheduler
- Includes:
- Util::Loggable
- Defined in:
- lib/logstash/plugin_mixins/jdbc/scheduler.rb
Defined Under Namespace
Modules: JobDecorator
Constant Summary collapse
- TimeImpl =
Rufus::Scheduler >= 3.4 moved the Time impl into a gem EoTime = ::EtOrbi::EoTime` Rufus::Scheduler 3.1 - 3.3 using it’s own Time impl ‘Rufus::Scheduler::ZoTime`
defined?(Rufus::Scheduler::EoTime) ? Rufus::Scheduler::EoTime : (defined?(Rufus::Scheduler::ZoTime) ? Rufus::Scheduler::ZoTime : ::Time)
Class Method Summary collapse
-
.new_scheduler(opts) ⇒ Object
Scheduler instance.
-
.start_cron_scheduler(cron, opts = {}, &block) ⇒ Object
Scheduler instance.
Instance Method Summary collapse
- #on_error(job, err) ⇒ Object
- #timeout_jobs ⇒ Object
- #work_thread_name_prefix ⇒ Object
- #work_threads(query = :all) ⇒ Object
Class Method Details
.new_scheduler(opts) ⇒ Object
Returns scheduler instance.
29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/logstash/plugin_mixins/jdbc/scheduler.rb', line 29 def self.new_scheduler(opts) unless opts.key?(:thread_name) raise ArgumentError, 'thread_name: option is required to be able to distinguish multiple scheduler threads' end opts[:max_work_threads] ||= 1 # amount the scheduler thread sleeps between checking whether jobs # should trigger, default is 0.3 which is a bit too often ... # in theory the cron expression '* * * * * *' supports running jobs # every second but this is very rare, we could potentially go higher opts[:frequency] ||= 1.0 new(opts) end |
.start_cron_scheduler(cron, opts = {}, &block) ⇒ Object
Returns scheduler instance.
18 19 20 21 22 23 24 25 |
# File 'lib/logstash/plugin_mixins/jdbc/scheduler.rb', line 18 def self.start_cron_scheduler(cron, opts = {}, &block) unless block_given? raise ArgumentError, 'missing (cron scheduler) block - worker task to execute' end scheduler = new_scheduler(opts) scheduler.schedule_cron(cron, &block) scheduler end |
Instance Method Details
#on_error(job, err) ⇒ Object
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/logstash/plugin_mixins/jdbc/scheduler.rb', line 80 def on_error(job, err) details = { exception: err.class, message: err., backtrace: err.backtrace } details[:cause] = err.cause if err.cause details[:now] = debug_format_time(TimeImpl.now) details[:last_time] = (debug_format_time(job.last_time) rescue nil) details[:next_time] = (debug_format_time(job.next_time) rescue nil) details[:job] = job details[:opts] = @opts details[:started_at] = started_at details[:thread] = thread.inspect details[:jobs_size] = @jobs.size details[:work_threads_size] = work_threads.size details[:work_queue_size] = work_queue.size logger.error("Scheduler intercepted an error:", details) rescue => e logger.error("Scheduler failed in #on_error #{e.inspect}") end |
#timeout_jobs ⇒ Object
44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/logstash/plugin_mixins/jdbc/scheduler.rb', line 44 def timeout_jobs # Rufus relies on `Thread.list` which is a blocking operation and with many schedulers # (and threads) within LS will have a negative impact on performance as scheduler # threads will end up waiting to obtain the `Thread.list` lock. # # However, this isn't necessary we can easily detect whether there are any jobs # that might need to timeout: only when `@opts[:timeout]` is set causes worker thread(s) # to have a `Thread.current[:rufus_scheduler_timeout]` that is not nil return unless @opts[:timeout] super end |
#work_thread_name_prefix ⇒ Object
109 110 111 |
# File 'lib/logstash/plugin_mixins/jdbc/scheduler.rb', line 109 def work_thread_name_prefix ( @opts[:thread_name] || "#{@thread_key}_scheduler" ) + '_worker-' end |
#work_threads(query = :all) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/logstash/plugin_mixins/jdbc/scheduler.rb', line 57 def work_threads(query = :all) if query == :__all_no_cache__ # special case from JobDecorator#start_work_thread @_work_threads = nil # when a new worker thread is being added reset return super(:all) end # Gets executed every time a job is triggered, we're going to cache the # worker threads for this scheduler (to avoid `Thread.list`) - they only # change when a new thread is being started from #start_work_thread ... work_threads = @_work_threads if work_threads.nil? work_threads = threads.select { |t| t[:rufus_scheduler_work_thread] } @_work_threads = work_threads end case query when :active then work_threads.select { |t| t[:rufus_scheduler_job] } when :vacant then work_threads.reject { |t| t[:rufus_scheduler_job] } else work_threads end end |