Class: ShortBus::Driver

Inherits:
Object
  • Object
show all
Includes:
DebugMessage
Defined in:
lib/short_bus/driver.rb

Overview

ShorBus::Driver is the message dispatcher.

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from DebugMessage

#debug_message

Constructor Details

#initialize(*options) ⇒ Driver

Example:

Arguments:

options: hash


17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/short_bus/driver.rb', line 17

def initialize(*options)
  @options = {
    debug: false,
    default_message_spec: nil,
    default_publisher_spec: nil,
    default_thread_count: 1,
    max_message_queue_size: 1_000_000
  }
  @options.merge! options[0] if options[0].is_a?(Hash)
  @debug = @options[:debug]

  @messages = SizedQueue.new(@options[:max_message_queue_size])
  @services = {}
  @threads = { message_router: launch_message_router }
end

Instance Attribute Details

#debugObject

Returns the value of attribute debug.



11
12
13
# File 'lib/short_bus/driver.rb', line 11

def debug
  @debug
end

#servicesObject (readonly)

Returns the value of attribute services.



10
11
12
# File 'lib/short_bus/driver.rb', line 10

def services
  @services
end

Instance Method Details

#publish(arg, publisher = nil) ⇒ Object Also known as: <<



55
56
57
58
59
60
# File 'lib/short_bus/driver.rb', line 55

def publish(arg, publisher = nil)
  return unless (message = convert_to_message arg)
  message.publisher = publisher if publisher
  @messages.push message
  message
end

#subscribe(*args, &block) ⇒ ShortBus::Service

Subscribes a callback (lamba, block, method) to receive messages

Parameters:

  • (*args)

Returns:



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# File 'lib/short_bus/driver.rb', line 38

def subscribe(*args, &block)
  service_args = {
    debug: @debug,
    driver: self,
    message_spec: @options[:default_message_spec],
    name: nil,
    publisher_spec: @options[:default_publisher_spec],
    service: nil,
    thread_count: @options[:default_thread_count]
  }.merge args[0].is_a?(Hash) ? args[0] : { service: args[0] }

  service_args[:service] = block.to_proc if block_given?
  debug_message("#subscribe service: #{service_args[:service]}")
  service = Service.new(service_args)
  @services[service.to_s] = service
end

#unsubscribe(service) ⇒ Object



64
65
66
67
68
69
70
71
# File 'lib/short_bus/driver.rb', line 64

def unsubscribe(service)
  if service.is_a? ShortBus::Service
    unsubscribe service.to_s
  elsif @services.key?(service)
    @services[service].stop
    @services.delete service
  end
end