Module: Einhorn::Event
- Defined in:
- lib/einhorn/event/loop_breaker.rb,
lib/einhorn/event.rb,
lib/einhorn/event/timer.rb,
lib/einhorn/event/ack_timer.rb,
lib/einhorn/event/connection.rb,
lib/einhorn/event/persistent.rb,
lib/einhorn/event/command_server.rb,
lib/einhorn/event/abstract_text_descriptor.rb
Overview
TODO: set lots of cloexecs
Defined Under Namespace
Modules: Persistent
Classes: ACKTimer, AbstractTextDescriptor, CommandServer, Connection, LoopBreaker, Timer
Constant Summary
collapse
- @@loopbreak_reader =
nil
- @@loopbreak_writer =
nil
- @@signal_actions =
[]
- @@readable =
{}
- @@writeable =
{}
- @@connections =
{}
- @@timers =
{}
Class Method Summary
collapse
Class Method Details
.break_loop ⇒ Object
163
164
165
166
167
168
169
170
|
# File 'lib/einhorn/event.rb', line 163
def self.break_loop
Einhorn.log_debug("Breaking the loop")
begin
@@loopbreak_writer.write_nonblock('a')
rescue Errno::EWOULDBLOCK, Errno::EAGAIN
Einhorn.log_error("Loop break pipe is full -- probably means that we are quite backlogged")
end
end
|
.cloexec!(fd) ⇒ Object
13
14
15
|
# File 'lib/einhorn/event.rb', line 13
def self.cloexec!(fd)
fd.fcntl(Fcntl::F_SETFD, fd.fcntl(Fcntl::F_GETFD) | Fcntl::FD_CLOEXEC)
end
|
.close_all ⇒ Object
26
27
28
29
30
31
32
33
34
|
# File 'lib/einhorn/event.rb', line 26
def self.close_all
@@loopbreak_reader.close
@@loopbreak_writer.close
(@@readable.values + @@writeable.values).each do |descriptors|
descriptors.each do |descriptor|
descriptor.close
end
end
end
|
.close_all_for_worker ⇒ Object
36
37
38
|
# File 'lib/einhorn/event.rb', line 36
def self.close_all_for_worker
close_all
end
|
.connections ⇒ Object
100
101
102
|
# File 'lib/einhorn/event.rb', line 100
def self.connections
@@connections.values
end
|
.deregister_connection(fd) ⇒ Object
96
97
98
|
# File 'lib/einhorn/event.rb', line 96
def self.deregister_connection(fd)
@@connections.delete(fd)
end
|
.deregister_readable(reader) ⇒ Object
61
62
63
64
65
|
# File 'lib/einhorn/event.rb', line 61
def self.deregister_readable(reader)
readers = @@readable[reader.to_io]
readers.delete(reader)
@@readable.delete(reader.to_io) if readers.length == 0
end
|
.deregister_timer(timer) ⇒ Object
109
110
111
112
113
|
# File 'lib/einhorn/event.rb', line 109
def self.deregister_timer(timer)
timers = @@timers[timer.expires_at]
timers.delete(timer)
@@timers.delete(timer.expires_at) if timers.length == 0
end
|
.deregister_writeable(writer) ⇒ Object
78
79
80
81
82
|
# File 'lib/einhorn/event.rb', line 78
def self.deregister_writeable(writer)
writers = @@writeable[writer.to_io]
writers.delete(writer)
@@readable.delete(writer.to_io) if writers.length == 0
end
|
.init ⇒ Object
17
18
19
20
21
22
23
24
|
# File 'lib/einhorn/event.rb', line 17
def self.init
readable, writeable = IO.pipe
@@loopbreak_reader = LoopBreaker.open(readable)
@@loopbreak_writer = writeable
cloexec!(readable)
cloexec!(writeable)
end
|
.loop_once ⇒ Object
115
116
117
118
119
|
# File 'lib/einhorn/event.rb', line 115
def self.loop_once
run_signal_actions
run_selectables
run_timers
end
|
.persistent_descriptors ⇒ Object
40
41
42
43
44
|
# File 'lib/einhorn/event.rb', line 40
def self.persistent_descriptors
descriptor_sets = @@readable.values + @@writeable.values + @@timers.values
descriptors = descriptor_sets.inject {|a, b| a | b}
descriptors.select {|descriptor| Einhorn::Event::Persistent.persistent?(descriptor)}
end
|
.readable_fds ⇒ Object
67
68
69
70
71
|
# File 'lib/einhorn/event.rb', line 67
def self.readable_fds
readers = @@readable.keys
Einhorn.log_debug("Readable fds are #{readers.inspect}")
readers
end
|
.register_connection(connection, fd) ⇒ Object
92
93
94
|
# File 'lib/einhorn/event.rb', line 92
def self.register_connection(connection, fd)
@@connections[fd] = connection
end
|
.register_readable(reader) ⇒ Object
56
57
58
59
|
# File 'lib/einhorn/event.rb', line 56
def self.register_readable(reader)
@@readable[reader.to_io] ||= Set.new
@@readable[reader.to_io] << reader
end
|
.register_signal_action(&blk) ⇒ Object
52
53
54
|
# File 'lib/einhorn/event.rb', line 52
def self.register_signal_action(&blk)
@@signal_actions << blk
end
|
.register_timer(timer) ⇒ Object
104
105
106
107
|
# File 'lib/einhorn/event.rb', line 104
def self.register_timer(timer)
@@timers[timer.expires_at] ||= Set.new
@@timers[timer.expires_at] << timer
end
|
.register_writeable(writer) ⇒ Object
73
74
75
76
|
# File 'lib/einhorn/event.rb', line 73
def self.register_writeable(writer)
@@writeable[writer.to_io] ||= Set.new
@@writeable[writer.to_io] << writer
end
|
.restore_persistent_descriptors(persistent_descriptors) ⇒ Object
46
47
48
49
50
|
# File 'lib/einhorn/event.rb', line 46
def self.restore_persistent_descriptors(persistent_descriptors)
persistent_descriptors.each do |descriptor_state|
Einhorn::Event::Persistent.from_state(descriptor_state)
end
end
|
.run_selectables ⇒ Object
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
# File 'lib/einhorn/event.rb', line 140
def self.run_selectables
time = timeout
Einhorn.log_debug("Loop timeout is #{time.inspect}")
return if time && time < 0
readable, writeable, _ = IO.select(readable_fds, writeable_fds, nil, time)
(readable || []).each do |io|
@@readable[io].each {|reader| reader.notify_readable}
end
(writeable || []).each do |io|
@@writeable[io].each {|writer| writer.notify_writeable}
end
end
|
.run_signal_actions ⇒ Object
130
131
132
133
134
135
136
137
138
|
# File 'lib/einhorn/event.rb', line 130
def self.run_signal_actions
while blk = @@signal_actions.shift
blk.call
end
end
|
.run_timers ⇒ Object
156
157
158
159
160
161
|
# File 'lib/einhorn/event.rb', line 156
def self.run_timers
@@timers.select {|expires_at, _| expires_at <= Time.now}.each do |expires_at, timers|
timers.dup.each {|timer| timer.ring!}
end
end
|
.timeout ⇒ Object
121
122
123
124
125
126
127
128
|
# File 'lib/einhorn/event.rb', line 121
def self.timeout
if expires_at = @@timers.keys.sort[0]
expires_at - Time.now
else
nil
end
end
|
.writeable_fds ⇒ Object
84
85
86
87
88
89
90
|
# File 'lib/einhorn/event.rb', line 84
def self.writeable_fds
writers = @@writeable.select do |io, writers|
writers.any? {|writer| writer.write_pending?}
end.map {|io, writers| io}
Einhorn.log_debug("Writeable fds are #{writers.inspect}")
writers
end
|