Class: Cosmos::UnpackingInterface

Inherits:
TcpipServerInterface
  • Object
show all
Defined in:
lib/cosmos/unpacking_interface/unpacking_interface.rb

Overview

we look for new aggregate packets

Instance Method Summary collapse

Instance Method Details

#agg_pkt_mapObject

Maps aggregate packets (packets with many granules) to simple packets (packets with a single granule) should have the form ‘AggregatePacket’ => ‘SimplePacket’ For example: ‘Science’ => ‘Science2’



28
29
30
# File 'lib/cosmos/unpacking_interface/unpacking_interface.rb', line 28

def agg_pkt_map
  {}
end

#connectObject



44
45
46
47
# File 'lib/cosmos/unpacking_interface/unpacking_interface.rb', line 44

def connect
  @derived_queue = []
  super()
end

#packet_mapperObject

Create and return a new instance of your own custom PacketMapper



40
41
42
# File 'lib/cosmos/unpacking_interface/unpacking_interface.rb', line 40

def packet_mapper
  PacketMapper.new
end

#process(packet:, target:, agg_packet:, simple_packet:) ⇒ Object

Unpack an aggregate packet if necessary and add resulting packets to the internal FIFO queue



52
53
54
55
56
57
58
59
60
61
62
# File 'lib/cosmos/unpacking_interface/unpacking_interface.rb', line 52

def process(packet:, target:, agg_packet:, simple_packet:)   
   agg_pkt = System.telemetry.packet(target, agg_packet)
   if(agg_pkt.identify?(packet.buffer))
     processor = AggregatePacketProcesser.new(packet_mapper, transforms)
     agg_pkt.buffer = packet.buffer.clone
     result = processor.unpack(agg_pkt, target, simple_packet)

     @derived_queue.concat(result)
   end
   return packet
end

#readObject

This method is called by COSMOS internally, essentially if we have packets on the queue read those otherwise wait for a packet, and unpack if necessary



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/cosmos/unpacking_interface/unpacking_interface.rb', line 67

def read
  if !@derived_queue.empty?
    p1 = @derived_queue.shift
    return p1
  else

   # This is a blocking call to wait for an actual packet to come over the wire
   packet = super()

   agg_pkt_map.select do |agg_packet, simple_packet|
     agg_pkt = System.telemetry.packet(target, agg_packet)
     if(agg_pkt.identify?(packet.buffer))      
       process(packet: packet, target: target, agg_packet: agg_packet, simple_packet: simple_packet)
     end
   end
 
   return packet
  end 
end

#targetObject

Cosmos target interface is being used with



34
35
36
# File 'lib/cosmos/unpacking_interface/unpacking_interface.rb', line 34

def target
 ""
end

#transformsObject

Transformations defined on an item in a simple packet. Entries should be of the form: “<TARGET>-<PACKET_NAME>-<ITEM_NAME>” => Proc where the proc can accept the following parameters TargetName, ItemName, key, value, index index is the index of the granule in the aggregate packet that was transformed into the simple packet More info about procs can be found here: ruby-doc.org/core-2.4.1/Proc.html



20
21
22
# File 'lib/cosmos/unpacking_interface/unpacking_interface.rb', line 20

def transforms
  {}
end