Class: Subscriber::Client
- Inherits:
-
Object
- Object
- Subscriber::Client
show all
- Defined in:
- lib/nchan_tools/pubsub.rb
Defined Under Namespace
Classes: ErrorResponse, ParserBundle
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(subscriber, arg = {}) ⇒ Client
Returns a new instance of Client.
395
396
397
398
399
|
# File 'lib/nchan_tools/pubsub.rb', line 395
def initialize(subscriber, arg={})
@notready = 9000
@cooked_ready=Celluloid::Condition.new
@logger = arg[:logger]
end
|
Instance Attribute Details
#concurrency ⇒ Object
Returns the value of attribute concurrency.
254
255
256
|
# File 'lib/nchan_tools/pubsub.rb', line 254
def concurrency
@concurrency
end
|
Class Method Details
.aliases ⇒ Object
284
285
286
|
# File 'lib/nchan_tools/pubsub.rb', line 284
def self.aliases
[]
end
|
.inherited(subclass) ⇒ Object
273
274
275
276
|
# File 'lib/nchan_tools/pubsub.rb', line 273
def self.inherited(subclass)
@@inherited||=[]
@@inherited << subclass
end
|
.lookup(name) ⇒ Object
278
279
280
281
282
283
|
# File 'lib/nchan_tools/pubsub.rb', line 278
def self.lookup(name)
@@inherited.each do |klass|
return klass if klass.aliases.include? name
end
nil
end
|
.unique_aliases ⇒ Object
288
289
290
291
292
293
294
|
# File 'lib/nchan_tools/pubsub.rb', line 288
def self.unique_aliases
uniqs=[]
@@inherited.each do |klass|
uniqs << klass.aliases.first if klass.aliases.length > 0
end
uniqs
end
|
Instance Method Details
#error(code, msg, bundle = nil) ⇒ Object
300
301
302
303
304
|
# File 'lib/nchan_tools/pubsub.rb', line 300
def error(code, msg, bundle=nil)
err=ErrorResponse.new code, msg, bundle, @error_what, @error_failword
err.caller=self
err
end
|
#handle_bundle_error(bundle, msg, err) ⇒ Object
374
375
376
377
378
379
380
381
|
# File 'lib/nchan_tools/pubsub.rb', line 374
def handle_bundle_error(bundle, msg, err)
if err && !(EOFError === err)
msg="<#{msg}>\n#{err.backtrace.join "\n"}"
end
@subscriber.on_failure error(0, msg, bundle)
@subscriber.finished+=1
close bundle
end
|
#poke(what = nil, timeout = nil) ⇒ Object
383
384
385
386
387
388
389
390
391
392
393
|
# File 'lib/nchan_tools/pubsub.rb', line 383
def poke(what=nil, timeout = nil)
begin
if what == :ready
(@notready.nil? || @notready > 0) && @cooked_ready.wait(timeout)
else
@connected > 0 && @cooked.wait(timeout)
end
rescue Celluloid::ConditionError => e
end
end
|
#provides_msgid? ⇒ Boolean
296
297
298
|
# File 'lib/nchan_tools/pubsub.rb', line 296
def provides_msgid?
true
end
|
#run ⇒ Object
401
402
403
|
# File 'lib/nchan_tools/pubsub.rb', line 401
def run
raise SubscriberError, "Not Implemented"
end
|
#stop(msg = "Stopped", src_bundle = nil) ⇒ Object
405
406
407
408
|
# File 'lib/nchan_tools/pubsub.rb', line 405
def stop(msg = "Stopped", src_bundle = nil)
@subscriber.on_failure error(0, msg, src_bundle)
@logger.log :subscriber, :stop if @logger
end
|