Module: ETLProducer

Includes:
ETLBase
Included in:
EventDestination, TeacherSource, TeacherTimetableSlotSource, TeacherTimetableSlotTransformer
Defined in:
lib/actors/etl_producer.rb

Overview

Producer module for implementing ETL consumer - producer Actor protocol.

ETL consumer - producer protocol with back-pressure is rather simple at it's core: There are two parties: producer and consumer that are both actor instances.

  • Producer produces (by loading, computing, generating, etc.) data (called rows) and sends them asynchronously to the consumer.

  • Consumer accepts rows from the producer and does something with them (saves them, sends them somewhere else, etc.).

Because rows are exchanged asynchronously and consuming could be slower than producing, a way how to limit data flow between the actors is needed.

This is solved by utilising back-pressure: Producer must only produce and send rows when consumer can accept them. The ability to receive rows is signalled by a message from the consumer to the producer. In response to that message a single row can be sent to the consumer. If the producer has no rows available for sending at the moment, it should internally store the state of its consumer and send a row when it becomes available. After the consumer is finished with processing the received row, it requests another row from the producer and the cycle repeats.

Instance Method Summary collapse

Methods included from ETLBase

#empty?, #eof_received?, #logger, #mark_empty!, #receive_eof, #unmark_empty!

Instance Method Details

#buffer_empty?Boolean

Returns:

  • (Boolean)

59
60
61
# File 'lib/actors/etl_producer.rb', line 59

def buffer_empty?
  @_buffer.nil?
end

#buffer_full?Boolean

Returns:

  • (Boolean)

63
64
65
# File 'lib/actors/etl_producer.rb', line 63

def buffer_full?
  !@_buffer.nil?
end

#buffer_popObject


67
68
69
70
71
72
# File 'lib/actors/etl_producer.rb', line 67

def buffer_pop
  raise "Output buffer empty." if buffer_empty?
  row = @_buffer
  @_buffer = nil
  row
end

#buffer_put(row) ⇒ Object


74
75
76
77
# File 'lib/actors/etl_producer.rb', line 74

def buffer_put(row)
  raise "Output buffer full." if buffer_full?
  @_buffer = row
end

#emit_eofObject

Sends EOF signal to actor's output (if it has one).


42
43
44
45
# File 'lib/actors/etl_producer.rb', line 42

def emit_eof
  logger.debug "Emiting EOF: #{self.class.name} -> #{@_output}"
  Celluloid::Actor[@_output].async.receive_eof if @_output
end

#emit_row(row) ⇒ Object

Sends a single row to actor's output asynchronously.


36
37
38
39
# File 'lib/actors/etl_producer.rb', line 36

def emit_row(row)
  logger.debug "Emiting row: #{self.class.name} -> #{@_output}"
  Celluloid::Actor[@_output].async.consume_row(row)
end

#output=(output) ⇒ Object

Sets actor output for sending produced rows and EOS.

Parameters:

  • output (Symbol)

    actor name in Celluloid actor registry


31
32
33
# File 'lib/actors/etl_producer.rb', line 31

def output=(output)
  @_output = output
end

#output_hungry?Boolean

Returns:

  • (Boolean)

95
96
97
# File 'lib/actors/etl_producer.rb', line 95

def output_hungry?
  @_output_state == :hungry
end

#output_row(row) ⇒ Object

Stores a single row in a local output buffer (in case output is stuffed) or sends it to the output directly.


49
50
51
52
53
54
55
56
57
# File 'lib/actors/etl_producer.rb', line 49

def output_row(row)
  if output_hungry?
    @_output_state = :stuffed
    emit_row(row)
    produce_row() unless empty?
  else
    buffer_put(row)
  end
end

#process_eofObject


90
91
92
93
# File 'lib/actors/etl_producer.rb', line 90

def process_eof
  @_eof_received = true
  emit_eof if empty?
end

#produce_rowObject

Tries to generate a single row and then send it to the output or output buffer if the row was successfully generated. If no row was generated, it sets empty flag and notifies its input (if it has one).

You should define #generate_row method on your producer actor, which returns either a single row (pretty much anything) or throws EndOfData or StopIteration error in case there there is no more rows to generate. (hint: Ruby's Enumerator#next behaves exactly like that)


107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/actors/etl_producer.rb', line 107

def produce_row
  unmark_empty!
  begin
    output_row(generate_row())
    logger.debug "Generating a row."
  rescue StopIteration, EndOfData
    logger.debug "All pending rows processed."
    mark_empty!
    notify_hungry if respond_to? :notify_hungry
    emit_eof if eof_received?
  end
end

#receive_hungryObject

Receive work request from it's output.


80
81
82
83
84
85
86
87
88
# File 'lib/actors/etl_producer.rb', line 80

def receive_hungry
  return if output_hungry?
  if buffer_empty?
    @_output_state = :hungry
  else
    emit_row(buffer_pop())
    produce_row() unless empty?
  end
end