Module: BinaryMessageParser

Defined in:
lib/signalfx/signalflow/binary.rb

Overview

Converts binary WebSocket messages into a hash

Class Method Summary collapse

Class Method Details

.parse(data) ⇒ Object

data should be a raw string



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/signalfx/signalflow/binary.rb', line 9

def parse(data)
  # See https://developers.signalfx.com/v2/reference#section-binary-encoding-of-websocket-messages
  version, message_type, flags, _, channel, payload = data.unpack("CCb8CZ16a*")
  compressed = flags[0] == "1"
  is_json = flags[1] == "1"

  if version != 1
    raise "Unsupported SignalFlow version #{version}"
  end

  if compressed
    payload = Zlib::Inflate.new(16+Zlib::MAX_WBITS).inflate(payload)
  end

  raise "Unknown binary message type #{message_type}" if !is_json && message_type != 5

  message = is_json ?
              JSON.parse(payload, {:symbolize_names => true}) :
              parse_data_payload(payload)

  message.merge({:channel => channel})
end

.parse_data_payload(payload) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/signalfx/signalflow/binary.rb', line 33

def parse_data_payload(payload)
  # See https://developers.signalfx.com/v2/reference#section-binary-encoding-used-for-the-websocket
  timestamp, element_count, tuples_raw = payload.unpack("Q>L>a*")
  data_hash = (0..element_count-1).map do |i|
    type, tsid, value_raw = tuples_raw[i*17..i*17+16].unpack("CQ>a8")

    value = case type
            when 1  # long
              value_raw.unpack("q>")
            when 2  # double
              value_raw.unpack("G")
            when 3  # int (32 bit)
              value_raw.unpack("l>")
            end

    [
      Base64.urlsafe_encode64([tsid].pack("Q>")).gsub("=", ""),
      value[0],
    ]
  end.to_h

  {
    :type => "data",
    :logicalTimestampMs => timestamp,
    :logicalTimestamp => Time.at(timestamp / 1000.0),
    :data => data_hash,
  }
end