Class: Faye::EventSource

Inherits:
Object
  • Object
show all
Includes:
WebSocket::API
Defined in:
lib/faye/eventsource.rb

Defined Under Namespace

Classes: Stream

Constant Summary collapse

DEFAULT_RETRY =
5

Constants included from WebSocket::API::ReadyStates

WebSocket::API::ReadyStates::CLOSED, WebSocket::API::ReadyStates::CLOSING, WebSocket::API::ReadyStates::CONNECTING, WebSocket::API::ReadyStates::OPEN

Instance Attribute Summary collapse

Attributes included from WebSocket::API

#buffered_amount

Attributes included from WebSocket::API::EventTarget

#onclose, #onerror, #onmessage, #onopen

Class Method Summary collapse

Instance Method Summary collapse

Methods included from WebSocket::API

#receive

Methods included from WebSocket::API::EventTarget

#add_event_listener, #dispatch_event, #remove_event_listener

Constructor Details

#initialize(env, options = {}) ⇒ EventSource

Returns a new instance of EventSource.



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/faye/eventsource.rb', line 26

def initialize(env, options = {})
  @env    = env
  @ping   = options[:ping]
  @retry  = (options[:retry] || DEFAULT_RETRY).to_f
  @url    = EventSource.determine_url(env)
  @stream = Stream.new(self)
  
  @ready_state = CONNECTING
  @send_buffer = []
  EventMachine.next_tick { open }
  
  callback = @env['async.callback']
  callback.call([101, {}, @stream])
  
  @stream.write("HTTP/1.1 200 OK\r\n" +
                "Content-Type: text/event-stream\r\n" +
                "Cache-Control: no-cache, no-store\r\n" +
                "Connection: close\r\n" +
                "\r\n\r\n" +
                "retry: #{ (@retry * 1000).floor }\r\n\r\n")
  
  @ready_state = OPEN
  
  if @ping
    @ping_timer = EventMachine.add_periodic_timer(@ping) { ping }
  end
end

Instance Attribute Details

#envObject (readonly)

Returns the value of attribute env.



8
9
10
# File 'lib/faye/eventsource.rb', line 8

def env
  @env
end

#ready_stateObject (readonly)

Returns the value of attribute ready_state.



8
9
10
# File 'lib/faye/eventsource.rb', line 8

def ready_state
  @ready_state
end

#urlObject (readonly)

Returns the value of attribute url.



8
9
10
# File 'lib/faye/eventsource.rb', line 8

def url
  @url
end

Class Method Details

.determine_url(env) ⇒ Object



15
16
17
18
19
20
21
22
23
24
# File 'lib/faye/eventsource.rb', line 15

def self.determine_url(env)
  secure = if env.has_key?('HTTP_X_FORWARDED_PROTO')
             env['HTTP_X_FORWARDED_PROTO'] == 'https'
           else
             env['HTTP_ORIGIN'] =~ /^https:/i
           end
  
  scheme = secure ? 'https:' : 'http:'
  "#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }"
end

.eventsource?(env) ⇒ Boolean

Returns:

  • (Boolean)


10
11
12
13
# File 'lib/faye/eventsource.rb', line 10

def self.eventsource?(env)
  accept = (env['HTTP_ACCEPT'] || '').split(/\s*,\s*/)
  accept.include?('text/event-stream')
end

Instance Method Details

#closeObject



82
83
84
85
86
87
88
89
90
# File 'lib/faye/eventsource.rb', line 82

def close
  return if [CLOSING, CLOSED].include?(@ready_state)
  @ready_state = CLOSED
  EventMachine.cancel_timer(@ping_timer)
  @stream.close_connection_after_writing
  event = WebSocket::API::Event.new('close')
  event.init_event('close', false, false)
  dispatch_event(event)
end

#last_event_idObject



54
55
56
# File 'lib/faye/eventsource.rb', line 54

def last_event_id
  @env['HTTP_LAST_EVENT_ID'] || ''
end

#ping(message = nil) ⇒ Object



77
78
79
80
# File 'lib/faye/eventsource.rb', line 77

def ping(message = nil)
  @stream.write(":\r\n\r\n")
  true
end

#rack_responseObject



58
59
60
# File 'lib/faye/eventsource.rb', line 58

def rack_response
  [ -1, {}, [] ]
end

#send(message, options = {}) ⇒ Object



62
63
64
65
66
67
68
69
70
71
72
73
74
75
# File 'lib/faye/eventsource.rb', line 62

def send(message, options = {})
  return false unless @ready_state == OPEN
  
  message = WebSocket.encode(message.to_s).
            gsub(/(\r\n|\r|\n)/, '\1data: ')
  
  frame  = ""
  frame << "event: #{options[:event]}\r\n" if options[:event]
  frame << "id: #{options[:id]}\r\n" if options[:id]
  frame << "data: #{message}\r\n\r\n"
  
  @stream.write(frame)
  true
end