Class: Plumbing::Actor::Threaded

Inherits:
Object
  • Object
show all
Defined in:
lib/plumbing/actor/threaded.rb

Direct Known Subclasses

Rails

Defined Under Namespace

Classes: Message

Instance Method Summary collapse

Constructor Details

#initialize(target) ⇒ Threaded

Returns a new instance of Threaded.



11
12
13
14
15
# File 'lib/plumbing/actor/threaded.rb', line 11

def initialize target
  @target = target
  @queue = Concurrent::Array.new
  @mutex = Thread::Mutex.new
end

Instance Method Details

#in_context?Boolean

Returns:

  • (Boolean)


32
# File 'lib/plumbing/actor/threaded.rb', line 32

def in_context? = @mutex.owned?

#safelyObject



26
27
28
29
30
# File 'lib/plumbing/actor/threaded.rb', line 26

def safely(&)
  Plumbing.config.logger.debug { "-> #{@target.class}#perform_safely\n#{Thread.current.name}" }
  send_message(:perform_safely, &)
  nil
end

#send_message(message_name, *args, **params, &block) ⇒ Object

Send the message to the target and wrap the result



18
19
20
21
22
23
24
# File 'lib/plumbing/actor/threaded.rb', line 18

def send_message(message_name, *args, **params, &block)
  Plumbing.config.logger.debug { "-> #{@target.class}##{message_name}(#{args.inspect}, #{params.inspect})\n#{Thread.current.name}" }
  Message.new(@target, message_name, Plumbing::Actor.transporter.marshal(*args), Plumbing::Actor.transporter.marshal(params).first, block, Concurrent::MVar.new).tap do |message|
    @queue << message
    send_messages
  end
end

#stopObject



34
# File 'lib/plumbing/actor/threaded.rb', line 34

def stop = @queue.clear