Class: Msgr::Client
Overview
rubocop:disable Metrics/ClassLength
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
Instance Method Summary collapse
-
#connect ⇒ Object
rubocop:enable all.
-
#initialize(config = {}) ⇒ Client
constructor
rubocop:disable MethodLength.
- #publish(payload, opts = {}) ⇒ Object
-
#purge(release: false) ⇒ Object
rubocop:enable all.
- #release ⇒ Object
- #routes ⇒ Object
-
#running? ⇒ Boolean
rubocop:enable all.
-
#start ⇒ Object
rubocop:disable AbcSize.
-
#stop(opts = {}) ⇒ Object
rubocop:disable AbcSize.
-
#uri ⇒ Object
rubocop:disable AbcSize rubocop:disable MethodLength rubocop:disable PerceivedComplexity rubocop:disable CyclomaticComplexity.
Methods included from Logging
Constructor Details
#initialize(config = {}) ⇒ Client
rubocop:disable MethodLength
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
# File 'lib/msgr/client.rb', line 14 def initialize(config = {}) @config = { host: '127.0.0.1', vhost: '/', max: 2 } @config.merge! parse(config.delete(:uri)) if config.key?(:uri) @config.merge! config.symbolize_keys @mutex = ::Mutex.new @routes = Routes.new @pid ||= ::Process.pid log(:debug) { "Created new client on process ##{@pid}..." } end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
11 12 13 |
# File 'lib/msgr/client.rb', line 11 def config @config end |
Instance Method Details
#connect ⇒ Object
rubocop:enable all
77 78 79 80 81 82 83 84 85 86 |
# File 'lib/msgr/client.rb', line 77 def connect mutex.synchronize do check_process! return if connection.running? log(:debug) { "Connect to #{uri}..." } connection.connect end end |
#publish(payload, opts = {}) ⇒ Object
115 116 117 118 119 120 |
# File 'lib/msgr/client.rb', line 115 def publish(payload, opts = {}) mutex.synchronize do check_process! sync_publish payload, opts end end |
#purge(release: false) ⇒ Object
rubocop:enable all
105 106 107 108 109 110 111 112 113 |
# File 'lib/msgr/client.rb', line 105 def purge(release: false) mutex.synchronize do check_process! log(:debug) { "Purge all queues on #{uri}..." } connection.purge(release: release) end end |
#release ⇒ Object
128 129 130 131 132 133 134 135 |
# File 'lib/msgr/client.rb', line 128 def release mutex.synchronize do check_process! return unless running? connection.release end end |
#routes ⇒ Object
122 123 124 125 126 |
# File 'lib/msgr/client.rb', line 122 def routes mutex.synchronize do @routes end end |
#running? ⇒ Boolean
rubocop:enable all
55 56 57 58 59 60 |
# File 'lib/msgr/client.rb', line 55 def running? mutex.synchronize do check_process! connection.running? end end |
#start ⇒ Object
rubocop:disable AbcSize
63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/msgr/client.rb', line 63 def start mutex.synchronize do check_process! return if connection.running? log(:debug) { "Start on #{uri}..." } @routes << config[:routing_file] if config[:routing_file].present? @routes.reload connection.bind(@routes) end end |
#stop(opts = {}) ⇒ Object
rubocop:disable AbcSize
89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/msgr/client.rb', line 89 def stop(opts = {}) mutex.synchronize do check_process! log(:debug) { "Stop on #{uri}..." } connection.release connection.delete if opts[:delete] connection.close dispatcher.shutdown reset end end |
#uri ⇒ Object
rubocop:disable AbcSize rubocop:disable MethodLength rubocop:disable PerceivedComplexity rubocop:disable CyclomaticComplexity
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/msgr/client.rb', line 36 def uri @uri = begin uri = ::URI.parse('amqp://localhost') uri.user = CGI.escape(config[:user]) if config.key?(:user) uri.password = '****' if config.key?(:pass) uri.host = config[:host] if config.key?(:host) uri.port = config[:port] if config.key?(:port) uri.scheme = config[:ssl] ? 'amqps' : 'amqp' if config.key?(:vhost) && config[:vhost] != '/' uri.path = "/#{CGI.escape(config[:vhost])}" end uri end end |