Class: Observability::Sender::UdpMulticast

Inherits:
Observability::Sender show all
Extended by:
Configurability
Defined in:
lib/observability/sender/udp_multicast.rb

Overview

A sender that sends events as JSON over multicast UDP.

Constant Summary collapse

RETRY_INTERVAL =

Number of seconds to wait between retrying a blocked write

0.25
SERIALIZE_PIPELINE =

The pipeline to use for turning events into network data

:resolve.to_proc >> JSON.method( :generate )

Instance Attribute Summary collapse

Attributes inherited from Observability::Sender

#executor

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Observability::Sender

configured_type, #enqueue, inherited, #start

Constructor Details

#initializeUdpMulticast

Create a new UDP sender



50
51
52
53
54
# File 'lib/observability/sender/udp_multicast.rb', line 50

def initialize( * )
	@multicast_address = self.class.multicast_address
	@port              = self.class.port
	@socket            = self.create_socket
end

Instance Attribute Details

#multicast_addressObject (readonly)

The address of the multicast group to send to



63
64
65
# File 'lib/observability/sender/udp_multicast.rb', line 63

def multicast_address
  @multicast_address
end

#portObject (readonly)

The port to send on



67
68
69
# File 'lib/observability/sender/udp_multicast.rb', line 67

def port
  @port
end

#socketObject (readonly)

The socket to send events over



71
72
73
# File 'lib/observability/sender/udp_multicast.rb', line 71

def socket
  @socket
end

Class Method Details

.bind_addrObject

Return an IPAddr that represents the local + multicast addresses to bind to.



44
45
46
# File 'lib/observability/sender/udp_multicast.rb', line 44

def self::bind_addr
	return IPAddr.new( self.bind_address ).hton + IPAddr.new( self.multicast_address ).hton
end

Instance Method Details

#create_socketObject

Create and return a UDPSocket after setting it up for multicast.



101
102
103
104
105
106
107
108
109
110
111
# File 'lib/observability/sender/udp_multicast.rb', line 101

def create_socket
	iaddr = self.class.bind_addr
	socket = UDPSocket.new

	socket.setsockopt( :IPPROTO_IP, :IP_ADD_MEMBERSHIP, iaddr )
	socket.setsockopt( :IPPROTO_IP, :IP_MULTICAST_TTL, self.class.multicast_ttl )
	socket.setsockopt( :IPPROTO_IP, :IP_MULTICAST_LOOP, 1 )
	socket.setsockopt( :SOL_SOCKET, :SO_REUSEPORT, 1 )

	return socket
end

#send_event(data) ⇒ Object

Send the specified event.



89
90
91
92
93
94
95
96
97
# File 'lib/observability/sender/udp_multicast.rb', line 89

def send_event( data )

	until data.empty?
		bytes = self.socket.send( data, 0, self.multicast_address, self.port )

		self.log.debug "Sent: %p" % [ data[0, bytes] ]
		data[ 0, bytes ] = ''
	end
end

#serialize_events(events) ⇒ Object

Serialize each the given events and return the results.



83
84
85
# File 'lib/observability/sender/udp_multicast.rb', line 83

def serialize_events( events  )
	return events.map( &SERIALIZE_PIPELINE )
end

#stopObject

Stop the sender’s executor.



75
76
77
78
79
# File 'lib/observability/sender/udp_multicast.rb', line 75

def stop
	super

	self.socket.shutdown( :WR )
end