Class: Arf::Reactor
- Inherits:
-
Object
- Object
- Arf::Reactor
- Defined in:
- lib/arf/reactor.rb
Constant Summary collapse
- BEAT_INTERVAL =
3
Class Method Summary collapse
- .attach_server(server, handler_class, *handler_args) ⇒ Object
- .client_writes_pending(client) ⇒ Object
- .connect(host, port, handler) ⇒ Object
- .detach(io) ⇒ Object
- .detach_client(client) ⇒ Object
- .instance ⇒ Object
- .post(task = nil) ⇒ Object
Instance Method Summary collapse
- #attach_server(server, handler_class, *handler_args) ⇒ Object
- #client_writes_pending(client) ⇒ Object
- #connect(host, port, handler) ⇒ Object
- #detach(io) ⇒ Object
- #detach_client(client) ⇒ Object
-
#initialize ⇒ Reactor
constructor
A new instance of Reactor.
- #io_for_client(client) ⇒ Object
- #post(task = nil, &block) ⇒ Object
- #timer(interval, &block) ⇒ Object
Constructor Details
#initialize ⇒ Reactor
Returns a new instance of Reactor.
18 19 20 21 22 23 24 25 26 |
# File 'lib/arf/reactor.rb', line 18 def initialize @nio = @executor = @thread = nil @stopping = false @todo = Queue.new @spawn_lock = Monitor.new @map = {} @socket_source = {} @log = Arf.configuration.logger.with_fields(subsystem: "Reactor") end |
Class Method Details
.attach_server(server, handler_class, *handler_args) ⇒ Object
14 15 16 |
# File 'lib/arf/reactor.rb', line 14 def self.attach_server(server, handler_class, *handler_args) instance.attach_server(server, handler_class, *handler_args) end |
.client_writes_pending(client) ⇒ Object
10 |
# File 'lib/arf/reactor.rb', line 10 def self.client_writes_pending(client) = instance.client_writes_pending(client) |
.connect(host, port, handler) ⇒ Object
8 |
# File 'lib/arf/reactor.rb', line 8 def self.connect(host, port, handler) = instance.connect(host, port, handler) |
.detach(io) ⇒ Object
12 |
# File 'lib/arf/reactor.rb', line 12 def self.detach(io) = instance.detach(io) |
.detach_client(client) ⇒ Object
11 |
# File 'lib/arf/reactor.rb', line 11 def self.detach_client(client) = instance.detach_client(client) |
.instance ⇒ Object
7 |
# File 'lib/arf/reactor.rb', line 7 def self.instance = (@instance ||= new) |
.post(task = nil) ⇒ Object
9 |
# File 'lib/arf/reactor.rb', line 9 def self.post(task = nil, &) = instance.post(task, &) |
Instance Method Details
#attach_server(server, handler_class, *handler_args) ⇒ Object
61 62 63 64 65 66 67 68 |
# File 'lib/arf/reactor.rb', line 61 def attach_server(server, handler_class, *handler_args) @log.debug("Attach server", server:, handler_class:) @todo << lambda { @map[server] = @nio.register(server, :r) @map[server].value = [handler_class, handler_args] } wakeup end |
#client_writes_pending(client) ⇒ Object
113 114 115 116 117 118 119 120 121 122 123 124 |
# File 'lib/arf/reactor.rb', line 113 def client_writes_pending(client) @log.debug("Writes pending", client: client.class.name, id: client.object_id) @todo << lambda { if (monitor = @map[io_for_client(client)]) monitor.interests = :rw else @log.warn("No monitor for client", id: client.object_id) end } wakeup @log.debug("Writes pending registered") end |
#connect(host, port, handler) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/arf/reactor.rb', line 33 def connect(host, port, handler) @log.debug("Establishing connection", host:, port:, handler:) handler = handler.new @todo << lambda { addr_info = Socket.getaddrinfo(host, port, nil, Socket::SOCK_STREAM).first family = addr_info[4] io = Socket.new(family, Socket::SOCK_STREAM, 0) begin io.connect_nonblock Socket.sockaddr_in(port, host) rescue Errno::EINPROGRESS @log.debug("Connection in progress", id: handler.object_id) @map[io] = @nio.register(io, :w) @map[io].value = [host, port, handler] @socket_source[io] = :client next end @log.debug("Connection succeeded", id: handler.object_id) @map[io] = @nio.register(io, :r) @map[io].value = handler @socket_source[io] = :client @log.debug("Async post_init dispatch", id: handler.object_id) post { handler.registered! } post { handler.post_init } if handler.respond_to?(:post_init) } wakeup handler end |
#detach(io) ⇒ Object
70 71 72 73 74 75 76 77 78 79 |
# File 'lib/arf/reactor.rb', line 70 def detach(io) @log.debug("Detach IO", io:) @todo << lambda { @nio.deregister(io) @map.delete(io) @socket_source.delete(io) io.close } wakeup end |
#detach_client(client) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/arf/reactor.rb', line 81 def detach_client(client) @log.debug("Detach client", client: client.class.name, id: client.object_id) @todo << lambda { io = io_for_client(client) next unless io @nio.deregister(io) @map.delete(io) @socket_source.delete(io) io.close } wakeup end |
#io_for_client(client) ⇒ Object
106 107 108 109 110 111 |
# File 'lib/arf/reactor.rb', line 106 def io_for_client(client) @map.each do |k, v| return k if v.value == client end nil end |
#post(task = nil, &block) ⇒ Object
95 96 97 98 99 100 101 102 103 104 |
# File 'lib/arf/reactor.rb', line 95 def post(task = nil, &block) task ||= block spawn source = caller[0] @executor << lambda do task.call rescue Exception => e @log.error("Async post execution failed", e, source:) end end |
#timer(interval, &block) ⇒ Object
28 29 30 31 |
# File 'lib/arf/reactor.rb', line 28 def timer(interval, &block) @log.debug("Attached timer", interval:, callable: block) Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute) end |