Class: KinesisClient

Inherits:
Object
  • Object
show all
Defined in:
lib/kinesis_client.rb

Overview

Model representing the result message posted to Kinesis stream about everything that has gone on here – good, bad, or otherwise.

Direct Known Subclasses

NYPLRubyUtil::KinesisClient

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ KinesisClient

Returns a new instance of KinesisClient.



10
11
12
13
14
15
16
17
18
# File 'lib/kinesis_client.rb', line 10

def initialize(config)
  @config = config
  @avro = nil

  if config[:schema_string]
    @avro = NYPLAvro.by_name(config[:schema_string])
  end

end

Instance Attribute Details

#avroObject (readonly)

Returns the value of attribute avro.



8
9
10
# File 'lib/kinesis_client.rb', line 8

def avro
  @avro
end

#configObject (readonly)

Returns the value of attribute config.



8
9
10
# File 'lib/kinesis_client.rb', line 8

def config
  @config
end

Instance Method Details

#<<(json_message) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/kinesis_client.rb', line 20

def <<(json_message)
  if config[:schema_string]
    message = avro.encode(json_message, false)
  else
    message = json_message
  end

  client = Aws::Kinesis::Client.new
  partition_key = (config[:partition_key] ? json_message[config[:partition_key]] : SecureRandom.hex(20)).hash.to_s

  resp = client.put_record({
    stream_name: config[:stream_name],
    data: message,
    partition_key: partition_key
    })

    return_hash = {}

    if resp.successful?
      return_hash["code"] = "200"
      return_hash["message"] = json_message, resp
      $logger.info("Message sent to HoldRequestResult #{json_message}, #{resp}") if $logger
    else
      $logger.error("message" => "FAILED to send message to HoldRequestResult #{json_message}, #{resp}.") if $logger
      raise NYPLError.new json_message, resp
    end
    return_hash
end