Class: Belated::Client

Inherits:
Object
  • Object
show all
Includes:
Singleton
Defined in:
lib/belated/client.rb,
lib/belated/testing.rb

Overview

A client that can perform jobs inline

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#bankObject

Returns the value of attribute bank.



14
15
16
# File 'lib/belated/client.rb', line 14

def bank
  @bank
end

#banker_threadObject

Returns the value of attribute banker_thread.



14
15
16
# File 'lib/belated/client.rb', line 14

def banker_thread
  @banker_thread
end

#proc_tableObject

Returns the value of attribute proc_table.



14
15
16
# File 'lib/belated/client.rb', line 14

def proc_table
  @proc_table
end

#queueObject

Returns the value of attribute queue.



14
15
16
# File 'lib/belated/client.rb', line 14

def queue
  @queue
end

Instance Method Details

#delete_from_tableObject



61
62
63
64
65
66
67
68
69
# File 'lib/belated/client.rb', line 61

def delete_from_table
  return if proc_table.length < 25

  @mutex.synchronize do
    proc_table.select { |_k, v| v.completed }.each do |key, _value|
      proc_table.delete(key)
    end
  end
end

#old_performJobWrapper

The method that pushes the jobs to the queue. If there is no connection, it pushes the job to the bank.

Parameters:

  • job (Object)
    • The the job to be pushed.

  • at (Date)
    • The time at which the job should be executed.

  • max_retries (Integer)
    • Times the job should be retried if it fails.

Returns:



24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/belated/testing.rb', line 24

def perform(job, at: nil, max_retries: 5, active_job: false)
  start unless started?
  return unless proper_job?(job)

  job_wrapper = wrap_job(job, at: at.to_f, max_retries: max_retries, active_job: active_job)
  bank.push(job_wrapper)
  @mutex.synchronize do
    proc_table[job_wrapper.object_id] = job_wrapper if job_wrapper.proc_klass
  end
  self.banker_thread = start_banker_thread if banker_thread.nil?
  job_wrapper
end

#perform(job, at: nil, max_retries: 5, active_job: false) ⇒ JobWrapper

The method that pushes the jobs to the queue. If there is no connection, it pushes the job to the bank.

Parameters:

  • job (Object)
    • The the job to be pushed.

  • at (Date) (defaults to: nil)
    • The time at which the job should be executed.

  • max_retries (Integer) (defaults to: 5)
    • Times the job should be retried if it fails.

Returns:



77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/belated/client.rb', line 77

def perform(job, at: nil, max_retries: 5, active_job: false)
  start unless started?
  return unless proper_job?(job)

  job_wrapper = wrap_job(job, at: at.to_f, max_retries: max_retries, active_job: active_job)
  bank.push(job_wrapper)
  @mutex.synchronize do
    proc_table[job_wrapper.object_id] = job_wrapper if job_wrapper.proc_klass
  end
  self.banker_thread = start_banker_thread if banker_thread.nil?
  job_wrapper
end

#perform_belatedJobWrapper

The method that pushes the jobs to the queue. If there is no connection, it pushes the job to the bank.

Parameters:

  • job (Object)
    • The the job to be pushed.

  • at (Date)
    • The time at which the job should be executed.

  • max_retries (Integer)
    • Times the job should be retried if it fails.

Returns:



89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/belated/client.rb', line 89

def perform(job, at: nil, max_retries: 5, active_job: false)
  start unless started?
  return unless proper_job?(job)

  job_wrapper = wrap_job(job, at: at.to_f, max_retries: max_retries, active_job: active_job)
  bank.push(job_wrapper)
  @mutex.synchronize do
    proc_table[job_wrapper.object_id] = job_wrapper if job_wrapper.proc_klass
  end
  self.banker_thread = start_banker_thread if banker_thread.nil?
  job_wrapper
end

#perform_laterJobWrapper

The method that pushes the jobs to the queue. If there is no connection, it pushes the job to the bank.

Parameters:

  • job (Object)
    • The the job to be pushed.

  • at (Date)
    • The time at which the job should be executed.

  • max_retries (Integer)
    • Times the job should be retried if it fails.

Returns:



90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/belated/client.rb', line 90

def perform(job, at: nil, max_retries: 5, active_job: false)
  start unless started?
  return unless proper_job?(job)

  job_wrapper = wrap_job(job, at: at.to_f, max_retries: max_retries, active_job: active_job)
  bank.push(job_wrapper)
  @mutex.synchronize do
    proc_table[job_wrapper.object_id] = job_wrapper if job_wrapper.proc_klass
  end
  self.banker_thread = start_banker_thread if banker_thread.nil?
  job_wrapper
end

#startvoid Also known as: initialize

This method returns an undefined value.

Starts up the client. Connects to the queue through DRb.



19
20
21
22
23
24
25
26
27
28
29
# File 'lib/belated/client.rb', line 19

def start
  return if started?

  server_uri = Belated::URI
  DRb.start_service
  self.proc_table = {}
  self.bank = Thread::Queue.new
  self.queue = DRbObject.new_with_uri(server_uri)
  @started = true
  @mutex = Mutex.new
end

#start_banker_threadvoid

This method returns an undefined value.

Thread in charge of handling the bank queue. You probably want to memoize the client in order to avoid having many threads in the sleep state.



46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/belated/client.rb', line 46

def start_banker_thread
  Thread.new do
    loop do
      delete_from_table
      sleep Belated.client_heartbeat and next if bank.empty?

      bank.length.times do
        queue.push(wrapper = bank.pop)
      rescue DRb::DRbConnError
        bank.push(wrapper)
      end
    end
  end
end

#started?Boolean

Returns:

  • (Boolean)


32
33
34
# File 'lib/belated/client.rb', line 32

def started?
  @started
end

#turn_offObject

Makes it possible to reset the client



37
38
39
40
# File 'lib/belated/client.rb', line 37

def turn_off
  @started = false
  banker_thread&.kill
end