Class: SSE::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/ld-eventsource/client.rb

Overview

A lightweight SSE client implementation. The client uses a worker thread to read from the streaming HTTP connection. Events are dispatched from the same worker thread.

The client will attempt to recover from connection failures as follows:

  • The first time the connection is dropped, it will wait about one second (or whatever value is

specified for reconnect_time) before attempting to reconnect. The actual delay has a pseudo-random jitter value added.

  • If the connection fails again within the time range specified by reconnect_reset_interval,

it will exponentially increase the delay between attempts (and also apply a random jitter). However, if the connection stays up for at least that amount of time, the delay will be reset to the minimum.

  • Each time a new connection is made, the client will send a Last-Event-Id header so the server

can pick up where it left off (if the server has been sending ID values for events).

It is also possible to force the connection to be restarted if the server sends no data within an interval specified by read_timeout. Using a read timeout is advisable because otherwise it is possible in some circumstances for a connection failure to go undetected. To keep the connection from timing out if there are no events to send, the server could send a comment line (‘“:”`) at regular intervals as a heartbeat.

Constant Summary collapse

DEFAULT_CONNECT_TIMEOUT =

The default value for connect_timeout in #initialize.

10
DEFAULT_READ_TIMEOUT =

The default value for read_timeout in #initialize.

300
DEFAULT_RECONNECT_TIME =

The default value for reconnect_time in #initialize.

1
MAX_RECONNECT_TIME =

The maximum number of seconds that the client will wait before reconnecting.

30
DEFAULT_RECONNECT_RESET_INTERVAL =

The default value for reconnect_reset_interval in #initialize.

60
DEFAULT_HTTP_METHOD =

The default HTTP method for requests.

"GET"

Instance Method Summary collapse

Constructor Details

#initialize(uri, headers: {}, connect_timeout: DEFAULT_CONNECT_TIMEOUT, read_timeout: DEFAULT_READ_TIMEOUT, reconnect_time: DEFAULT_RECONNECT_TIME, reconnect_reset_interval: DEFAULT_RECONNECT_RESET_INTERVAL, last_event_id: nil, proxy: nil, logger: nil, socket_factory: nil, method: DEFAULT_HTTP_METHOD, payload: nil, retry_enabled: true, http_client_options: nil) {|client| ... } ⇒ Client

Creates a new SSE client.

Once the client is created, it immediately attempts to open the SSE connection. You will normally want to register your event handler before this happens, so that no events are missed. To do this, provide a block after the constructor; the block will be executed before opening the connection.

Examples:

Specifying an event handler at initialization time

client = SSE::Client.new(uri) do |c|
  c.on_event do |event|
    puts "I got an event: #{event.type}, #{event.data}"
  end
end

Parameters:

  • uri (String)

    the URI to connect to

  • headers (Hash) (defaults to: {})

    ({}) custom headers to send with each HTTP request

  • connect_timeout (Float) (defaults to: DEFAULT_CONNECT_TIMEOUT)

    (DEFAULT_CONNECT_TIMEOUT) maximum time to wait for a connection, in seconds

  • read_timeout (Float) (defaults to: DEFAULT_READ_TIMEOUT)

    (DEFAULT_READ_TIMEOUT) the connection will be dropped and restarted if this number of seconds elapse with no data; nil for no timeout

  • reconnect_time (Float) (defaults to: DEFAULT_RECONNECT_TIME)

    (DEFAULT_RECONNECT_TIME) the initial delay before reconnecting after a failure, in seconds; this can increase as described in SSE::Client

  • reconnect_reset_interval (Float) (defaults to: DEFAULT_RECONNECT_RESET_INTERVAL)

    (DEFAULT_RECONNECT_RESET_INTERVAL) if a connection stays alive for at least this number of seconds, the reconnect interval will return to the initial value

  • last_event_id (String) (defaults to: nil)

    (nil) the initial value that the client should send in the Last-Event-Id header, if any

  • proxy (String) (defaults to: nil)

    (nil) optional URI of a proxy server to use (you can also specify a proxy with the HTTP_PROXY or HTTPS_PROXY environment variable)

  • logger (Logger) (defaults to: nil)

    a Logger instance for the client to use for diagnostic output; defaults to a logger with WARN level that goes to standard output

  • socket_factory (#open) (defaults to: nil)

    (nil) an optional factory object for creating sockets, if you want to use something other than the default TCPSocket; it must implement ‘open(uri, timeout)` to return a connected Socket

  • method (String) (defaults to: DEFAULT_HTTP_METHOD)

    (“GET”) the HTTP method to use for requests

  • payload (String, Hash, Array, #call) (defaults to: nil)

    (nil) optional request payload. If payload is a Hash or an Array, it will be converted to JSON and sent as the request body. A string will be sent as a non-JSON request body. If payload responds to #call, it will be invoked on each request to generate the payload dynamically.

  • retry_enabled (Boolean) (defaults to: true)

    (true) whether to retry connections after failures. If false, the client will exit after the first connection failure instead of attempting to reconnect.

  • http_client_options (Hash) (defaults to: nil)

    (nil) additional options to pass to the HTTP client, such as socket_factory or proxy. These settings will override the socket factory and proxy settings.

Yield Parameters:

  • client (Client)

    the new client instance, before opening the connection



102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
# File 'lib/ld-eventsource/client.rb', line 102

def initialize(uri,
      headers: {},
      connect_timeout: DEFAULT_CONNECT_TIMEOUT,
      read_timeout: DEFAULT_READ_TIMEOUT,
      reconnect_time: DEFAULT_RECONNECT_TIME,
      reconnect_reset_interval: DEFAULT_RECONNECT_RESET_INTERVAL,
      last_event_id: nil,
      proxy: nil,
      logger: nil,
      socket_factory: nil,
      method: DEFAULT_HTTP_METHOD,
      payload: nil,
      retry_enabled: true,
      http_client_options: nil)
  @uri = URI(uri)
  @stopped = Concurrent::AtomicBoolean.new(false)
  @retry_enabled = retry_enabled

  @headers = headers.clone
  @connect_timeout = connect_timeout
  @read_timeout = read_timeout
  @method = method.to_s.upcase
  @payload = payload
  @logger = logger || default_logger

  base_http_client_options = {}
  if socket_factory
    base_http_client_options["socket_class"] = socket_factory
  end

  if proxy
    @proxy = proxy
  else
    proxy_uri = @uri.find_proxy
    if !proxy_uri.nil? && (proxy_uri.scheme == 'http' || proxy_uri.scheme == 'https')
      @proxy = proxy_uri
    end
  end

  if @proxy
    base_http_client_options["proxy"] = {
      :proxy_address => @proxy.host,
      :proxy_port => @proxy.port,
    }
    base_http_client_options["proxy"][:proxy_username] = @proxy.user unless @proxy.user.nil?
    base_http_client_options["proxy"][:proxy_password] = @proxy.password unless @proxy.password.nil?
  end

  options = http_client_options.is_a?(Hash) ? base_http_client_options.merge(http_client_options) : base_http_client_options

  @http_client = HTTP::Client.new(options)
    .follow
    .timeout({
      read: read_timeout,
      connect: connect_timeout,
    })
  @cxn = nil
  @lock = Mutex.new

  @backoff = Impl::Backoff.new(reconnect_time || DEFAULT_RECONNECT_TIME, MAX_RECONNECT_TIME,
    reconnect_reset_interval: reconnect_reset_interval)
  @first_attempt = true

  @on = { event: ->(_) {}, error: ->(_) {} }
  @last_id = last_event_id
  @query_params_callback = nil

  yield self if block_given?

  Thread.new { run_stream }.name = 'LD/SSEClient'
end

Instance Method Details

#closeObject

Permanently shuts down the client and its connection. No further events will be dispatched. This has no effect if called a second time.



244
245
246
247
248
# File 'lib/ld-eventsource/client.rb', line 244

def close
  if @stopped.make_true
    reset_http
  end
end

#closed?Boolean

Tests whether the client has been shut down by a call to #close.

Returns:

  • (Boolean)

    true if the client has been shut down



255
256
257
# File 'lib/ld-eventsource/client.rb', line 255

def closed?
  @stopped.value
end

#on_error {|error| ... } ⇒ Object

Specifies a block or Proc to receive connection errors. This will be called with a single parameter that is an instance of some exception class– normally, either some I/O exception or one of the classes in Errors. It is called from the same worker thread that reads the stream, so no more events or errors will be dispatched until it returns.

If the error handler decides that this type of error is not recoverable, it has the ability to prevent any further reconnect attempts by calling #close on the Client. For instance, you might want to do this if the server returned a ‘401 Unauthorized` error and no other authorization credentials are available, since any further requests would presumably also receive a 401.

Any previously specified error handler will be replaced.

Yield Parameters:

  • error (StandardError)


206
207
208
# File 'lib/ld-eventsource/client.rb', line 206

def on_error(&action)
  @on[:error] = action
end

#on_event {|event| ... } ⇒ Object

Specifies a block or Proc to receive events from the stream. This will be called once for every valid event received, with a single parameter of type StreamEvent. It is called from the same worker thread that reads the stream, so no more events will be dispatched until it returns.

Any exception that propagates out of the handler will cause the stream to disconnect and reconnect, on the assumption that data may have been lost and that restarting the stream will cause it to be resent.

Any previously specified event handler will be replaced.

Yield Parameters:



187
188
189
# File 'lib/ld-eventsource/client.rb', line 187

def on_event(&action)
  @on[:event] = action
end

#query_params(&action) ⇒ Object

Specifies a block or Proc to generate query parameters dynamically. This will be called before each connection attempt (both initial connection and reconnections), allowing you to update query parameters based on the current client state.

The block should return a Hash with string keys and string values, which will be merged with any existing query parameters in the base URI. If the callback raises an exception, it will be logged and the connection will proceed with the base URI’s query parameters (or no query parameters if none were present).

This is useful for scenarios where query parameters need to reflect the current state of the client, such as sending a “basis” parameter that represents what data the client already has.

Examples:

Using dynamic query parameters

client = SSE::Client.new(base_uri) do |c|
  c.query_params do
    {
      "basis" => (selector.state if selector.defined?),
      "filter" => filter_key
    }.compact
  end
  c.on_event { |event| handle_event(event) }
end

Yield Returns:

  • (Hash<String, String>)

    a hash of query parameter names to values



236
237
238
# File 'lib/ld-eventsource/client.rb', line 236

def query_params(&action)
  @query_params_callback = action
end