Class: Conveyor::Channel
- Inherits:
-
BaseChannel
- Object
- BaseChannel
- Conveyor::Channel
- Defined in:
- lib/conveyor/channel.rb
Overview
Channel
A basic channel.
Constant Summary
Constants inherited from BaseChannel
BaseChannel::BLOCK_SIZE, BaseChannel::BUCKET_SIZE, BaseChannel::CACHE_SIZE, BaseChannel::FORMAT_VERSION, BaseChannel::NAME_PATTERN
Instance Method Summary collapse
-
#get_next ⇒ Object
Returns the next item from the global (non-group) iterator.
-
#get_next_by_group(group) ⇒ Object
Returns the next item for
group
. - #get_next_n(n) ⇒ Object
- #get_next_n_by_group(n, group) ⇒ Object
-
#initialize(directory) ⇒ Channel
constructor
If
directory
doesn’t already exist, it will be created during initialization. -
#post(data) ⇒ Object
Add data to the channel.
- #rewind(*opts) ⇒ Object
- #status ⇒ Object
Methods inherited from BaseChannel
#commit, #delete!, #get, #get_nearest_after_timestamp, #inspect, parse_headers, #rebuild_index, valid_channel_name?
Constructor Details
#initialize(directory) ⇒ Channel
If directory
doesn’t already exist, it will be created during initialization.
10 11 12 13 14 15 16 17 18 19 |
# File 'lib/conveyor/channel.rb', line 10 def initialize directory @group_iterators = {} @group_iterators_files = {} @iterator_lock = Mutex.new @group_iterator_locks = Hash.new{|k,v| Mutex.new } super(directory) @iterator_file.sync = true end |
Instance Method Details
#get_next ⇒ Object
Returns the next item from the global (non-group) iterator.
27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/conveyor/channel.rb', line 27 def get_next r = nil iterator_lock do if @iterator <= @last_id r = get(@iterator) @iterator += 1 @iterator_file.write("#{@iterator.to_s(36)}\n") r else nil end end end |
#get_next_by_group(group) ⇒ Object
Returns the next item for group
. If group
hasn’t been seen before, the first item is returned.
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/conveyor/channel.rb', line 42 def get_next_by_group group r = nil group_iterator_lock(group) do @group_iterators[group] = 1 unless @group_iterators.key?(group) if @group_iterators[group] <= @last_id r = get(@group_iterators[group]) @group_iterators[group] += 1 group_iterators_file(group) do |f| f.write("#{@group_iterators[group].to_s(36)}\n") end else nil end end r end |
#get_next_n(n) ⇒ Object
59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/conveyor/channel.rb', line 59 def get_next_n n r = [] iterator_lock do while r.length < n && @iterator <= @last_id r << get(@iterator) @iterator += 1 @iterator_file.write("#{@iterator.to_s(36)}\n") r end end r end |
#get_next_n_by_group(n, group) ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/conveyor/channel.rb', line 72 def get_next_n_by_group n, group r = [] group_iterator_lock(group) do @group_iterators[group] = 1 unless @group_iterators.key?(group) while r.length < n && @group_iterators[group] < @last_id r << get(@group_iterators[group]) @group_iterators[group] += 1 group_iterators_file(group) do |f| f.write("#{@group_iterators[group].to_s(36)}\n") end end end r end |
#post(data) ⇒ Object
Add data to the channel.
22 23 24 |
# File 'lib/conveyor/channel.rb', line 22 def post data commit data end |
#rewind(*opts) ⇒ Object
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/conveyor/channel.rb', line 93 def rewind *opts opts = opts.inject{|h, m| m.merge(h)} if opts.key?(:id) if opts.key?(:group) group_iterator_lock(opts[:group]) do @group_iterators[opts[:group]] = opts[:id].to_i group_iterators_file(opts[:group]) do |f| f.write("#{@group_iterators[opts[:group]].to_s(36)}\n") end end else iterator_lock do @iterator = opts[:id].to_i @iterator_file.write("#{@iterator.to_s(36)}\n") end end elsif opts.key?(:time) if opts.key?(:group) rewind :id => nearest_after(opts[:time]), :group => opts[:group] else rewind :id => nearest_after(opts[:time]) end end end |
#status ⇒ Object
87 88 89 90 91 |
# File 'lib/conveyor/channel.rb', line 87 def status super.merge({ :iterator_groups => @group_iterators.inject({}){|m,(k,v)| m[k] = v; m}, }) end |