Class: PgVersions::Connection::Subscription
- Inherits:
- 
      Object
      
        - Object
- PgVersions::Connection::Subscription
 
- Defined in:
- lib/pg_versions/pg_versions.rb
Instance Method Summary collapse
- #bump(*channels, notify: false) ⇒ Object
- #drop ⇒ Object
- 
  
    
      #initialize(inner, batch_delay)  ⇒ Subscription 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    A new instance of Subscription. 
- #notify(versions) ⇒ Object
- #read(*channels, notify: false) ⇒ Object
- #subscribe(channels, known: {}) ⇒ Object
- #unsubscribe(*channels) ⇒ Object
- #update_already_known_versions(new_already_known_versions) ⇒ Object
- 
  
    
      #wait(new_already_known_versions = {}, batch_delay: nil)  ⇒ Object 
    
    
  
  
  
  
  
  
  
  
  
    TODO: make this resume-able after forced exception. 
Constructor Details
#initialize(inner, batch_delay) ⇒ Subscription
Returns a new instance of Subscription.
| 497 498 499 500 501 502 503 | # File 'lib/pg_versions/pg_versions.rb', line 497 def initialize(inner, batch_delay) @inner = inner @batch_delay = batch_delay @notifications = Queue.new @already_known_versions = Hash.new { |h,k| h[k] = [] } @channels = Hash.new(0) end | 
Instance Method Details
#bump(*channels, notify: false) ⇒ Object
| 538 539 540 541 542 543 | # File 'lib/pg_versions/pg_versions.rb', line 538 def bump(*channels, notify: false) channels = @channels.keys if channels.size == 0 versions = @inner.bump(channels) update_already_known_versions(versions) if not notify versions end | 
#drop ⇒ Object
| 583 584 585 586 587 | # File 'lib/pg_versions/pg_versions.rb', line 583 def drop @notifications << nil @inner.unsubscribe(self, @channels.keys) if @channels.keys.size > 0 #TODO: what to do if this object gets used after drop? end | 
#notify(versions) ⇒ Object
| 578 579 580 | # File 'lib/pg_versions/pg_versions.rb', line 578 def notify(versions) @notifications << versions end | 
#read(*channels, notify: false) ⇒ Object
| 530 531 532 533 534 535 | # File 'lib/pg_versions/pg_versions.rb', line 530 def read(*channels, notify: false) channels = @channels.keys if channels.size == 0 versions = @inner.read(channels) update_already_known_versions(versions) if not notify versions end | 
#subscribe(channels, known: {}) ⇒ Object
| 506 507 508 509 510 511 512 513 514 515 | # File 'lib/pg_versions/pg_versions.rb', line 506 def subscribe(channels, known: {}) update_already_known_versions(known) channels = [channels].flatten channels.select! { |channel| (@channels[channel] += 1) == 1 } if channels.size > 0 @inner.subscribe(self, channels) end end | 
#unsubscribe(*channels) ⇒ Object
| 518 519 520 521 522 523 524 525 526 527 | # File 'lib/pg_versions/pg_versions.rb', line 518 def unsubscribe(*channels) channels = [channels].flatten channels.select! { |channel| @channels[channel] -= 1 raise "Trying to unsubscribe from channel (%p) more times than it was subscribed to"%[channel] if @channels[channel] < 0 @channels.delete(channel) if @channels[channel] == 0 not @channels.has_key?(channel) } @inner.unsubscribe(self, channels) end | 
#update_already_known_versions(new_already_known_versions) ⇒ Object
| 590 591 592 593 594 | # File 'lib/pg_versions/pg_versions.rb', line 590 def update_already_known_versions(new_already_known_versions) new_already_known_versions.each { |channel, version| @already_known_versions[channel] = version if (version <=> @already_known_versions[channel]) == 1 } end | 
#wait(new_already_known_versions = {}, batch_delay: nil) ⇒ Object
TODO: make this resume-able after forced exception
| 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 | # File 'lib/pg_versions/pg_versions.rb', line 547 def wait(new_already_known_versions = {}, batch_delay: nil) batch_delay = @batch_delay if batch_delay.nil? update_already_known_versions(new_already_known_versions) loop { events = [@notifications.shift] sleep batch_delay if batch_delay events << @notifications.shift while not @notifications.empty? changed_versions = {} events.each { |versions| return nil if not versions #termination versions.each { |channel, version| if (@already_known_versions[channel] <=> version) == -1 @already_known_versions[channel] = version changed_versions[channel] = version end } } if changed_versions.size > 0 return Notification.new(changed_versions, @already_known_versions.dup) end } end |