Class: TinyQ::Funnel

Inherits:
Object
  • Object
show all
Defined in:
lib/tinyq/funnel.rb

Overview

Funnel will receive messages on the internal queue and will dispatch to subscribers

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(n, b = false) ⇒ Funnel

Returns a new instance of Funnel.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/tinyq/funnel.rb', line 13

def initialize(n, b = false)
    @name = n
    @broadcaster = b
    @queue = EventMachine::Queue.new
    @subscribers = {}
    @buckets = []

    cb = Proc.new do |event|
        $LOG.debug("Funnel #{@name} - Callback")
        if !@subscribers.empty?
            # OK we can dequeue from bucket since we have somewhere to send it
            bucket = event[:Bucket]
            message,message_id = bucket.dequeue(self)

            $LOG.debug("Funnel #{@name} - Callback got '#{message_id}'")

            if message != nil
                if @broadcaster
                    $LOG.debug("Funnel #{@name} - Broadcasting #{message_id}")
                    @subscribers.each do |c,subscriber|
                        if subscriber.put_message(bucket, self, message, message_id)
                            # Subscriber is done removing
                            $LOG.debug("Funnel #{@name} - Subscriber received requested count")
                            self.remove_connection(c)
                        end
                    end
                else
                    $LOG.debug("Funnel #{@name} - Unicasting #{message_id}")
                    c = @subscribers.keys[0]
                    subscriber = @subscribers[c]
                    $LOG.debug("Funnel #{@name} - Unicasting to #{subscriber.connection.ip}:#{subscriber.connection.port}")
                    if subscriber.put_message(bucket, self, message, message_id)
                        # Subscriber is done removing
                        $LOG.debug("Funnel #{@name} - Subscriber #{subscriber.connection.ip}:#{subscriber.connection.port} received requested count")
                        self.remove_connection(c)
                    end
                end
            else
                $LOG.debug("Funnel #{@name} - Callback noop")
            end
        end

        # Wait for next event
        @queue.pop &cb
    end

    @queue.pop &cb
end

Instance Attribute Details

#broadcasterObject

Returns the value of attribute broadcaster.



11
12
13
# File 'lib/tinyq/funnel.rb', line 11

def broadcaster
  @broadcaster
end

#bucketsObject

Returns the value of attribute buckets.



9
10
11
# File 'lib/tinyq/funnel.rb', line 9

def buckets
  @buckets
end

#nameObject

Returns the value of attribute name.



7
8
9
# File 'lib/tinyq/funnel.rb', line 7

def name
  @name
end

#queueObject (readonly)

Returns the value of attribute queue.



8
9
10
# File 'lib/tinyq/funnel.rb', line 8

def queue
  @queue
end

#subscribersObject

Returns the value of attribute subscribers.



10
11
12
# File 'lib/tinyq/funnel.rb', line 10

def subscribers
  @subscribers
end

Instance Method Details

#add_connection(c, n = 1) ⇒ Object



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/tinyq/funnel.rb', line 67

def add_connection(c,n = 1)
    subscriber = @subscribers[c]
    if nil == subscriber
        subscriber = Subscriber.new(c,n)
        @subscribers[c] = subscriber
    end

    # At this point, we need to see if any buckets we are connected to
    # have pending messages
    @buckets.each do |bucket|
        if !bucket.messages.empty?
            self.notify(bucket)
        end
    end
end

#notify(bucket) ⇒ Object

Method called by bucket when messages are available



63
64
65
# File 'lib/tinyq/funnel.rb', line 63

def notify(bucket)
    @queue.push({:Event => "New Message", :Bucket => bucket})
end

#remove_connection(c) ⇒ Object



83
84
85
86
# File 'lib/tinyq/funnel.rb', line 83

def remove_connection(c)
    $LOG.debug("Funnel #{@name} - Removing subscriber #{c.ip}:#{c.port}")
    @subscribers.delete(c)
end