Class: Observability::Collector::Timescale
- Inherits:
-
Observability::Collector
- Object
- Observability::Collector
- Observability::Collector::Timescale
- Extended by:
- Configurability, Loggability
- Defined in:
- lib/observability/collector/timescale.rb
Constant Summary collapse
- MAX_EVENT_BYTES =
The maximum size of event messages
64 * 1024
- LOOP_TIMER =
The number of seconds to wait between IO loops
0.25- JSON_CONFIG =
The config to pass to JSON.parse
{ object_class: Sequel::Postgres::JSONHash, array_class: Sequel::Postgres::JSONArray }.freeze
Instance Method Summary collapse
-
#initialize ⇒ Timescale
constructor
Create a new UDP collector.
-
#read_next_event ⇒ Object
Read the next event from the socket.
-
#start ⇒ Object
Start receiving events.
-
#start_processing ⇒ Object
Start consuming incoming events and storing them.
-
#stop ⇒ Object
Stop receiving events.
-
#stop_processing ⇒ Object
Stop consuming events.
-
#store_event(event) ⇒ Object
Store the specified
event.
Methods inherited from Observability::Collector
configured_type, inherited, start
Constructor Details
#initialize ⇒ Timescale
Create a new UDP collector
49 50 51 52 53 54 55 56 |
# File 'lib/observability/collector/timescale.rb', line 49 def initialize super @socket = UDPSocket.new @db = nil @cursor = nil @processing = false end |
Instance Method Details
#read_next_event ⇒ Object
Read the next event from the socket
108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/observability/collector/timescale.rb', line 108 def read_next_event data = @socket.recv_nonblock( MAX_EVENT_BYTES, exception: false ) if data == :wait_readable IO.select( [@socket], nil, nil, LOOP_TIMER ) return nil elsif data.empty? return nil else self.log.info "Read %d bytes" % [ data.bytesize ] return JSON.parse( data ) end end |
#start ⇒ Object
Start receiving events.
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/observability/collector/timescale.rb', line 64 def start self.log.info "Starting up." @db = Sequel.connect( self.class.db ) @db.extension( :pg_json ) # @cursor = @db[ :events ].prepare( :insert, :insert_new_event, # time: :$time, # type: :$type, # version: :$version, # data: :$data # ) @socket.bind( self.class.host, self.class.port ) self.start_processing end |
#start_processing ⇒ Object
Start consuming incoming events and storing them.
91 92 93 94 95 96 97 98 |
# File 'lib/observability/collector/timescale.rb', line 91 def start_processing @processing = true while @processing event = self.read_next_event or next self.log.debug "Read event: %p" % [ event ] self.store_event( event ) end end |
#stop ⇒ Object
Stop receiving events.
82 83 84 85 86 87 |
# File 'lib/observability/collector/timescale.rb', line 82 def stop self.stop_processing @cursor = nil @db.disconnect end |
#stop_processing ⇒ Object
Stop consuming events.
102 103 104 |
# File 'lib/observability/collector/timescale.rb', line 102 def stop_processing @processing = false end |
#store_event(event) ⇒ Object
Store the specified event.
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/observability/collector/timescale.rb', line 124 def store_event( event ) self.log.debug "Storing event: %p" % [ event ] time = event.delete('@timestamp') type = event.delete('@type') version = event.delete('@version') # @cursor.call( time: time, type: type, version: version, data: event ) @db[ :events ].insert( time: time, type: type, version: version, data: Sequel.pg_json( event ) ) end |