Class: LivereloadRails::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/livereload_rails/stream.rb

Overview

A non-blocking connection.

Constant Summary collapse

READ_CHUNK =
1024 * 10
EMPTY =
"".freeze
SWALLOW_ERRORS =
[EOFError, IOError, Errno::EPIPE, Errno::ECONNRESET, Errno::EPROTOTYPE]

Instance Method Summary collapse

Constructor Details

#initialize(io) {|input| ... } ⇒ Stream

Returns a new instance of Stream.

Examples:

stream = Stream.new(io) do |input|
  # handle input
end
stream.loop

Parameters:

  • io (#read_nonblock, #write_nonblock)

Yields:

  • (input)

    whenever there is input to be consumed.

Yield Parameters:

  • input (String)

    streaming input data.



21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/livereload_rails/stream.rb', line 21

def initialize(io)
  @io = io
  @io_writer = @io.dup

  @read_block = Proc.new

  @input_buffer = "".b
  @output_buffer = "".b
  @output_queue = []
  @mutex = Mutex.new

  @wakeup, @waker = IO.pipe
end

Instance Method Details

#closeObject

Close the connection immediately.

TODO: SO_LINGER, close before or after sending outgoing data?



51
52
53
54
55
56
# File 'lib/livereload_rails/stream.rb', line 51

def close
  @io.close unless @io.closed?
  @io_writer.close unless @io_writer.closed?
  @waker.close unless @waker.closed?
  @wakeup.close unless @wakeup.closed?
end

#closed?Boolean

Returns true if stream is closed.

Returns:

  • (Boolean)

    true if stream is closed.



59
60
61
# File 'lib/livereload_rails/stream.rb', line 59

def closed?
  @io.closed?
end

#loop(selector = NIO::Selector.new) ⇒ Object

Parameters:

  • selector (NIO::Selector) (defaults to: NIO::Selector.new)


64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/livereload_rails/stream.rb', line 64

def loop(selector = NIO::Selector.new)
  @looping = ! closed?
  return unless @looping

  wakeup_monitor = selector.register(@wakeup, :r)
  wakeup_monitor.value = handler_for(:wakeup_handler)

  read_monitor = selector.register(@io, :r)
  read_monitor.value = handler_for(:read_handler)

  register_writer(selector)

  while @looping
    selector.select { |monitor| monitor.value.call(monitor) }
  end
ensure
  selector.deregister(@io)
  selector.deregister(@io_writer)
  selector.deregister(@wakeup)
end

#write(message) ⇒ Object

Queue a message to be sent later on the stream.

There is no guarantee that the message will arrive. If you want a receipt of any kind you will need to wait for a reply.

Parameters:

  • message (String)


41
42
43
44
45
46
# File 'lib/livereload_rails/stream.rb', line 41

def write(message)
  @mutex.synchronize do
    @output_queue.push(message)
    @waker.write("\0")
  end
end