Class: PGJob::Job

Inherits:
Object
  • Object
show all
Defined in:
lib/pgjob/job.rb

Instance Method Summary collapse

Constructor Details

#initialize(*args, &block) ⇒ Job

Initialize with AR::connection or attributes for PG.connect



4
5
6
7
8
9
10
11
12
13
14
15
16
17
# File 'lib/pgjob/job.rb', line 4

def initialize(*args, &block)
  if args.first.respond_to?(:exec)
    @conn = args.first
    @can_finish = false
  else
    @conn = PG.connect(*args)
    @can_finish = true
  end

  if block_given?
    block.call(self)
    finish!
  end
end

Instance Method Details

#add_log(id, msg) ⇒ Object

Adds log message



116
117
118
119
120
121
122
123
# File 'lib/pgjob/job.rb', line 116

def add_log(id, msg)
  sql = <<-SQL
    UPDATE jobs
    SET log = '#{@conn.escape [log(id), msg].compact.join("\n")}'
    WHERE id = #{id};
  SQL
  @conn.exec(sql)
end

#create(name, params = {}) ⇒ Object

Creates new job



50
51
52
53
54
55
56
57
58
59
# File 'lib/pgjob/job.rb', line 50

def create(name, params = {})
  assert_symbol_keys!(params)

  sql = <<-SQL
    INSERT INTO jobs(name, params, mode, status)
    VALUES('#{@conn.escape(name)}', '#{@conn.escape JSON.dump(params)}', 'wait', 'Start')
    RETURNING id;
  SQL
  @conn.exec(sql).to_a.first['id'].to_i
end

#failed?(id) ⇒ Boolean

Returns:

  • (Boolean)


137
138
139
# File 'lib/pgjob/job.rb', line 137

def failed?(id)
  mode(id) == :failed
end

#finish!Object

Closes connection if possible



43
44
45
46
47
# File 'lib/pgjob/job.rb', line 43

def finish!
  if @can_finish
    @conn.finish
  end
end

#finished?(id) ⇒ Boolean

Returns:

  • (Boolean)


129
130
131
# File 'lib/pgjob/job.rb', line 129

def finished?(id)
  [:success, :failed].include? mode(id)
end

#log(id) ⇒ Object

Returns full log



106
107
108
109
110
111
112
113
# File 'lib/pgjob/job.rb', line 106

def log(id)
  sql = <<-SQL
    SELECT log
    FROM jobs
    WHERE id = #{id};
  SQL
  @conn.exec(sql).to_a.first['log']
end

#migrate!Object

Creates jobs table



20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/pgjob/job.rb', line 20

def migrate!
  unless tables.include?('jobs')
    @conn.exec <<-SQL
      CREATE TABLE jobs (
        id SERIAL,
        name character varying(255),
        params text,
        mode character varying(20),
        status character varying(255),
        log text
      );
    SQL
  end
end

#name(id) ⇒ Object

Returns name



62
63
64
65
66
67
68
69
# File 'lib/pgjob/job.rb', line 62

def name(id)
  sql = <<-SQL
    SELECT name
    FROM jobs
    WHERE id = #{id};
  SQL
  @conn.exec(sql).to_a.first['name']
end

#params(id) ⇒ Object

Returns params



96
97
98
99
100
101
102
103
# File 'lib/pgjob/job.rb', line 96

def params(id)
  sql = <<-SQL
    SELECT params
    FROM jobs
    WHERE id = #{id};
  SQL
  symbolize_keys JSON.load(@conn.exec(sql).to_a.first['params'])
end

#rollback!Object

Destroys jobs table



36
37
38
39
40
# File 'lib/pgjob/job.rb', line 36

def rollback!
  @conn.exec <<-SQL
    DROP TABLE IF EXISTS jobs;
  SQL
end

#running?(id) ⇒ Boolean

Returns:

  • (Boolean)


125
126
127
# File 'lib/pgjob/job.rb', line 125

def running?(id)
  [:wait, :running].include? mode(id)
end

#status(id, mode = nil, value = nil) ⇒ Object

Sets/Returns status



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/pgjob/job.rb', line 72

def status(id, mode = nil, value = nil )
  if mode
    raise "wrong mode #{mode}" unless [:running, :success, :failed].include?(mode.to_sym)
    raise "set value" unless value
    sql = <<-SQL
      UPDATE jobs
      SET
        mode = '#{mode}',
        status = '#{@conn.escape value}'
      WHERE id = #{id};
    SQL
    @conn.exec(sql)
    add_log(id, "[mode] #{mode} - #{value}") 
  else
    sql = <<-SQL
      SELECT status
      FROM jobs
      WHERE id = #{id};
    SQL
    @conn.exec(sql).to_a.first['status']
  end
end

#success?(id) ⇒ Boolean

Returns:

  • (Boolean)


133
134
135
# File 'lib/pgjob/job.rb', line 133

def success?(id)
  mode(id) == :success
end