Class: Cod::Beanstalk::Channel

Inherits:
Channel
  • Object
show all
Defined in:
lib/cod/beanstalk/channel.rb

Overview

NOTE: Beanstalk channels cannot currently be used in Cod.select. This is due to limitations inherent in the beanstalkd protocol. We’ll probably try to get a patch into beanstalkd to change this.

NOTE: If you embed a beanstalk channel into one of your messages, you will get a channel that connects to the same server and the same tube on the other end. This behaviour is useful for Cod::Service.

Defined Under Namespace

Classes: Control

Constant Summary collapse

JOB_PRIORITY =
0

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods inherited from Channel

#interact

Constructor Details

#initialize(tube_name, server_url) ⇒ Channel

Returns a new instance of Channel.



19
20
21
22
23
24
25
# File 'lib/cod/beanstalk/channel.rb', line 19

def initialize(tube_name, server_url)
  super()
  @tube_name, @server_url = tube_name, server_url

  @body_serializer = Cod::SimpleSerializer.new
  @transport = connection(server_url, tube_name)
end

Instance Attribute Details

#server_urlObject (readonly)

Beanstalkd server this channel is connected to



17
18
19
# File 'lib/cod/beanstalk/channel.rb', line 17

def server_url
  @server_url
end

#tube_nameObject (readonly)

Which tube this channel is connected to



15
16
17
# File 'lib/cod/beanstalk/channel.rb', line 15

def tube_name
  @tube_name
end

Class Method Details

._load(str) ⇒ Object

:nodoc:



131
132
133
134
# File 'lib/cod/beanstalk/channel.rb', line 131

def self._load(str) # :nodoc:
  tube_name, server_url = Marshal.load(str)
  Cod.beanstalk(tube_name, server_url)
end

Instance Method Details

#_dump(level) ⇒ Object

———————————————————- serialization



127
128
129
130
# File 'lib/cod/beanstalk/channel.rb', line 127

def _dump(level) # :nodoc:
  Marshal.dump(
    [@tube_name, @server_url])
end

#bs_bury(msg_id) ⇒ Object



146
147
148
149
150
# File 'lib/cod/beanstalk/channel.rb', line 146

def bs_bury(msg_id)
  # NOTE: Why I need to assign a priority when burying I fail to
  # understand. Like a priority for rapture?
  bs_command([:bury, msg_id, JOB_PRIORITY], :buried)
end

#bs_delete(msg_id) ⇒ Object

—————————————————– beanstalk commands



137
138
139
# File 'lib/cod/beanstalk/channel.rb', line 137

def bs_delete(msg_id) # :nodoc:
  bs_command([:delete, msg_id], :deleted)
end

#bs_release(msg_id) ⇒ Object

:nodoc:



140
141
142
# File 'lib/cod/beanstalk/channel.rb', line 140

def bs_release(msg_id) # :nodoc:
  bs_command([:release, msg_id, JOB_PRIORITY, 0], :released)
end

#bs_release_with_delay(msg_id, seconds) ⇒ Object

:nodoc:



143
144
145
# File 'lib/cod/beanstalk/channel.rb', line 143

def bs_release_with_delay(msg_id, seconds) # :nodoc:
  bs_command([:release, msg_id, JOB_PRIORITY, seconds], :released)
end

#client(answers_to) ⇒ Object



67
68
69
# File 'lib/cod/beanstalk/channel.rb', line 67

def client(answers_to)
  Service::Client.new(self, answers_to)
end

#closeObject



54
55
56
# File 'lib/cod/beanstalk/channel.rb', line 54

def close
  @transport.close
end

#getObject



45
46
47
48
49
50
51
52
# File 'lib/cod/beanstalk/channel.rb', line 45

def get
  id, msg = bs_reserve
  
  # We delete the job immediately, since #get should be definitive.
  bs_delete(id)

  deserialize(msg)
end

#initialize_copy(other) ⇒ Object

Allow #dup on beanstalkd channels, resulting in a new connection to the beanstalkd server.



30
31
32
33
# File 'lib/cod/beanstalk/channel.rb', line 30

def initialize_copy(other)
  super(other)
  initialize(other.tube_name, other.server_url)
end

#put(msg) ⇒ Object



35
36
37
38
39
40
41
42
43
# File 'lib/cod/beanstalk/channel.rb', line 35

def put(msg)
  pri   = JOB_PRIORITY
  delay = 0
  ttr   = 120
  body = @body_serializer.en(msg)
  
  answer, *rest = @transport.interact([:put, pri, delay, ttr, body])
  fail "#put fails, #{answer.inspect}" unless answer == :inserted
end

#serviceObject

——————————————————— service/client



64
65
66
# File 'lib/cod/beanstalk/channel.rb', line 64

def service
  Service.new(self)
end

#to_read_fdsObject



58
59
60
61
# File 'lib/cod/beanstalk/channel.rb', line 58

def to_read_fds
  fail "Cod.select not supported with beanstalkd channels.\n"+
    "To support this, we will have to extend the beanstalkd protocol."
end

#try_getObject

——————————————————– queue interface



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/cod/beanstalk/channel.rb', line 72

def try_get 
  fail "No block given to #try_get" unless block_given?
  
  id, msg = bs_reserve
  control = Control.new(id, self)
  
  begin
    retval = yield(deserialize(msg), control)
  rescue Exception
    control.release unless control.command_given?
    raise
  ensure
    control.delete unless control.command_given?
  end
  
  return retval
end