Class: SSE::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/ld-eventsource/client.rb

Overview

A lightweight SSE client implementation. The client uses a worker thread to read from the streaming HTTP connection. Events are dispatched from the same worker thread.

The client will attempt to recover from connection failures as follows:

  • The first time the connection is dropped, it will wait about one second (or whatever value is

specified for ‘reconnect_time`) before attempting to reconnect. The actual delay has a pseudo-random jitter value added.

  • If the connection fails again within the time range specified by ‘reconnect_reset_interval`,

it will exponentially increase the delay between attempts (and also apply a random jitter). However, if the connection stays up for at least that amount of time, the delay will be reset to the minimum.

  • Each time a new connection is made, the client will send a ‘Last-Event-Id` header so the server

can pick up where it left off (if the server has been sending ID values for events).

It is also possible to force the connection to be restarted if the server sends no data within an interval specified by ‘read_timeout`. Using a read timeout is advisable because otherwise it is possible in some circumstances for a connection failure to go undetected. To keep the connection from timing out if there are no events to send, the server could send a comment line (`“:”`) at regular intervals as a heartbeat.

Constant Summary collapse

DEFAULT_CONNECT_TIMEOUT =

The default value for ‘connect_timeout` in #initialize.

10
DEFAULT_READ_TIMEOUT =

The default value for ‘read_timeout` in #initialize.

300
DEFAULT_RECONNECT_TIME =

The default value for ‘reconnect_time` in #initialize.

1
MAX_RECONNECT_TIME =

The maximum number of seconds that the client will wait before reconnecting.

30
DEFAULT_RECONNECT_RESET_INTERVAL =

The default value for ‘reconnect_reset_interval` in #initialize.

60

Instance Method Summary collapse

Constructor Details

#initialize(uri, headers: {}, connect_timeout: DEFAULT_CONNECT_TIMEOUT, read_timeout: DEFAULT_READ_TIMEOUT, reconnect_time: DEFAULT_RECONNECT_TIME, reconnect_reset_interval: DEFAULT_RECONNECT_RESET_INTERVAL, last_event_id: nil, proxy: nil, logger: nil) {|client| ... } ⇒ Client

Creates a new SSE client.

Once the client is created, it immediately attempts to open the SSE connection. You will normally want to register your event handler before this happens, so that no events are missed. To do this, provide a block after the constructor; the block will be executed before opening the connection.

Examples:

Specifying an event handler at initialization time

client = SSE::Client.new(uri) do |c|
  c.on_event do |event|
    puts "I got an event: #{event.type}, #{event.data}"
  end
end

Parameters:

  • uri (String)

    the URI to connect to

  • headers (Hash) (defaults to: {})

    ({}) custom headers to send with each HTTP request

  • connect_timeout (Float) (defaults to: DEFAULT_CONNECT_TIMEOUT)

    (DEFAULT_CONNECT_TIMEOUT) maximum time to wait for a connection, in seconds

  • read_timeout (Float) (defaults to: DEFAULT_READ_TIMEOUT)

    (DEFAULT_READ_TIMEOUT) the connection will be dropped and restarted if this number of seconds elapse with no data; nil for no timeout

  • reconnect_time (Float) (defaults to: DEFAULT_RECONNECT_TIME)

    (DEFAULT_RECONNECT_TIME) the initial delay before reconnecting after a failure, in seconds; this can increase as described in SSE::Client

  • reconnect_reset_interval (Float) (defaults to: DEFAULT_RECONNECT_RESET_INTERVAL)

    (DEFAULT_RECONNECT_RESET_INTERVAL) if a connection stays alive for at least this number of seconds, the reconnect interval will return to the initial value

  • last_event_id (String) (defaults to: nil)

    (nil) the initial value that the client should send in the ‘Last-Event-Id` header, if any

  • proxy (String) (defaults to: nil)

    (nil) optional URI of a proxy server to use (you can also specify a proxy with the ‘HTTP_PROXY` or `HTTPS_PROXY` environment variable)

  • logger (Logger) (defaults to: nil)

    a Logger instance for the client to use for diagnostic output; defaults to a logger with WARN level that goes to standard output

Yield Parameters:

  • client (Client)

    the new client instance, before opening the connection



85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/ld-eventsource/client.rb', line 85

def initialize(uri,
      headers: {},
      connect_timeout: DEFAULT_CONNECT_TIMEOUT,
      read_timeout: DEFAULT_READ_TIMEOUT,
      reconnect_time: DEFAULT_RECONNECT_TIME,
      reconnect_reset_interval: DEFAULT_RECONNECT_RESET_INTERVAL,
      last_event_id: nil,
      proxy: nil,
      logger: nil)
  @uri = URI(uri)
  @stopped = Concurrent::AtomicBoolean.new(false)

  @headers = headers.clone
  @connect_timeout = connect_timeout
  @read_timeout = read_timeout
  @logger = logger || default_logger

  if proxy
    @proxy = proxy
  else
    proxy_uri = @uri.find_proxy
    if !proxy_uri.nil? && (proxy_uri.scheme == 'http' || proxy_uri.scheme == 'https')
      @proxy = proxy_uri
    end
  end

  @backoff = Impl::Backoff.new(reconnect_time || DEFAULT_RECONNECT_TIME, MAX_RECONNECT_TIME,
    reconnect_reset_interval: reconnect_reset_interval)

  @on = { event: ->(_) {}, error: ->(_) {} }
  @last_id = last_event_id

  yield self if block_given?

  Thread.new do
    run_stream
  end
end

Instance Method Details

#closeObject

Permanently shuts down the client and its connection. No further events will be dispatched. This has no effect if called a second time.



164
165
166
167
168
169
# File 'lib/ld-eventsource/client.rb', line 164

def close
  if @stopped.make_true
    @cxn.close if !@cxn.nil?
    @cxn = nil
  end
end

#on_error {|error| ... } ⇒ Object

Specifies a block or Proc to receive connection errors. This will be called with a single parameter that is an instance of some exception class– normally, either some I/O exception or one of the classes in Errors. It is called from the same worker thread that reads the stream, so no more events or errors will be dispatched until it returns.

If the error handler decides that this type of error is not recoverable, it has the ability to prevent any further reconnect attempts by calling #close on the Client. For instance, you might want to do this if the server returned a ‘401 Unauthorized` error and no other authorization credentials are available, since any further requests would presumably also receive a 401.

Any previously specified error handler will be replaced.

Yield Parameters:

  • error (StandardError)


156
157
158
# File 'lib/ld-eventsource/client.rb', line 156

def on_error(&action)
  @on[:error] = action
end

#on_event {|event| ... } ⇒ Object

Specifies a block or Proc to receive events from the stream. This will be called once for every valid event received, with a single parameter of type StreamEvent. It is called from the same worker thread that reads the stream, so no more events will be dispatched until it returns.

Any exception that propagates out of the handler will cause the stream to disconnect and reconnect, on the assumption that data may have been lost and that restarting the stream will cause it to be resent.

Any previously specified event handler will be replaced.

Yield Parameters:



137
138
139
# File 'lib/ld-eventsource/client.rb', line 137

def on_event(&action)
  @on[:event] = action
end