Class: Concurrent::Actor::Behaviour::Pausing

Inherits:
Abstract
  • Object
show all
Defined in:
lib/concurrent/actor/behaviour/pausing.rb

Overview

Allows to pause actors on errors. When paused all arriving messages are collected and processed after the actor is resumed or reset. Resume will simply continue with next message. Reset also reinitialized context. TODO example

Instance Attribute Summary

Attributes inherited from Abstract

#core, #subsequent

Instance Method Summary collapse

Methods inherited from Abstract

#broadcast, #pass, #reject_envelope

Methods included from InternalDelegations

#behaviour, #behaviour!, #children, #context, #dead_letter_routing, #log, #redirect, #terminate!

Methods included from PublicDelegations

#context_class, #executor, #name, #parent, #path, #reference

Methods included from TypeCheck

#Child!, #Child?, #Match!, #Match?, #Type!, #Type?

Constructor Details

#initialize(core, subsequent) ⇒ Pausing

Returns a new instance of Pausing.



11
12
13
14
15
# File 'lib/concurrent/actor/behaviour/pausing.rb', line 11

def initialize(core, subsequent)
  super core, subsequent
  @paused = false
  @buffer = []
end

Instance Method Details

#on_envelope(envelope) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/concurrent/actor/behaviour/pausing.rb', line 17

def on_envelope(envelope)
  case envelope.message
  when :pause!
    pause!
  when :resume!
    resume!
  when :reset!
    reset!
  when :restart!
    restart!
  else
    if @paused
      @buffer << envelope
      MESSAGE_PROCESSED
    else
      pass envelope
    end
  end
end

#on_event(event) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
# File 'lib/concurrent/actor/behaviour/pausing.rb', line 63

def on_event(event)
  case event
  when :terminated, :restarted
    @buffer.each { |envelope| reject_envelope envelope }
    @buffer.clear
  when :resumed, :reset
    @buffer.each { |envelope| core.schedule_execution { pass envelope } }
    @buffer.clear
  end
  super event
end

#pause!(error = nil) ⇒ Object



37
38
39
40
41
# File 'lib/concurrent/actor/behaviour/pausing.rb', line 37

def pause!(error = nil)
  @paused = true
  broadcast(error || :paused)
  true
end

#reset!(broadcast = true) ⇒ Object



49
50
51
52
53
54
55
# File 'lib/concurrent/actor/behaviour/pausing.rb', line 49

def reset!(broadcast = true)
  core.allocate_context
  core.build_context
  resume!(false)
  broadcast(:reset) if broadcast
  true
end

#restart!Object



57
58
59
60
61
# File 'lib/concurrent/actor/behaviour/pausing.rb', line 57

def restart!
  reset! false
  broadcast(:restarted)
  true
end

#resume!(broadcast = true) ⇒ Object



43
44
45
46
47
# File 'lib/concurrent/actor/behaviour/pausing.rb', line 43

def resume!(broadcast = true)
  @paused = false
  broadcast(:resumed) if broadcast
  true
end