Class: SeeingIsBelieving::EventStream::Consumer
- Inherits:
-
Object
- Object
- SeeingIsBelieving::EventStream::Consumer
- Defined in:
- lib/seeing_is_believing/event_stream/consumer.rb
Defined Under Namespace
Classes: FinishCriteria
Class Method Summary collapse
Instance Method Summary collapse
- #call(n = 1) ⇒ Object
- #each ⇒ Object
-
#initialize(streams) ⇒ Consumer
constructor
A new instance of Consumer.
- #join ⇒ Object
-
#process_exitstatus(status) ⇒ Object
NOTE: Note it’s probably a bad plan to call these methods from within the same thread as the consumer, because if it blocks, who will remove items from the queue?.
- #process_timeout(seconds) ⇒ Object
Constructor Details
#initialize(streams) ⇒ Consumer
Returns a new instance of Consumer.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 70 def initialize(streams) @finished = false self.finish_criteria = FinishCriteria.new self.queue = Queue.new event_stream = streams.fetch :events stdout_stream = streams.fetch :stdout stderr_stream = streams.fetch :stderr self.threads = [ Thread.new do begin stdout_stream.each_line { |line| queue << Events::Stdout.new(value: line) } queue << Events::StdoutClosed.new(side: :producer) rescue IOError queue << Events::StdoutClosed.new(side: :consumer) ensure queue << lambda { finish_criteria.stdout_thread_finished! } end end, Thread.new do begin stderr_stream.each_line { |line| queue << Events::Stderr.new(value: line) } queue << Events::StderrClosed.new(side: :producer) rescue IOError queue << Events::StderrClosed.new(side: :consumer) ensure queue << lambda { finish_criteria.stderr_thread_finished! } end end, Thread.new do begin event_stream.each_line { |line| queue << line } queue << Events::EventStreamClosed.new(side: :producer) rescue IOError queue << Events::EventStreamClosed.new(side: :consumer) ensure queue << lambda { finish_criteria.event_thread_finished! } end end, ] end |
Class Method Details
.fix_encoding(str) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 53 def self.fix_encoding(str) begin str.encode! Encoding::UTF_8 rescue EncodingError str = str.force_encoding(Encoding::UTF_8) end return str.scrub('�') if str.respond_to? :scrub # basically reimplement scrub, b/c it's not implemented on 1.9.3 str.each_char.inject("") do |new_str, char| if char.valid_encoding? new_str << char else new_str << '�' end end end |
Instance Method Details
#call(n = 1) ⇒ Object
113 114 115 116 |
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 113 def call(n=1) return next_event if n == 1 Array.new(n) { next_event } end |
#each ⇒ Object
118 119 120 121 |
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 118 def each return to_enum :each unless block_given? yield call 1 until @finished end |
#join ⇒ Object
139 140 141 |
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 139 def join threads.each(&:join) end |
#process_exitstatus(status) ⇒ Object
NOTE: Note it’s probably a bad plan to call these methods from within the same thread as the consumer, because if it blocks, who will remove items from the queue?
126 127 128 129 130 131 |
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 126 def process_exitstatus(status) queue << lambda { queue << Events::Exitstatus.new(value: status) finish_criteria.received_exitstatus! } end |