Class: RServiceBus2::MQFile
Overview
Beanstalk client implementation.
Instance Attribute Summary
Attributes inherited from MQ
Instance Method Summary collapse
-
#ack ⇒ Object
rubocop:enable Metrics/MethodLength.
-
#initialize(uri) ⇒ MQFile
constructor
Connect to the broker.
-
#pop ⇒ Object
Get next msg from queue rubocop:disable Metrics/MethodLength.
- #send(queue_name, msg) ⇒ Object
- #subscribe(queue_name) ⇒ Object
Methods inherited from MQ
Constructor Details
#initialize(uri) ⇒ MQFile
Connect to the broker
11 12 13 14 15 16 |
# File 'lib/rservicebus2/mq/file.rb', line 11 def initialize(uri) super FileUtils.mkdir_p("#{@uri.path}/#{@local_queue_name}") @timeout = RServiceBus2.get_value('QUEUE_TIMEOUT', '5').to_i end |
Instance Method Details
#ack ⇒ Object
rubocop:enable Metrics/MethodLength
43 44 45 |
# File 'lib/rservicebus2/mq/file.rb', line 43 def ack FileUtils.rm @file_path end |
#pop ⇒ Object
Get next msg from queue rubocop:disable Metrics/MethodLength
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/rservicebus2/mq/file.rb', line 26 def pop time = @timeout while time.positive? files = Dir.glob("#{@uri.path}/#{@local_queue_name}/*.msg") if files.positive? @file_path = files[0] @body = IO.read(@file_path) return @body end time -= 1 sleep(1) end raise NoMsgToProcess if files.length.zero? end |
#send(queue_name, msg) ⇒ Object
47 48 49 50 |
# File 'lib/rservicebus2/mq/file.rb', line 47 def send(queue_name, msg) FileUtils.mkdir_p("#{@uri.path}/#{queue_name}") IO.write("#{@uri.path}/#{queue_name}/#{rand}.msg", msg) end |
#subscribe(queue_name) ⇒ Object
18 19 20 21 22 |
# File 'lib/rservicebus2/mq/file.rb', line 18 def subscribe(queue_name) path = "#{@uri.path}/#{queue_name}" FileUtils.mkdir_p(path) @local_queue_name = queue_name end |