Class: RFlow::Message

Inherits:
Object
  • Object
show all
Defined in:
lib/rflow/message.rb,
lib/rflow/components/log.rb,
lib/rflow/components/raw.rb,
lib/rflow/components/clock.rb,
lib/rflow/components/integer.rb

Overview

A message to be sent around in the RFlow framework.

Defined Under Namespace

Modules: Clock Classes: Data, ProcessingEvent

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(data_type_name, provenance = [], properties = {}, serialization_type = 'avro', schema = nil, serialized_data = nil) ⇒ Message

When creating a new message as a transformation of an existing message, it’s encouraged to copy the provenance and properties of the original message into the new message. This allows downstream components to potentially use these fields.



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/rflow/message.rb', line 75

def initialize(data_type_name, provenance = [], properties = {}, serialization_type = 'avro', schema = nil, serialized_data = nil)
  @data_type_name = data_type_name.to_s

  # Turn the provenance array of Hashes into an array of
  # ProcessingEvents for easier access and time validation.
  # TODO: do this lazily so as not to create/destroy objects that are
  # never used
  @provenance = (provenance || []).map do |event|
    if event.is_a? ProcessingEvent
      event
    else
      ProcessingEvent.new(event['component_instance_uuid'],
                          event['started_at'], event['completed_at'],
                          event['context'])
    end
  end

  @properties = properties || {}

  # TODO: Make this better.  This check is technically
  # unnecessary, as we are able to completely deserialize the
  # message without needing to resort to the registered schema.
  registered_schema = RFlow::Configuration.available_data_types[@data_type_name][serialization_type.to_s]
  unless registered_schema
    raise ArgumentError, "Data type '#{@data_type_name}' with serialization_type '#{serialization_type}' not found"
  end

  # TODO: think about registering the schemas automatically if not
  # found in Configuration
  if schema && (registered_schema != schema)
    raise ArgumentError, "Passed schema ('#{schema}') does not equal registered schema ('#{registered_schema}') for data type '#{@data_type_name}' with serialization_type '#{serialization_type}'"
  end

  @data = Data.new(registered_schema, serialization_type.to_s, serialized_data)

  # Get the extensions and apply them to the data object to add capability
  RFlow::Configuration.available_data_extensions[@data_type_name].each do |e|
    RFlow.logger.debug "Extending '#{data_type_name}' with extension '#{e}'"
    @data.extend e
  end
end

Instance Attribute Details

#dataString (readonly)

The actual data string in the message.

Returns:

  • (String)


69
70
71
# File 'lib/rflow/message.rb', line 69

def data
  @data
end

#data_type_nameString (readonly)

The data type name of the message.

Returns:

  • (String)


65
66
67
# File 'lib/rflow/message.rb', line 65

def data_type_name
  @data_type_name
end

#propertiesHash

The message’s properties information.

Returns:

  • (Hash)


61
62
63
# File 'lib/rflow/message.rb', line 61

def properties
  @properties
end

#provenanceArray<ProcessingEvent>

The message’s provenance information.

Returns:



57
58
59
# File 'lib/rflow/message.rb', line 57

def provenance
  @provenance
end

Instance Method Details

#to_avroString

Serialize the current message object to Avro using the org.rflow.Message Avro schema. Note that we have to manually set the encoding for Ruby 1.9, otherwise the stringio would use UTF-8 by default, which would not work correctly, as a serialize avro string is BINARY, not UTF-8.

Returns:

  • (String)


123
124
125
126
127
128
129
130
131
132
133
# File 'lib/rflow/message.rb', line 123

def to_avro
  # stringify all the properties
  string_properties = Hash[properties.map { |k,v| [k.to_s, v.to_s] }]

  Message.encode('data_type_name' => data_type_name.to_s,
                 'provenance' => provenance.map(&:to_hash),
                 'properties' => string_properties.to_hash,
                 'data_serialization_type' => data.serialization_type.to_s,
                 'data_schema' => data.schema_string,
                 'data' => data.to_avro)
end