Class: SeeingIsBelieving::EventStream::Consumer

Inherits:
Object
  • Object
show all
Defined in:
lib/seeing_is_believing/event_stream/consumer.rb

Defined Under Namespace

Classes: FinishCriteria

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(streams) ⇒ Consumer

Returns a new instance of Consumer.



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 70

def initialize(streams)
  @finished            = false
  self.finish_criteria = FinishCriteria.new
  self.queue           = Queue.new
  event_stream         = streams.fetch :events
  stdout_stream        = streams.fetch :stdout
  stderr_stream        = streams.fetch :stderr
  self.threads         = [
    Thread.new do
      begin
        stdout_stream.each_line { |line| queue << Events::Stdout.new(value: line) }
        queue << Events::StdoutClosed.new(side: :producer)
      rescue IOError
        queue << Events::StdoutClosed.new(side: :consumer)
      ensure
        queue << lambda { finish_criteria.stdout_thread_finished! }
      end
    end,

    Thread.new do
      begin
        stderr_stream.each_line { |line| queue << Events::Stderr.new(value: line) }
        queue << Events::StderrClosed.new(side: :producer)
      rescue IOError
        queue << Events::StderrClosed.new(side: :consumer)
      ensure
        queue << lambda { finish_criteria.stderr_thread_finished! }
      end
    end,

    Thread.new do
      begin
        event_stream.each_line { |line| queue << line }
        queue << Events::EventStreamClosed.new(side: :producer)
      rescue IOError
        queue << Events::EventStreamClosed.new(side: :consumer)
      ensure
        queue << lambda { finish_criteria.event_thread_finished! }
      end
    end,
  ]
end

Class Method Details

.fix_encoding(str) ⇒ Object



61
62
63
64
65
66
67
68
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 61

def self.fix_encoding(str)
  begin
    str.encode! Encoding::UTF_8
  rescue EncodingError
    str = str.force_encoding(Encoding::UTF_8)
  end
  str.scrub('')
end

Instance Method Details

#call(n = 1) ⇒ Object



113
114
115
116
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 113

def call(n=1)
  return next_event if n == 1
  Array.new(n) { next_event }
end

#eachObject



118
119
120
121
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 118

def each
  return to_enum :each unless block_given?
  yield call 1 until @finished
end

#joinObject



140
141
142
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 140

def join
  threads.each(&:join)
end

#process_exitstatus(status) ⇒ Object

NOTE: Note it’s probably a bad plan to call these methods from within the same thread as the consumer, because if it blocks, who will remove items from the queue?



126
127
128
129
130
131
132
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 126

def process_exitstatus(status)
  status ||= 1 # see #100
  queue << lambda {
    queue << Events::Exitstatus.new(value: status)
    finish_criteria.received_exitstatus!
  }
end

#process_timeout(seconds) ⇒ Object



133
134
135
136
137
138
# File 'lib/seeing_is_believing/event_stream/consumer.rb', line 133

def process_timeout(seconds)
  queue << lambda {
    queue << Events::Timeout.new(seconds: seconds)
    finish_criteria.received_timeout!
  }
end