Class: SeeingIsBelieving::EventStream::Consumer
- Inherits:
-
Object
- Object
- SeeingIsBelieving::EventStream::Consumer
- Defined in:
- lib/seeing_is_believing/event_stream/consumer.rb
Defined Under Namespace
Classes: FinishCriteria
Class Method Summary collapse
Instance Method Summary collapse
- #call(n = 1) ⇒ Object
- #each ⇒ Object
-
#initialize(streams) ⇒ Consumer
constructor
A new instance of Consumer.
- #join ⇒ Object
-
#process_exitstatus(status) ⇒ Object
NOTE: Note it’s probably a bad plan to call these methods from within the same thread as the consumer, because if it blocks, who will remove items from the queue?.
- #process_timeout(seconds) ⇒ Object
Constructor Details
#initialize(streams) ⇒ Consumer
Returns a new instance of Consumer.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 70 def initialize(streams) @finished = false self.finish_criteria = FinishCriteria.new self.queue = Queue.new event_stream = streams.fetch :events stdout_stream = streams.fetch :stdout stderr_stream = streams.fetch :stderr self.threads = [ Thread.new do begin stdout_stream.each_line { |line| queue << Events::Stdout.new(value: line) } queue << Events::StdoutClosed.new(side: :producer) rescue IOError queue << Events::StdoutClosed.new(side: :consumer) ensure queue << lambda { finish_criteria.stdout_thread_finished! } end end, Thread.new do begin stderr_stream.each_line { |line| queue << Events::Stderr.new(value: line) } queue << Events::StderrClosed.new(side: :producer) rescue IOError queue << Events::StderrClosed.new(side: :consumer) ensure queue << lambda { finish_criteria.stderr_thread_finished! } end end, Thread.new do begin event_stream.each_line { |line| queue << line } queue << Events::EventStreamClosed.new(side: :producer) rescue IOError queue << Events::EventStreamClosed.new(side: :consumer) ensure queue << lambda { finish_criteria.event_thread_finished! } end end, ] end |
Class Method Details
.fix_encoding(str) ⇒ Object
61 62 63 64 65 66 67 68 |
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 61 def self.fix_encoding(str) begin str.encode! Encoding::UTF_8 rescue EncodingError str = str.force_encoding(Encoding::UTF_8) end str.scrub('�') end |
Instance Method Details
#call(n = 1) ⇒ Object
113 114 115 116 |
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 113 def call(n=1) return next_event if n == 1 Array.new(n) { next_event } end |
#each ⇒ Object
118 119 120 121 |
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 118 def each return to_enum :each unless block_given? yield call 1 until @finished end |
#join ⇒ Object
140 141 142 |
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 140 def join threads.each(&:join) end |
#process_exitstatus(status) ⇒ Object
NOTE: Note it’s probably a bad plan to call these methods from within the same thread as the consumer, because if it blocks, who will remove items from the queue?
126 127 128 129 130 131 132 |
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 126 def process_exitstatus(status) status ||= 1 # see #100 queue << lambda { queue << Events::Exitstatus.new(value: status) finish_criteria.received_exitstatus! } end |