Class: Conveyor::Channel

Inherits:
BaseChannel show all
Defined in:
lib/conveyor/channel.rb

Overview

Channel

A basic channel.

Constant Summary

Constants inherited from BaseChannel

BaseChannel::BLOCK_SIZE, BaseChannel::BUCKET_SIZE, BaseChannel::CACHE_SIZE, BaseChannel::FORMAT_VERSION, BaseChannel::NAME_PATTERN

Instance Method Summary collapse

Methods inherited from BaseChannel

#commit, #delete!, #get, #get_nearest_after_timestamp, #inspect, parse_headers, #rebuild_index, valid_channel_name?

Constructor Details

#initialize(directory) ⇒ Channel

If directory doesn’t already exist, it will be created during initialization.



10
11
12
13
14
15
16
17
18
19
# File 'lib/conveyor/channel.rb', line 10

def initialize directory
  @group_iterators       = {}
  @group_iterators_files = {}
  @iterator_lock         = Mutex.new
  @group_iterator_locks  = Hash.new{|k,v| Mutex.new }

  super(directory)

  @iterator_file.sync    = true
end

Instance Method Details

#get_nextObject

Returns the next item from the global (non-group) iterator.



27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/conveyor/channel.rb', line 27

def get_next
  r = nil
  iterator_lock do
    if @iterator <= @last_id
      r = get(@iterator)
      @iterator += 1
      @iterator_file.write("#{@iterator.to_s(36)}\n")
      r
    else
      nil
    end
  end
end

#get_next_by_group(group) ⇒ Object

Returns the next item for group. If group hasn’t been seen before, the first item is returned.



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/conveyor/channel.rb', line 42

def get_next_by_group group
  r = nil
  group_iterator_lock(group) do
    @group_iterators[group] = 1 unless @group_iterators.key?(group)
    if @group_iterators[group] <= @last_id
      r = get(@group_iterators[group])
      @group_iterators[group] += 1
      group_iterators_file(group) do |f|
        f.write("#{@group_iterators[group].to_s(36)}\n")
      end
    else
      nil
    end
  end
  r
end

#get_next_n(n) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/conveyor/channel.rb', line 59

def get_next_n n
  r = []
  iterator_lock do
    while r.length < n && @iterator <= @last_id
      r << get(@iterator)
      @iterator += 1
      @iterator_file.write("#{@iterator.to_s(36)}\n")
      r
    end
  end
  r
end

#get_next_n_by_group(n, group) ⇒ Object



72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/conveyor/channel.rb', line 72

def get_next_n_by_group n, group
  r = []
  group_iterator_lock(group) do
    @group_iterators[group] = 1 unless @group_iterators.key?(group)
    while r.length < n && @group_iterators[group] < @last_id
      r << get(@group_iterators[group])
      @group_iterators[group] += 1
      group_iterators_file(group) do |f|
        f.write("#{@group_iterators[group].to_s(36)}\n")
      end
    end
  end
  r
end

#post(data) ⇒ Object

Add data to the channel.



22
23
24
# File 'lib/conveyor/channel.rb', line 22

def post data
  commit data
end

#rewind(*opts) ⇒ Object



93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/conveyor/channel.rb', line 93

def rewind *opts
  opts = opts.inject{|h, m| m.merge(h)}
  if opts.key?(:id)
    if opts.key?(:group)
      group_iterator_lock(opts[:group]) do
        @group_iterators[opts[:group]] = opts[:id].to_i
        group_iterators_file(opts[:group]) do |f|
          f.write("#{@group_iterators[opts[:group]].to_s(36)}\n")
        end
      end
    else
      iterator_lock do
        @iterator = opts[:id].to_i
        @iterator_file.write("#{@iterator.to_s(36)}\n")
      end
    end
  elsif opts.key?(:time)
    if opts.key?(:group)
      rewind :id => nearest_after(opts[:time]), :group => opts[:group]
    else
      rewind :id => nearest_after(opts[:time])
    end
  end
end

#statusObject



87
88
89
90
91
# File 'lib/conveyor/channel.rb', line 87

def status
  super.merge({
    :iterator_groups => @group_iterators.inject({}){|m,(k,v)| m[k] = v; m},
  })
end