Class: Cosmos::UnpackingInterface
- Inherits:
-
TcpipServerInterface
- Object
- TcpipServerInterface
- Cosmos::UnpackingInterface
- Defined in:
- lib/cosmos/unpacking_interface/unpacking_interface.rb
Overview
we look for new aggregate packets
Instance Method Summary collapse
-
#agg_pkt_map ⇒ Object
Maps aggregate packets (packets with many granules) to simple packets (packets with a single granule) should have the form ‘AggregatePacket’ => ‘SimplePacket’ For example: ‘Science’ => ‘Science2’.
- #connect ⇒ Object
-
#packet_mapper ⇒ Object
Create and return a new instance of your own custom PacketMapper.
-
#process(packet:, target:, agg_packet:, simple_packet:) ⇒ Object
Unpack an aggregate packet if necessary and add resulting packets to the internal FIFO queue.
-
#read ⇒ Object
This method is called by COSMOS internally, essentially if we have packets on the queue read those otherwise wait for a packet, and unpack if necessary.
-
#target ⇒ Object
Cosmos target interface is being used with.
-
#transforms ⇒ Object
Transformations defined on an item in a simple packet.
Instance Method Details
#agg_pkt_map ⇒ Object
Maps aggregate packets (packets with many granules) to simple packets (packets with a single granule) should have the form ‘AggregatePacket’ => ‘SimplePacket’ For example: ‘Science’ => ‘Science2’
28 29 30 |
# File 'lib/cosmos/unpacking_interface/unpacking_interface.rb', line 28 def agg_pkt_map {} end |
#connect ⇒ Object
44 45 46 47 |
# File 'lib/cosmos/unpacking_interface/unpacking_interface.rb', line 44 def connect @derived_queue = [] super() end |
#packet_mapper ⇒ Object
Create and return a new instance of your own custom PacketMapper
40 41 42 |
# File 'lib/cosmos/unpacking_interface/unpacking_interface.rb', line 40 def packet_mapper PacketMapper.new end |
#process(packet:, target:, agg_packet:, simple_packet:) ⇒ Object
Unpack an aggregate packet if necessary and add resulting packets to the internal FIFO queue
52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/cosmos/unpacking_interface/unpacking_interface.rb', line 52 def process(packet:, target:, agg_packet:, simple_packet:) agg_pkt = System.telemetry.packet(target, agg_packet) if(agg_pkt.identify?(packet.buffer)) processor = AggregatePacketProcesser.new(packet_mapper, transforms) agg_pkt.buffer = packet.buffer.clone result = processor.unpack(agg_pkt, target, simple_packet) @derived_queue.concat(result) end return packet end |
#read ⇒ Object
This method is called by COSMOS internally, essentially if we have packets on the queue read those otherwise wait for a packet, and unpack if necessary
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/cosmos/unpacking_interface/unpacking_interface.rb', line 67 def read if !@derived_queue.empty? p1 = @derived_queue.shift return p1 else # This is a blocking call to wait for an actual packet to come over the wire packet = super() agg_pkt_map.select do |agg_packet, simple_packet| agg_pkt = System.telemetry.packet(target, agg_packet) if(agg_pkt.identify?(packet.buffer)) process(packet: packet, target: target, agg_packet: agg_packet, simple_packet: simple_packet) end end return packet end end |
#target ⇒ Object
Cosmos target interface is being used with
34 35 36 |
# File 'lib/cosmos/unpacking_interface/unpacking_interface.rb', line 34 def target "" end |
#transforms ⇒ Object
Transformations defined on an item in a simple packet. Entries should be of the form: “<TARGET>-<PACKET_NAME>-<ITEM_NAME>” => Proc where the proc can accept the following parameters TargetName, ItemName, key, value, index index is the index of the granule in the aggregate packet that was transformed into the simple packet More info about procs can be found here: ruby-doc.org/core-2.4.1/Proc.html
20 21 22 |
# File 'lib/cosmos/unpacking_interface/unpacking_interface.rb', line 20 def transforms {} end |