Class: RFlow::Message
- Inherits:
-
Object
- Object
- RFlow::Message
- Defined in:
- lib/rflow/message.rb
Defined Under Namespace
Classes: Data, ProcessingEvent
Instance Attribute Summary collapse
-
#data ⇒ Object
readonly
Returns the value of attribute data.
-
#data_type_name ⇒ Object
readonly
Returns the value of attribute data_type_name.
-
#properties ⇒ Object
Returns the value of attribute properties.
-
#provenance ⇒ Object
Returns the value of attribute provenance.
Class Method Summary collapse
- .encode(message) ⇒ Object
-
.from_avro(bytes) ⇒ Object
Take in an Avro serialization of a message and return a new Message object.
- .message_reader ⇒ Object
- .message_writer ⇒ Object
- .schema ⇒ Object
Instance Method Summary collapse
-
#initialize(data_type_name, provenance = [], properties = {}, serialization_type = 'avro', schema = nil, serialized_data = nil) ⇒ Message
constructor
When creating a new message as a transformation of an existing message, its encouraged to copy the provenance and properties of the original message into the new message.
-
#to_avro ⇒ Object
Serialize the current message object to Avro using the org.rflow.Message Avro schema.
Constructor Details
#initialize(data_type_name, provenance = [], properties = {}, serialization_type = 'avro', schema = nil, serialized_data = nil) ⇒ Message
When creating a new message as a transformation of an existing message, its encouraged to copy the provenance and properties of the original message into the new message. This allows downstream components to potentially use these fields
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/rflow/message.rb', line 43 def initialize(data_type_name, provenance = [], properties = {}, serialization_type = 'avro', schema = nil, serialized_data = nil) @data_type_name = data_type_name.to_s # Turn the provenance array of Hashes into an array of # ProcessingEvents for easier access and time validation. # TODO: do this lazily so as not to create/destroy objects that are # never used @provenance = (provenance || []).map do |event| if event.is_a? ProcessingEvent event else ProcessingEvent.new(event['component_instance_uuid'], event['started_at'], event['completed_at'], event['context']) end end @properties = properties || {} # TODO: Make this better. This check is technically # unnecessary, as we are able to completely deserialize the # message without needing to resort to the registered schema. registered_schema = RFlow::Configuration.available_data_types[@data_type_name][serialization_type.to_s] unless registered_schema raise ArgumentError, "Data type '#{@data_type_name}' with serialization_type '#{serialization_type}' not found" end # TODO: think about registering the schemas automatically if not # found in Configuration if schema && (registered_schema != schema) raise ArgumentError, "Passed schema ('#{schema}') does not equal registered schema ('#{registered_schema}') for data type '#{@data_type_name}' with serialization_type '#{serialization_type}'" end @data = Data.new(registered_schema, serialization_type.to_s, serialized_data) # Get the extensions and apply them to the data object to add capability RFlow::Configuration.available_data_extensions[@data_type_name].each do |e| RFlow.logger.debug "Extending '#{data_type_name}' with extension '#{e}'" @data.extend e end end |
Instance Attribute Details
#data ⇒ Object (readonly)
Returns the value of attribute data.
37 38 39 |
# File 'lib/rflow/message.rb', line 37 def data @data end |
#data_type_name ⇒ Object (readonly)
Returns the value of attribute data_type_name.
37 38 39 |
# File 'lib/rflow/message.rb', line 37 def data_type_name @data_type_name end |
#properties ⇒ Object
Returns the value of attribute properties.
36 37 38 |
# File 'lib/rflow/message.rb', line 36 def properties @properties end |
#provenance ⇒ Object
Returns the value of attribute provenance.
36 37 38 |
# File 'lib/rflow/message.rb', line 36 def provenance @provenance end |
Class Method Details
.encode(message) ⇒ Object
24 |
# File 'lib/rflow/message.rb', line 24 def encode(); RFlow::Avro.encode(, ); end |
.from_avro(bytes) ⇒ Object
Take in an Avro serialization of a message and return a new Message object. Assumes the org.rflow.Message Avro schema.
28 29 30 31 32 33 |
# File 'lib/rflow/message.rb', line 28 def from_avro(bytes) = RFlow::Avro.decode(, bytes) Message.new(['data_type_name'], ['provenance'], ['properties'], ['data_serialization_type'], ['data_schema'], ['data']) end |
.message_reader ⇒ Object
22 |
# File 'lib/rflow/message.rb', line 22 def ; ||= ::Avro::IO::DatumReader.new(schema, schema); end |
.message_writer ⇒ Object
23 |
# File 'lib/rflow/message.rb', line 23 def ; ||= ::Avro::IO::DatumWriter.new(schema); end |
.schema ⇒ Object
21 |
# File 'lib/rflow/message.rb', line 21 def schema; @schema ||= ::Avro::Schema.parse(File.read(File.join(File.dirname(__FILE__), '..', '..', 'schema', 'message.avsc'))); end |
Instance Method Details
#to_avro ⇒ Object
Serialize the current message object to Avro using the org.rflow.Message Avro schema. Note that we have to manually set the encoding for Ruby 1.9, otherwise the stringio would use UTF-8 by default, which would not work correctly, as a serialize avro string is BINARY, not UTF-8
90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/rflow/message.rb', line 90 def to_avro # stringify all the properties string_properties = Hash[properties.map { |k,v| [k.to_s, v.to_s] }] Message.encode('data_type_name' => data_type_name.to_s, 'provenance' => provenance.map(&:to_hash), 'properties' => string_properties.to_hash, 'data_serialization_type' => data.serialization_type.to_s, 'data_schema' => data.schema_string, 'data' => data.to_avro) end |