Module: BinaryMessageParser
- Defined in:
- lib/signalfx/signalflow/binary.rb
Overview
Converts binary WebSocket messages into a hash
Class Method Summary collapse
-
.parse(data) ⇒ Object
data should be a raw string.
- .parse_data_payload(payload) ⇒ Object
Class Method Details
.parse(data) ⇒ Object
data should be a raw string
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/signalfx/signalflow/binary.rb', line 9 def parse(data) # See https://developers.signalfx.com/v2/reference#section-binary-encoding-of-websocket-messages version, , flags, _, channel, payload = data.unpack("CCb8CZ16a*") compressed = flags[0] == "1" is_json = flags[1] == "1" if version != 1 raise "Unsupported SignalFlow version #{version}" end if compressed payload = Zlib::Inflate.new(16+Zlib::MAX_WBITS).inflate(payload) end raise "Unknown binary message type #{}" if !is_json && != 5 = is_json ? JSON.parse(payload, {:symbolize_names => true}) : parse_data_payload(payload) .merge({:channel => channel}) end |
.parse_data_payload(payload) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/signalfx/signalflow/binary.rb', line 33 def parse_data_payload(payload) # See https://developers.signalfx.com/v2/reference#section-binary-encoding-used-for-the-websocket , element_count, tuples_raw = payload.unpack("Q>L>a*") data_hash = (0..element_count-1).map do |i| type, tsid, value_raw = tuples_raw[i*17..i*17+16].unpack("CQ>a8") value = case type when 1 # long value_raw.unpack("q>") when 2 # double value_raw.unpack("G") when 3 # int (32 bit) value_raw.unpack("l>") end [ Base64.urlsafe_encode64([tsid].pack("Q>")).gsub("=", ""), value[0], ] end.to_h { :type => "data", :logicalTimestampMs => , :logicalTimestamp => Time.at( / 1000.0), :data => data_hash, } end |