Class: Msgr::Client
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
Instance Method Summary collapse
- #connect ⇒ Object
-
#drain ⇒ Object
Purge all queues known to Msgr, if they exist.
-
#initialize(config = {}) ⇒ Client
constructor
A new instance of Client.
- #publish(payload, opts = {}) ⇒ Object
- #purge(release: false) ⇒ Object
- #release ⇒ Object
- #routes ⇒ Object
- #running? ⇒ Boolean
- #start ⇒ Object
- #stop(opts = {}) ⇒ Object
- #uri ⇒ Object
Methods included from Logging
Constructor Details
#initialize(config = {}) ⇒ Client
Returns a new instance of Client.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/msgr/client.rb', line 13 def initialize(config = {}) @config = { host: '127.0.0.1', vhost: '/', max: 2 } @config.merge! parse(config.delete(:uri)) if config[:uri] @config.merge! config.symbolize_keys @mutex = ::Mutex.new @routes = Routes.new @pid ||= ::Process.pid log(:debug) { "Created new client on process ##{@pid}..." } end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
11 12 13 |
# File 'lib/msgr/client.rb', line 11 def config @config end |
Instance Method Details
#connect ⇒ Object
68 69 70 71 72 73 74 75 76 77 |
# File 'lib/msgr/client.rb', line 68 def connect mutex.synchronize do check_process! return if connection.running? log(:debug) { "Connect to #{uri}..." } connection.connect end end |
#drain ⇒ Object
Purge all queues known to Msgr, if they exist.
107 108 109 110 111 |
# File 'lib/msgr/client.rb', line 107 def drain @routes.each do |route| connection.purge_queue(route.name) end end |
#publish(payload, opts = {}) ⇒ Object
113 114 115 116 117 118 |
# File 'lib/msgr/client.rb', line 113 def publish(payload, opts = {}) mutex.synchronize do check_process! sync_publish payload, opts end end |
#purge(release: false) ⇒ Object
94 95 96 97 98 99 100 101 102 |
# File 'lib/msgr/client.rb', line 94 def purge(release: false) mutex.synchronize do check_process! log(:debug) { "Purge all queues on #{uri}..." } connection.purge(release: release) end end |
#release ⇒ Object
126 127 128 129 130 131 132 133 |
# File 'lib/msgr/client.rb', line 126 def release mutex.synchronize do check_process! return unless running? connection.release end end |
#routes ⇒ Object
120 121 122 123 124 |
# File 'lib/msgr/client.rb', line 120 def routes mutex.synchronize do @routes end end |
#running? ⇒ Boolean
48 49 50 51 52 53 |
# File 'lib/msgr/client.rb', line 48 def running? mutex.synchronize do check_process! connection.running? end end |
#start ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/msgr/client.rb', line 55 def start mutex.synchronize do check_process! return if connection.running? log(:debug) { "Start on #{uri}..." } @routes << config[:routing_file] if config[:routing_file].present? @routes.reload connection.bind(@routes) end end |
#stop(opts = {}) ⇒ Object
79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/msgr/client.rb', line 79 def stop(opts = {}) mutex.synchronize do check_process! log(:debug) { "Stop on #{uri}..." } connection.release connection.delete if opts[:delete] connection.close dispatcher.shutdown reset end end |
#uri ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/msgr/client.rb', line 30 def uri @uri = begin uri = ::URI.parse('amqp://localhost') uri.user = CGI.escape(config[:user]) if config.key?(:user) uri.password = '****' if config.key?(:pass) uri.host = config[:host] if config.key?(:host) uri.port = config[:port] if config.key?(:port) uri.scheme = config[:ssl] ? 'amqps' : 'amqp' if config.key?(:vhost) && config[:vhost] != '/' uri.path = "/#{CGI.escape(config[:vhost])}" end uri end end |