Class: LaunchDarkly::Impl::DataSystem::StreamingDataSource Private

Inherits:
Object
  • Object
show all
Includes:
LaunchDarkly::Interfaces::DataSystem::Synchronizer
Defined in:
lib/ldclient-rb/impl/data_system/streaming.rb

Overview

This class is part of a private API. You should avoid using this class if possible, as it may be removed or be changed in the future.

StreamingDataSource is a Synchronizer that uses Server-Sent Events (SSE) to receive real-time updates from LaunchDarkly’s Flag Delivery services.

Since:

  • 5.5.0

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sdk_key, http_config, initial_reconnect_delay, config) ⇒ StreamingDataSource

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of StreamingDataSource.

Parameters:

  • sdk_key (String)
  • http_config (HttpConfigOptions)

    HTTP connection settings

  • initial_reconnect_delay (Float)

    Initial delay before reconnecting after an error

  • config (LaunchDarkly::Config)

    Used for global header settings

Since:

  • 5.5.0



39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/ldclient-rb/impl/data_system/streaming.rb', line 39

def initialize(sdk_key, http_config, initial_reconnect_delay, config)
  @sdk_key = sdk_key
  @http_config = http_config
  @initial_reconnect_delay = initial_reconnect_delay
  @config = config
  @logger = config.logger
  @name = "StreamingDataSourceV2"
  @sse = nil
  @stopped = Concurrent::Event.new
  @diagnostic_accumulator = nil
  @connection_attempt_start_time = 0
end

Instance Attribute Details

#nameObject (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Since:

  • 5.5.0



31
32
33
# File 'lib/ldclient-rb/impl/data_system/streaming.rb', line 31

def name
  @name
end

Instance Method Details

#set_diagnostic_accumulator(diagnostic_accumulator) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Sets the diagnostic accumulator for streaming initialization metrics.

Parameters:

Since:

  • 5.5.0



57
58
59
# File 'lib/ldclient-rb/impl/data_system/streaming.rb', line 57

def set_diagnostic_accumulator(diagnostic_accumulator)
  @diagnostic_accumulator = diagnostic_accumulator
end

#stopObject

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Stops the streaming synchronizer.

Since:

  • 5.5.0



184
185
186
187
188
# File 'lib/ldclient-rb/impl/data_system/streaming.rb', line 184

def stop
  @logger.info { "[LDClient] Stopping StreamingDataSourceV2 synchronizer" }
  @sse&.close
  @stopped.set
end

#sync(ss) {|update| ... } ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

sync begins the synchronization process for the data source, yielding Update objects until the connection is closed or an unrecoverable error occurs.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/ldclient-rb/impl/data_system/streaming.rb', line 69

def sync(ss)
  @logger.info { "[LDClient] Starting StreamingDataSourceV2 synchronizer" }
  log_connection_started

  change_set_builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new
  envid = nil

  base_uri = @http_config.base_uri + FDV2_STREAMING_ENDPOINT
  headers = Impl::Util.default_http_headers(@sdk_key, @config)
  opts = {
    headers: headers,
    read_timeout: STREAM_READ_TIMEOUT,
    logger: @logger,
    socket_factory: @http_config.socket_factory,
    reconnect_time: @initial_reconnect_delay,
  }

  @sse = SSE::Client.new(base_uri, **opts) do |client|
    client.on_connect do |headers|
      # Extract environment ID and check for fallback on successful connection
      if headers
        envid = headers[LD_ENVID_HEADER] || envid

        # Check for fallback header on connection
        if headers[LD_FD_FALLBACK_HEADER] == 'true'
          log_connection_result(true)
          yield LaunchDarkly::Interfaces::DataSystem::Update.new(
            state: LaunchDarkly::Interfaces::DataSource::Status::OFF,
            revert_to_fdv1: true,
            environment_id: envid
          )
          stop
        end
      end
    end

    client.on_event do |event|
      begin
        update = process_message(event, change_set_builder, envid)
        if update
          log_connection_result(true)
          @connection_attempt_start_time = 0
          yield update
        end
      rescue JSON::ParserError => e
        @logger.info { "[LDClient] Error parsing stream event; will restart stream: #{e}" }
        yield LaunchDarkly::Interfaces::DataSystem::Update.new(
          state: LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED,
          error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(
            LaunchDarkly::Interfaces::DataSource::ErrorInfo::INVALID_DATA,
            0,
            e.to_s,
            Time.now
          ),
          environment_id: envid
        )

        # Re-raise the exception so the SSE implementation can catch it and restart the stream.
        raise
      rescue => e
        @logger.info { "[LDClient] Error while handling stream event; will restart stream: #{e}" }
        yield LaunchDarkly::Interfaces::DataSystem::Update.new(
          state: LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED,
          error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(
            LaunchDarkly::Interfaces::DataSource::ErrorInfo::UNKNOWN,
            0,
            e.to_s,
            Time.now
          ),
          environment_id: envid
        )

        # Re-raise the exception so the SSE implementation can catch it and restart the stream.
        raise
      end
    end

    client.on_error do |error|
      log_connection_result(false)
      fallback = false

      # Extract envid and fallback from error headers if available
      if error.respond_to?(:headers) && error.headers
        envid = error.headers[LD_ENVID_HEADER] || envid

        if error.headers[LD_FD_FALLBACK_HEADER] == 'true'
          fallback = true
        end
      end

      update = handle_error(error, envid, fallback)
      yield update if update
    end

    client.query_params do
      selector = ss.selector
      {
        "filter" => @config.payload_filter_key,
        "basis" => (selector.state if selector&.defined?),
      }.compact
    end
  end

  unless @sse
    @logger.error { "[LDClient] Failed to create SSE client for streaming updates" }
    return
  end

  # Client auto-starts in background thread. Wait here until stop() is called.
  @stopped.wait
end