Class: SeeingIsBelieving::EventStream::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/seeing_is_believing/event_stream/consumer.rb

Defined Under Namespace

Classes: FinishCriteria

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(streams) ⇒ Consumer

Returns a new instance of Consumer.



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 70

def initialize(streams)
  @finished            = false
  self.finish_criteria = FinishCriteria.new
  self.queue           = Queue.new
  event_stream         = streams.fetch :events
  stdout_stream        = streams.fetch :stdout
  stderr_stream        = streams.fetch :stderr

  Thread.new do
    begin
      stdout_stream.each_line { |line| queue << Events::Stdout.new(value: line) }
      queue << Events::StdoutClosed.new(side: :producer)
    rescue IOError
      queue << Events::StdoutClosed.new(side: :consumer)
    ensure
      queue << lambda { finish_criteria.stdout_thread_finished! }
    end
  end

  Thread.new do
    begin
      stderr_stream.each_line { |line| queue << Events::Stderr.new(value: line) }
      queue << Events::StderrClosed.new(side: :producer)
    rescue IOError
      queue << Events::StderrClosed.new(side: :consumer)
    ensure
      queue << lambda { finish_criteria.stderr_thread_finished! }
    end
  end

  Thread.new do
    begin
      event_stream.each_line { |line| queue << line }
      queue << Events::EventStreamClosed.new(side: :producer)
    rescue IOError
      queue << Events::EventStreamClosed.new(side: :consumer)
    ensure
      queue << lambda { finish_criteria.event_thread_finished! }
    end
  end
end

Class Method Details

.fix_encoding(str) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 53

def self.fix_encoding(str)
  begin
    str.encode! Encoding::UTF_8
  rescue EncodingError
    str = str.force_encoding(Encoding::UTF_8)
  end
  return str.scrub('') if str.respond_to? :scrub
  # basically reimplement scrub, b/c it's not implemented on 1.9.3
  str.each_char.inject("") do |new_str, char|
    if char.valid_encoding?
      new_str << char
    else
      new_str << ''
    end
  end
end

Instance Method Details

#call(n = 1) ⇒ Object



112
113
114
115
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 112

def call(n=1)
  return next_event if n == 1
  Array.new(n) { next_event }
end

#eachObject



117
118
119
120
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 117

def each
  return to_enum :each unless block_given?
  yield call 1 until @finished
end

#process_exitstatus(status) ⇒ Object

NOTE: Note it’s probably a bad plan to call these methods from within the same thread as the consumer, because if it blocks, who will remove items from the queue?



125
126
127
128
129
130
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 125

def process_exitstatus(status)
  queue << lambda {
    queue << Events::Exitstatus.new(value: status)
    finish_criteria.received_exitstatus!
  }
end

#process_timeout(seconds) ⇒ Object



131
132
133
134
135
136
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 131

def process_timeout(seconds)
  queue << lambda {
    queue << Events::Timeout.new(seconds: seconds)
    finish_criteria.received_timeout!
  }
end