Class: Msgr::Client

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/msgr/client.rb

Overview

rubocop:disable Metrics/ClassLength

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#log, #log_name

Constructor Details

#initialize(config = {}) ⇒ Client

rubocop:disable MethodLength



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/msgr/client.rb', line 14

def initialize(config = {})
  @config = {
    host: '127.0.0.1',
    vhost: '/',
    max: 2
  }

  @config.merge! parse(config.delete(:uri)) if config.key?(:uri)
  @config.merge! config.symbolize_keys

  @mutex  = ::Mutex.new
  @routes = Routes.new
  @pid ||= ::Process.pid

  log(:debug) { "Created new client on process ##{@pid}..." }
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



11
12
13
# File 'lib/msgr/client.rb', line 11

def config
  @config
end

Instance Method Details

#connectObject

rubocop:enable all



77
78
79
80
81
82
83
84
85
86
# File 'lib/msgr/client.rb', line 77

def connect
  mutex.synchronize do
    check_process!
    return if connection.running?

    log(:debug) { "Connect to #{uri}..." }

    connection.connect
  end
end

#publish(payload, opts = {}) ⇒ Object



115
116
117
118
119
120
# File 'lib/msgr/client.rb', line 115

def publish(payload, opts = {})
  mutex.synchronize do
    check_process!
    sync_publish payload, opts
  end
end

#purge(release: false) ⇒ Object

rubocop:enable all



105
106
107
108
109
110
111
112
113
# File 'lib/msgr/client.rb', line 105

def purge(release: false)
  mutex.synchronize do
    check_process!

    log(:debug) { "Purge all queues on #{uri}..." }

    connection.purge(release: release)
  end
end

#releaseObject



128
129
130
131
132
133
134
135
# File 'lib/msgr/client.rb', line 128

def release
  mutex.synchronize do
    check_process!
    return unless running?

    connection.release
  end
end

#routesObject



122
123
124
125
126
# File 'lib/msgr/client.rb', line 122

def routes
  mutex.synchronize do
    @routes
  end
end

#running?Boolean

rubocop:enable all

Returns:

  • (Boolean)


55
56
57
58
59
60
# File 'lib/msgr/client.rb', line 55

def running?
  mutex.synchronize do
    check_process!
    connection.running?
  end
end

#startObject

rubocop:disable AbcSize



63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/msgr/client.rb', line 63

def start
  mutex.synchronize do
    check_process!
    return if connection.running?

    log(:debug) { "Start on #{uri}..." }

    @routes << config[:routing_file] if config[:routing_file].present?
    @routes.reload
    connection.bind(@routes)
  end
end

#stop(opts = {}) ⇒ Object

rubocop:disable AbcSize



89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/msgr/client.rb', line 89

def stop(opts = {})
  mutex.synchronize do
    check_process!

    log(:debug) { "Stop on #{uri}..." }

    connection.release
    connection.delete if opts[:delete]
    connection.close
    dispatcher.shutdown

    reset
  end
end

#uriObject

rubocop:disable AbcSize rubocop:disable MethodLength rubocop:disable PerceivedComplexity rubocop:disable CyclomaticComplexity



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/msgr/client.rb', line 36

def uri
  @uri = begin
    uri = ::URI.parse('amqp://localhost')

    uri.user     = CGI.escape(config[:user]) if config.key?(:user)
    uri.password = '****'                    if config.key?(:pass)
    uri.host     = config[:host]             if config.key?(:host)
    uri.port     = config[:port]             if config.key?(:port)
    uri.scheme   = config[:ssl] ? 'amqps' : 'amqp'

    if config.key?(:vhost) && config[:vhost] != '/'
      uri.path = "/#{CGI.escape(config[:vhost])}"
    end

    uri
  end
end