Module: Postqueue

Extended by:
DefaultQueue, SingleForwardable
Defined in:
lib/postqueue.rb,
lib/postqueue/cli.rb,
lib/postqueue/item.rb,
lib/postqueue/queue.rb,
lib/postqueue/logger.rb,
lib/postqueue/version.rb,
lib/postqueue/cli/stats.rb,
lib/postqueue/item/enqueue.rb,
lib/postqueue/queue/runner.rb,
lib/postqueue/queue/timing.rb,
lib/postqueue/default_queue.rb,
lib/postqueue/item/inserter.rb,
lib/postqueue/queue/logging.rb,
lib/postqueue/queue/callback.rb,
lib/postqueue/queue/processing.rb,
lib/postqueue/cli/options_parser.rb,
lib/postqueue/queue/select_and_lock.rb

Defined Under Namespace

Modules: CLI, DefaultQueue Classes: Item, MissingHandler, Queue, Timing

Constant Summary collapse

VERSION =
"0.7.1"

Class Method Summary collapse

Methods included from DefaultQueue

default_queue

Class Method Details

.default_loggerObject



10
11
12
# File 'lib/postqueue/logger.rb', line 10

def self.default_logger
  defined?(Rails) ? Rails.logger : stdout_logger
end

.loggerObject



6
7
8
# File 'lib/postqueue/logger.rb', line 6

def self.logger
  @logger || default_logger
end

.logger=(logger) ⇒ Object



2
3
4
# File 'lib/postqueue/logger.rb', line 2

def self.logger=(logger)
  @logger ||= logger
end

.migrate!(table_name = "postqueue") ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/postqueue/item.rb', line 36

def self.migrate!(table_name = "postqueue")
  connection = Item.connection

  if connection.tables.include?(table_name)
    upgrade_table!(table_name)
    return
  end

  connection.execute "  CREATE TABLE \#{table_name} (\n    id          BIGSERIAL PRIMARY KEY,\n    op          VARCHAR,\n    entity_id   INTEGER NOT NULL DEFAULT 0,\n    created_at  timestamp without time zone NOT NULL DEFAULT (now() at time zone 'utc'),\n    next_run_at timestamp without time zone NOT NULL DEFAULT (now() at time zone 'utc'),\n    failed_attempts INTEGER NOT NULL DEFAULT 0\n  );\n\n  -- This index should be usable to find duplicate duplicates in the table. While\n  -- we search for entries with matching op and entity_id, we assume that entity_id\n  -- has a much higher cardinality.\n  CREATE INDEX \#{table_name}_idx1 ON \#{table_name}(entity_id);\n\n  -- This index should help picking the next entries to run. Otherwise a full tablescan\n  -- would be necessary whenevr we check out items.\n  CREATE INDEX \#{table_name}_idx2 ON \#{table_name}(next_run_at);\n  SQL\nend\n"

.new(*args, &block) ⇒ Object



9
10
11
# File 'lib/postqueue.rb', line 9

def new(*args, &block)
  ::Postqueue::Queue.new(*args, &block)
end

.stdout_loggerObject



14
15
16
# File 'lib/postqueue/logger.rb', line 14

def self.stdout_logger
  @stdout_logger ||= Logger.new(STDOUT)
end

.unmigrate!(table_name = "postqueue") ⇒ Object



19
20
21
22
23
# File 'lib/postqueue/item.rb', line 19

def self.unmigrate!(table_name = "postqueue")
  Item.connection.execute "    DROP TABLE IF EXISTS \#{table_name};\n  SQL\nend\n"

.upgrade_table!(table_name) ⇒ Object



25
26
27
28
29
30
31
32
33
34
# File 'lib/postqueue/item.rb', line 25

def self.upgrade_table!(table_name)
  # upgrade id column to use BIGINT if necessary
  id_max = Item.column_types['id'].send(:range).end
  if id_max <= 2147483648
    STDERR.puts "Changing type of #{table_name}.id column to BIGINT"
    Item.connection.execute "ALTER TABLE #{table_name} ALTER COLUMN id TYPE BIGINT"
    Item.connection.execute "ALTER SEQUENCE #{table_name}_id_seq RESTART WITH 2147483649"
    Item.reset_column_information
  end
end