Class: DTAS::UNIXServer

Inherits:
Object
  • Object
show all
Defined in:
lib/dtas/unix_server.rb

Overview

The programming model for the event loop here aims to be compatible with EPOLLONESHOT use with epoll, since that fits my brain far better than existing evented APIs/frameworks. If we cared about scalability to thousands of clients, we'd really use epoll, but IO.select can be just as fast (or faster) with few descriptors and is obviously more portable.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(path) ⇒ UNIXServer

Returns a new instance of UNIXServer.


25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/dtas/unix_server.rb', line 25

def initialize(path)
  @path = path
  # lock down access by default, arbitrary commands may run as the
  # same user dtas-player runs as:
  old_umask = File.umask(0077)
  @to_io = Socket.new(:UNIX, :SEQPACKET, 0)
  addr = Socket.pack_sockaddr_un(path)
  begin
    @to_io.bind(addr)
  rescue Errno::EADDRINUSE
    # maybe we have an old path leftover from a killed process
    tmp = Socket.new(:UNIX, :SEQPACKET, 0)
    begin
      tmp.connect(addr)
      raise RuntimeError, "socket `#{path}' is in use", []
    rescue Errno::ECONNREFUSED
      # ok, leftover socket, unlink and rebind anyways
      File.unlink(path)
      @to_io.bind(addr)
    ensure
      tmp.close
    end
  end
  @to_io.listen(1024)
  @readers = { self => true }
  @writers = {}
ensure
  File.umask(old_umask)
end

Instance Attribute Details

#to_ioObject (readonly)

:nodoc:


18
19
20
# File 'lib/dtas/unix_server.rb', line 18

def to_io
  @to_io
end

Instance Method Details

#accept_nonblockObject


119
120
121
# File 'lib/dtas/unix_server.rb', line 119

def accept_nonblock
  @to_io.accept_nonblock(exception: false)
end

#closeObject


20
21
22
23
# File 'lib/dtas/unix_server.rb', line 20

def close
  File.unlink(@path)
  @to_io.close
end

#readable_iterObject


60
61
62
63
64
65
66
67
# File 'lib/dtas/unix_server.rb', line 60

def readable_iter
  # we do not do anything with the block passed to us
  case rv = accept_nonblock
  when :wait_readable then return rv
  else
    @readers[DTAS::UNIXAccepted.new(rv[0])] = true
  end while true
end

#run_onceObject


97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/dtas/unix_server.rb', line 97

def run_once
  # give IO.select one-shot behavior, snapshot and replace the watchlist
  begin
    r = IO.select(@readers.keys, @writers.keys) or return
  rescue IOError
    # this only happens when sinks error out
    @writers.delete_if { |io| io.to_io.closed? }
    retry
  end
  @hot_read = r[0]
  r[1].each do |io|
    @writers.delete(io)
    wait_ctl(io, io.writable_iter)
  end
  @hot_read = nil
  r[0].each do |io|
    @readers.delete(io)
    wait_ctl(io, io.readable_iter { |_io, msg| yield(_io, msg) })
  end
end

#wait_ctl(io, err) ⇒ Object


69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/dtas/unix_server.rb', line 69

def wait_ctl(io, err)
  case err
  when :hot_read
    # this is only safe when we're iterating through ready writers
    # the linear search for Array#include? is not expensive since
    # we usually don't have a lot of sinks.
    @hot_read << io unless @hot_read.include?(io)
  when :wait_readable
    @readers[io] = true
  when :wait_writable
    @writers[io] = true
  when :delete
    @readers.delete(io)
    @writers.delete(io)
  when :ignore
    # There are 2 cases for :ignore
    # - DTAS::Buffer was readable before, but all destinations (e.g. sinks)
    #   were blocked, so we stop caring for producer (buffer) readability.
    # - a consumer (e.g. DTAS::Sink) just became writable, but the
    #   corresponding DTAS::Buffer was already readable in a previous
    #   call.
  when nil, StandardError
    io.close
  else
    raise "BUG: wait_ctl invalid: #{io} #{err.inspect}"
  end
end

#write_failed(client, e) ⇒ Object


55
56
57
58
# File 'lib/dtas/unix_server.rb', line 55

def write_failed(client, e)
  warn "failed to write to #{client}: #{e.message} (#{e.class})"
  client.close
end