Class: Observability::Collector::Timescale

Inherits:
Observability::Collector show all
Extended by:
Configurability, Loggability
Defined in:
lib/observability/collector/timescale.rb

Constant Summary collapse

MAX_EVENT_BYTES =

The maximum size of event messages

64 * 1024
LOOP_TIMER =

The number of seconds to wait between IO loops

0.25
JSON_CONFIG =

The config to pass to JSON.parse

{
	object_class: Sequel::Postgres::JSONHash,
	array_class: Sequel::Postgres::JSONArray
}.freeze

Instance Method Summary collapse

Methods inherited from Observability::Collector

configured_type, inherited, start

Constructor Details

#initializeTimescale

Create a new UDP collector



49
50
51
52
53
54
55
56
# File 'lib/observability/collector/timescale.rb', line 49

def initialize
	super

	@socket     = UDPSocket.new
	@db         = nil
	@cursor     = nil
	@processing = false
end

Instance Method Details

#read_next_eventObject

Read the next event from the socket



108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/observability/collector/timescale.rb', line 108

def read_next_event
	data = @socket.recv_nonblock( MAX_EVENT_BYTES, exception: false )

	if data == :wait_readable
		IO.select( [@socket], nil, nil, LOOP_TIMER )
		return nil
	elsif data.empty?
		return nil
	else
		self.log.info "Read %d bytes" % [ data.bytesize ]
		return JSON.parse( data )
	end
end

#startObject

Start receiving events.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/observability/collector/timescale.rb', line 64

def start
	self.log.info "Starting up."
	@db = Sequel.connect( self.class.db )
	@db.extension( :pg_json )
	# @cursor = @db[ :events ].prepare( :insert, :insert_new_event,
	# 	 time: :$time,
	# 	 type: :$type,
	# 	 version: :$version,
	# 	 data: :$data
	# )

	@socket.bind( self.class.host, self.class.port )

	self.start_processing
end

#start_processingObject

Start consuming incoming events and storing them.



91
92
93
94
95
96
97
98
# File 'lib/observability/collector/timescale.rb', line 91

def start_processing
	@processing = true
	while @processing
		event = self.read_next_event or next
		self.log.debug "Read event: %p" % [ event ]
		self.store_event( event )
	end
end

#stopObject

Stop receiving events.



82
83
84
85
86
87
# File 'lib/observability/collector/timescale.rb', line 82

def stop
	self.stop_processing

	@cursor = nil
	@db.disconnect
end

#stop_processingObject

Stop consuming events.



102
103
104
# File 'lib/observability/collector/timescale.rb', line 102

def stop_processing
	@processing = false
end

#store_event(event) ⇒ Object

Store the specified event.



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/observability/collector/timescale.rb', line 124

def store_event( event )
	self.log.debug "Storing event: %p" % [ event ]

	time    = event.delete('@timestamp')
	type    = event.delete('@type')
	version = event.delete('@version')

	# @cursor.call( time: time, type: type, version: version, data: event )
	@db[ :events ].insert(
		 time: time,
		 type: type,
		 version: version,
		 data: Sequel.pg_json( event )
	)
end