Class: RFlow::Message
- Inherits:
-
Object
- Object
- RFlow::Message
- Defined in:
- lib/rflow/message.rb,
lib/rflow/components/log.rb,
lib/rflow/components/raw.rb,
lib/rflow/components/clock.rb,
lib/rflow/components/integer.rb
Overview
A message to be sent around in the RFlow framework.
Defined Under Namespace
Modules: Clock Classes: Data, ProcessingEvent
Instance Attribute Summary collapse
-
#data ⇒ String
readonly
The actual data string in the message.
-
#data_type_name ⇒ String
readonly
The data type name of the message.
-
#properties ⇒ Hash
The message’s properties information.
-
#provenance ⇒ Array<ProcessingEvent>
The message’s provenance information.
Instance Method Summary collapse
-
#initialize(data_type_name, provenance = [], properties = {}, serialization_type = 'avro', schema = nil, serialized_data = nil) ⇒ Message
constructor
When creating a new message as a transformation of an existing message, it’s encouraged to copy the provenance and properties of the original message into the new message.
-
#to_avro ⇒ String
Serialize the current message object to Avro using the org.rflow.Message Avro schema.
Constructor Details
#initialize(data_type_name, provenance = [], properties = {}, serialization_type = 'avro', schema = nil, serialized_data = nil) ⇒ Message
When creating a new message as a transformation of an existing message, it’s encouraged to copy the provenance and properties of the original message into the new message. This allows downstream components to potentially use these fields.
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/rflow/message.rb', line 71 def initialize(data_type_name, provenance = [], properties = {}, serialization_type = 'avro', schema = nil, serialized_data = nil) @data_type_name = data_type_name.to_s # Turn the provenance array of Hashes into an array of # ProcessingEvents for easier access and time validation. # TODO: do this lazily so as not to create/destroy objects that are # never used @provenance = (provenance || []).map do |event| if event.is_a? ProcessingEvent event else ProcessingEvent.new(event['component_instance_uuid'], event['started_at'], event['completed_at'], event['context']) end end @properties = properties || {} # TODO: Make this better. This check is technically # unnecessary, as we are able to completely deserialize the # message without needing to resort to the registered schema. registered_schema = RFlow::Configuration.available_data_types[@data_type_name][serialization_type.to_s] unless registered_schema raise ArgumentError, "Data type '#{@data_type_name}' with serialization_type '#{serialization_type}' not found" end # TODO: think about registering the schemas automatically if not # found in Configuration if schema && (registered_schema != schema) raise ArgumentError, "Passed schema ('#{schema}') does not equal registered schema ('#{registered_schema}') for data type '#{@data_type_name}' with serialization_type '#{serialization_type}'" end @data = Data.new(registered_schema, serialization_type.to_s, serialized_data) # Get the extensions and apply them to the data object to add capability RFlow::Configuration.available_data_extensions[@data_type_name].each do |e| RFlow.logger.debug "Extending '#{data_type_name}' with extension '#{e}'" @data.extend e end end |
Instance Attribute Details
#data ⇒ String (readonly)
The actual data string in the message.
65 66 67 |
# File 'lib/rflow/message.rb', line 65 def data @data end |
#data_type_name ⇒ String (readonly)
The data type name of the message.
61 62 63 |
# File 'lib/rflow/message.rb', line 61 def data_type_name @data_type_name end |
#properties ⇒ Hash
The message’s properties information.
57 58 59 |
# File 'lib/rflow/message.rb', line 57 def properties @properties end |
#provenance ⇒ Array<ProcessingEvent>
The message’s provenance information.
53 54 55 |
# File 'lib/rflow/message.rb', line 53 def provenance @provenance end |
Instance Method Details
#to_avro ⇒ String
Serialize the current message object to Avro using the org.rflow.Message Avro schema. Note that we have to manually set the encoding for Ruby 1.9, otherwise the stringio would use UTF-8 by default, which would not work correctly, as a serialize avro string is BINARY, not UTF-8.
119 120 121 122 123 124 125 126 127 128 129 |
# File 'lib/rflow/message.rb', line 119 def to_avro # stringify all the properties string_properties = Hash[properties.map { |k,v| [k.to_s, v.to_s] }] Message.encode('data_type_name' => data_type_name.to_s, 'provenance' => provenance.map(&:to_hash), 'properties' => string_properties.to_hash, 'data_serialization_type' => data.serialization_type.to_s, 'data_schema' => data.schema_string, 'data' => data.to_avro) end |