Class: Subscriber

Inherits:
Object
  • Object
show all
Defined in:
lib/nchan_tools/pubsub.rb

Defined Under Namespace

Classes: Client, EventSourceClient, HTTPChunkedClient, IntervalPollClient, Logger, LongPollClient, MultiparMixedClient, SubscriberError, WebSocketClient

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url, concurrency = 1, opt = {}) ⇒ Subscriber

Returns a new instance of Subscriber.



1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
# File 'lib/nchan_tools/pubsub.rb', line 1523

def initialize(url, concurrency=1, opt={})
  @care_about_message_ids=opt[:use_message_id].nil? ? true : opt[:use_message_id]
  @url=url
  @quit_message = opt[:quit_message]
  opt[:timeout] ||= 30
  opt[:connect_timeout] ||= 5
  #puts "Starting subscriber on #{url}"
  @Client_Class = Client.lookup(opt[:client] || :longpoll)
  if @Client_Class.nil?
    raise SubscriberError, "unknown client type #{opt[:client]}"
  end
  
  if !opt[:nostore] && opt[:nomsg]
    opt[:nomsg] = nil
    puts "nomsg reverted to false because nostore is false"
  end
  opt[:concurrency]=concurrency
  @concurrency = opt[:concurrency]
  @opt=opt
  if opt[:log]
    @log = Subscriber::Logger.new
    opt[:logger]=@log
  end
  new_client
  reset
end

Instance Attribute Details

#clientObject

Returns the value of attribute client.



1522
1523
1524
# File 'lib/nchan_tools/pubsub.rb', line 1522

def client
  @client
end

#client_classObject

Returns the value of attribute client_class.



1522
1523
1524
# File 'lib/nchan_tools/pubsub.rb', line 1522

def client_class
  @client_class
end

#concurrencyObject

Returns the value of attribute concurrency.



1522
1523
1524
# File 'lib/nchan_tools/pubsub.rb', line 1522

def concurrency
  @concurrency
end

#errorsObject

Returns the value of attribute errors.



1522
1523
1524
# File 'lib/nchan_tools/pubsub.rb', line 1522

def errors
  @errors
end

#finishedObject

Returns the value of attribute finished.



1522
1523
1524
# File 'lib/nchan_tools/pubsub.rb', line 1522

def finished
  @finished
end

#logObject

Returns the value of attribute log.



1522
1523
1524
# File 'lib/nchan_tools/pubsub.rb', line 1522

def log
  @log
end

#max_round_tripsObject

Returns the value of attribute max_round_trips.



1522
1523
1524
# File 'lib/nchan_tools/pubsub.rb', line 1522

def max_round_trips
  @max_round_trips
end

#messagesObject

Returns the value of attribute messages.



1522
1523
1524
# File 'lib/nchan_tools/pubsub.rb', line 1522

def messages
  @messages
end

#quit_messageObject

Returns the value of attribute quit_message.



1522
1523
1524
# File 'lib/nchan_tools/pubsub.rb', line 1522

def quit_message
  @quit_message
end

#urlObject

Returns the value of attribute url.



1522
1523
1524
# File 'lib/nchan_tools/pubsub.rb', line 1522

def url
  @url
end

#waitingObject

Returns the value of attribute waiting.



1522
1523
1524
# File 'lib/nchan_tools/pubsub.rb', line 1522

def waiting
  @waiting
end

Instance Method Details

#abortObject



1563
1564
1565
# File 'lib/nchan_tools/pubsub.rb', line 1563

def abort
  @client.terminate
end

#errors?Boolean

Returns:

  • (Boolean)


1566
1567
1568
# File 'lib/nchan_tools/pubsub.rb', line 1566

def errors?
  not no_errors?
end

#make_error(client, what, code, msg, failword = " failed") ⇒ Object



1632
1633
1634
# File 'lib/nchan_tools/pubsub.rb', line 1632

def make_error(client, what, code, msg, failword=" failed")
  "#{client.class.name.split('::').last} #{what}#{failword}: #{msg} (code #{code})"
end

#match_errors(regex) ⇒ Object



1572
1573
1574
1575
1576
1577
1578
# File 'lib/nchan_tools/pubsub.rb', line 1572

def match_errors(regex)
  return false if no_errors?
  @errors.each do |err|
    return false unless err =~ regex
  end
  true
end

#new_clientObject



1549
1550
1551
# File 'lib/nchan_tools/pubsub.rb', line 1549

def new_client
  @client=@Client_Class.new self, @opt
end

#no_errors?Boolean

Returns:

  • (Boolean)


1569
1570
1571
# File 'lib/nchan_tools/pubsub.rb', line 1569

def no_errors?
  @errors.empty?
end

#on_failure(err = nil, nostore = false, &block) ⇒ Object



1636
1637
1638
1639
1640
1641
1642
1643
# File 'lib/nchan_tools/pubsub.rb', line 1636

def on_failure(err=nil, nostore=false, &block)
  if block_given?
    @on_failure=block
  else
    @errors << err.to_s unless nostore
    @on_failure.call(err.to_s, err.bundle) if @on_failure.respond_to? :call
  end
end

#on_message(msg = nil, bundle = nil, &block) ⇒ Object



1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
# File 'lib/nchan_tools/pubsub.rb', line 1618

def on_message(msg=nil, bundle=nil, &block)
  #puts "received message #{msg && msg.to_s[0..15]}"
  if block_given?
    @on_message=block
  else
    @messages << msg if @messages
    if @quit_message == msg.to_s
      @on_message.call(msg, bundle) if @on_message
      return false 
    end
    @on_message.call(msg, bundle) if @on_message
  end
end

#resetObject



1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
# File 'lib/nchan_tools/pubsub.rb', line 1552

def reset
  @errors=[]
  unless @nostore
    @messages=MessageStore.new :noid => !(client.provides_msgid? && @care_about_message_ids)
    @messages.name="sub"
  end
  @waiting=0
  @finished=0
  new_client if terminated?
  self
end

#runObject



1581
1582
1583
1584
1585
1586
1587
1588
1589
# File 'lib/nchan_tools/pubsub.rb', line 1581

def run
  begin
    client.current_actor
  rescue Celluloid::DeadActorError
    return false
  end
  @client.async.run
  self
end

#stopObject



1590
1591
1592
1593
1594
1595
1596
1597
# File 'lib/nchan_tools/pubsub.rb', line 1590

def stop
  begin
    @client.stop
  rescue Celluloid::DeadActorError
    return false
  end
  true
end

#terminateObject



1598
1599
1600
1601
1602
1603
1604
1605
# File 'lib/nchan_tools/pubsub.rb', line 1598

def terminate
  begin
    @client.terminate
  rescue Celluloid::DeadActorError
    return false
  end
  true
end

#terminated?Boolean

Returns:

  • (Boolean)


1606
1607
1608
1609
1610
1611
1612
1613
# File 'lib/nchan_tools/pubsub.rb', line 1606

def terminated?
  begin
    client.current_actor unless client == nil
  rescue Celluloid::DeadActorError
    return true
  end
  false
end

#wait(until_what = nil, timeout = nil) ⇒ Object



1614
1615
1616
# File 'lib/nchan_tools/pubsub.rb', line 1614

def wait(until_what=nil, timeout = nil)
  @client.poke until_what, timeout
end