Class: RFlow::Message::Data

Inherits:
Object
  • Object
show all
Defined in:
lib/rflow/message.rb,
lib/rflow/components/log.rb,
lib/rflow/components/raw.rb,
lib/rflow/components/integer.rb

Overview

Should proxy most methods to #data_object that we can serialize to Avro using the schema. Extensions should use extended hook to apply immediate changes.

Defined Under Namespace

Classes: Integer, Log, Raw

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(schema_string, serialization_type = 'avro', serialized_data = nil) ⇒ Data

Returns a new instance of Data.

Raises:

  • (ArgumentError)


192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/rflow/message.rb', line 192

def initialize(schema_string, serialization_type = 'avro', serialized_data = nil)
  raise ArgumentError, 'Only Avro serialization_type supported at the moment' unless serialization_type.to_s == 'avro'

  @schema_string = schema_string
  @serialization_type = serialization_type.to_s

  begin
    @schema = ::Avro::Schema.parse(schema_string)
    @writer = ::Avro::IO::DatumWriter.new(@schema)
  rescue Exception => e
    raise ArgumentError, "Invalid schema '#{@schema_string}': #{e}: #{e.message}"
  end

  if serialized_data
    serialized_data.force_encoding 'BINARY'
    @data_object = RFlow::Avro.decode(::Avro::IO::DatumReader.new(schema, schema), serialized_data)
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(method_sym, *args, &block) ⇒ void

This method returns an undefined value.

Proxy methods down to the underlying #data_object, probably a Hash. Hopefully an extension will provide any additional functionality so this won’t be called unless needed.



227
228
229
# File 'lib/rflow/message.rb', line 227

def method_missing(method_sym, *args, &block)
  @data_object.send(method_sym, *args, &block)
end

Instance Attribute Details

#data_objectObject

The data object for the message.

Returns:

  • (Object)


190
191
192
# File 'lib/rflow/message.rb', line 190

def data_object
  @data_object
end

#schema::Avro::Schema (readonly)

Avro parsed version of the schema the data follows

Returns:

  • (::Avro::Schema)


184
185
186
# File 'lib/rflow/message.rb', line 184

def schema
  @schema
end

#schema_stringString (readonly)

The string form of the schema the data follows.

Returns:

  • (String)


181
182
183
# File 'lib/rflow/message.rb', line 181

def schema_string
  @schema_string
end

#serialization_typeString (readonly)

Serialization type. Currently, always avro.

Returns:

  • (String)


187
188
189
# File 'lib/rflow/message.rb', line 187

def serialization_type
  @serialization_type
end

Instance Method Details

#to_avroString

Encode the message out to real Avro.

Returns:

  • (String)


219
220
221
# File 'lib/rflow/message.rb', line 219

def to_avro
  RFlow::Avro.encode @writer, @data_object
end

#valid?boolean

Is the message valid per the Avro schema?

Returns:

  • (boolean)


213
214
215
# File 'lib/rflow/message.rb', line 213

def valid?
  ::Avro::Schema.validate @schema, @data_object
end