Class: Cod::Beanstalk::Channel
- Defined in:
- lib/cod/beanstalk/channel.rb
Overview
NOTE: Beanstalk channels cannot currently be used in Cod.select. This is due to limitations inherent in the beanstalkd protocol. We’ll probably try to get a patch into beanstalkd to change this.
NOTE: If you embed a beanstalk channel into one of your messages, you will get a channel that connects to the same server and the same tube on the other end. This behaviour is useful for Cod::Service.
Defined Under Namespace
Classes: Control
Constant Summary collapse
- JOB_PRIORITY =
0
Instance Attribute Summary collapse
-
#server_url ⇒ Object
readonly
Beanstalkd server this channel is connected to.
-
#tube_name ⇒ Object
readonly
Which tube this channel is connected to.
Class Method Summary collapse
-
._load(str) ⇒ Object
:nodoc:.
Instance Method Summary collapse
-
#_dump(level) ⇒ Object
———————————————————- serialization.
- #bs_bury(msg_id) ⇒ Object
-
#bs_delete(msg_id) ⇒ Object
—————————————————– beanstalk commands.
-
#bs_release(msg_id) ⇒ Object
:nodoc:.
-
#bs_release_with_delay(msg_id, seconds) ⇒ Object
:nodoc:.
- #client(answers_to) ⇒ Object
- #close ⇒ Object
- #get ⇒ Object
-
#initialize(tube_name, server_url) ⇒ Channel
constructor
A new instance of Channel.
-
#initialize_copy(other) ⇒ Object
Allow #dup on beanstalkd channels, resulting in a new connection to the beanstalkd server.
- #put(msg) ⇒ Object
-
#service ⇒ Object
——————————————————— service/client.
- #to_read_fds ⇒ Object
-
#try_get ⇒ Object
——————————————————– queue interface.
Methods inherited from Channel
Constructor Details
#initialize(tube_name, server_url) ⇒ Channel
Returns a new instance of Channel.
19 20 21 22 23 24 25 |
# File 'lib/cod/beanstalk/channel.rb', line 19 def initialize(tube_name, server_url) super() @tube_name, @server_url = tube_name, server_url @body_serializer = Cod::SimpleSerializer.new @transport = connection(server_url, tube_name) end |
Instance Attribute Details
#server_url ⇒ Object (readonly)
Beanstalkd server this channel is connected to
17 18 19 |
# File 'lib/cod/beanstalk/channel.rb', line 17 def server_url @server_url end |
#tube_name ⇒ Object (readonly)
Which tube this channel is connected to
15 16 17 |
# File 'lib/cod/beanstalk/channel.rb', line 15 def tube_name @tube_name end |
Class Method Details
Instance Method Details
#_dump(level) ⇒ Object
———————————————————- serialization
127 128 129 130 |
# File 'lib/cod/beanstalk/channel.rb', line 127 def _dump(level) # :nodoc: Marshal.dump( [@tube_name, @server_url]) end |
#bs_bury(msg_id) ⇒ Object
146 147 148 149 150 |
# File 'lib/cod/beanstalk/channel.rb', line 146 def bs_bury(msg_id) # NOTE: Why I need to assign a priority when burying I fail to # understand. Like a priority for rapture? bs_command([:bury, msg_id, JOB_PRIORITY], :buried) end |
#bs_delete(msg_id) ⇒ Object
—————————————————– beanstalk commands
137 138 139 |
# File 'lib/cod/beanstalk/channel.rb', line 137 def bs_delete(msg_id) # :nodoc: bs_command([:delete, msg_id], :deleted) end |
#bs_release(msg_id) ⇒ Object
:nodoc:
140 141 142 |
# File 'lib/cod/beanstalk/channel.rb', line 140 def bs_release(msg_id) # :nodoc: bs_command([:release, msg_id, JOB_PRIORITY, 0], :released) end |
#bs_release_with_delay(msg_id, seconds) ⇒ Object
:nodoc:
143 144 145 |
# File 'lib/cod/beanstalk/channel.rb', line 143 def bs_release_with_delay(msg_id, seconds) # :nodoc: bs_command([:release, msg_id, JOB_PRIORITY, seconds], :released) end |
#client(answers_to) ⇒ Object
67 68 69 |
# File 'lib/cod/beanstalk/channel.rb', line 67 def client(answers_to) Service::Client.new(self, answers_to) end |
#close ⇒ Object
54 55 56 |
# File 'lib/cod/beanstalk/channel.rb', line 54 def close @transport.close end |
#get ⇒ Object
45 46 47 48 49 50 51 52 |
# File 'lib/cod/beanstalk/channel.rb', line 45 def get id, msg = bs_reserve # We delete the job immediately, since #get should be definitive. bs_delete(id) deserialize(msg) end |
#initialize_copy(other) ⇒ Object
Allow #dup on beanstalkd channels, resulting in a new connection to the beanstalkd server.
30 31 32 33 |
# File 'lib/cod/beanstalk/channel.rb', line 30 def initialize_copy(other) super(other) initialize(other.tube_name, other.server_url) end |
#put(msg) ⇒ Object
35 36 37 38 39 40 41 42 43 |
# File 'lib/cod/beanstalk/channel.rb', line 35 def put(msg) pri = JOB_PRIORITY delay = 0 ttr = 120 body = @body_serializer.en(msg) answer, *rest = @transport.interact([:put, pri, delay, ttr, body]) fail "#put fails, #{answer.inspect}" unless answer == :inserted end |
#service ⇒ Object
——————————————————— service/client
64 65 66 |
# File 'lib/cod/beanstalk/channel.rb', line 64 def service Service.new(self) end |
#to_read_fds ⇒ Object
58 59 60 61 |
# File 'lib/cod/beanstalk/channel.rb', line 58 def to_read_fds fail "Cod.select not supported with beanstalkd channels.\n"+ "To support this, we will have to extend the beanstalkd protocol." end |
#try_get ⇒ Object
——————————————————– queue interface
72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/cod/beanstalk/channel.rb', line 72 def try_get fail "No block given to #try_get" unless block_given? id, msg = bs_reserve control = Control.new(id, self) begin retval = yield(deserialize(msg), control) rescue Exception control.release unless control.command_given? raise ensure control.delete unless control.command_given? end return retval end |