Class: Observability::Sender::UDP

Inherits:
Observability::Sender show all
Extended by:
Configurability
Defined in:
lib/observability/sender/udp.rb

Overview

A sender that sends events as JSON over UDP.

Constant Summary collapse

RETRY_INTERVAL =

Number of seconds to wait between retrying a blocked write

0.25
SERIALIZE_PIPELINE =

The pipeline to use for turning events into network data

:resolve.to_proc >> JSON.method(:generate)

Instance Attribute Summary collapse

Attributes inherited from Observability::Sender

#executor

Instance Method Summary collapse

Methods inherited from Observability::Sender

configured_type, #enqueue, inherited

Constructor Details

#initializeUDP

Create a new UDP sender



37
38
39
# File 'lib/observability/sender/udp.rb', line 37

def initialize( * )
	@socket = UDPSocket.new
end

Instance Attribute Details

#socketObject (readonly)

The socket to send events over



48
49
50
# File 'lib/observability/sender/udp.rb', line 48

def socket
  @socket
end

Instance Method Details

#send_event(data) ⇒ Object

Send the specified event.



74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/observability/sender/udp.rb', line 74

def send_event( data )
	until data.empty?
		bytes = self.socket.sendmsg_nonblock( data, 0, exception: false )

		if bytes == :wait_writable
			IO.select( nil, [self.socket], nil )
		else
			self.log.debug "Sent: %p" % [ data[0, bytes] ]
			data[ 0, bytes ] = ''
		end
	end
end

#serialize_events(events) ⇒ Object

Serialize each the given events and return the results.



68
69
70
# File 'lib/observability/sender/udp.rb', line 68

def serialize_events( events  )
	return events.map( &SERIALIZE_PIPELINE )
end

#startObject

Start sending queued events.



52
53
54
55
56
# File 'lib/observability/sender/udp.rb', line 52

def start
	self.socket.connect( self.class.host, self.class.port )

	super
end

#stopObject

Stop the sender’s executor.



60
61
62
63
64
# File 'lib/observability/sender/udp.rb', line 60

def stop
	super

	self.socket.shutdown( :WR )
end