Class: KinesisProducer::Daemon

Inherits:
Object
  • Object
show all
Defined in:
lib/kinesis_producer/daemon.rb

Defined Under Namespace

Classes: Meter

Constant Summary collapse

FixnumMax =
(2 ** (64 - 2)) - 1

Instance Method Summary collapse

Constructor Details

#initialize(binary, handler, options) ⇒ Daemon

Returns a new instance of Daemon.



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/kinesis_producer/daemon.rb', line 22

def initialize(binary, handler, options)
  @binary  = binary
  @handler = handler

  @configuration             = options[:configuration] || {}
  @credentials               = options[:credentials]
  @metrics_credentials       = options[:metrics_credentials]
  @credentials_refresh_delay = options[:credentials_refresh_delay] || 5000
  @logger                    = options[:logger]
  @debug                     = options[:debug]

  @executor = Concurrent::CachedThreadPool.new
  @shutdown = Concurrent::AtomicBoolean.new(false)
  @outgoing_messages = Queue.new
  @incoming_messages = Queue.new

  if debug?
    @meters = {
      add_message:     Meter.new,
      send_message:    Meter.new,
      receive_message: Meter.new,
      return_message:  Meter.new,
    }
  end
end

Instance Method Details

#add(message) ⇒ Object



65
66
67
68
# File 'lib/kinesis_producer/daemon.rb', line 65

def add(message)
  @outgoing_messages.push(message)
  @meters[:add_message].mark if debug?
end

#destroyObject



55
56
57
58
59
60
61
62
63
# File 'lib/kinesis_producer/daemon.rb', line 55

def destroy
  @shutdown.make_true
  if @pid
    Process.kill("TERM", @pid)
    Process.waitpid(@pid)
    sleep 1 # TODO
  end
  delete_pipes
end

#startObject



48
49
50
51
52
53
# File 'lib/kinesis_producer/daemon.rb', line 48

def start
  @executor.post do
    create_pipes
    start_child
  end
end