Class: SeeingIsBelieving::EventStream::Consumer
- Inherits:
-
Object
- Object
- SeeingIsBelieving::EventStream::Consumer
- Defined in:
- lib/seeing_is_believing/event_stream/consumer.rb
Defined Under Namespace
Classes: FinishCriteria
Class Method Summary collapse
Instance Method Summary collapse
- #call(n = 1) ⇒ Object
- #each ⇒ Object
-
#initialize(streams) ⇒ Consumer
constructor
A new instance of Consumer.
- #join ⇒ Object
-
#process_exitstatus(status) ⇒ Object
NOTE: Note it’s probably a bad plan to call these methods from within the same thread as the consumer, because if it blocks, who will remove items from the queue?.
- #process_timeout(seconds) ⇒ Object
Constructor Details
#initialize(streams) ⇒ Consumer
Returns a new instance of Consumer.
74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 74 def initialize(streams) @finished = false self.finish_criteria = FinishCriteria.new self.queue = Queue.new event_stream = streams.fetch :events stdout_stream = streams.fetch :stdout stderr_stream = streams.fetch :stderr self.threads = [ Thread.new do begin stdout_stream.each_line { |line| queue << Events::Stdout.new(value: line) } queue << Events::StdoutClosed.new(side: :producer) rescue IOError queue << Events::StdoutClosed.new(side: :consumer) ensure queue << lambda { finish_criteria.stdout_thread_finished! } end end, Thread.new do begin stderr_stream.each_line { |line| queue << Events::Stderr.new(value: line) } queue << Events::StderrClosed.new(side: :producer) rescue IOError queue << Events::StderrClosed.new(side: :consumer) ensure queue << lambda { finish_criteria.stderr_thread_finished! } end end, Thread.new do begin event_stream.each_line { |line| queue << line } queue << Events::EventStreamClosed.new(side: :producer) rescue IOError queue << Events::EventStreamClosed.new(side: :consumer) ensure queue << lambda { finish_criteria.event_thread_finished! } end end, ] end |
Class Method Details
.fix_encoding(str) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 57 def self.fix_encoding(str) begin str.encode! Encoding::UTF_8 rescue EncodingError str = str.force_encoding(Encoding::UTF_8) end return str.scrub('�') if str.respond_to? :scrub # basically reimplement scrub, b/c it's not implemented on 2.0.0 str.each_char.inject("") do |new_str, char| if char.valid_encoding? new_str << char else new_str << '�' end end end |
Instance Method Details
#call(n = 1) ⇒ Object
117 118 119 120 |
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 117 def call(n=1) return next_event if n == 1 Array.new(n) { next_event } end |
#each ⇒ Object
122 123 124 125 |
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 122 def each return to_enum :each unless block_given? yield call 1 until @finished end |
#join ⇒ Object
144 145 146 |
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 144 def join threads.each(&:join) end |
#process_exitstatus(status) ⇒ Object
NOTE: Note it’s probably a bad plan to call these methods from within the same thread as the consumer, because if it blocks, who will remove items from the queue?
130 131 132 133 134 135 136 |
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 130 def process_exitstatus(status) status ||= 1 # see #100 queue << lambda { queue << Events::Exitstatus.new(value: status) finish_criteria.received_exitstatus! } end |