Class: KinesisProducer::Daemon
- Inherits:
-
Object
- Object
- KinesisProducer::Daemon
- Defined in:
- lib/kinesis_producer/daemon.rb
Defined Under Namespace
Classes: Meter
Constant Summary collapse
- FixnumMax =
(2 ** (64 - 2)) - 1
Instance Method Summary collapse
- #add(message) ⇒ Object
- #destroy ⇒ Object
-
#initialize(binary, handler, options) ⇒ Daemon
constructor
A new instance of Daemon.
- #start ⇒ Object
Constructor Details
#initialize(binary, handler, options) ⇒ Daemon
Returns a new instance of Daemon.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/kinesis_producer/daemon.rb', line 22 def initialize(binary, handler, ) @binary = binary @handler = handler @configuration = [:configuration] || {} @credentials = [:credentials] @metrics_credentials = [:metrics_credentials] @credentials_refresh_delay = [:credentials_refresh_delay] || 5000 @logger = [:logger] @debug = [:debug] @executor = Concurrent::CachedThreadPool.new @shutdown = Concurrent::AtomicBoolean.new(false) @outgoing_messages = Queue.new @incoming_messages = Queue.new if debug? @meters = { add_message: Meter.new, send_message: Meter.new, receive_message: Meter.new, return_message: Meter.new, } end end |
Instance Method Details
#add(message) ⇒ Object
65 66 67 68 |
# File 'lib/kinesis_producer/daemon.rb', line 65 def add() @outgoing_messages.push() @meters[:add_message].mark if debug? end |
#destroy ⇒ Object
55 56 57 58 59 60 61 62 63 |
# File 'lib/kinesis_producer/daemon.rb', line 55 def destroy @shutdown.make_true if @pid Process.kill("TERM", @pid) Process.waitpid(@pid) sleep 1 # TODO end delete_pipes end |
#start ⇒ Object
48 49 50 51 52 53 |
# File 'lib/kinesis_producer/daemon.rb', line 48 def start @executor.post do create_pipes start_child end end |