Class: TinyQ::Bucket

Inherits:
Object
  • Object
show all
Defined in:
lib/tinyq/bucket.rb

Overview

Bucket, messages get dropped into the bucket and forwarded to the different funnels connected to that bucket

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(n, p = false) ⇒ Bucket

Returns a new instance of Bucket.



18
19
20
21
22
23
24
25
26
27
# File 'lib/tinyq/bucket.rb', line 18

def initialize(n, p = false)
    @name = n
    @permanent = p
    @messages = {}
    @message_ids = []
    @references = {}
    @pendings = {}
    @funnels = {}
    @uuid = UUID.new
end

Instance Attribute Details

#funnelsObject

Returns the value of attribute funnels.



14
15
16
# File 'lib/tinyq/bucket.rb', line 14

def funnels
  @funnels
end

#message_idsObject (readonly)

Returns the value of attribute message_ids.



10
11
12
# File 'lib/tinyq/bucket.rb', line 10

def message_ids
  @message_ids
end

#messagesObject (readonly)

Returns the value of attribute messages.



9
10
11
# File 'lib/tinyq/bucket.rb', line 9

def messages
  @messages
end

#nameObject

Returns the value of attribute name.



7
8
9
# File 'lib/tinyq/bucket.rb', line 7

def name
  @name
end

#pendingsObject (readonly)

Returns the value of attribute pendings.



12
13
14
# File 'lib/tinyq/bucket.rb', line 12

def pendings
  @pendings
end

#permanentObject

Returns the value of attribute permanent.



16
17
18
# File 'lib/tinyq/bucket.rb', line 16

def permanent
  @permanent
end

#referencesObject (readonly)

Returns the value of attribute references.



11
12
13
# File 'lib/tinyq/bucket.rb', line 11

def references
  @references
end

Instance Method Details

#dequeue(funnel) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/tinyq/bucket.rb', line 51

def dequeue(funnel)
    if !@messages.empty?
        message_id = nil

        @message_ids.each do |mid|
            $LOG.debug("Bucket #{@name} - #{mid} references: #{@references[mid]}")
            if @references[mid].count(funnel.name) > 0
                # That message was not de-referenced yet
                $LOG.debug("Bucket #{@name} - #{mid} -> #{funnel.name}")
                message_id = mid
                break
            end
        end

        if nil != message_id
            $LOG.debug("Bucket #{@name} - Sending #{message_id} to funnel #{funnel.name}")
            message = @messages[message_id]

            # Remove the given funnel from a reference
            @references[message_id].delete(funnel.name)
            # Add the given funnel to the pending list for that message
            @pendings[message_id].push(funnel.name)

            [message, message_id]
        else
            $LOG.debug("Bucket #{@name} - No more messages for funnel #{funnel.name}")
            [nil, nil]
        end
    else
        [nil, nil]
    end
end

#feed_funnel(funnel) ⇒ Object



116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/tinyq/bucket.rb', line 116

def feed_funnel(funnel)
    @funnels[funnel.name] = funnel
    # Add ourself to the list of buckets for that funnel
    if funnel.buckets.count self == 0
        funnel.buckets.push(self)
    end

    # If we have cached messages, notify new funnel
    if !@messages.empty?
        # Add the current funnels as a reference
        @message_ids.each do |mid|
            @references[mid].push(funnel.name)
        end
        $LOG.debug("Bucket #{@name} - Notifying funnel #{funnel.name}")
        funnel.notify(self)
    end

end

#funnel(name, broadcaster = false) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/tinyq/bucket.rb', line 103

def funnel(name, broadcaster = false)
    funnel = @funnels[name]
    if nil == funnel
        $LOG.info("Bucket #{@name} - Creating funnel #{name}")
        funnel = Funnel.new(name, broadcaster)
        feed_funnel(funnel)
    end
    # Update potential settings
    funnel.broadcaster = broadcaster

    funnel
end

#message_sent(funnel, message_id) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/tinyq/bucket.rb', line 84

def message_sent(funnel, message_id)
    $LOG.debug("Bucket #{@name} - Message #{message_id} sent on #{funnel.name}")
    @pendings[message_id].delete(funnel.name)

    # If no funnels are either pending or referenced, then message can be removed
    if @pendings[message_id].empty? && @references[message_id].empty?
        $LOG.debug("Bucket #{@name} - Purge message #{message_id}")
        # No more references, message can be deleted
        Permanent.remove "#{message_id}.dat" unless !@permanent
        @messages.delete(message_id)
        @message_ids.delete(message_id)
        @references.delete(message_id)
        @pendings.delete(message_id)
        true
    end

    false
end

#put_message(message) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/tinyq/bucket.rb', line 29

def put_message(message)
    message_id = @uuid.generate
    #message[:__id] = message_id
    #message[:__sent] = Time.now.iso8601

    # If permantent bucket, then store
    Permanent.store message,"#{message_id}.dat", { :gzip => true } unless !@permanent

    @messages[message_id] = message
    @message_ids.push(message_id)
    @references[message_id] = @funnels.keys
    @pendings[message_id] = []

    if !@funnels.empty?
        # Put message in each funnel
        @funnels.each do |n,funnel|
            $LOG.debug("Bucket #{@name} - Notifying funnel #{funnel.name}")
            funnel.notify(self)
        end
    end
end