Module: Einhorn::Event

Defined in:
lib/einhorn/event/loop_breaker.rb,
lib/einhorn/event.rb,
lib/einhorn/event/timer.rb,
lib/einhorn/event/ack_timer.rb,
lib/einhorn/event/connection.rb,
lib/einhorn/event/persistent.rb,
lib/einhorn/event/command_server.rb,
lib/einhorn/event/abstract_text_descriptor.rb

Overview

TODO: set lots of cloexecs

Defined Under Namespace

Modules: Persistent Classes: ACKTimer, AbstractTextDescriptor, CommandServer, Connection, LoopBreaker, Timer

Constant Summary collapse

@@loopbreak_reader =
nil
@@loopbreak_writer =
nil
@@readable =
{}
@@writeable =
{}
@@timers =
{}

Class Method Summary collapse

Class Method Details

.break_loopObject



134
135
136
137
138
139
140
141
# File 'lib/einhorn/event.rb', line 134

def self.break_loop
  Einhorn.log_debug("Breaking the loop")
  begin
    @@loopbreak_writer.write_nonblock('a')
  rescue Errno::EWOULDBLOCK, Errno::EAGAIN
    Einhorn.log_error("Loop break pipe is full -- probably means that we are quite backlogged")
  end
end

.cloexec!(fd) ⇒ Object



11
12
13
# File 'lib/einhorn/event.rb', line 11

def self.cloexec!(fd)
  fd.fcntl(Fcntl::F_SETFD, fd.fcntl(Fcntl::F_GETFD) | Fcntl::FD_CLOEXEC)
end

.close_allObject



24
25
26
27
28
29
30
31
32
# File 'lib/einhorn/event.rb', line 24

def self.close_all
  @@loopbreak_reader.close
  @@loopbreak_writer.close
  (@@readable.values + @@writeable.values).each do |descriptors|
    descriptors.each do |descriptor|
      descriptor.close
    end
  end
end

.close_all_for_workerObject



34
35
36
# File 'lib/einhorn/event.rb', line 34

def self.close_all_for_worker
  close_all
end

.deregister_readable(reader) ⇒ Object



55
56
57
58
59
# File 'lib/einhorn/event.rb', line 55

def self.deregister_readable(reader)
  readers = @@readable[reader.to_io]
  readers.delete(reader)
  @@readable.delete(reader.to_io) if readers.length == 0
end

.deregister_timer(timer) ⇒ Object



91
92
93
94
95
# File 'lib/einhorn/event.rb', line 91

def self.deregister_timer(timer)
  timers = @@timers[timer.expires_at]
  timers.delete(timer)
  @@timers.delete(timer.expires_at) if timers.length == 0
end

.deregister_writeable(writer) ⇒ Object



72
73
74
75
76
# File 'lib/einhorn/event.rb', line 72

def self.deregister_writeable(writer)
  writers = @@writeable[writer.to_io]
  writers.delete(writer)
  @@readable.delete(writer.to_io) if writers.length == 0
end

.initObject



15
16
17
18
19
20
21
22
# File 'lib/einhorn/event.rb', line 15

def self.init
  readable, writeable = IO.pipe
  @@loopbreak_reader = LoopBreaker.open(readable)
  @@loopbreak_writer = writeable

  cloexec!(readable)
  cloexec!(writeable)
end

.loop_onceObject



97
98
99
100
# File 'lib/einhorn/event.rb', line 97

def self.loop_once
  run_selectables
  run_timers
end

.persistent_descriptorsObject



38
39
40
41
42
# File 'lib/einhorn/event.rb', line 38

def self.persistent_descriptors
  descriptor_sets = @@readable.values + @@writeable.values + @@timers.values
  descriptors = descriptor_sets.inject {|a, b| a | b}
  descriptors.select {|descriptor| Einhorn::Event::Persistent.persistent?(descriptor)}
end

.readable_fdsObject



61
62
63
64
65
# File 'lib/einhorn/event.rb', line 61

def self.readable_fds
  readers = @@readable.keys
  Einhorn.log_debug("Readable fds are #{readers.inspect}")
  readers
end

.register_readable(reader) ⇒ Object



50
51
52
53
# File 'lib/einhorn/event.rb', line 50

def self.register_readable(reader)
  @@readable[reader.to_io] ||= Set.new
  @@readable[reader.to_io] << reader
end

.register_timer(timer) ⇒ Object



86
87
88
89
# File 'lib/einhorn/event.rb', line 86

def self.register_timer(timer)
  @@timers[timer.expires_at] ||= Set.new
  @@timers[timer.expires_at] << timer
end

.register_writeable(writer) ⇒ Object



67
68
69
70
# File 'lib/einhorn/event.rb', line 67

def self.register_writeable(writer)
  @@writeable[writer.to_io] ||= Set.new
  @@writeable[writer.to_io] << writer
end

.restore_persistent_descriptors(persistent_descriptors) ⇒ Object



44
45
46
47
48
# File 'lib/einhorn/event.rb', line 44

def self.restore_persistent_descriptors(persistent_descriptors)
  persistent_descriptors.each do |descriptor_state|
    Einhorn::Event::Persistent.from_state(descriptor_state)
  end
end

.run_selectablesObject



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/einhorn/event.rb', line 111

def self.run_selectables
  time = timeout
  Einhorn.log_debug("Loop timeout is #{time.inspect}")
  # Time's already up
  return if time && time < 0

  readable, writeable, _ = IO.select(readable_fds, writeable_fds, nil, time)
  (readable || []).each do |io|
    @@readable[io].each {|reader| reader.notify_readable}
  end

  (writeable || []).each do |io|
    @@writeable[io].each {|writer| writer.notify_writeable}
  end
end

.run_timersObject



127
128
129
130
131
132
# File 'lib/einhorn/event.rb', line 127

def self.run_timers
  @@timers.select {|expires_at, _| expires_at <= Time.now}.each do |expires_at, timers|
    # Going to be modifying the set, so let's dup it.
    timers.dup.each {|timer| timer.ring!}
  end
end

.timeoutObject



102
103
104
105
106
107
108
109
# File 'lib/einhorn/event.rb', line 102

def self.timeout
  # (expires_at of the next timer) - now
  if expires_at = @@timers.keys.sort[0]
    expires_at - Time.now
  else
    nil
  end
end

.writeable_fdsObject



78
79
80
81
82
83
84
# File 'lib/einhorn/event.rb', line 78

def self.writeable_fds
  writers = @@writeable.select do |io, writers|
    writers.any? {|writer| writer.write_pending?}
  end.map {|io, writers| io}
  Einhorn.log_debug("Writeable fds are #{writers.inspect}")
  writers
end