Class: Funl::MessageSequencer
- Inherits:
-
Object
- Object
- Funl::MessageSequencer
- Includes:
- Stream
- Defined in:
- lib/funl/message-sequencer.rb
Overview
Assigns a unique sequential ids to each message and relays it to its destinations.
Instance Attribute Summary collapse
-
#blob_type ⇒ Object
readonly
Returns the value of attribute blob_type.
-
#greeting ⇒ Object
readonly
Returns the value of attribute greeting.
-
#log ⇒ Object
readonly
Returns the value of attribute log.
-
#message_class ⇒ Object
readonly
Returns the value of attribute message_class.
-
#server ⇒ Object
readonly
Returns the value of attribute server.
-
#server_thread ⇒ Object
readonly
Returns the value of attribute server_thread.
-
#stream_type ⇒ Object
readonly
Returns the value of attribute stream_type.
-
#streams ⇒ Object
readonly
Returns the value of attribute streams.
-
#tick ⇒ Object
readonly
Returns the value of attribute tick.
Instance Method Summary collapse
- #default_greeting ⇒ Object
-
#initialize(server, *conns, log: Logger.new($stderr), stream_type: ObjectStream::MSGPACK_TYPE, message_class: Message, blob_type: Blobber::MSGPACK_TYPE, tick: 0) ⇒ MessageSequencer
constructor
A new instance of MessageSequencer.
- #run ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #wait ⇒ Object
Methods included from Stream
#client_stream_for, #message_server_stream_for, #server_stream_for
Constructor Details
#initialize(server, *conns, log: Logger.new($stderr), stream_type: ObjectStream::MSGPACK_TYPE, message_class: Message, blob_type: Blobber::MSGPACK_TYPE, tick: 0) ⇒ MessageSequencer
Returns a new instance of MessageSequencer.
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/funl/message-sequencer.rb', line 22 def initialize server, *conns, log: Logger.new($stderr), stream_type: ObjectStream::MSGPACK_TYPE, message_class: Message, blob_type: Blobber::MSGPACK_TYPE, tick: 0 @server = server @log = log @stream_type = stream_type @message_class = @blob_type = blob_type @greeting = default_greeting @tick = tick @streams = [] conns.each do |conn| try_conn conn end end |
Instance Attribute Details
#blob_type ⇒ Object (readonly)
Returns the value of attribute blob_type.
19 20 21 |
# File 'lib/funl/message-sequencer.rb', line 19 def blob_type @blob_type end |
#greeting ⇒ Object (readonly)
Returns the value of attribute greeting.
20 21 22 |
# File 'lib/funl/message-sequencer.rb', line 20 def greeting @greeting end |
#log ⇒ Object (readonly)
Returns the value of attribute log.
16 17 18 |
# File 'lib/funl/message-sequencer.rb', line 16 def log @log end |
#message_class ⇒ Object (readonly)
Returns the value of attribute message_class.
18 19 20 |
# File 'lib/funl/message-sequencer.rb', line 18 def @message_class end |
#server ⇒ Object (readonly)
Returns the value of attribute server.
12 13 14 |
# File 'lib/funl/message-sequencer.rb', line 12 def server @server end |
#server_thread ⇒ Object (readonly)
Returns the value of attribute server_thread.
13 14 15 |
# File 'lib/funl/message-sequencer.rb', line 13 def server_thread @server_thread end |
#stream_type ⇒ Object (readonly)
Returns the value of attribute stream_type.
17 18 19 |
# File 'lib/funl/message-sequencer.rb', line 17 def stream_type @stream_type end |
#streams ⇒ Object (readonly)
Returns the value of attribute streams.
14 15 16 |
# File 'lib/funl/message-sequencer.rb', line 14 def streams @streams end |
#tick ⇒ Object (readonly)
Returns the value of attribute tick.
15 16 17 |
# File 'lib/funl/message-sequencer.rb', line 15 def tick @tick end |
Instance Method Details
#default_greeting ⇒ Object
42 43 44 45 46 |
# File 'lib/funl/message-sequencer.rb', line 42 def default_greeting { "blob" => blob_type }.freeze # can't change after initial conns read it end |
#run ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
# File 'lib/funl/message-sequencer.rb', line 72 def run loop do readables, _ = select [server, *streams] readables.each do |readable| case readable when server begin conn, addr = readable.accept_nonblock log.debug "accepted #{conn.inspect} from #{addr.inspect}" try_conn conn rescue IO::WaitReadable next end else log.debug {"readable = #{readable}"} begin msgs = [] readable.read do |msg| msgs << msg end rescue IOError, SystemCallError => ex log.debug {"closing #{readable}: #{ex}"} @streams.delete readable readable.close unless readable.closed? else log.debug { "read #{msgs.size} messages from #{readable.peer_name}"} end msgs.each do |msg| msg end end end end rescue => ex log.error ex raise end |
#start ⇒ Object
58 59 60 61 62 |
# File 'lib/funl/message-sequencer.rb', line 58 def start @server_thread = Thread.new do run end end |
#stop ⇒ Object
64 65 66 |
# File 'lib/funl/message-sequencer.rb', line 64 def stop server_thread.kill if server_thread end |
#wait ⇒ Object
68 69 70 |
# File 'lib/funl/message-sequencer.rb', line 68 def wait server_thread.join end |