Class: Alephant::Alephant

Inherits:
Object
  • Object
show all
Defined in:
lib/alephant.rb

Constant Summary collapse

VALID_OPTS =
[
  :model_file,
  :s3_bucket_id,
  :s3_object_path,
  :s3_object_id,
  :table_name,
  :sqs_queue_id,
  :view_path,
  :sequential_proc,
  :set_last_seen_proc,
  :component_id
]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}, logger = nil) ⇒ Alephant

Returns a new instance of Alephant.



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/alephant.rb', line 33

def initialize(opts = {}, logger = nil)
  set_logger(logger)
  set_opts(opts)

  @logger = ::Alephant.logger
  @sequencer = Sequencer.new(
    {
      :table_name => @table_name
    },
    @sqs_queue_id
  )

  @queue = Queue.new(@sqs_queue_id)
  @cache = Cache.new(@s3_bucket_id, @s3_object_path)
  @multi_renderer = MultiRenderer.new(@model_file, "#{@view_path}/#{@component_id}")
  @parser = Parser.new
end

Instance Attribute Details

#cacheObject (readonly)

Returns the value of attribute cache.



18
19
20
# File 'lib/alephant.rb', line 18

def cache
  @cache
end

#queueObject (readonly)

Returns the value of attribute queue.



18
19
20
# File 'lib/alephant.rb', line 18

def queue
  @queue
end

#rendererObject (readonly)

Returns the value of attribute renderer.



18
19
20
# File 'lib/alephant.rb', line 18

def renderer
  @renderer
end

#sequencerObject (readonly)

Returns the value of attribute sequencer.



18
19
20
# File 'lib/alephant.rb', line 18

def sequencer
  @sequencer
end

Instance Method Details

#receive(msg) ⇒ Object



61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/alephant.rb', line 61

def receive(msg)
  data = @parser.parse msg.body

  @logger.info("Alephant.receive: with id #{msg.id} and body digest: #{msg.md5}")

  if @sequencer.sequential?(data, &@sequential_proc)
    write data
    @sequencer.set_last_seen(data, &@set_last_seen_proc)
  else
    @logger.warn("Alephant.receive: out of sequence message received #{msg.id} (discarded)")
  end
end

#run!Object



74
75
76
77
78
# File 'lib/alephant.rb', line 74

def run!
  Thread.new do
    @queue.poll { |msg| receive(msg) }
  end
end

#set_logger(logger) ⇒ Object



51
52
53
# File 'lib/alephant.rb', line 51

def set_logger(logger)
  ::Alephant.logger = logger
end

#write(data) ⇒ Object



55
56
57
58
59
# File 'lib/alephant.rb', line 55

def write(data)
  @multi_renderer.render(data).each do |id, item|
    @cache.put(id, item)
  end
end