Class: Funl::MessageSequencer
- Inherits:
-
Object
- Object
- Funl::MessageSequencer
- Includes:
- Stream
- Defined in:
- lib/funl/message-sequencer.rb
Overview
Assigns a unique sequential ids to each message and relays it to its destinations.
Direct Known Subclasses
Instance Attribute Summary collapse
-
#blob_type ⇒ Object
readonly
Returns the value of attribute blob_type.
-
#greeting ⇒ Object
readonly
Returns the value of attribute greeting.
-
#log ⇒ Object
readonly
Returns the value of attribute log.
-
#message_class ⇒ Object
readonly
Returns the value of attribute message_class.
-
#server ⇒ Object
readonly
Returns the value of attribute server.
-
#server_thread ⇒ Object
readonly
Returns the value of attribute server_thread.
-
#stream_type ⇒ Object
readonly
Returns the value of attribute stream_type.
-
#subscribers ⇒ Object
readonly
Returns the value of attribute subscribers.
-
#tick ⇒ Object
readonly
Returns the value of attribute tick.
Class Method Summary collapse
Instance Method Summary collapse
- #default_greeting ⇒ Object
-
#initialize(server, *conns, log: Logger.new($stderr), stream_type: ObjectStream::MSGPACK_TYPE, message_class: Message, blob_type: Blobber::MSGPACK_TYPE, tick: 0) ⇒ MessageSequencer
constructor
A new instance of MessageSequencer.
- #run ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #wait ⇒ Object
Methods included from Stream
#client_stream_for, #message_server_stream_for, #server_stream_for
Constructor Details
#initialize(server, *conns, log: Logger.new($stderr), stream_type: ObjectStream::MSGPACK_TYPE, message_class: Message, blob_type: Blobber::MSGPACK_TYPE, tick: 0) ⇒ MessageSequencer
Returns a new instance of MessageSequencer.
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/funl/message-sequencer.rb', line 31 def initialize server, *conns, log: Logger.new($stderr), stream_type: ObjectStream::MSGPACK_TYPE, message_class: Message, blob_type: Blobber::MSGPACK_TYPE, tick: 0 @server = server @log = log @stream_type = stream_type @message_class = @blob_type = blob_type @greeting = default_greeting @tick = tick init_selector conns.each do |conn| try_conn conn end @subscribers_to_all = [] # [conn, ...] @subscribers = Hash.new {|h, tag| h[tag] = []} # tag => [conn, ...] @tags = Hash.new {|h, conn| h[conn] = []} # conn => [tag, ...] end |
Instance Attribute Details
#blob_type ⇒ Object (readonly)
Returns the value of attribute blob_type.
18 19 20 |
# File 'lib/funl/message-sequencer.rb', line 18 def blob_type @blob_type end |
#greeting ⇒ Object (readonly)
Returns the value of attribute greeting.
19 20 21 |
# File 'lib/funl/message-sequencer.rb', line 19 def greeting @greeting end |
#log ⇒ Object (readonly)
Returns the value of attribute log.
15 16 17 |
# File 'lib/funl/message-sequencer.rb', line 15 def log @log end |
#message_class ⇒ Object (readonly)
Returns the value of attribute message_class.
17 18 19 |
# File 'lib/funl/message-sequencer.rb', line 17 def @message_class end |
#server ⇒ Object (readonly)
Returns the value of attribute server.
12 13 14 |
# File 'lib/funl/message-sequencer.rb', line 12 def server @server end |
#server_thread ⇒ Object (readonly)
Returns the value of attribute server_thread.
13 14 15 |
# File 'lib/funl/message-sequencer.rb', line 13 def server_thread @server_thread end |
#stream_type ⇒ Object (readonly)
Returns the value of attribute stream_type.
16 17 18 |
# File 'lib/funl/message-sequencer.rb', line 16 def stream_type @stream_type end |
#subscribers ⇒ Object (readonly)
Returns the value of attribute subscribers.
20 21 22 |
# File 'lib/funl/message-sequencer.rb', line 20 def subscribers @subscribers end |
#tick ⇒ Object (readonly)
Returns the value of attribute tick.
14 15 16 |
# File 'lib/funl/message-sequencer.rb', line 14 def tick @tick end |
Class Method Details
.new(*a) ⇒ Object
22 23 24 25 26 27 28 29 |
# File 'lib/funl/message-sequencer.rb', line 22 def self.new *a if self == MessageSequencer require 'funl/message-sequencer-select' MessageSequencerSelect.new(*a) else super end end |
Instance Method Details
#default_greeting ⇒ Object
56 57 58 59 60 |
# File 'lib/funl/message-sequencer.rb', line 56 def default_greeting { "blob" => blob_type }.freeze # can't change after initial conns read it end |
#run ⇒ Object
76 77 78 79 80 81 82 83 |
# File 'lib/funl/message-sequencer.rb', line 76 def run loop do select_streams end rescue => ex log.error ex raise end |
#start ⇒ Object
62 63 64 65 66 |
# File 'lib/funl/message-sequencer.rb', line 62 def start @server_thread = Thread.new do run end end |
#stop ⇒ Object
68 69 70 |
# File 'lib/funl/message-sequencer.rb', line 68 def stop server_thread.kill if server_thread end |
#wait ⇒ Object
72 73 74 |
# File 'lib/funl/message-sequencer.rb', line 72 def wait server_thread.join end |