Class: ShortBus::Service

Inherits:
Object
  • Object
show all
Includes:
DebugMessage
Defined in:
lib/short_bus/service.rb

Overview

ShortBus::Service tracks a registered service (subscriber)

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from DebugMessage

#debug_message

Constructor Details

#initialize(debug: false, driver: nil, max_run_queue_size: 1_000_000, message_spec: nil, name: nil, recursive: false, publisher_spec: nil, service: nil, suppress_exception: false, thread_count: 1) ⇒ Service

Returns a new instance of Service.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/short_bus/service.rb', line 13

def initialize(
  debug: false,
  driver: nil,
  max_run_queue_size: 1_000_000,
  message_spec: nil,
  name: nil,
  recursive: false,
  publisher_spec: nil,
  service: nil,
  suppress_exception: false,
  thread_count: 1
)
  @debug = debug
  @driver = driver
  @message_spec = message_spec ? Spec.new(message_spec) : nil
  @recursive = recursive
  @publisher_spec = publisher_spec ? Spec.new(publisher_spec) : nil
  @service = service
  @suppress_exception = suppress_exception
  @thread_count = thread_count

  @name = name || @service.to_s || OpenSSL::HMAC.new(rand.to_s, 'sha1').to_s
  @run_queue = SizedQueue.new(max_run_queue_size)
  @threads = []
  start
end

Instance Attribute Details

#nameObject (readonly)

Returns the value of attribute name.



11
12
13
# File 'lib/short_bus/service.rb', line 11

def name
  @name
end

#threadsObject (readonly)

Returns the value of attribute threads.



11
12
13
# File 'lib/short_bus/service.rb', line 11

def threads
  @threads
end

Instance Method Details

#check(message, dry_run = false) ⇒ Object



40
41
42
43
44
45
46
47
48
49
# File 'lib/short_bus/service.rb', line 40

def check(message, dry_run = false)
  debug_message "[#{@name}]#check(#{message})#{' dry_run' if dry_run}#"
  if(
    (!@message_spec || @message_spec.match(message.to_s)) &&
    (!@publisher_spec || @publisher_spec.match(message.publisher)) &&
    (message.publisher != @name || @recursive)
  )
    @run_queue << message unless dry_run
  end
end

#service_threadObject

TODO: consider some mechanism to pass Exceptions up to the main thread,

perhaps with a whitelist, optional logging, something clean.


53
54
55
56
57
58
59
60
61
62
63
# File 'lib/short_bus/service.rb', line 53

def service_thread
  Thread.new do
    begin
      run_service @run_queue.shift until Thread.current.key?(:stop)
    rescue Exception => exc
      puts "Service [#{@name}] => #{exc.inspect}" unless @suppress_exception
      abort if exc.is_a? SystemExit
      retry unless Thread.current.key?(:stop)
    end
  end
end

#startObject



65
66
67
# File 'lib/short_bus/service.rb', line 65

def start
  @threads << service_thread while @threads.length < @thread_count
end

#stop(when_to_kill = nil) ⇒ Object



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/short_bus/service.rb', line 69

def stop(when_to_kill = nil)
  @threads.each do |thread|
    if when_to_kill.is_a? Numeric
      begin
        Timeout.timeout(when_to_kill) { stop }
      rescue Timeout::Error
        stop :now
      end
    elsif when_to_kill == :now
      thread.kill
    else
      thread[:stop] = true
    end
  end
  @threads.delete_if(&:join)
end

#stop!Object



86
87
88
# File 'lib/short_bus/service.rb', line 86

def stop!
  stop :now
end

#to_sObject



90
91
92
# File 'lib/short_bus/service.rb', line 90

def to_s
  @name
end