Class: Skywalking::Reporter::Scheduler

Inherits:
Object
  • Object
show all
Includes:
Log::Logging
Defined in:
lib/skywalking/reporter/scheduler.rb

Defined Under Namespace

Classes: Timer

Instance Method Summary collapse

Methods included from Log::Logging

#debug, #error, #info, #log, #warn

Constructor Details

#initializeScheduler

Returns a new instance of Scheduler.



21
22
23
24
25
26
27
28
29
# File 'lib/skywalking/reporter/scheduler.rb', line 21

def initialize
  @read_side, @write_side = IO.pipe
  @queue = Queue.new
  @workers = {}
  @jobs = Hash.new { |h, k| h[k] = [] }
  @jobs[:timer_job] << proc { |worker| add_worker(worker) }
  @jobs[:event_job] << proc { |event, blk| @jobs[event] << blk }
  @running = true
end

Instance Method Details

#add_worker(worker) ⇒ Object



83
84
85
86
87
88
89
# File 'lib/skywalking/reporter/scheduler.rb', line 83

def add_worker(worker)
  orig_job = @workers[worker.job_name]
  orig_job.adjust_next_trigger_time(Process.clock_gettime(Process::CLOCK_REALTIME) - orig_job.latest_trigger) if orig_job

  @workers[worker.job_name] = worker
  trigger_worker(worker)
end

#dispatch(job, args) ⇒ Object



52
53
54
55
56
57
58
59
60
# File 'lib/skywalking/reporter/scheduler.rb', line 52

def dispatch(job, args)
  @jobs[job].each do |orig_job|
    begin
      orig_job.call(*args)
    rescue Exception => e
      warn "Error in job #{job}: #{e.message}"
    end
  end
end

#find_next_triggerObject



72
73
74
75
76
77
# File 'lib/skywalking/reporter/scheduler.rb', line 72

def find_next_trigger
  return nil if @workers.empty?

  timeout = @workers.values.map(&:next_trigger_time).min - Process.clock_gettime(Process::CLOCK_REALTIME)
  timeout.positive? ? timeout : 0
end

#notifyObject



79
80
81
# File 'lib/skywalking/reporter/scheduler.rb', line 79

def notify
  @write_side.write_nonblock('n')
end

#runObject



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/skywalking/reporter/scheduler.rb', line 31

def run
  while running?
    reads = IO.select([@read_side], nil, nil, find_next_trigger)
    if reads&.dig(0, 0) == @read_side
      @read_side.read(1)
    end
    @workers.each_value do |worker|
      if worker.need_trigger?
        @queue << [worker.job_name]
        worker.set_latest_trigger_time
      end
    end

    until @queue.empty?
      job, args = @queue.pop
      dispatch(job, args)
      @workers[job]&.init_next_trigger_time
    end
  end
end

#running?Boolean

Returns:

  • (Boolean)


98
99
100
# File 'lib/skywalking/reporter/scheduler.rb', line 98

def running?
  @running
end

#shutdownObject



102
103
104
105
106
107
# File 'lib/skywalking/reporter/scheduler.rb', line 102

def shutdown
  return unless running?

  @running = false
  @write_side.write_nonblock('s')
end

#subscribe(job_name, job_interval, &job_func) ⇒ Object



62
63
64
65
# File 'lib/skywalking/reporter/scheduler.rb', line 62

def subscribe(job_name, job_interval, &job_func)
  trigger(:event_job, [job_name, job_func])
  trigger(:timer_job, Timer.new(job_name, job_interval))
end

#trigger(job_type, *args) ⇒ Object



67
68
69
70
# File 'lib/skywalking/reporter/scheduler.rb', line 67

def trigger(job_type, *args)
  @queue.push([job_type, *args])
  notify
end

#trigger_worker(worker) ⇒ Object



91
92
93
94
95
96
# File 'lib/skywalking/reporter/scheduler.rb', line 91

def trigger_worker(worker)
  if worker.need_trigger?
    @queue.push([worker.job_name])
    worker.set_latest_trigger_time
  end
end