Class: Cod::Beanstalk::Channel

Inherits:
Channel
  • Object
show all
Defined in:
lib/cod/beanstalk/channel.rb

Overview

Note:

Beanstalk channels cannot currently be used in Cod.select. This is due to limitations inherent in the beanstalkd protocol. We’ll probably try to get a patch into beanstalkd to change this.

Note:

If you embed a beanstalk channel into one of your messages, you will get a channel that connects to the same server and the same tube on the other end. This behaviour is useful for Service.

A channel based on a beanstalkd tube. A #put will insert messages into the tube, and a #get will fetch the next message that is pending on the tube.

Defined Under Namespace

Classes: Control

Constant Summary collapse

JOB_PRIORITY =

All messages get inserted into the beanstalk queue as this priority.

0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Channel

#interact

Constructor Details

#initialize(tube_name, server_url) ⇒ Channel

Returns a new instance of Channel.



24
25
26
27
28
29
30
# File 'lib/cod/beanstalk/channel.rb', line 24

def initialize(tube_name, server_url)
  super()
  @tube_name, @server_url = tube_name, server_url

  @body_serializer = Cod::SimpleSerializer.new
  @transport = connection(server_url, tube_name)
end

Instance Attribute Details

#server_urlObject (readonly)

Beanstalkd server this channel is connected to



22
23
24
# File 'lib/cod/beanstalk/channel.rb', line 22

def server_url
  @server_url
end

#tube_nameObject (readonly)

Which tube this channel is connected to



20
21
22
# File 'lib/cod/beanstalk/channel.rb', line 20

def tube_name
  @tube_name
end

Class Method Details

._load(str) ⇒ Object



173
174
175
176
# File 'lib/cod/beanstalk/channel.rb', line 173

def self._load(str) # :nodoc:
  tube_name, server_url = Marshal.load(str)
  Cod.beanstalk(tube_name, server_url)
end

Instance Method Details

#_dump(level) ⇒ Object

———————————————————- serialization



168
169
170
171
# File 'lib/cod/beanstalk/channel.rb', line 168

def _dump(level) # :nodoc:
  Marshal.dump(
    [@tube_name, @server_url])
end

#bs_bury(msg_id) ⇒ Object



192
193
194
195
196
# File 'lib/cod/beanstalk/channel.rb', line 192

def bs_bury(msg_id)
  # NOTE: Why I need to assign a priority when burying I fail to
  # understand. Like a priority for rapture?
  bs_command([:bury, msg_id, JOB_PRIORITY], :buried)
end

#bs_delete(msg_id) ⇒ Object

—————————————————– beanstalk commands



180
181
182
# File 'lib/cod/beanstalk/channel.rb', line 180

def bs_delete(msg_id)
  bs_command([:delete, msg_id], :deleted)
end

#bs_release(msg_id) ⇒ Object



184
185
186
# File 'lib/cod/beanstalk/channel.rb', line 184

def bs_release(msg_id)
  bs_command([:release, msg_id, JOB_PRIORITY, 0], :released)
end

#bs_release_with_delay(msg_id, seconds) ⇒ Object



188
189
190
# File 'lib/cod/beanstalk/channel.rb', line 188

def bs_release_with_delay(msg_id, seconds)
  bs_command([:release, msg_id, JOB_PRIORITY, seconds], :released)
end

#client(answers_to) ⇒ Object



73
74
75
# File 'lib/cod/beanstalk/channel.rb', line 73

def client(answers_to)
  Service::Client.new(self, answers_to)
end

#closeObject



59
60
61
# File 'lib/cod/beanstalk/channel.rb', line 59

def close
  @transport.close
end

#getObject



50
51
52
53
54
55
56
57
# File 'lib/cod/beanstalk/channel.rb', line 50

def get
  id, msg = bs_reserve
  
  # We delete the job immediately, since #get should be definitive.
  bs_delete(id)

  deserialize(msg)
end

#initialize_copy(other) ⇒ Object

Allow #dup on beanstalkd channels, resulting in a new connection to the beanstalkd server.



35
36
37
38
# File 'lib/cod/beanstalk/channel.rb', line 35

def initialize_copy(other)
  super(other)
  initialize(other.tube_name, other.server_url)
end

#put(msg) ⇒ Object



40
41
42
43
44
45
46
47
48
# File 'lib/cod/beanstalk/channel.rb', line 40

def put(msg)
  pri   = JOB_PRIORITY
  delay = 0
  ttr   = 120
  body = @body_serializer.en(msg)
  
  answer, *rest = @transport.interact([:put, pri, delay, ttr, body])
  fail "#put fails, #{answer.inspect}" unless answer == :inserted
end

#serviceObject

——————————————————— service/client



70
71
72
# File 'lib/cod/beanstalk/channel.rb', line 70

def service
  Service.new(self)
end

#to_read_fdsObject



64
65
66
67
# File 'lib/cod/beanstalk/channel.rb', line 64

def to_read_fds
  fail "Cod.select not supported with beanstalkd channels.\n"+
    "To support this, we will have to extend the beanstalkd protocol."
end

#try_get {|Object, Cod::Beanstalk::Channel::Control| ... } ⇒ Object

Like #get, read next message from the channel but reserve the right to put it back. This uses beanstalkds flow control features to be able to control message flow in the case of exceptions and the like.

If the block given to this message raises an exception, the message is released unless a control command has been given. This means that other workers on the same tube will get the chance to see the message.

If the block is exited without specifying a fate for the message, it is deleted from the tube.

Examples:

All the flow control that beanstalkd allows

channel.try_get { |msg, ctl|
  if msg == 1
    ctl.release # don't handle messages of type 1
  else
    ctl.bury    # for example
  end
}

Exceptions release the message

# Will release the message and allow other connected channels to 
# #get it.
channel.try_get { |msg, ctl|
  fail "No such message handler"
}

Yields:

Returns:

  • the blocks return value

See Also:



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/cod/beanstalk/channel.rb', line 112

def try_get 
  fail "No block given to #try_get" unless block_given?
  
  id, msg = bs_reserve
  control = Control.new(id, self)
  
  begin
    retval = yield(deserialize(msg), control)
  rescue Exception
    control.release unless control.command_given?
    raise
  ensure
    control.delete unless control.command_given?
  end
  
  return retval
end