Class: Arf::Reactor

Inherits:
Object
  • Object
show all
Defined in:
lib/arf/reactor.rb

Constant Summary collapse

BEAT_INTERVAL =
3

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeReactor

Returns a new instance of Reactor.



18
19
20
21
22
23
24
25
26
# File 'lib/arf/reactor.rb', line 18

def initialize
  @nio = @executor = @thread = nil
  @stopping = false
  @todo = Queue.new
  @spawn_lock = Monitor.new
  @map = {}
  @socket_source = {}
  @log = Arf.configuration.logger.with_fields(subsystem: "Reactor")
end

Class Method Details

.attach_server(server, handler_class, *handler_args) ⇒ Object



14
15
16
# File 'lib/arf/reactor.rb', line 14

def self.attach_server(server, handler_class, *handler_args)
  instance.attach_server(server, handler_class, *handler_args)
end

.client_writes_pending(client) ⇒ Object



10
# File 'lib/arf/reactor.rb', line 10

def self.client_writes_pending(client) = instance.client_writes_pending(client)

.connect(host, port, handler) ⇒ Object



8
# File 'lib/arf/reactor.rb', line 8

def self.connect(host, port, handler) = instance.connect(host, port, handler)

.detach(io) ⇒ Object



12
# File 'lib/arf/reactor.rb', line 12

def self.detach(io) = instance.detach(io)

.detach_client(client) ⇒ Object



11
# File 'lib/arf/reactor.rb', line 11

def self.detach_client(client) = instance.detach_client(client)

.instanceObject



7
# File 'lib/arf/reactor.rb', line 7

def self.instance = (@instance ||= new)

.post(task = nil) ⇒ Object



9
# File 'lib/arf/reactor.rb', line 9

def self.post(task = nil, &) = instance.post(task, &)

Instance Method Details

#attach_server(server, handler_class, *handler_args) ⇒ Object



61
62
63
64
65
66
67
68
# File 'lib/arf/reactor.rb', line 61

def attach_server(server, handler_class, *handler_args)
  @log.debug("Attach server", server:, handler_class:)
  @todo << lambda {
    @map[server] = @nio.register(server, :r)
    @map[server].value = [handler_class, handler_args]
  }
  wakeup
end

#client_writes_pending(client) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/arf/reactor.rb', line 113

def client_writes_pending(client)
  @log.debug("Writes pending", client: client.class.name, id: client.object_id)
  @todo << lambda {
    if (monitor = @map[io_for_client(client)])
      monitor.interests = :rw
    else
      @log.warn("No monitor for client", id: client.object_id)
    end
  }
  wakeup
  @log.debug("Writes pending registered")
end

#connect(host, port, handler) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/arf/reactor.rb', line 33

def connect(host, port, handler)
  @log.debug("Establishing connection", host:, port:, handler:)
  handler = handler.new
  @todo << lambda {
    addr_info = Socket.getaddrinfo(host, port, nil, Socket::SOCK_STREAM).first
    family = addr_info[4]
    io = Socket.new(family, Socket::SOCK_STREAM, 0)
    begin
      io.connect_nonblock Socket.sockaddr_in(port, host)
    rescue Errno::EINPROGRESS
      @log.debug("Connection in progress", id: handler.object_id)
      @map[io] = @nio.register(io, :w)
      @map[io].value = [host, port, handler]
      @socket_source[io] = :client
      next
    end
    @log.debug("Connection succeeded", id: handler.object_id)
    @map[io] = @nio.register(io, :r)
    @map[io].value = handler
    @socket_source[io] = :client
    @log.debug("Async post_init dispatch", id: handler.object_id)
    post { handler.registered! }
    post { handler.post_init } if handler.respond_to?(:post_init)
  }
  wakeup
  handler
end

#detach(io) ⇒ Object



70
71
72
73
74
75
76
77
78
79
# File 'lib/arf/reactor.rb', line 70

def detach(io)
  @log.debug("Detach IO", io:)
  @todo << lambda {
    @nio.deregister(io)
    @map.delete(io)
    @socket_source.delete(io)
    io.close
  }
  wakeup
end

#detach_client(client) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/arf/reactor.rb', line 81

def detach_client(client)
  @log.debug("Detach client", client: client.class.name, id: client.object_id)
  @todo << lambda {
    io = io_for_client(client)
    next unless io

    @nio.deregister(io)
    @map.delete(io)
    @socket_source.delete(io)
    io.close
  }
  wakeup
end

#io_for_client(client) ⇒ Object



106
107
108
109
110
111
# File 'lib/arf/reactor.rb', line 106

def io_for_client(client)
  @map.each do |k, v|
    return k if v.value == client
  end
  nil
end

#post(task = nil, &block) ⇒ Object



95
96
97
98
99
100
101
102
103
104
# File 'lib/arf/reactor.rb', line 95

def post(task = nil, &block)
  task ||= block
  spawn
  source = caller[0]
  @executor << lambda do
    task.call
  rescue Exception => e
    @log.error("Async post execution failed", e, source:)
  end
end

#timer(interval, &block) ⇒ Object



28
29
30
31
# File 'lib/arf/reactor.rb', line 28

def timer(interval, &block)
  @log.debug("Attached timer", interval:, callable: block)
  Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute)
end