Class: SplitIoClient::SSE::EventSource::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/splitclient-rb/sse/event_source/client.rb

Constant Summary collapse

DEFAULT_READ_TIMEOUT =
70
CONNECT_TIMEOUT =
30_000
OK_CODE =
200
KEEP_ALIVE_RESPONSE =
"c\r\n:keepalive\n\n\r\n".freeze
ERROR_EVENT_TYPE =
'error'.freeze

Instance Method Summary collapse

Constructor Details

#initialize(config, read_timeout: DEFAULT_READ_TIMEOUT) {|_self| ... } ⇒ Client

Returns a new instance of Client.

Yields:

  • (_self)

Yield Parameters:



16
17
18
19
20
21
22
23
24
25
26
# File 'lib/splitclient-rb/sse/event_source/client.rb', line 16

def initialize(config, read_timeout: DEFAULT_READ_TIMEOUT)
  @config = config
  @read_timeout = read_timeout
  @connected = Concurrent::AtomicBoolean.new(false)
  @first_event = Concurrent::AtomicBoolean.new(true)
  @socket = nil
  @event_parser = SSE::EventSource::EventParser.new(config)
  @on = { event: ->(_) {}, action: ->(_) {} }

  yield self if block_given?
end

Instance Method Details

#close(action = Constants::PUSH_NONRETRYABLE_ERROR) ⇒ Object



36
37
38
39
40
41
42
43
# File 'lib/splitclient-rb/sse/event_source/client.rb', line 36

def close(action = Constants::PUSH_NONRETRYABLE_ERROR)
  dispatch_action(action)
  @connected.make_false
  SplitIoClient::Helpers::ThreadHelper.stop(:connect_stream, @config)
  @socket&.close
rescue StandardError => e
  @config.logger.error("SSEClient close Error: #{e.inspect}")
end

#connected?Boolean

Returns:

  • (Boolean)


64
65
66
# File 'lib/splitclient-rb/sse/event_source/client.rb', line 64

def connected?
  @connected.value
end

#on_action(&action) ⇒ Object



32
33
34
# File 'lib/splitclient-rb/sse/event_source/client.rb', line 32

def on_action(&action)
  @on[:action] = action
end

#on_event(&action) ⇒ Object



28
29
30
# File 'lib/splitclient-rb/sse/event_source/client.rb', line 28

def on_event(&action)
  @on[:event] = action
end

#start(url) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/splitclient-rb/sse/event_source/client.rb', line 45

def start(url)
  if connected?
    @config.logger.debug('SSEClient already running.')
    return true
  end

  @uri = URI(url)
  latch = Concurrent::CountDownLatch.new(1)

  connect_thread(latch)

  return false unless latch.wait(CONNECT_TIMEOUT)

  connected?
rescue StandardError => e
  @config.logger.error("SSEClient start Error: #{e.inspect}")
  connected?
end