Class: ShortBus::Service
- Inherits:
-
Object
- Object
- ShortBus::Service
- Includes:
- DebugMessage
- Defined in:
- lib/short_bus/service.rb
Overview
ShortBus::Service tracks a registered service (subscriber)
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#threads ⇒ Object
readonly
Returns the value of attribute threads.
Instance Method Summary collapse
- #check(message, dry_run = false) ⇒ Object
-
#initialize(debug: false, driver: nil, max_run_queue_size: 1_000_000, message_spec: nil, name: nil, recursive: false, publisher_spec: nil, service: nil, suppress_exception: false, thread_count: 1) ⇒ Service
constructor
A new instance of Service.
-
#service_thread ⇒ Object
TODO: consider some mechanism to pass Exceptions up to the main thread, perhaps with a whitelist, optional logging, something clean.
- #start ⇒ Object
- #stop(when_to_kill = nil) ⇒ Object
- #stop! ⇒ Object
- #to_s ⇒ Object
Methods included from DebugMessage
Constructor Details
#initialize(debug: false, driver: nil, max_run_queue_size: 1_000_000, message_spec: nil, name: nil, recursive: false, publisher_spec: nil, service: nil, suppress_exception: false, thread_count: 1) ⇒ Service
Returns a new instance of Service.
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/short_bus/service.rb', line 13 def initialize( debug: false, driver: nil, max_run_queue_size: 1_000_000, message_spec: nil, name: nil, recursive: false, publisher_spec: nil, service: nil, suppress_exception: false, thread_count: 1 ) @debug = debug @driver = driver = ? Spec.new() : nil @recursive = recursive @publisher_spec = publisher_spec ? Spec.new(publisher_spec) : nil @service = service @suppress_exception = suppress_exception @thread_count = thread_count @name = name || @service.to_s || OpenSSL::HMAC.new(rand.to_s, 'sha1').to_s @run_queue = SizedQueue.new(max_run_queue_size) @threads = [] start end |
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
11 12 13 |
# File 'lib/short_bus/service.rb', line 11 def name @name end |
#threads ⇒ Object (readonly)
Returns the value of attribute threads.
11 12 13 |
# File 'lib/short_bus/service.rb', line 11 def threads @threads end |
Instance Method Details
#check(message, dry_run = false) ⇒ Object
40 41 42 43 44 45 46 47 48 49 |
# File 'lib/short_bus/service.rb', line 40 def check(, dry_run = false) "[#{@name}]#check(#{message})#{' dry_run' if dry_run}#" if( (! || .match(.to_s)) && (!@publisher_spec || @publisher_spec.match(.publisher)) && (.publisher != @name || @recursive) ) @run_queue << unless dry_run end end |
#service_thread ⇒ Object
TODO: consider some mechanism to pass Exceptions up to the main thread,
perhaps with a whitelist, optional logging, something clean.
53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/short_bus/service.rb', line 53 def service_thread Thread.new do begin run_service @run_queue.shift until Thread.current.key?(:stop) rescue Exception => exc puts "Service [#{@name}] => #{exc.inspect}" unless @suppress_exception abort if exc.is_a? SystemExit retry unless Thread.current.key?(:stop) end end end |
#start ⇒ Object
65 66 67 |
# File 'lib/short_bus/service.rb', line 65 def start @threads << service_thread while @threads.length < @thread_count end |
#stop(when_to_kill = nil) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/short_bus/service.rb', line 69 def stop(when_to_kill = nil) @threads.each do |thread| if when_to_kill.is_a? Numeric begin Timeout.timeout(when_to_kill) { stop } rescue Timeout::Error stop :now end elsif when_to_kill == :now thread.kill else thread[:stop] = true end end @threads.delete_if(&:join) end |
#stop! ⇒ Object
86 87 88 |
# File 'lib/short_bus/service.rb', line 86 def stop! stop :now end |
#to_s ⇒ Object
90 91 92 |
# File 'lib/short_bus/service.rb', line 90 def to_s @name end |