Class: Observability::Collector::Timescale

Inherits:
Observability::Collector show all
Extended by:
Configurability, Loggability
Defined in:
lib/observability/collector/timescale.rb

Constant Summary collapse

MAX_EVENT_BYTES =

The maximum size of event messages

64 * 1024
LOOP_TIMER =

The number of seconds to wait between IO loops

0.25
JSON_CONFIG =

The config to pass to JSON.parse

{
  object_class: Sequel::Postgres::JSONHash,
  array_class: Sequel::Postgres::JSONArray
}.freeze

Instance Method Summary collapse

Methods inherited from Observability::Collector

configured_type, inherited, start

Constructor Details

#initializeTimescale

Create a new UDP collector



49
50
51
52
53
54
55
56
# File 'lib/observability/collector/timescale.rb', line 49

def initialize
  super

  @socket     = UDPSocket.new
  @db         = nil
  @cursor     = nil
  @processing = false
end

Instance Method Details

#read_next_eventObject

Read the next event from the socket



108
109
110
111
112
113
114
115
116
117
118
119
120
121
# File 'lib/observability/collector/timescale.rb', line 108

def read_next_event
  self.log.debug "Reading next event."
  data = @socket.recv_nonblock( MAX_EVENT_BYTES, exception: false )

  if data == :wait_readable
    IO.select( [@socket], nil, nil, LOOP_TIMER )
    return nil
  elsif data.empty?
    return nil
  else
    self.log.info "Read %d bytes" % [ data.bytesize ]
    return JSON.parse( data )
  end
end

#startObject

Start receiving events.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/observability/collector/timescale.rb', line 64

def start
  self.log.info "Starting up."
  @db = Sequel.connect( self.class.db )
  @db.extension( :pg_json )
  # @cursor = @db[ :events ].prepare( :insert, :insert_new_event,
  #   time: :$time,
  #   type: :$type,
  #   version: :$version,
  #   data: :$data
  # )

  @socket.bind( self.class.host, self.class.port )

  self.start_processing
end

#start_processingObject

Start consuming incoming events and storing them.



91
92
93
94
95
96
97
98
# File 'lib/observability/collector/timescale.rb', line 91

def start_processing
  @processing = true
  while @processing
    event = self.read_next_event or next
    self.log.debug "Read event: %p" % [ event ]
    self.store_event( event )
  end
end

#stopObject

Stop receiving events.



82
83
84
85
86
87
# File 'lib/observability/collector/timescale.rb', line 82

def stop
  self.stop_processing

  @cursor = nil
  @db.disconnct
end

#stop_processingObject

Stop consuming events.



102
103
104
# File 'lib/observability/collector/timescale.rb', line 102

def stop_processing
  @processing = false
end

#store_event(event) ⇒ Object

Store the specified event.



125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/observability/collector/timescale.rb', line 125

def store_event( event )
  time    = event.delete('@timestamp')
  type    = event.delete('@type')
  version = event.delete('@version')

  # @cursor.call( time: time, type: type, version: version, data: event )
  @db[ :events ].insert(
     time: time,
     type: type,
     version: version,
     data: Sequel.pg_json( event )
  )
end