Class: Celluloid::EventSource

Inherits:
Object
  • Object
show all
Includes:
IO, Internals::Logger
Defined in:
lib/celluloid/eventsource.rb,
lib/celluloid/eventsource/version.rb,
lib/celluloid/eventsource/event_parser.rb,
lib/celluloid/eventsource/response_parser.rb

Defined Under Namespace

Classes: EventParser, ReadTimeout, ResponseParser, UnexpectedContentType

Constant Summary collapse

CONNECTING =
0
OPEN =
1
CLOSED =
2
MAX_RECONNECT_TIME =
30
VERSION =
"0.11.0"

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(uri, options = {}) {|_self| ... } ⇒ EventSource

Constructor for an EventSource.

Parameters:

  • uri (String)

    the event stream URI

  • opts (Hash)

    the configuration options

Yields:

  • (_self)

Yield Parameters:



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/celluloid/eventsource.rb', line 43

def initialize(uri, options = {})
  self.url = uri
  options  = options.dup
  @ready_state = CONNECTING
  @with_credentials = options.delete(:with_credentials) { false }
  @headers = default_request_headers.merge(options.fetch(:headers, {}))
  @read_timeout = options.fetch(:read_timeout, 0).to_i
  proxy = ENV['HTTP_PROXY'] || ENV['http_proxy'] || options[:proxy]
  if proxy
    proxyUri = URI(proxy)
    if proxyUri.scheme == 'http' || proxyUri.scheme == 'https'
      @proxy = proxyUri
    end
  end

  @reconnect_timeout = options.fetch(:reconnect_delay, 1)
  @on = { open: ->{}, message: ->(_) {}, error: ->(_) {} }

  @chunked = false

  yield self if block_given?

  async.listen
end

Instance Attribute Details

#ready_stateObject (readonly)

Returns the value of attribute ready_state.



22
23
24
# File 'lib/celluloid/eventsource.rb', line 22

def ready_state
  @ready_state
end

#urlObject

Returns the value of attribute url.



21
22
23
# File 'lib/celluloid/eventsource.rb', line 21

def url
  @url
end

#with_credentialsObject (readonly)

Returns the value of attribute with_credentials.



21
22
23
# File 'lib/celluloid/eventsource.rb', line 21

def with_credentials
  @with_credentials
end

Instance Method Details

#closeObject



94
95
96
97
# File 'lib/celluloid/eventsource.rb', line 94

def close
  @socket.close if @socket
  @ready_state = CLOSED
end

#closed?Boolean

Returns:

  • (Boolean)


76
77
78
# File 'lib/celluloid/eventsource.rb', line 76

def closed?
  ready_state == CLOSED
end

#connected?Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/celluloid/eventsource.rb', line 72

def connected?
  ready_state == OPEN
end

#listenObject



80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/celluloid/eventsource.rb', line 80

def listen
  while !closed?
    begin
      establish_connection
      process_stream
    rescue UnexpectedContentType
      raise  # Let these flow to the top
    rescue StandardError => e
      info "Reconnecting after exception: #{e}"
      # Just reconnect on runtime errors
    end
  end
end

#on(event_name, &action) ⇒ Object



99
100
101
# File 'lib/celluloid/eventsource.rb', line 99

def on(event_name, &action)
  @on[event_name.to_sym] = action
end

#on_error(&action) ⇒ Object



111
112
113
# File 'lib/celluloid/eventsource.rb', line 111

def on_error(&action)
  @on[:error] = action
end

#on_message(&action) ⇒ Object



107
108
109
# File 'lib/celluloid/eventsource.rb', line 107

def on_message(&action)
  @on[:message] = action
end

#on_open(&action) ⇒ Object



103
104
105
# File 'lib/celluloid/eventsource.rb', line 103

def on_open(&action)
  @on[:open] = action
end