Class: Ruote::Beanstalk::BsParticipant

Inherits:
Object
  • Object
show all
Includes:
LocalParticipant
Defined in:
lib/ruote/beanstalk/participant.rb

Overview

This participant emits workitems towards a beanstalk queue.

engine.register_participant(
  :heavy_labour,
  :reply_by_default => true, :beanstalk => '127.0.0.1:11300')

workitem format

Workitems are encoded in the format

[ 'workitem', workitem.to_h ]

and then serialized as JSON strings.

cancel items

Like workitems, but the format is

[ 'cancelitem', fei.to_h, flavour.to_s ]

where fei is the FlowExpressionId of the expression getting cancelled (and whose workitems are to be retired) and flavour is either ‘cancel’ or ‘kill’.

extending this participant

Extend and overwrite encode_workitem and encode_cancelitem or simply re-open the class and change those methods.

:beanstalk

Indicates which beanstalk to talk to

engine.register_participant(
  'alice'
  Ruote::Beanstalk::BsParticipant,
  'beanstalk' => '127.0.0.1:11300')

:tube

Most of the time, you want the workitems (or the cancelitems) to be emitted over/in a specific tube

engine.register_participant(
  'alice'
  Ruote::Beanstalk::BsParticipant,
  'beanstalk' => '127.0.0.1:11300',
  'tube' => 'ruote-workitems')

:reply_by_default

If the participant is configured with ‘reply_by_default’ => true, the participant will dispatch the workitem over to Beanstalk and then immediately reply to its ruote engine (letting the flow resume).

engine.register_participant(
  'alice'
  Ruote::Beanstalk::BsParticipant,
  'beanstalk' => '127.0.0.1:11300',
  'reply_by_default' => true)

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ BsParticipant

Returns a new instance of BsParticipant.



105
106
107
108
# File 'lib/ruote/beanstalk/participant.rb', line 105

def initialize (opts)

  @opts = opts
end

Instance Method Details

#cancel(fei, flavour) ⇒ Object



117
118
119
120
# File 'lib/ruote/beanstalk/participant.rb', line 117

def cancel (fei, flavour)

  connection.put(encode_cancelitem(fei, flavour))
end

#consume(workitem) ⇒ Object



110
111
112
113
114
115
# File 'lib/ruote/beanstalk/participant.rb', line 110

def consume (workitem)

  connection.put(encode_workitem(workitem))

  reply(workitem) if @opts['reply_by_default']
end

#encode_cancelitem(fei, flavour) ⇒ Object



127
128
129
130
# File 'lib/ruote/beanstalk/participant.rb', line 127

def encode_cancelitem (fei, flavour)

  Rufus::Json.encode([ 'cancelitem', fei.to_h, flavour.to_s ])
end

#encode_workitem(workitem) ⇒ Object



122
123
124
125
# File 'lib/ruote/beanstalk/participant.rb', line 122

def encode_workitem (workitem)

  Rufus::Json.encode([ 'workitem', workitem.to_h ])
end