Class: Ffmprb::Util::ThreadedIoBuffer

Inherits:
Object
  • Object
show all
Includes:
ProcVis::Node
Defined in:
lib/ffmprb/util/threaded_io_buffer.rb

Overview

TODO the events mechanism is currently unused (and commented out) => synchro mechanism not needed

Defined Under Namespace

Classes: AllOutputsBrokenError, Stats

Class Attribute Summary collapse

Instance Attribute Summary collapse

Attributes included from ProcVis::Node

#_proc_vis

Instance Method Summary collapse

Methods included from ProcVis::Node

#proc_vis_edge, #proc_vis_name, #proc_vis_node

Constructor Details

#initialize(input, *outputs, keep_outputs_open_on_input_idle_limit: nil) ⇒ ThreadedIoBuffer

NOTE input/output can be lambdas for single asynchronic io evaluation

the lambdas must be timeout-interrupt-safe (since they are wrapped in timeout blocks)

NOTE all ios are being opened and closed as soon as possible



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/ffmprb/util/threaded_io_buffer.rb', line 29

def initialize(input, *outputs, keep_outputs_open_on_input_idle_limit: nil)
  super()  # NOTE for the monitor, apparently

  Ffmprb.logger.debug{"ThreadedIoBuffer initializing with (#{ThreadedIoBuffer.blocks_max}x#{ThreadedIoBuffer.block_size})"}

  @input = input
  @outputs = outputs.map do |outp|
    OpenStruct.new _io: outp, q: SizedQueue.new(ThreadedIoBuffer.blocks_max)
  end
  @stats = Stats.new(self)
  @keep_outputs_open_on_input_idle_limit = keep_outputs_open_on_input_idle_limit
  # @events = {}

  Thread.new "io buffer main" do
    init_reader!
    @outputs.each do |output|
      init_writer_output! output
      init_writer! output
    end

    Thread.join_children!.tap do
      Ffmprb.logger.debug{"ThreadedIoBuffer (#{@input.path}->#{@outputs.map(&:io).map(&:path)}) terminated successfully (#{stats})"}
    end
  end
end

Class Attribute Details

.block_sizeObject

Returns the value of attribute block_size.



15
16
17
# File 'lib/ffmprb/util/threaded_io_buffer.rb', line 15

def block_size
  @block_size
end

.blocks_maxObject

Returns the value of attribute blocks_max.



14
15
16
# File 'lib/ffmprb/util/threaded_io_buffer.rb', line 14

def blocks_max
  @blocks_max
end

.io_wait_timeoutObject

Returns the value of attribute io_wait_timeout.



18
19
20
# File 'lib/ffmprb/util/threaded_io_buffer.rb', line 18

def io_wait_timeout
  @io_wait_timeout
end

.timeoutObject

Returns the value of attribute timeout.



16
17
18
# File 'lib/ffmprb/util/threaded_io_buffer.rb', line 16

def timeout
  @timeout
end

.timeout_limitObject

Returns the value of attribute timeout_limit.



17
18
19
# File 'lib/ffmprb/util/threaded_io_buffer.rb', line 17

def timeout_limit
  @timeout_limit
end

Instance Attribute Details

#statsObject (readonly)

Returns the value of attribute stats.



23
24
25
# File 'lib/ffmprb/util/threaded_io_buffer.rb', line 23

def stats
  @stats
end

Instance Method Details

#labelObject

protected

def fire!(event)

wait_for_handler!
Ffmprb.logger.debug{"ThreadedIoBuffer firing #{event}"}
if blk = @events.to_h[event.to_sym]
  @handler_thr = Util::Thread.new "#{event} handler", &blk
end
@events[event.to_sym] = true

end handle_synchronously :fire!



97
98
99
# File 'lib/ffmprb/util/threaded_io_buffer.rb', line 97

def label
  "IObuff: Curr/Peak/Max=#{stats.blocks_buff}/#{stats.blocks_max}/#{ThreadedIoBuffer.blocks_max} In/Out=#{stats.bytes_in}/#{stats.bytes_out}"
end