Class: DirtyPipeline::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/dirty_pipeline/queue.rb

Instance Method Summary collapse

Constructor Details

#initialize(operation, subject_class, subject_id, transaction_id) ⇒ Queue

Returns a new instance of Queue.



3
4
5
6
# File 'lib/dirty_pipeline/queue.rb', line 3

def initialize(operation, subject_class, subject_id, transaction_id)
  @root = "dirty-pipeline-queue:#{subject_class}:#{subject_id}:" \
          "op_#{operation}:txid_#{transaction_id}"
end

Instance Method Details

#clear!Object



8
9
10
11
12
13
# File 'lib/dirty_pipeline/queue.rb', line 8

def clear!
  DirtyPipeline.with_redis do |r|
    r.del active_event_key
    r.del events_queue_key
  end
end

#popObject



34
35
36
37
38
39
40
# File 'lib/dirty_pipeline/queue.rb', line 34

def pop
  DirtyPipeline.with_redis do |r|
    data = r.lpop(events_queue_key)
    data.nil? ? r.del(active_event_key) : r.set(active_event_key, data)
    unpack(data)
  end
end

#processing_eventObject



42
43
44
# File 'lib/dirty_pipeline/queue.rb', line 42

def processing_event
  DirtyPipeline.with_redis { |r| unpack(r.get(active_event_key)) }
end

#push(event) ⇒ Object Also known as: <<



23
24
25
26
# File 'lib/dirty_pipeline/queue.rb', line 23

def push(event)
  DirtyPipeline.with_redis { |r| r.rpush(events_queue_key, pack(event)) }
  self
end

#to_aObject



15
16
17
18
19
20
21
# File 'lib/dirty_pipeline/queue.rb', line 15

def to_a
  DirtyPipeline.with_redis do |r|
    r.lrange(events_queue_key, 0, -1).map! do |packed_event|
      unpack(packed_event)
    end
  end
end

#unshift(event) ⇒ Object



29
30
31
32
# File 'lib/dirty_pipeline/queue.rb', line 29

def unshift(event)
  DirtyPipeline.with_redis { |r| r.lpush(events_queue_key, pack(event)) }
  self
end