Class: Celluloid::EventSource

Inherits:
Object
  • Object
show all
Includes:
IO
Defined in:
lib/celluloid/eventsource.rb,
lib/celluloid/eventsource/version.rb,
lib/celluloid/eventsource/response_parser.rb

Defined Under Namespace

Classes: MessageEvent, ResponseParser

Constant Summary collapse

CONNECTING =
0
OPEN =
1
CLOSED =
2
MAX_RETRIES =

2^31 since our retries library doesn’t allow for unlimited retries. At an average of 1 second per retry, we’ll still be retrying in 68 years.

2147483648
VERSION =
"0.8.4"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri, options = {}) {|_self| ... } ⇒ EventSource

Returns a new instance of EventSource.

Yields:

  • (_self)

Yield Parameters:



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/celluloid/eventsource.rb', line 28

def initialize(uri, options = {})
  self.url = uri
  options  = options.dup
  @ready_state = CONNECTING
  @with_credentials = options.delete(:with_credentials) { false }
  @heartbeat_timeout = options.delete(:heartbeat_timeout) { 300 }
  @logger = options.delete(:logger) { default_logger }
  @logger.info("[EventSource] Starting client connecting to url: #{self.url} with heartbeat timeout: #{@heartbeat_timeout} seconds")
  @headers = default_request_headers.merge(options.fetch(:headers, {}))

  @event_type_buffer = ""
  @last_event_id_buffer = ""
  @data_buffer = ""

  @last_event_id = String.new

  @reconnect_timeout = 1
  @on = { open: ->{}, message: ->(_) {}, error: ->(_) {}}
  @parser = ResponseParser.new

  @chunked = false

  yield self if block_given?

  async.listen
end

Instance Attribute Details

#heartbeat_timeoutObject (readonly)

Returns the value of attribute heartbeat_timeout.



15
16
17
# File 'lib/celluloid/eventsource.rb', line 15

def heartbeat_timeout
  @heartbeat_timeout
end

#loggerObject (readonly)

Returns the value of attribute logger.



15
16
17
# File 'lib/celluloid/eventsource.rb', line 15

def logger
  @logger
end

#ready_stateObject (readonly)

Returns the value of attribute ready_state.



16
17
18
# File 'lib/celluloid/eventsource.rb', line 16

def ready_state
  @ready_state
end

#urlObject

Returns the value of attribute url.



15
16
17
# File 'lib/celluloid/eventsource.rb', line 15

def url
  @url
end

#with_credentialsObject (readonly)

Returns the value of attribute with_credentials.



15
16
17
# File 'lib/celluloid/eventsource.rb', line 15

def with_credentials
  @with_credentials
end

Instance Method Details

#closeObject



88
89
90
91
92
93
# File 'lib/celluloid/eventsource.rb', line 88

def close
  @logger.info("[EventSource] Closing client")
  @heartbeat_task.cancel if @heartbeat_task
  @socket.close if @socket
  @ready_state = CLOSED
end

#closed?Boolean

Returns:

  • (Boolean)


63
64
65
# File 'lib/celluloid/eventsource.rb', line 63

def closed?
  ready_state == CLOSED
end

#connected?Boolean

Returns:

  • (Boolean)


59
60
61
# File 'lib/celluloid/eventsource.rb', line 59

def connected?
  ready_state == OPEN
end

#listenObject



67
68
69
70
71
72
73
74
75
76
77
# File 'lib/celluloid/eventsource.rb', line 67

def listen
  while !closed?
    begin
      establish_connection
      chunked? ? process_chunked_stream : process_stream
    rescue Exception => e
      logger.debug("[EventSource] Reconnecting after exception: #{e}")
    end
    sleep @reconnect_timeout
  end
end

#listen_for_heartbeatsObject



79
80
81
82
83
84
85
86
# File 'lib/celluloid/eventsource.rb', line 79

def listen_for_heartbeats
  @logger.debug("[EventSource] Starting listening for heartbeats. Reconnecting after #{@heartbeat_timeout} seconds if no comments are received")
  @heartbeat_task.cancel if @heartbeat_task
  @heartbeat_task = Concurrent::ScheduledTask.new(@heartbeat_timeout){
    @logger.warn("[EventSource] Didn't get heartbeat after #{@heartbeat_timeout} seconds. Reconnecting.")
    @socket.close if @socket
  }.execute
end

#on(event_name, &action) ⇒ Object



95
96
97
# File 'lib/celluloid/eventsource.rb', line 95

def on(event_name, &action)
  @on[event_name.to_sym] = action
end

#on_error(&action) ⇒ Object



107
108
109
# File 'lib/celluloid/eventsource.rb', line 107

def on_error(&action)
  @on[:error] = action
end

#on_message(&action) ⇒ Object



103
104
105
# File 'lib/celluloid/eventsource.rb', line 103

def on_message(&action)
  @on[:message] = action
end

#on_open(&action) ⇒ Object



99
100
101
# File 'lib/celluloid/eventsource.rb', line 99

def on_open(&action)
  @on[:open] = action
end