Module: PipelineToolkit::MessageCommand Abstract

Included in:
MessagePusher, MessageSink
Defined in:
lib/pipeline_toolkit/message_command.rb

Overview

This module is abstract.

Provide abstract base functionality for pipeline machines.

Required, To implement a your own machine, override the following methods:

Optionally, your machine can implement the following:

Examples:

An example of a pipeline machine


class DefaultMachine
  include MessageCommand

  def process_standard(message)
    # do some work ie. transform the message or apply some business rules
    # raise exception if processing or business rule fails
    pass_on(message)
  rescue
    # handle the storage of the message that could not be processed
    # and acknowledge the AMQP server server
    acknowledge(message)
  end
end  

DefaultMachine.new.start

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#optionsObject (readonly)

The options the machine should use



44
45
46
# File 'lib/pipeline_toolkit/message_command.rb', line 44

def options
  @options
end

Instance Method Details

#acknowledge(message) ⇒ Object

Notify the AMQP server server that we’ve handled the message

Parameters:

  • message (Hash)

    The message to acknowledge



118
119
120
121
122
123
# File 'lib/pipeline_toolkit/message_command.rb', line 118

def acknowledge(message)
  return if message[:ack_id].nil?
  DefaultLogger.debug("MessageCommand#acknowledge(message)") 
  ack_message = {:msg_type => "ack", :ack_id => message[:ack_id]}
  write_to_pipe(ack_message, @ack_pipe)
end

#cleanupObject

Cleans up any used resource, such as pipes and files.



95
96
97
# File 'lib/pipeline_toolkit/message_command.rb', line 95

def cleanup
  @ack_pipe.close if @ack_pipe
end

#descriptionObject

A string describing the machine. The description is used to describe the machine in the monitoring interface. Default implement is to return the name of the Machine’s class. This can be overriden within idividual machines if you wish to change it.



164
165
166
167
# File 'lib/pipeline_toolkit/message_command.rb', line 164

def description
  # NB: Regex removes modules from description
  self.class.name.scan(/(\w+)$/).first
end

#initialize(options = {}) ⇒ Object

Initializes a new instance. A message command can only be initialized through the implementation instance.

Parameters:

  • options (Hash) (defaults to: {})

    An options hash.



52
53
54
# File 'lib/pipeline_toolkit/message_command.rb', line 52

def initialize(options = {})
  @options = options
end

#initialize_machineObject

This method is abstract.

Override in the class the MessageCommand is included into. Provides a

chance to initialize any code that needs to take place once the EventMachine loop has started.



140
141
142
143
# File 'lib/pipeline_toolkit/message_command.rb', line 140

def initialize_machine
  DefaultLogger.debug("MessageCommand#initialize_machine") 
  # Implemented in class that includes me
end

#pass_on(message) ⇒ Object

Pass the message to the next machine in the pipeline by writing the message to the STDOUT.

Parameters:

  • message (Hash)

    The message to pass on



130
131
132
133
# File 'lib/pipeline_toolkit/message_command.rb', line 130

def pass_on(message)
  DefaultLogger.debug("MessageCommand#pass_on(message)") 
  write_to_pipe(message)
end

#process(message) ⇒ Object

Callback for the Handlers::MessageHandler when it receives a message

Parameters:

  • message (Hash)

    The decoded message



104
105
106
107
108
109
110
111
# File 'lib/pipeline_toolkit/message_command.rb', line 104

def process(message)
  DefaultLogger.debug("MessageCommand#process(message)") 
  if message[:msg_type] == "system"
    process_system(message)
  else
    process_standard(message)
  end
end

#process_standard(message) ⇒ Object

This method is abstract.

In your machine implementation you need to override the #process_standard method.

Processes a message. This method must call #pass_on if the message was handled successfully, or if the handling of the message fails for whatever reason, you need to decide how you are going to handle the failure of the message (write message to error log | queue | etc.) and then acknowledge the message by calling #acknowledge. Acknowledging the message mean that the message has been dealt with. All messages must be acknowledged by at least one worker in a pipeline.



153
154
155
156
157
# File 'lib/pipeline_toolkit/message_command.rb', line 153

def process_standard(message)
  DefaultLogger.debug("MessageCommand#process_standard(message)") 
  # Implemented in class that includes me
  pass_on(message)
end

#process_system(message) ⇒ Object

Process the first system message to notify machines down the pipeline which named pipe to use for acknowledgements.

Parameters:

  • message (Hash)

    The system message



175
176
177
178
179
180
181
# File 'lib/pipeline_toolkit/message_command.rb', line 175

def process_system(message)
  DefaultLogger.debug("MessageCommand#process_system(message)") 
  options[:ack] = message[:ack] # inherit setting from upstream
  @ack_pipe = File.open(message[:sys_pipe], "w") # open ack pipe (needs to already exist)
  send_description
  pass_on(message)
end

#send_descriptionObject

Can be overriden in the class the MessageCommand is included into. Called when the acknowledgement named pipe has been established.



191
192
193
194
# File 'lib/pipeline_toolkit/message_command.rb', line 191

def send_description
  message = { :msg_type => "pipe_desc", :description => self.description }
  write_to_pipe(message, @ack_pipe)
end

#startObject

Starts the machine up, ie. creates a new eventmachine reactor loop and starts watching the STDIN for new messages



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/pipeline_toolkit/message_command.rb', line 60

def start
  DefaultLogger.debug("MessageCommand#start") 

  Signal.trap('INT')  { self.stop }
  Signal.trap('TERM') { self.stop }
  Signal.trap('PIPE') { self.stop }

  begin
    EM.run do
      conn = EM.watch($stdin, Handlers::MessageHandler, self, options)
      initialize_machine

      # must call this to setup callback to notify_readable
      conn.notify_readable = true
    end
  rescue Exception => e
    DefaultLogger.error("#{e.class.name}: #{e.message}\n" << e.backtrace.join("\n"))
    raise e
  ensure
    cleanup
  end
end

#stopObject

Stops the command. This stops the event loop, giving it enough time to clear its buffers



86
87
88
89
90
# File 'lib/pipeline_toolkit/message_command.rb', line 86

def stop
  DefaultLogger.info("Shutting down #{self.class.name}")
  DefaultLogger.info "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
  EM.next_tick { EM.stop }
end

#write_to_pipe(message, pipe = $stdout) ⇒ Object



183
184
185
# File 'lib/pipeline_toolkit/message_command.rb', line 183

def write_to_pipe(message, pipe=$stdout)
  pipe.syswrite(MessageCoder.encode(message) << "\n")
end