Class: Subscriber::EventSourceClient

Inherits:
LongPollClient show all
Includes:
Celluloid::IO
Defined in:
lib/nchan_tools/pubsub.rb

Defined Under Namespace

Classes: EventSourceParser

Instance Attribute Summary

Attributes inherited from LongPollClient

#timeout

Attributes inherited from Client

#concurrency

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from LongPollClient

#close, #initialize, #listen, #request_code_ok, #run, #stop

Methods inherited from Client

#handle_bundle_error, inherited, #initialize, lookup, #poke, #provides_msgid?, #run, #stop, unique_aliases

Constructor Details

This class inherits a constructor from Subscriber::LongPollClient

Class Method Details

.aliasesObject



1159
1160
1161
# File 'lib/nchan_tools/pubsub.rb', line 1159

def self.aliases
  [:eventsource, :sse]
end

Instance Method Details

#error(c, m, cn = nil) ⇒ Object



1163
1164
1165
1166
1167
# File 'lib/nchan_tools/pubsub.rb', line 1163

def error(c,m,cn=nil)
  @error_what ||= [ "#{@http2 ? 'HTTP/2' : 'HTTP'} Request failed", "connection closed" ]
  @error_failword ||= ""
  super
end

#new_bundle(uri, opt = {}) ⇒ Object



1225
1226
1227
1228
# File 'lib/nchan_tools/pubsub.rb', line 1225

def new_bundle(uri, opt={})
  opt[:accept]="text/event-stream"
  super
end

#setup_bundle(b) ⇒ Object



1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
# File 'lib/nchan_tools/pubsub.rb', line 1230

def setup_bundle(b)
  b.on_headers do |code, headers|
    if code == 200
      @notready-=1
      @cooked_ready.signal true if @notready == 0
      b.connected = true
    end
  end
  b.buffer_body!
  b.subparser=EventSourceParser.new
  b.on_chunk do |chunk|
    while b.body_buf.slice! /^.*\n/ do
      b.subparser.parse_line $~[0]
    end
  end
  b.on_error do |msg, err|
    if EOFError === err && !b.subparser.buf_empty?
      b.subparser.parse_line "\n"
    end
    handle_bundle_error b, msg, err
  end
  
  b.on_response do |code, headers, body|
    if code != 200
      @subscriber.on_failure error(code, "", b)
      @subscriber.finished+=1
    else
      if !b.subparser.buf_empty?
        b.subparser.parse_line "\n"
      else
        @subscriber.on_failure error(0, "Response completed unexpectedly", b)
      end
      @subscriber.finished+=1
    end
    close b
  end
  
  b.subparser.on_event do |evt, data, evt_id|
    case evt 
    when :comment
      if data.match(/^(?<code>\d+): (?<message>.*)/)
        @subscriber.on_failure error($~[:code].to_i, $~[:message], b)
        @subscriber.finished+=1
        close b
      end
    else
      @timer.reset if @timer
      unless @nomsg
        msg=Message.new data.dup
        msg.id=evt_id
        msg.eventsource_event=evt
      else
        msg=data
      end
      if @subscriber.on_message(msg, b) == false
        @subscriber.finished+=1
        close b
      end
    end
  end
  b
end