Class: OpenWFE::StompParticipant

Inherits:
Object
  • Object
show all
Includes:
LocalParticipant
Defined in:
lib/stomp_message/stomp_participant.rb

Overview

Participant to send/receive work items to stomp message servers applications send message and asynch wait for response

Timeoout may need to be changed

On the return side, you can override the method handle_call_result for better mappings between messages calls and the workitems.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(topic, host = 'localhost', port = 61613, timeout = 4, &block) ⇒ StompParticipant

Returns a new instance of StompParticipant.



23
24
25
26
27
28
29
30
31
32
33
# File 'lib/stomp_message/stomp_participant.rb', line 23

def initialize(topic, host='localhost', port=61613, timeout=4, &block)

    
    self.options={}
    self.options[:host]=host
    self.options[:port]=port
    self.options[:topic]=topic
    self.timeout_val = timeout
    self.msg_sender=StompMessage::StompSendTopic.new(self.options)   
    self.msg_sender.setup_auto_close
end

Instance Attribute Details

#msg_senderObject

Returns the value of attribute msg_sender.



22
23
24
# File 'lib/stomp_message/stomp_participant.rb', line 22

def msg_sender
  @msg_sender
end

#optionsObject

Returns the value of attribute options.



22
23
24
# File 'lib/stomp_message/stomp_participant.rb', line 22

def options
  @options
end

#timeout_valObject

Returns the value of attribute timeout_val.



22
23
24
# File 'lib/stomp_message/stomp_participant.rb', line 22

def timeout_val
  @timeout_val
end

Instance Method Details

#consume(workitem) ⇒ Object

The method called by the engine when the flow reaches an instance of this Participant class.



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/stomp_message/stomp_participant.rb', line 39

def consume (workitem)
   
    m=prepare_call_params(workitem)
    puts "message is: #{m.to_xml}"
   # billing_sender.send_topic(m.body,  arg_hash[:msisdn]) 
   # m=StompMessage::Message.new('stomp_BILLING', msg)
     header={}
    # header[:msisdn]=workitem.attributes[:msisdn]
        begin
          Timeout::timeout(self.timeout_val) {
            self.msg_sender.send_topic_acknowledge(m,header)  { 
                           |msg|  workitem=handle_call_result(msg, workitem)
                             }
          }
          rescue Timeout::Error
           puts "STOMP participant:: consume(wi) exception"
           workitem.attributes["__result__"]=false
           workitem.attributes["stomp_TIMEOUT"]=true
        #   raise "timeout" 
          ensure
             reply_to_engine(workitem)
          end
           

   
end

#handle_call_result(result, workitem) ⇒ Object

This implementation simply stuffs the result into the workitem as an attribute named “__result__”.

Feel free to override this method.



85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/stomp_message/stomp_participant.rb', line 85

def handle_call_result (result, workitem)

    puts 'in handle action block' 
     puts 'MESSAGE RECEIVED ----'
      m= StompMessage::Message.load_xml(result)
      workitem.attributes["__result__"]=m.to_xml
      workitem.attributes["command"]=m.command.to_s
       workitem.attributes["body"]=m.body.to_s
       puts "wi #{workitem.attributes.to_s}"
      workitem
      
end

#prepare_call_params(workitem) ⇒ Object

The base implementation :prepares the message param there is a workitem field with the same name.

Feel free to override this method.



72
73
74
75
76
77
# File 'lib/stomp_message/stomp_participant.rb', line 72

def prepare_call_params (workitem)
    m=StompMessage::Message.new(workitem.attributes[:command].to_s,
                                 workitem.attributes[:body].to_s)
    m
   # puts "message is: #{m.to_xml}"
end