Class: SeeingIsBelieving::EventStream::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/seeing_is_believing/event_stream/consumer.rb

Defined Under Namespace

Classes: FinishCriteria

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(streams) ⇒ Consumer

Returns a new instance of Consumer.



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 74

def initialize(streams)
  @finished            = false
  self.finish_criteria = FinishCriteria.new
  self.queue           = Queue.new
  event_stream         = streams.fetch :events
  stdout_stream        = streams.fetch :stdout
  stderr_stream        = streams.fetch :stderr
  self.threads         = [
    Thread.new do
      begin
        stdout_stream.each_line { |line| queue << Events::Stdout.new(value: line) }
        queue << Events::StdoutClosed.new(side: :producer)
      rescue IOError
        queue << Events::StdoutClosed.new(side: :consumer)
      ensure
        queue << lambda { finish_criteria.stdout_thread_finished! }
      end
    end,

    Thread.new do
      begin
        stderr_stream.each_line { |line| queue << Events::Stderr.new(value: line) }
        queue << Events::StderrClosed.new(side: :producer)
      rescue IOError
        queue << Events::StderrClosed.new(side: :consumer)
      ensure
        queue << lambda { finish_criteria.stderr_thread_finished! }
      end
    end,

    Thread.new do
      begin
        event_stream.each_line { |line| queue << line }
        queue << Events::EventStreamClosed.new(side: :producer)
      rescue IOError
        queue << Events::EventStreamClosed.new(side: :consumer)
      ensure
        queue << lambda { finish_criteria.event_thread_finished! }
      end
    end,
  ]
end

Class Method Details

.fix_encoding(str) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 57

def self.fix_encoding(str)
  begin
    str.encode! Encoding::UTF_8
  rescue EncodingError
    str = str.force_encoding(Encoding::UTF_8)
  end
  return str.scrub('') if str.respond_to? :scrub
  # basically reimplement scrub, b/c it's not implemented on 2.0.0
  str.each_char.inject("") do |new_str, char|
    if char.valid_encoding?
      new_str << char
    else
      new_str << ''
    end
  end
end

Instance Method Details

#call(n = 1) ⇒ Object



117
118
119
120
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 117

def call(n=1)
  return next_event if n == 1
  Array.new(n) { next_event }
end

#eachObject



122
123
124
125
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 122

def each
  return to_enum :each unless block_given?
  yield call 1 until @finished
end

#joinObject



144
145
146
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 144

def join
  threads.each(&:join)
end

#process_exitstatus(status) ⇒ Object

NOTE: Note it’s probably a bad plan to call these methods from within the same thread as the consumer, because if it blocks, who will remove items from the queue?



130
131
132
133
134
135
136
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 130

def process_exitstatus(status)
  status ||= 1 # see #100
  queue << lambda {
    queue << Events::Exitstatus.new(value: status)
    finish_criteria.received_exitstatus!
  }
end

#process_timeout(seconds) ⇒ Object



137
138
139
140
141
142
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 137

def process_timeout(seconds)
  queue << lambda {
    queue << Events::Timeout.new(seconds: seconds)
    finish_criteria.received_timeout!
  }
end