Class: Ffmprb::Util::ThreadedIoBuffer
- Inherits:
-
Object
- Object
- Ffmprb::Util::ThreadedIoBuffer
- Includes:
- ProcVis::Node
- Defined in:
- lib/ffmprb/util/threaded_io_buffer.rb
Overview
TODO the events mechanism is currently unused (and commented out) => synchro mechanism not needed
Defined Under Namespace
Classes: AllOutputsBrokenError, Stats
Class Attribute Summary collapse
-
.block_size ⇒ Object
Returns the value of attribute block_size.
-
.blocks_max ⇒ Object
Returns the value of attribute blocks_max.
-
.io_wait_timeout ⇒ Object
Returns the value of attribute io_wait_timeout.
-
.timeout ⇒ Object
Returns the value of attribute timeout.
-
.timeout_limit ⇒ Object
Returns the value of attribute timeout_limit.
Instance Attribute Summary collapse
-
#stats ⇒ Object
readonly
Returns the value of attribute stats.
Attributes included from ProcVis::Node
Instance Method Summary collapse
-
#initialize(input, *outputs, keep_outputs_open_on_input_idle_limit: nil) ⇒ ThreadedIoBuffer
constructor
NOTE input/output can be lambdas for single asynchronic io evaluation the lambdas must be timeout-interrupt-safe (since they are wrapped in timeout blocks) NOTE all ios are being opened and closed as soon as possible.
-
#label ⇒ Object
protected.
Methods included from ProcVis::Node
#proc_vis_edge, #proc_vis_name, #proc_vis_node
Constructor Details
#initialize(input, *outputs, keep_outputs_open_on_input_idle_limit: nil) ⇒ ThreadedIoBuffer
NOTE input/output can be lambdas for single asynchronic io evaluation
the lambdas must be timeout-interrupt-safe (since they are wrapped in timeout blocks)
NOTE all ios are being opened and closed as soon as possible
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/ffmprb/util/threaded_io_buffer.rb', line 29 def initialize(input, *outputs, keep_outputs_open_on_input_idle_limit: nil) super() # NOTE for the monitor, apparently Ffmprb.logger.debug{"ThreadedIoBuffer initializing with (#{ThreadedIoBuffer.blocks_max}x#{ThreadedIoBuffer.block_size})"} @input = input @outputs = outputs.map do |outp| OpenStruct.new _io: outp, q: SizedQueue.new(ThreadedIoBuffer.blocks_max) end @stats = Stats.new(self) @keep_outputs_open_on_input_idle_limit = keep_outputs_open_on_input_idle_limit # @events = {} Thread.new "io buffer main" do init_reader! @outputs.each do |output| init_writer_output! output init_writer! output end Thread.join_children!.tap do Ffmprb.logger.debug{"ThreadedIoBuffer (#{@input.path}->#{@outputs.map(&:io).map(&:path)}) terminated successfully (#{stats})"} end end end |
Class Attribute Details
.block_size ⇒ Object
Returns the value of attribute block_size.
15 16 17 |
# File 'lib/ffmprb/util/threaded_io_buffer.rb', line 15 def block_size @block_size end |
.blocks_max ⇒ Object
Returns the value of attribute blocks_max.
14 15 16 |
# File 'lib/ffmprb/util/threaded_io_buffer.rb', line 14 def blocks_max @blocks_max end |
.io_wait_timeout ⇒ Object
Returns the value of attribute io_wait_timeout.
18 19 20 |
# File 'lib/ffmprb/util/threaded_io_buffer.rb', line 18 def io_wait_timeout @io_wait_timeout end |
.timeout ⇒ Object
Returns the value of attribute timeout.
16 17 18 |
# File 'lib/ffmprb/util/threaded_io_buffer.rb', line 16 def timeout @timeout end |
.timeout_limit ⇒ Object
Returns the value of attribute timeout_limit.
17 18 19 |
# File 'lib/ffmprb/util/threaded_io_buffer.rb', line 17 def timeout_limit @timeout_limit end |
Instance Attribute Details
#stats ⇒ Object (readonly)
Returns the value of attribute stats.
23 24 25 |
# File 'lib/ffmprb/util/threaded_io_buffer.rb', line 23 def stats @stats end |
Instance Method Details
#label ⇒ Object
protected
def fire!(event)
wait_for_handler!
Ffmprb.logger.debug{"ThreadedIoBuffer firing #{event}"}
if blk = @events.to_h[event.to_sym]
@handler_thr = Util::Thread.new "#{event} handler", &blk
end
@events[event.to_sym] = true
end handle_synchronously :fire!
97 98 99 |
# File 'lib/ffmprb/util/threaded_io_buffer.rb', line 97 def label "IObuff: Curr/Peak/Max=#{stats.blocks_buff}/#{stats.blocks_max}/#{ThreadedIoBuffer.blocks_max} In/Out=#{stats.bytes_in}/#{stats.bytes_out}" end |