Module: EventMachine::Bucketer::Base

Included in:
InMemory, Redis
Defined in:
lib/em-bucketer/base.rb

Instance Method Summary collapse

Instance Method Details

#add_item(bucket_id, item_id, item, &blk) ⇒ Object

Adds a item to the specified bucket and calls the block when it is done

the bucket to put the item in of the item (used to ensure uniqueness within a bucket) placed in the bucket

Parameters:

  • bucket_id (String)

    the bucket id of

  • item_id (String)

    the item_id

  • item (Object)

    the item to be



22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/em-bucketer/base.rb', line 22

def add_item(bucket_id, item_id, item, &blk)
  add_timer_if_first(bucket_id)
  EM::Completion.new.tap do |c|
    c.callback(&blk) if block_given?
    add_bucket_to_db(bucket_id, item_id, item).callback do
      c.succeed
    end.errback do |e|
      c.fail e
    end
    check_bucket_full(bucket_id)
  end
end

#empty_bucket(bucket_id, &blk) ⇒ Object

Empty a bucket

of the bucket you want to empty

Parameters:

  • bucket_id (String)

    the bucket id



108
109
110
111
112
113
114
115
116
117
118
# File 'lib/em-bucketer/base.rb', line 108

def empty_bucket(bucket_id, &blk)
  EM::Completion.new.tap do |c|
    c.callback(&blk) if block_given?
    empty_bucket_in_db(bucket_id).callback do
      clear_timer(bucket_id)
      c.succeed
    end.errback do |e|
      c.fail e
    end
  end
end

#get_and_empty_bucket(bucket_id) {|Array| ... } ⇒ Object

Get the contents of a bucket then empty it

of the bucket you want to get into the bucket

Parameters:

  • bucket_id (String)

    the bucket id

Yields:

  • (Array)

    the items you put



89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/em-bucketer/base.rb', line 89

def get_and_empty_bucket(bucket_id, &blk)
  EM::Completion.new.tap do |c|
    c.callback(&blk) if block_given?
    get_bucket(bucket_id).callback do |contents|
      empty_bucket(bucket_id).callback do
        c.succeed contents
      end.errback do |e|
        c.fail e
      end
    end.errback do |e|
      c.fail e
    end
  end
end

#get_bucket(bucket_id) {|Array| ... } ⇒ Object

Get the contents of a bucket.

of the bucket you want to get into the bucket

Parameters:

  • bucket_id (String)

    the bucket id

Yields:

  • (Array)

    the items you put



72
73
74
75
76
77
78
79
80
81
# File 'lib/em-bucketer/base.rb', line 72

def get_bucket(bucket_id, &blk)
  EM::Completion.new.tap do |c|
    c.callback(&blk) if block_given?
    get_bucket_from_db(bucket_id).callback do |bucket|
      c.succeed bucket.values
    end.errback do |e|
      c.fail e
    end
  end
end

#on_bucket_full {|String| ... } ⇒ Object

Used to set a callback hook for when a bucket reaches the threshold size. It is IMPORTANT to note that the bucket will not automatically be emptied you must call empty_bucket if you want the bucket to be emptied. Also the callback will be called every time a item is added until the bucket is emptied.

Yields:

  • (String)

    The bucket id of the full bucket



44
45
46
# File 'lib/em-bucketer/base.rb', line 44

def on_bucket_full(&blk)
  @on_bucket_full_callbacks << blk
end

#on_bucket_timeout {|String| ... } ⇒ Object

Used to set a callback hook for when a bucket reaches the time limit. It is IMPORTANT to note that the bucket will not automatically be emptied you must call empty_bucket if you want the bucket to be emptied.

This timer is started once the bucket gets its first item and is cleared only when the bucket is emptied. The callback will only be called once at this time and then not again unless you empty the bucket and add something again.

Yields:

  • (String)

    The bucket id of the full bucket



62
63
64
# File 'lib/em-bucketer/base.rb', line 62

def on_bucket_timeout(&blk)
  @on_bucket_timeout_callbacks << blk
end

#setup(bucket_threshold_size, bucket_max_age) ⇒ Object



3
4
5
6
7
8
9
10
# File 'lib/em-bucketer/base.rb', line 3

def setup(bucket_threshold_size, bucket_max_age)
  @bucket_threshold_size = bucket_threshold_size
  @bucket_max_age = bucket_max_age
  @buckets = {}
  @on_bucket_full_callbacks = []
  @on_bucket_timeout_callbacks = []
  @buckets_with_timers = Set.new
end