Class: OneApm::Support::EventLoop

Inherits:
Object
  • Object
show all
Defined in:
lib/one_apm/support/event/timer.rb,
lib/one_apm/support/event/event_loop.rb

Defined Under Namespace

Classes: Timer

Instance Method Summary collapse

Constructor Details

#initializeEventLoop

Returns a new instance of EventLoop.



10
11
12
13
14
15
16
17
18
19
# File 'lib/one_apm/support/event/event_loop.rb', line 10

def initialize
  @self_pipe_rd, @self_pipe_wr = IO.pipe
  @event_queue = Queue.new
  @stopped     = false
  @timers      = {}

  @subscriptions = Hash.new { |h,k| h[k] = [] }
  @subscriptions[:__add_timer] << Proc.new { |t| set_timer(t) }
  @subscriptions[:__add_event] << Proc.new { |e, blk| @subscriptions[e] << blk }
end

Instance Method Details

#dispatch_event(event, args) ⇒ Object



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
# File 'lib/one_apm/support/event/event_loop.rb', line 95

def dispatch_event(event, args)
  OneApm::Manager.logger.debug("EventLoop: Dispatching event '#{event}' with #{@subscriptions[event].size} callback(s).")

  errors = []
  @subscriptions[event].each do |s|
    begin
      s.call(*args)
    rescue OneApm::ForceRestartException, OneApm::ForceDisconnectException
      raise
    rescue => e
      errors << e
    end
  end

  if !errors.empty?
    OneApm::Manager.logger.error "#{errors.size} error(s) running task for event '#{event}' in Agent Event Loop:", *errors
  end
end

#fire(event, *args) ⇒ Object



122
123
124
125
# File 'lib/one_apm/support/event/event_loop.rb', line 122

def fire(event, *args)
  @event_queue << [event, args]
  wakeup
end

#fire_after(interval, event) ⇒ Object



132
133
134
135
# File 'lib/one_apm/support/event/event_loop.rb', line 132

def fire_after(interval, event)
  OneApm::Manager.logger.debug "Firing event #{event} after #{interval} seconds."
  fire(:__add_timer, Timer.new(interval, event, false))
end

#fire_every(interval, event) ⇒ Object



127
128
129
130
# File 'lib/one_apm/support/event/event_loop.rb', line 127

def fire_every(interval, event)
  OneApm::Manager.logger.debug "Firing event #{event} every #{interval} seconds."
  fire(:__add_timer, Timer.new(interval, event, true))
end

#fire_timer(timer) ⇒ Object



84
85
86
87
88
89
# File 'lib/one_apm/support/event/event_loop.rb', line 84

def fire_timer(timer)
  if timer.due?
    @event_queue << [timer.event]
    timer.set_fired_time
  end
end

#fire_timersObject



78
79
80
81
82
# File 'lib/one_apm/support/event/event_loop.rb', line 78

def fire_timers
  @timers.each do |event, timer|
    fire_timer(timer)
  end
end

#next_timeoutObject



34
35
36
37
38
# File 'lib/one_apm/support/event/event_loop.rb', line 34

def next_timeout
  return nil if @timers.empty?
  timeout = @timers.values.map(&:next_fire_time).min - Time.now
  timeout < 0 ? 0 : timeout
end

#on(event, &blk) ⇒ Object



118
119
120
# File 'lib/one_apm/support/event/event_loop.rb', line 118

def on(event, &blk)
  fire(:__add_event, event, blk)
end

#prune_timersObject



91
92
93
# File 'lib/one_apm/support/event/event_loop.rb', line 91

def prune_timers
  @timers.delete_if { |e, t| t.finished? }
end

#reschedule_timer_for_event(e) ⇒ Object



114
115
116
# File 'lib/one_apm/support/event/event_loop.rb', line 114

def reschedule_timer_for_event(e)
  @timers[e].reschedule if @timers[e]
end

#runObject



49
50
51
52
53
54
# File 'lib/one_apm/support/event/event_loop.rb', line 49

def run
  OneApm::Manager.logger.debug "Running event loop"
  while !stopped?
    run_once
  end
end

#run_once(nonblock = false) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/one_apm/support/event/event_loop.rb', line 56

def run_once(nonblock=false)
  wait_to_run(nonblock)

  prune_timers
  fire_timers

  until @event_queue.empty?
    evt, args = @event_queue.pop
    dispatch_event(evt, args)
    reschedule_timer_for_event(evt)
  end
end

#set_timer(timer) ⇒ Object



21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/one_apm/support/event/event_loop.rb', line 21

def set_timer(timer)
  existing_timer = @timers[timer.event]

  if existing_timer
    elapsed_interval = Time.now - existing_timer.last_interval_start
    timer.advance(elapsed_interval)
  end

  @timers[timer.event] = timer

  fire_timer(timer)
end

#stopObject



44
45
46
47
# File 'lib/one_apm/support/event/event_loop.rb', line 44

def stop
  @stopped = true
  wakeup
end

#stopped?Boolean

Returns:

  • (Boolean)


40
41
42
# File 'lib/one_apm/support/event/event_loop.rb', line 40

def stopped?
  @stopped
end

#wait_to_run(nonblock) ⇒ Object



69
70
71
72
73
74
75
76
# File 'lib/one_apm/support/event/event_loop.rb', line 69

def wait_to_run(nonblock)
  timeout = nonblock ? 0 : next_timeout
  ready = IO.select([@self_pipe_rd], nil, nil, timeout)

  if ready && ready[0] && ready[0][0] && ready[0][0] == @self_pipe_rd
    @self_pipe_rd.read(1)
  end
end

#wakeupObject



137
138
139
140
141
# File 'lib/one_apm/support/event/event_loop.rb', line 137

def wakeup
  @self_pipe_wr.write_nonblock '.'
rescue Errno::EAGAIN
  OneApm::Manager.logger.debug "Failed to wakeup event loop"
end