Class: Disque

Inherits:
Object
  • Object
show all
Defined in:
lib/disque.rb

Constant Summary collapse

ECONN =
[
  Errno::ECONNREFUSED,
  Errno::EINVAL,
]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(hosts, auth: nil, cycle: 1000) ⇒ Disque

Create a new Disque client by passing a list of nodes.

Disque.new(["127.0.0.1:7711", "127.0.0.1:7712", "127.0.0.1:7713"])

For each operation, a counter is updated to signal which node was the originator of the message. Based on that information, after a full cycle (1000 operations, but configurable on initialization) the stats are checked to see what is the most convenient node to connect to in order to avoid extra jumps.

TODO Account for authentication TODO Account for timeout



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/disque.rb', line 25

def initialize(hosts, auth: nil, cycle: 1000)

  # Cluster password
  @auth = auth

  # Cycle length
  @cycle = cycle

  # Operations counter
  @count = 0

  # Known nodes
  @nodes = Hash.new

  # Connection stats
  @stats = Hash.new(0)

  # Main client
  @client = Redic.new

  # Scout client
  @scout = Redic.new

  # Preferred client prefix
  @prefix = nil

  explore!(hosts)
end

Instance Attribute Details

#nodesObject (readonly)

Returns the value of attribute nodes.



10
11
12
# File 'lib/disque.rb', line 10

def nodes
  @nodes
end

#prefixObject (readonly)

Returns the value of attribute prefix.



11
12
13
# File 'lib/disque.rb', line 11

def prefix
  @prefix
end

#statsObject (readonly)

Returns the value of attribute stats.



9
10
11
# File 'lib/disque.rb', line 9

def stats
  @stats
end

Instance Method Details

#call(*args) ⇒ Object

Run commands on the active connection. If the connection is lost, new connections are tried until all nodes become unavailable.



129
130
131
132
133
134
# File 'lib/disque.rb', line 129

def call(*args)
  @client.call!(*args)
rescue *ECONN
  explore!(@nodes.values)
  retry
end

#explore!(hosts) ⇒ Object

Collect the list of nodes and keep a connection to the node that provided that information.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/disque.rb', line 64

def explore!(hosts)

  # Reset nodes
  @nodes.clear

  hosts.each do |host|
    begin
      @scout.configure(url(host))

      result = @scout.call!("HELLO")

      # For keeping track of nodes and stats, we use only the
      # first eight characters of the node_id. That's because
      # those eight characters are part of the job_ids, and
      # our stats are based on that.
      @prefix = result[1][0,8]

      # Connect the main client to the first node that replied
      @client.configure(@scout.url)

      # Populate cache with the list of node and their hosts
      result[2..-1].each do |node_id, hostname, port, priority|
        @nodes[node_id[0,8]] = sprintf("%s:%s", hostname, port)
      end

      @scout.quit

      break

    rescue *ECONN
      $stderr.puts($!.inspect)
    end
  end

  if @nodes.empty?
    raise ArgumentError, "nodes unavailable"
  end
end

#fetch(from: [], count: 1, timeout: 0) ⇒ Object



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/disque.rb', line 160

def fetch(from: [], count: 1, timeout: 0)
  pick_client!

  jobs = call(
    "GETJOB",
      "TIMEOUT", timeout,
      "COUNT", count,
      "FROM", *from)

  if jobs then
    @count += 1

    jobs.each do |queue, msgid, job|

      # Update stats
      @stats[msgid[2,8]] += 1

      if block_given?

        # Process job
        yield(job, queue)

        # Remove job
        call("ACKJOB", msgid)
      end
    end
  end

  return jobs
end

#options_to_arguments(options) ⇒ Object



191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/disque.rb', line 191

def options_to_arguments(options)
  arguments = []

  options.each do |key, value|
    if value == true
      arguments.push(key)
    else
      arguments.push(key, value)
    end
  end

  return arguments
end

#pick_client!Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/disque.rb', line 103

def pick_client!
  if @count == @cycle
    @count = 0
    prefix, _ = @stats.max { |a, b| a[1] <=> b[1] }

    if prefix != @prefix
      host = @nodes[prefix]

      if host

        # Reconfigure main client
        @client.configure(url(host))

        # Save current node prefix
        @prefix = prefix

        # Reset stats for this new connection
        @stats.clear
      end
    end
  end
end

#push(queue_name, job, ms_timeout, options = {}) ⇒ Object

Disque’s ADDJOB signature is as follows:

ADDJOB queue_name job <ms-timeout>
  [REPLICATE <count>]
  [DELAY <sec>]
  [RETRY <sec>]
  [TTL <sec>]
  [MAXLEN <count>]
  [ASYNC]

You can pass any optional arguments as a hash, for example:

disque.push("foo", "myjob", 1000, ttl: 1, async: true)

Note that ‘async` is a special case because it’s just a flag. That’s why ‘true` must be passed as its value.



153
154
155
156
157
158
# File 'lib/disque.rb', line 153

def push(queue_name, job, ms_timeout, options = {})
  command = ["ADDJOB", queue_name, job, ms_timeout]
  command += options_to_arguments(options)

  call(*command)
end

#quitObject



205
206
207
# File 'lib/disque.rb', line 205

def quit
  @client.quit
end

#url(host) ⇒ Object



54
55
56
57
58
59
60
# File 'lib/disque.rb', line 54

def url(host)
  if @auth
    sprintf("disque://:%s@%s", @auth, host)
  else
    sprintf("disque://%s", host)
  end
end