Class: Pants::Writers::UDPWriterConnection

Inherits:
EM::Connection
  • Object
show all
Includes:
LogSwitch::Mixin, NetworkHelpers
Defined in:
lib/pants/writers/udp_writer.rb

Overview

This is the EventMachine connection that connects the data from the data channel (put there by the reader you’re using) to the IP and UDP port you want to send it to.

Constant Summary collapse

PACKET_SPLIT_THRESHOLD =

Packets get split up before writing if they’re over this size.

1400
PACKET_SPLIT_SIZE =

Packets get split up to this size before writing.

1300

Instance Method Summary collapse

Constructor Details

#initialize(read_from_channel, dest_ip, dest_port) ⇒ UDPWriterConnection

Returns a new instance of UDPWriterConnection.

Parameters:

  • read_from_channel (EventMachine::Channel)

    The channel to expect data on and write to the socket.

  • dest_ip (String)

    The IP address to send data to. Can be unicast or multicast.

  • dest_port (Fixnum)

    The UDP port to send data to.



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/pants/writers/udp_writer.rb', line 29

def initialize(read_from_channel, dest_ip, dest_port)
  @read_from_channel = read_from_channel
  @dest_ip = dest_ip
  @dest_port = dest_port

  if Addrinfo.ip(@dest_ip).ipv4_multicast? || Addrinfo.ip(@dest_ip).ipv6_multicast?
    log "Got a multicast address: #{@dest_ip}:#{@dest_port}"
    setup_multicast_socket(@dest_ip)
  else
    log "Got a unicast address: #{@dest_ip}:#{@dest_port}"
  end
end

Instance Method Details

#post_initObject

Sends data received on the data channel to the destination IP and port. Since data may have been put in to the channel by a File reader (and will therefore be larger chunks of data than you’ll want to send in a packet over the wire), it will split packets into PACKET_SPLIT_SIZE sized packets before sending.



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/pants/writers/udp_writer.rb', line 47

def post_init
  @read_from_channel.subscribe do |data|
    if data.size > PACKET_SPLIT_THRESHOLD
      log "#{__id__} Got big data: #{data.size}.  Splitting..."
      io = StringIO.new(data)
      io.binmode

      begin
        log "#{__id__} Spliced #{PACKET_SPLIT_SIZE} bytes to socket packet"

        while true
          new_packet = io.read_nonblock(PACKET_SPLIT_SIZE)
          send_datagram(new_packet, @dest_ip, @dest_port)
          new_packet = nil
        end
      rescue EOFError
        send_datagram(new_packet, @dest_ip, @dest_port) if new_packet
        io.close
      end
    else
      log "Sending data to #{@dest_ip}:#{@dest_port}"
      send_datagram(data, @dest_ip, @dest_port)
    end
  end
end

#receive_data(data) ⇒ Object



73
74
75
# File 'lib/pants/writers/udp_writer.rb', line 73

def receive_data(data)
  log "Got data (should I?): #{data.size}, port #{@dest_port}, peer: #{get_peername}"
end