Module: ETLConsumer

Includes:
ETLBase
Included in:
EventDestination, EventsCleaner, TeacherTimetableSlotSource, TeacherTimetableSlotTransformer
Defined in:
lib/actors/etl_consumer.rb

Overview

Consumer module for implementing ETL consumer - producer Actor protocol.

For details about the protocol see comments on ETLProducer.

Instance Method Summary collapse

Methods included from ETLBase

#empty?, #eof_received?, #logger, #mark_empty!, #receive_eof, #unmark_empty!

Instance Method Details

#consume_row(row) ⇒ Object

Receive a single row from actor's source(s).

Actor specific #process_row implementation is called with received row and the return value is stored in processed_row.


20
21
22
23
24
# File 'lib/actors/etl_consumer.rb', line 20

def consume_row(row)
  raise "#{self.class.name}: Received row when not empty!" unless empty?
  self.processed_row = process_row(row)
  produce_row() if respond_to?(:produce_row)
end

#input=(input) ⇒ Object

Sets actor input for sending hungry notifications.

Parameters:

  • input (Symbol)

    actor name in Celluloid actor registry


12
13
14
# File 'lib/actors/etl_consumer.rb', line 12

def input=(input)
  @_input = input
end

#notify_hungryObject

Send asynchronous "hungry" notification to it's input (if it has one).

Hungry notification means that more work should be sent to this actor.


29
30
31
32
33
34
35
36
# File 'lib/actors/etl_consumer.rb', line 29

def notify_hungry
  if @_input
    logger.debug "Sending hungry notification to #{@_input}."
    Celluloid::Actor[@_input].async.receive_hungry
  else
    logger.debug "Nowhere to send hungry notification. No input specified."
  end
end

#pop_processed_rowObject


47
48
49
50
51
# File 'lib/actors/etl_consumer.rb', line 47

def pop_processed_row
  row = @_processed_row
  @_processed_row = nil
  row
end

#processed_rowObject


53
54
55
# File 'lib/actors/etl_consumer.rb', line 53

def processed_row
  @_processed_row
end

#processed_row=(row) ⇒ Object


43
44
45
# File 'lib/actors/etl_consumer.rb', line 43

def processed_row=(row)
  @_processed_row = row
end

#start!Object


38
39
40
41
# File 'lib/actors/etl_consumer.rb', line 38

def start!
  mark_empty!
  notify_hungry
end