Class: RFlow::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/rflow/message.rb

Defined Under Namespace

Classes: Data, ProcessingEvent

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(data_type_name, provenance = [], properties = {}, serialization_type = 'avro', schema = nil, serialized_data = nil) ⇒ Message

When creating a new message as a transformation of an existing message, its encouraged to copy the provenance and properties of the original message into the new message. This allows downstream components to potentially use these fields



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/rflow/message.rb', line 43

def initialize(data_type_name, provenance = [], properties = {}, serialization_type = 'avro', schema = nil, serialized_data = nil)
  @data_type_name = data_type_name.to_s

  # Turn the provenance array of Hashes into an array of
  # ProcessingEvents for easier access and time validation.
  # TODO: do this lazily so as not to create/destroy objects that are
  # never used
  @provenance = (provenance || []).map do |event|
    if event.is_a? ProcessingEvent
      event
    else
      ProcessingEvent.new(event['component_instance_uuid'],
                          event['started_at'], event['completed_at'],
                          event['context'])
    end
  end

  @properties = properties || {}

  # TODO: Make this better.  This check is technically
  # unnecessary, as we are able to completely deserialize the
  # message without needing to resort to the registered schema.
  registered_schema = RFlow::Configuration.available_data_types[@data_type_name][serialization_type.to_s]
  unless registered_schema
    raise ArgumentError, "Data type '#{@data_type_name}' with serialization_type '#{serialization_type}' not found"
  end

  # TODO: think about registering the schemas automatically if not
  # found in Configuration
  if schema && (registered_schema != schema)
    raise ArgumentError, "Passed schema ('#{schema}') does not equal registered schema ('#{registered_schema}') for data type '#{@data_type_name}' with serialization_type '#{serialization_type}'"
  end

  @data = Data.new(registered_schema, serialization_type.to_s, serialized_data)

  # Get the extensions and apply them to the data object to add capability
  RFlow::Configuration.available_data_extensions[@data_type_name].each do |e|
    RFlow.logger.debug "Extending '#{data_type_name}' with extension '#{e}'"
    @data.extend e
  end
end

Instance Attribute Details

#dataObject (readonly)

Returns the value of attribute data.



37
38
39
# File 'lib/rflow/message.rb', line 37

def data
  @data
end

#data_type_nameObject (readonly)

Returns the value of attribute data_type_name.



37
38
39
# File 'lib/rflow/message.rb', line 37

def data_type_name
  @data_type_name
end

#propertiesObject

Returns the value of attribute properties.



36
37
38
# File 'lib/rflow/message.rb', line 36

def properties
  @properties
end

#provenanceObject

Returns the value of attribute provenance.



36
37
38
# File 'lib/rflow/message.rb', line 36

def provenance
  @provenance
end

Class Method Details

.encode(message) ⇒ Object



24
# File 'lib/rflow/message.rb', line 24

def encode(message); RFlow::Avro.encode(message_writer, message); end

.from_avro(bytes) ⇒ Object

Take in an Avro serialization of a message and return a new Message object. Assumes the org.rflow.Message Avro schema.



28
29
30
31
32
33
# File 'lib/rflow/message.rb', line 28

def from_avro(bytes)
  message = RFlow::Avro.decode(message_reader, bytes)
  Message.new(message['data_type_name'], message['provenance'], message['properties'],
              message['data_serialization_type'], message['data_schema'],
              message['data'])
end

.message_readerObject



22
# File 'lib/rflow/message.rb', line 22

def message_reader; @message_reader ||= ::Avro::IO::DatumReader.new(schema, schema); end

.message_writerObject



23
# File 'lib/rflow/message.rb', line 23

def message_writer; @message_writer ||= ::Avro::IO::DatumWriter.new(schema); end

.schemaObject



21
# File 'lib/rflow/message.rb', line 21

def schema; @schema ||= ::Avro::Schema.parse(File.read(File.join(File.dirname(__FILE__), '..', '..', 'schema', 'message.avsc'))); end

Instance Method Details

#to_avroObject

Serialize the current message object to Avro using the org.rflow.Message Avro schema. Note that we have to manually set the encoding for Ruby 1.9, otherwise the stringio would use UTF-8 by default, which would not work correctly, as a serialize avro string is BINARY, not UTF-8



90
91
92
93
94
95
96
97
98
99
100
# File 'lib/rflow/message.rb', line 90

def to_avro
  # stringify all the properties
  string_properties = Hash[properties.map { |k,v| [k.to_s, v.to_s] }]

  Message.encode('data_type_name' => data_type_name.to_s,
                 'provenance' => provenance.map(&:to_hash),
                 'properties' => string_properties.to_hash,
                 'data_serialization_type' => data.serialization_type.to_s,
                 'data_schema' => data.schema_string,
                 'data' => data.to_avro)
end