Class: Skywalking::Reporter::Scheduler
- Inherits:
-
Object
- Object
- Skywalking::Reporter::Scheduler
show all
- Includes:
- Log::Logging
- Defined in:
- lib/skywalking/reporter/scheduler.rb
Defined Under Namespace
Classes: Timer
Instance Method Summary
collapse
#debug, #error, #info, #log, #warn
Constructor Details
Returns a new instance of Scheduler.
21
22
23
24
25
26
27
28
29
|
# File 'lib/skywalking/reporter/scheduler.rb', line 21
def initialize
@read_side, @write_side = IO.pipe
@queue = Queue.new
@workers = {}
@jobs = Hash.new { |h, k| h[k] = [] }
@jobs[:timer_job] << proc { |worker| add_worker(worker) }
@jobs[:event_job] << proc { |event, blk| @jobs[event] << blk }
@running = true
end
|
Instance Method Details
#add_worker(worker) ⇒ Object
83
84
85
86
87
88
89
|
# File 'lib/skywalking/reporter/scheduler.rb', line 83
def add_worker(worker)
orig_job = @workers[worker.job_name]
orig_job.adjust_next_trigger_time(Process.clock_gettime(Process::CLOCK_REALTIME) - orig_job.latest_trigger) if orig_job
@workers[worker.job_name] = worker
trigger_worker(worker)
end
|
#dispatch(job, args) ⇒ Object
52
53
54
55
56
57
58
59
60
|
# File 'lib/skywalking/reporter/scheduler.rb', line 52
def dispatch(job, args)
@jobs[job].each do |orig_job|
begin
orig_job.call(*args)
rescue Exception => e
warn "Error in job #{job}: #{e.message}"
end
end
end
|
#find_next_trigger ⇒ Object
72
73
74
75
76
77
|
# File 'lib/skywalking/reporter/scheduler.rb', line 72
def find_next_trigger
return nil if @workers.empty?
timeout = @workers.values.map(&:next_trigger_time).min - Process.clock_gettime(Process::CLOCK_REALTIME)
timeout.positive? ? timeout : 0
end
|
#notify ⇒ Object
79
80
81
|
# File 'lib/skywalking/reporter/scheduler.rb', line 79
def notify
@write_side.write_nonblock('n')
end
|
#run ⇒ Object
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
# File 'lib/skywalking/reporter/scheduler.rb', line 31
def run
while running?
reads = IO.select([@read_side], nil, nil, find_next_trigger)
if reads&.dig(0, 0) == @read_side
@read_side.read(1)
end
@workers.each_value do |worker|
if worker.need_trigger?
@queue << [worker.job_name]
worker.set_latest_trigger_time
end
end
until @queue.empty?
job, args = @queue.pop
dispatch(job, args)
@workers[job]&.init_next_trigger_time
end
end
end
|
#running? ⇒ Boolean
98
99
100
|
# File 'lib/skywalking/reporter/scheduler.rb', line 98
def running?
@running
end
|
#shutdown ⇒ Object
102
103
104
105
106
107
|
# File 'lib/skywalking/reporter/scheduler.rb', line 102
def shutdown
return unless running?
@running = false
@write_side.write_nonblock('s')
end
|
#subscribe(job_name, job_interval, &job_func) ⇒ Object
62
63
64
65
|
# File 'lib/skywalking/reporter/scheduler.rb', line 62
def subscribe(job_name, job_interval, &job_func)
trigger(:event_job, [job_name, job_func])
trigger(:timer_job, Timer.new(job_name, job_interval))
end
|
#trigger(job_type, *args) ⇒ Object
67
68
69
70
|
# File 'lib/skywalking/reporter/scheduler.rb', line 67
def trigger(job_type, *args)
@queue.push([job_type, *args])
notify
end
|
#trigger_worker(worker) ⇒ Object
91
92
93
94
95
96
|
# File 'lib/skywalking/reporter/scheduler.rb', line 91
def trigger_worker(worker)
if worker.need_trigger?
@queue.push([worker.job_name])
worker.set_latest_trigger_time
end
end
|