Module: MessageCommand
- Includes:
- DefaultLogging
- Included in:
- MessageProbe, MessagePusher, MessageSink
- Defined in:
- lib/pipeline_toolkit/message_command.rb
Defined Under Namespace
Modules: ProcessLine
Instance Attribute Summary collapse
-
#sys_pipe ⇒ Object
readonly
Returns the value of attribute sys_pipe.
Instance Method Summary collapse
- #ack_msg(msg) ⇒ Object
-
#init_loop ⇒ Object
Override in included class.
- #pass_on_msg(msg) ⇒ Object
- #process_line(line) ⇒ Object
-
#process_message(msg) ⇒ Object
Override in included class.
- #process_system(msg) ⇒ Object
- #shutdown ⇒ Object
- #start ⇒ Object
Instance Attribute Details
#sys_pipe ⇒ Object (readonly)
Returns the value of attribute sys_pipe.
7 8 9 |
# File 'lib/pipeline_toolkit/message_command.rb', line 7 def sys_pipe @sys_pipe end |
Instance Method Details
#ack_msg(msg) ⇒ Object
53 54 55 56 |
# File 'lib/pipeline_toolkit/message_command.rb', line 53 def ack_msg(msg) return unless @use_ack @sys_pipe.syswrite(msg.ack_id + "\n") end |
#init_loop ⇒ Object
Override in included class. Provides a chance to initialize any code that needs to take place once the EM loop has started.
64 65 66 |
# File 'lib/pipeline_toolkit/message_command.rb', line 64 def init_loop # Implemented in class that includes me end |
#pass_on_msg(msg) ⇒ Object
58 59 60 |
# File 'lib/pipeline_toolkit/message_command.rb', line 58 def pass_on_msg(msg) $stdout.syswrite(MessageCoder.encode(msg) << "\n") end |
#process_line(line) ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/pipeline_toolkit/message_command.rb', line 35 def process_line(line) msg = MessageCoder.decode(line) case msg[:msg_type] when :system result = process_system(msg) else result = (msg) end case result when :ack self.ack_msg(msg) else self.pass_on_msg(result) end end |
#process_message(msg) ⇒ Object
Override in included class. Processes a message. This method must return either a msg object – which may or may not have been modified – or the symbol :ack. Returning :ack mean that the message has been dealt with and can be acknowledged back to the queue server. All messages must be acknowledged by at least one message_command.
72 73 74 75 |
# File 'lib/pipeline_toolkit/message_command.rb', line 72 def (msg) # Implemented in class that includes me msg end |
#process_system(msg) ⇒ Object
77 78 79 80 81 82 |
# File 'lib/pipeline_toolkit/message_command.rb', line 77 def process_system(msg) @sys_pipe = File.open(msg.sys_pipe, "w") @use_ack = msg.use_ack @max_unackd = msg.max_unackd msg end |
#shutdown ⇒ Object
30 31 32 33 |
# File 'lib/pipeline_toolkit/message_command.rb', line 30 def shutdown log.info("shutting down") @sys_pipe && @sys_pipe.close end |
#start ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/pipeline_toolkit/message_command.rb', line 9 def start log.info("starting") Signal.trap('INT') { EM.stop } Signal.trap('TERM'){ EM.stop } @ack_buffer ||= "" begin EM.run do self.init_loop EM.attach($stdin, ProcessLine, self) end rescue StandardError => e log.info e raise e ensure self.shutdown end end |