Class: Disque

Inherits:
Object
  • Object
show all
Defined in:
lib/disque.rb

Constant Summary collapse

ECONN =
[
  Errno::ECONNREFUSED,
  Errno::EINVAL,
]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(hosts, auth: nil, cycle: 1000) ⇒ Disque

Create a new Disque client by passing a list of nodes.

Disque.new(["127.0.0.1:7711", "127.0.0.1:7712", "127.0.0.1:7713"])

Alternatively, you can pass a single string with a comma-separated list of nodes:

Disque.new("127.0.0.1:7711,127.0.0.1:7712,127.0.0.1:7713")

For each operation, a counter is updated to signal which node was the originator of the message. Based on that information, after a full cycle (1000 operations, but configurable on initialization) the stats are checked to see what is the most convenient node to connect to in order to avoid extra jumps.

TODO Account for timeout



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/disque.rb', line 29

def initialize(hosts, auth: nil, cycle: 1000)

  # Split a string of hosts if necessary
  if String === hosts
    hosts = hosts.split(",")
  end

  # Cluster password
  @auth = auth

  # Cycle length
  @cycle = cycle

  # Operations counter
  @count = 0

  # Known nodes
  @nodes = Hash.new

  # Connection stats
  @stats = Hash.new(0)

  # Main client
  @client = Redic.new

  # Scout client
  @scout = Redic.new

  # Preferred client prefix
  @prefix = nil

  explore!(hosts)
end

Instance Attribute Details

#nodesObject (readonly)

Returns the value of attribute nodes.



10
11
12
# File 'lib/disque.rb', line 10

def nodes
  @nodes
end

#prefixObject (readonly)

Returns the value of attribute prefix.



11
12
13
# File 'lib/disque.rb', line 11

def prefix
  @prefix
end

#statsObject (readonly)

Returns the value of attribute stats.



9
10
11
# File 'lib/disque.rb', line 9

def stats
  @stats
end

Instance Method Details

#call(*args) ⇒ Object

Run commands on the active connection. If the connection is lost, new connections are tried until all nodes become unavailable.



138
139
140
141
142
143
# File 'lib/disque.rb', line 138

def call(*args)
  @client.call!(*args)
rescue *ECONN
  explore!(@nodes.values)
  retry
end

#explore!(hosts) ⇒ Object

Collect the list of nodes and keep a connection to the node that provided that information.



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/disque.rb', line 73

def explore!(hosts)

  # Reset nodes
  @nodes.clear

  hosts.each do |host|
    begin
      @scout.configure(url(host))

      result = @scout.call!("HELLO")

      # For keeping track of nodes and stats, we use only the
      # first eight characters of the node_id. That's because
      # those eight characters are part of the job_ids, and
      # our stats are based on that.
      @prefix = result[1][0,8]

      # Connect the main client to the first node that replied
      @client.configure(@scout.url)

      # Populate cache with the list of node and their hosts
      result[2..-1].each do |node_id, hostname, port, priority|
        @nodes[node_id[0,8]] = sprintf("%s:%s", hostname, port)
      end

      @scout.quit

      break

    rescue *ECONN
      $stderr.puts($!.inspect)
    end
  end

  if @nodes.empty?
    raise ArgumentError, "nodes unavailable"
  end
end

#fetch(from: [], count: 1, timeout: 0) ⇒ Object



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# File 'lib/disque.rb', line 169

def fetch(from: [], count: 1, timeout: 0)
  pick_client!

  jobs = call(
    "GETJOB",
      "TIMEOUT", timeout,
      "COUNT", count,
      "FROM", *from)

  if jobs then
    @count += 1

    jobs.each do |queue, msgid, job|

      # Update stats
      @stats[msgid[2,8]] += 1

      if block_given?

        # Process job
        yield(job, queue)

        # Remove job
        call("ACKJOB", msgid)
      end
    end
  end

  return jobs
end

#options_to_arguments(options) ⇒ Object



200
201
202
203
204
205
206
207
208
209
210
211
212
# File 'lib/disque.rb', line 200

def options_to_arguments(options)
  arguments = []

  options.each do |key, value|
    if value == true
      arguments.push(key)
    else
      arguments.push(key, value)
    end
  end

  return arguments
end

#pick_client!Object



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/disque.rb', line 112

def pick_client!
  if @count == @cycle
    @count = 0
    prefix, _ = @stats.max { |a, b| a[1] <=> b[1] }

    if prefix != @prefix
      host = @nodes[prefix]

      if host

        # Reconfigure main client
        @client.configure(url(host))

        # Save current node prefix
        @prefix = prefix

        # Reset stats for this new connection
        @stats.clear
      end
    end
  end
end

#push(queue_name, job, ms_timeout, options = {}) ⇒ Object

Disque’s ADDJOB signature is as follows:

ADDJOB queue_name job <ms-timeout>
  [REPLICATE <count>]
  [DELAY <sec>]
  [RETRY <sec>]
  [TTL <sec>]
  [MAXLEN <count>]
  [ASYNC]

You can pass any optional arguments as a hash, for example:

disque.push("foo", "myjob", 1000, ttl: 1, async: true)

Note that ‘async` is a special case because it’s just a flag. That’s why ‘true` must be passed as its value.



162
163
164
165
166
167
# File 'lib/disque.rb', line 162

def push(queue_name, job, ms_timeout, options = {})
  command = ["ADDJOB", queue_name, job, ms_timeout]
  command += options_to_arguments(options)

  call(*command)
end

#quitObject



214
215
216
# File 'lib/disque.rb', line 214

def quit
  @client.quit
end

#url(host) ⇒ Object



63
64
65
66
67
68
69
# File 'lib/disque.rb', line 63

def url(host)
  if @auth
    sprintf("disque://:%s@%s", @auth, host)
  else
    sprintf("disque://%s", host)
  end
end