Class: RServiceBus2::MQFile

Inherits:
MQ
  • Object
show all
Defined in:
lib/rservicebus2/mq/file.rb

Overview

Beanstalk client implementation.

Instance Attribute Summary

Attributes inherited from MQ

#local_queue_name

Instance Method Summary collapse

Methods inherited from MQ

#connect, get

Constructor Details

#initialize(uri) ⇒ MQFile

Connect to the broker



11
12
13
14
15
16
# File 'lib/rservicebus2/mq/file.rb', line 11

def initialize(uri)
  super

  FileUtils.mkdir_p("#{@uri.path}/#{@local_queue_name}")
  @timeout = RServiceBus2.get_value('QUEUE_TIMEOUT', '5').to_i
end

Instance Method Details

#ackObject

rubocop:enable Metrics/MethodLength



43
44
45
# File 'lib/rservicebus2/mq/file.rb', line 43

def ack
  FileUtils.rm @file_path
end

#popObject

Get next msg from queue rubocop:disable Metrics/MethodLength

Raises:



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/rservicebus2/mq/file.rb', line 26

def pop
  time = @timeout
  while time.positive?
    files = Dir.glob("#{@uri.path}/#{@local_queue_name}/*.msg")
    if files.positive?
      @file_path = files[0]
      @body = IO.read(@file_path)
      return @body
    end
    time -= 1
    sleep(1)
  end

  raise NoMsgToProcess if files.length.zero?
end

#send(queue_name, msg) ⇒ Object



47
48
49
50
# File 'lib/rservicebus2/mq/file.rb', line 47

def send(queue_name, msg)
  FileUtils.mkdir_p("#{@uri.path}/#{queue_name}")
  IO.write("#{@uri.path}/#{queue_name}/#{rand}.msg", msg)
end

#subscribe(queue_name) ⇒ Object



18
19
20
21
22
# File 'lib/rservicebus2/mq/file.rb', line 18

def subscribe(queue_name)
  path = "#{@uri.path}/#{queue_name}"
  FileUtils.mkdir_p(path)
  @local_queue_name = queue_name
end