Class: Faktory::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/faktory/client.rb

Constant Summary collapse

HASHER =
proc do |iter, pwd, salt|
  sha = Digest::SHA256.new
  hashing = pwd + salt
  iter.times do
    hashing = sha.digest(hashing)
  end
  Digest.hexencode(hashing)
end
@@random_process_wid =
""

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url: 'tcp://localhost:7419', debug: false) ⇒ Client

Best practice is to rely on the localhost default for development and configure the environment variables for non-development environments.

FAKTORY_PROVIDER=MY_FAKTORY_URL MY_FAKTORY_URL=tcp://:[email protected]:7419

Note above, the URL can contain the password for secure installations.



38
39
40
41
42
# File 'lib/faktory/client.rb', line 38

def initialize(url: 'tcp://localhost:7419', debug: false)
  @debug = debug
  @location = uri_from_env || URI(url)
  open
end

Instance Attribute Details

#middlewareObject

Returns the value of attribute middleware.



29
30
31
# File 'lib/faktory/client.rb', line 29

def middleware
  @middleware
end

Class Method Details

.worker!Object

Called when booting the worker process to signal that this process will consume jobs and send BEAT.



25
26
27
# File 'lib/faktory/client.rb', line 25

def self.worker!
  @@random_process_wid = SecureRandom.hex(8)
end

Instance Method Details

#ack(jid) ⇒ Object



76
77
78
79
80
81
# File 'lib/faktory/client.rb', line 76

def ack(jid)
  transaction do
    command("ACK", %Q[{"jid":"#{jid}"}])
    ok!
  end
end

#beatObject

Sends a heartbeat to the server, in order to prove this worker process is still alive.

Return a string signal to process, legal values are “quiet” or “terminate”. The quiet signal is informative: the server won’t allow this process to FETCH any more jobs anyways.



99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/faktory/client.rb', line 99

def beat
  transaction do
    command("BEAT", %Q[{"wid":"#{@@random_process_wid}"}])
    str = result
    if str == "OK"
      str
    else
      hash = JSON.parse(str)
      hash["state"]
    end
  end
end

#closeObject



44
45
46
47
48
49
# File 'lib/faktory/client.rb', line 44

def close
  return unless @sock
  command "END"
  @sock.close
  @sock = nil
end

#fail(jid, ex) ⇒ Object



83
84
85
86
87
88
89
90
91
# File 'lib/faktory/client.rb', line 83

def fail(jid, ex)
  transaction do
    command("FAIL", JSON.dump({ message: ex.message[0...1000],
                      errtype: ex.class.name,
                      jid: jid,
                      backtrace: ex.backtrace}))
    ok!
  end
end

#fetch(*queues) ⇒ Object



67
68
69
70
71
72
73
74
# File 'lib/faktory/client.rb', line 67

def fetch(*queues)
  job = nil
  transaction do
    command("FETCH", *queues)
    job = result
  end
  JSON.parse(job) if job
end

#flushObject

Warning: this clears all job data in Faktory



52
53
54
55
56
57
# File 'lib/faktory/client.rb', line 52

def flush
  transaction do
    command "FLUSH"
    ok!
  end
end

#infoObject



112
113
114
115
116
117
118
# File 'lib/faktory/client.rb', line 112

def info
  transaction do
    command("INFO")
    str = result
    JSON.parse(str) if str
  end
end

#push(job) ⇒ Object



59
60
61
62
63
64
65
# File 'lib/faktory/client.rb', line 59

def push(job)
  transaction do
    command "PUSH", JSON.generate(job)
    ok!
    job["jid"]
  end
end