Class: Funl::SubscriptionTracker

Inherits:
Object
  • Object
show all
Defined in:
lib/funl/subscription-tracker.rb

Overview

Threadsafe manager for a client’s subscriptions.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client) ⇒ SubscriptionTracker

Returns a new instance of SubscriptionTracker.



14
15
16
17
18
19
20
21
22
23
# File 'lib/funl/subscription-tracker.rb', line 14

def initialize client
  @client = client

  @subscribed_tags = []
  @subscribed_all = false

  @waiters = []
  @mon = Monitor.new
  @cvar = @mon.new_cond
end

Instance Attribute Details

#clientObject (readonly)

Returns the value of attribute client.



7
8
9
# File 'lib/funl/subscription-tracker.rb', line 7

def client
  @client
end

#subscribed_allObject (readonly)

Returns the value of attribute subscribed_all.



8
9
10
# File 'lib/funl/subscription-tracker.rb', line 8

def subscribed_all
  @subscribed_all
end

Instance Method Details

#subscribe(tags) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
# File 'lib/funl/subscription-tracker.rb', line 25

def subscribe tags
  @mon.synchronize do
    if (tags - @subscribed_tags).empty?
      return false
    else
      client.seq << Message.control(SUBSCRIBE, tags)
      wait {(tags - @subscribed_tags).empty?}
      return true
    end
  end
end

#subscribe_allObject



37
38
39
40
41
42
43
44
45
46
47
# File 'lib/funl/subscription-tracker.rb', line 37

def subscribe_all
  @mon.synchronize do
    if @subscribed_all
      return false
    else
      client.seq << Message.control(SUBSCRIBE_ALL)
      wait {@subscribed_all}
      return true
    end
  end
end

#subscribed_tagsObject



10
11
12
# File 'lib/funl/subscription-tracker.rb', line 10

def subscribed_tags
  @subscribed_tags.dup
end

#unsubscribe(tags) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
# File 'lib/funl/subscription-tracker.rb', line 49

def unsubscribe tags
  @mon.synchronize do
    if (tags & @subscribed_tags).empty?
      return false
    else
      client.seq << Message.control(UNSUBSCRIBE, tags)
      wait {(tags & @subscribed_tags).empty?}
      return true
    end
  end
end

#unsubscribe_allObject



61
62
63
64
65
66
67
68
69
70
71
# File 'lib/funl/subscription-tracker.rb', line 61

def unsubscribe_all
  @mon.synchronize do
    if !@subscribed_all
      return false
    else
      client.seq << Message.control(UNSUBSCRIBE_ALL)
      wait {!@subscribed_all}
      return true
    end
  end
end

#update(op_type, tags = nil) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/funl/subscription-tracker.rb', line 73

def update op_type, tags=nil
  @mon.synchronize do
    case op_type
    when SUBSCRIBE;       @subscribed_tags |= tags
    when SUBSCRIBE_ALL;   @subscribed_all = true
    when UNSUBSCRIBE;     @subscribed_tags -= tags
    when UNSUBSCRIBE_ALL; @subscribed_all = false
    else raise ArgumentError
    end

    @cvar.broadcast
  end
end

#waitObject



87
88
89
90
91
# File 'lib/funl/subscription-tracker.rb', line 87

def wait
  until yield
    @cvar.wait ## timeout?
  end
end