Class: Euston::Daemons::Pipeline::MessageBuffer::Buffer

Inherits:
Object
  • Object
show all
Defined in:
lib/euston-daemons/pipeline/lib/message_buffer/buffer.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(mongodb) ⇒ Buffer

Returns a new instance of Buffer.



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/euston-daemons/pipeline/lib/message_buffer/buffer.rb', line 6

def initialize mongodb
  name = 'message_buffer'
  mongodb.create_collection name unless mongodb.collection_names.include? name

  @name = name
  @collection = mongodb.collection name

  @collection.ensure_index [ ['message_id', Mongo::ASCENDING] ],
                             :unique  => false,
                             :name    => "#{name}_message_id_index"

  @collection.ensure_index [ ['component_id', Mongo::ASCENDING],
                             ['dispatch_at',  Mongo::ASCENDING] ],
                             :unique  => false,
                             :name    => "#{name}_component_id_dispatch_at_index"
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



23
24
25
# File 'lib/euston-daemons/pipeline/lib/message_buffer/buffer.rb', line 23

def name
  @name
end

Instance Method Details

#delete_dispatched_messages(component_id) ⇒ Object



25
26
27
# File 'lib/euston-daemons/pipeline/lib/message_buffer/buffer.rb', line 25

def delete_dispatched_messages component_id
  @collection.remove({ 'component_id' => component_id }, :multi => true)
end

#enqueue(exchange, message, dispatch_at = nil) ⇒ Object



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/euston-daemons/pipeline/lib/message_buffer/buffer.rb', line 29

def enqueue exchange, message, dispatch_at = nil
  messages = message
  messages = [{ :hash => message, :dispatch_at => dispatch_at }] unless messages.is_a? Array

  messages = messages.map do |m|
    message_is_well_formed = m.is_a?(Hash) && m.has_key?(:hash) && m.has_key?(:dispatch_at)
    m = { :hash => m, :dispatch_at => dispatch_at } unless message_is_well_formed
    map_to_document exchange, m
  end

  @collection.insert(messages) unless messages.empty?
end

#find_dispatchable_messages(component_id) ⇒ Object



42
43
44
45
46
47
48
# File 'lib/euston-daemons/pipeline/lib/message_buffer/buffer.rb', line 42

def find_dispatchable_messages component_id
  query   = { 'component_id' => component_id }
  fields  = ['exchange', 'type', 'json']
  sort    = [ 'dispatch_at', Mongo::ASCENDING ]

  @collection.find query, :fields => fields, :sort => sort
end

#get_by_id(id) ⇒ Object



50
51
52
# File 'lib/euston-daemons/pipeline/lib/message_buffer/buffer.rb', line 50

def get_by_id id
  @collection.find_one 'message_id' => id
end

#take_ownership_of_dispatchable_messages(component_id) ⇒ Object



54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/euston-daemons/pipeline/lib/message_buffer/buffer.rb', line 54

def take_ownership_of_dispatchable_messages component_id
  new_messages_eligible_for_dispatch  = { 'component_id'  => '',
                                          'dispatch_at'   => { '$lte' => Time.now.to_f } }

  messages_stuck_in_other_components  = { 'component_id'  => { '$ne'  => '' },
                                          'dispatch_at'   => { '$lte' => Time.now.to_f - 60 } }

  query = { '$or'   => [ new_messages_eligible_for_dispatch, messages_stuck_in_other_components ] }
  doc   = { '$set'  => { 'component_id' => component_id } }

  @collection.update query, doc, :multi => true
end