Class: Xcopier::Actor

Inherits:
Object
  • Object
show all
Defined in:
lib/xcopier/actor.rb

Direct Known Subclasses

Reader, Runner, Transformer, Writer

Defined Under Namespace

Classes: UnknownMessageError

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(copier) ⇒ Actor

Returns a new instance of Actor.



27
28
29
30
# File 'lib/xcopier/actor.rb', line 27

def initialize(copier)
  @copier = copier
  super()
end

Instance Attribute Details

#__queueObject

Returns the value of attribute __queue.



10
11
12
# File 'lib/xcopier/actor.rb', line 10

def __queue
  @__queue
end

#copierObject (readonly)

Returns the value of attribute copier.



9
10
11
# File 'lib/xcopier/actor.rb', line 9

def copier
  @copier
end

#parentObject

Returns the value of attribute parent.



10
11
12
# File 'lib/xcopier/actor.rb', line 10

def parent
  @parent
end

#resultObject

Returns the value of attribute result.



10
11
12
# File 'lib/xcopier/actor.rb', line 10

def result
  @result
end

#threadObject

Returns the value of attribute thread.



10
11
12
# File 'lib/xcopier/actor.rb', line 10

def thread
  @thread
end

Class Method Details

.spawn!(*args) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
# File 'lib/xcopier/actor.rb', line 15

def self.spawn!(*args)
  actor = new(*args)
  actor.__queue = Thread::Queue.new
  actor.thread = Thread.new do
    Thread.current[:xcopier_actor] = actor
    actor.__work__
  end
  actor.thread.name = name.demodulize.underscore
  actor.thread.report_on_exception = false
  actor
end

Instance Method Details

#__work__Object



36
37
38
39
40
41
42
43
44
45
46
# File 'lib/xcopier/actor.rb', line 36

def __work__
  while (message = __queue.pop)
    begin
      return result if message == :__terminate

      on_message(message)
    rescue Exception => e # rubocop:disable Lint/RescueException
      on_error(e)
    end
  end
end

#on_error(error) ⇒ Object



62
# File 'lib/xcopier/actor.rb', line 62

def on_error(error); end

#on_message(message) ⇒ Object

Raises:

  • (NotImplementedError)


52
53
54
# File 'lib/xcopier/actor.rb', line 52

def on_message(message)
  raise NotImplementedError
end

#tell(message) ⇒ Object



48
49
50
# File 'lib/xcopier/actor.rb', line 48

def tell(message)
  __queue.push(message)
end

#terminate!Object



56
57
58
59
60
# File 'lib/xcopier/actor.rb', line 56

def terminate!
  debug "#{self.class.name.demodulize}: terminating"
  __queue.clear
  __queue.push(:__terminate)
end

#waitObject



32
33
34
# File 'lib/xcopier/actor.rb', line 32

def wait
  thread.value
end