Class: Celluloid::EventSource
- Inherits:
-
Object
- Object
- Celluloid::EventSource
show all
- Includes:
- IO
- Defined in:
- lib/celluloid/eventsource.rb,
lib/celluloid/eventsource/version.rb,
lib/celluloid/eventsource/response_parser.rb
Defined Under Namespace
Classes: MessageEvent, ResponseParser
Constant Summary
collapse
- CONNECTING =
0
- OPEN =
1
- CLOSED =
2
- MAX_RETRIES =
2^31 since our retries library doesn’t allow for unlimited retries. At an average of 1 second per retry, we’ll still be retrying in 68 years.
2147483648
- VERSION =
"0.8.4"
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(uri, options = {}) {|_self| ... } ⇒ EventSource
Returns a new instance of EventSource.
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
# File 'lib/celluloid/eventsource.rb', line 28
def initialize(uri, options = {})
self.url = uri
options = options.dup
@ready_state = CONNECTING
@with_credentials = options.delete(:with_credentials) { false }
@heartbeat_timeout = options.delete(:heartbeat_timeout) { 300 }
@logger = options.delete(:logger) { default_logger }
@logger.info("[EventSource] Starting client connecting to url: #{self.url} with heartbeat timeout: #{@heartbeat_timeout} seconds")
@headers = .merge(options.fetch(:headers, {}))
@event_type_buffer = ""
@last_event_id_buffer = ""
@data_buffer = ""
@last_event_id = String.new
@reconnect_timeout = 1
@on = { open: ->{}, message: ->(_) {}, error: ->(_) {}}
@parser = ResponseParser.new
@chunked = false
yield self if block_given?
async.listen
end
|
Instance Attribute Details
#heartbeat_timeout ⇒ Object
Returns the value of attribute heartbeat_timeout.
15
16
17
|
# File 'lib/celluloid/eventsource.rb', line 15
def heartbeat_timeout
@heartbeat_timeout
end
|
#logger ⇒ Object
Returns the value of attribute logger.
15
16
17
|
# File 'lib/celluloid/eventsource.rb', line 15
def logger
@logger
end
|
#ready_state ⇒ Object
Returns the value of attribute ready_state.
16
17
18
|
# File 'lib/celluloid/eventsource.rb', line 16
def ready_state
@ready_state
end
|
#url ⇒ Object
Returns the value of attribute url.
15
16
17
|
# File 'lib/celluloid/eventsource.rb', line 15
def url
@url
end
|
#with_credentials ⇒ Object
Returns the value of attribute with_credentials.
15
16
17
|
# File 'lib/celluloid/eventsource.rb', line 15
def with_credentials
@with_credentials
end
|
Instance Method Details
#close ⇒ Object
88
89
90
91
92
93
|
# File 'lib/celluloid/eventsource.rb', line 88
def close
@logger.info("[EventSource] Closing client")
@heartbeat_task.cancel if @heartbeat_task
@socket.close if @socket
@ready_state = CLOSED
end
|
#closed? ⇒ Boolean
63
64
65
|
# File 'lib/celluloid/eventsource.rb', line 63
def closed?
ready_state == CLOSED
end
|
#connected? ⇒ Boolean
59
60
61
|
# File 'lib/celluloid/eventsource.rb', line 59
def connected?
ready_state == OPEN
end
|
#listen ⇒ Object
67
68
69
70
71
72
73
74
75
76
77
|
# File 'lib/celluloid/eventsource.rb', line 67
def listen
while !closed?
begin
establish_connection
chunked? ? process_chunked_stream : process_stream
rescue Exception => e
logger.debug("[EventSource] Reconnecting after exception: #{e}")
end
sleep @reconnect_timeout
end
end
|
#listen_for_heartbeats ⇒ Object
79
80
81
82
83
84
85
86
|
# File 'lib/celluloid/eventsource.rb', line 79
def listen_for_heartbeats
@logger.debug("[EventSource] Starting listening for heartbeats. Reconnecting after #{@heartbeat_timeout} seconds if no comments are received")
@heartbeat_task.cancel if @heartbeat_task
@heartbeat_task = Concurrent::ScheduledTask.new(@heartbeat_timeout){
@logger.warn("[EventSource] Didn't get heartbeat after #{@heartbeat_timeout} seconds. Reconnecting.")
@socket.close if @socket
}.execute
end
|
#on(event_name, &action) ⇒ Object
95
96
97
|
# File 'lib/celluloid/eventsource.rb', line 95
def on(event_name, &action)
@on[event_name.to_sym] = action
end
|
#on_error(&action) ⇒ Object
107
108
109
|
# File 'lib/celluloid/eventsource.rb', line 107
def on_error(&action)
@on[:error] = action
end
|
#on_message(&action) ⇒ Object
103
104
105
|
# File 'lib/celluloid/eventsource.rb', line 103
def on_message(&action)
@on[:message] = action
end
|
#on_open(&action) ⇒ Object
99
100
101
|
# File 'lib/celluloid/eventsource.rb', line 99
def on_open(&action)
@on[:open] = action
end
|