Class: Msgr::Client

Inherits:
Object
  • Object
show all
Includes:
Logging
Defined in:
lib/msgr/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Logging

#log, #log_name

Constructor Details

#initialize(config = {}) ⇒ Client

Returns a new instance of Client.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/msgr/client.rb', line 7

def initialize(config = {})
  @uri          = ::URI.parse config[:uri] ? config.delete(:uri) : 'amqp://localhost/'
  config[:pass] ||= @uri.password

  @uri.user   = config[:user] ||= @uri.user || 'guest'
  @uri.scheme = (config[:ssl] ||= @uri.scheme.to_s.downcase == 'amqps') ? 'amqps' : 'amqp'
  @uri.host   = config[:host] ||= @uri.host || '127.0.0.1'
  @uri.port   = config[:port] ||= @uri.port
  @uri.path   = config[:vhost] ||= @uri.path.present? ? @uri.path : '/'
  config.reject! { |_, v| v.nil? }

  @config = config
  @config[:max] ||= 2

  @mutex  = ::Mutex.new
  @routes = Routes.new
  @pid    ||= ::Process.pid

  log(:info) { "Created new client on process ##{@pid}..." }
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



5
6
7
# File 'lib/msgr/client.rb', line 5

def config
  @config
end

#uriObject (readonly)

Returns the value of attribute uri.



5
6
7
# File 'lib/msgr/client.rb', line 5

def uri
  @uri
end

Instance Method Details

#connectObject



48
49
50
51
52
53
54
55
56
57
# File 'lib/msgr/client.rb', line 48

def connect
  mutex.synchronize do
    check_process!
    return if connection.running?

    log(:info) { "Connect to #{uri}..." }

    connection.connect
  end
end

#publish(payload, opts = {}) ⇒ Object



73
74
75
76
77
78
# File 'lib/msgr/client.rb', line 73

def publish(payload, opts = {})
  mutex.synchronize do
    check_process!
    connection.publish payload, opts
  end
end

#releaseObject



86
87
88
89
90
91
92
93
# File 'lib/msgr/client.rb', line 86

def release
  mutex.synchronize do
    check_process!
    return unless running?

    connection.release
  end
end

#routesObject



80
81
82
83
84
# File 'lib/msgr/client.rb', line 80

def routes
  mutex.synchronize do
    @routes
  end
end

#running?Boolean

Returns:

  • (Boolean)


28
29
30
31
32
33
# File 'lib/msgr/client.rb', line 28

def running?
  mutex.synchronize do
    check_process!
    connection.running?
  end
end

#startObject



35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/msgr/client.rb', line 35

def start
  mutex.synchronize do
    check_process!
    return if connection.running?

    log(:info) { "Start on #{uri}..." }

    @routes << config[:routing_file] if config[:routing_file].present?
    @routes.reload
    connection.bind(@routes)
  end
end

#stopObject



59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/msgr/client.rb', line 59

def stop
  mutex.synchronize do
    check_process!

    log(:info) { "Stop on #{uri}..." }

    connection.release
    connection.close
    dispatcher.shutdown

    reset
  end
end