Class: KinesisProducer::Library

Inherits:
Object
  • Object
show all
Defined in:
lib/kinesis_producer/library.rb

Defined Under Namespace

Classes: Handler

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Library

Returns a new instance of Library.



70
71
72
73
74
75
76
# File 'lib/kinesis_producer/library.rb', line 70

def initialize(options)
  @binary_path = options.delete(:binary_path) || self.class.default_binary_path
  @message_id = Concurrent::AtomicFixnum.new(1)
  @futures = Concurrent::Map.new
  @child = Daemon.new(@binary_path, Handler.new(@futures), options)
  @child.start
end

Class Method Details

.binaryObject



55
56
57
58
59
60
61
62
# File 'lib/kinesis_producer/library.rb', line 55

def binary
  case
  when OS.linux?;   Binary::Files['linux']
  when OS.osx?;     Binary::Files['osx']
  when OS.windows?; Binary::Files['windows']
  else; raise
  end
end

.default_binary_pathObject



64
65
66
67
# File 'lib/kinesis_producer/library.rb', line 64

def default_binary_path
  root_dir = File.expand_path('../../..', __FILE__)
  File.join(root_dir, binary)
end

Instance Method Details

#destroyObject



78
79
80
81
# File 'lib/kinesis_producer/library.rb', line 78

def destroy
  flush_sync
  @child.destroy
end

#flush(options = {}) ⇒ Object



98
99
100
# File 'lib/kinesis_producer/library.rb', line 98

def flush(options = {})
  add_message(:flush, KinesisProducer::Protobuf::Flush.new(options))
end

#flush_syncObject



102
103
104
105
106
107
# File 'lib/kinesis_producer/library.rb', line 102

def flush_sync
  while @futures.size > 0
    flush()
    sleep 0.5
  end
end

#get_metrics(options = {}) ⇒ Object



90
91
92
93
94
95
96
# File 'lib/kinesis_producer/library.rb', line 90

def get_metrics(options = {})
  f = Concurrent::IVar.new
  id = add_message(:metrics_request, KinesisProducer::Protobuf::MetricsRequest.new(options))
  @futures[id] = f
  f.wait
  f.value
end

#put_record(options) ⇒ Object



83
84
85
86
87
88
# File 'lib/kinesis_producer/library.rb', line 83

def put_record(options)
  f = Concurrent::IVar.new
  id = add_message(:put_record, KinesisProducer::Protobuf::PutRecord.new(options))
  @futures[id] = f
  f
end