Module: Legion::Extensions::Tasker::Helpers::FetchDelayed
- Included in:
- Runners::FetchDelayed
- Defined in:
- lib/legion/extensions/tasker/helpers/fetch_delayed.rb
Instance Method Summary collapse
- #find_delayed ⇒ Object
- #find_subtasks(trigger_id:) ⇒ Object
- #find_trigger(runner_class:, function:) ⇒ Object
Instance Method Details
#find_delayed ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/legion/extensions/tasker/helpers/fetch_delayed.rb', line 21 def find_delayed(**) sql = ' SELECT tasks.id, tasks.relationship_id, tasks.function_id, tasks.created, relationships.delay as relationship_delay, relationships.chain_id, functions.name as function_name, runners.namespace as class, runners.id as runner_id, CONCAT( `exchange`, ".",`queue`,".",`functions`.`name`) AS runner_routing_key FROM legion.tasks INNER JOIN legion.functions ON (functions.id = tasks.function_id) INNER JOIN legion.runners ON (runners.id = functions.runner_id) INNER JOIN `legion`.`extensions` ON (`runners`.`extension_id` = `extensions`.`id`) LEFT JOIN legion.relationships ON (relationships.id = tasks.relationship_id) WHERE status = \'task.delayed\';' Legion::Data::Connection.sequel.fetch(sql).all end |
#find_subtasks(trigger_id:) ⇒ Object
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/legion/extensions/tasker/helpers/fetch_delayed.rb', line 38 def find_subtasks(trigger_id:, **) sql = " SELECT `relationships`.`id` as `relationship_id`, `debug`, `chain_id`, `allow_new_chains`, `delay`, `trigger_id`, `action_id`, `conditions`, `transformation`, `runners`.`namespace`, `runners`.`id` as `runner_id`, `runners`.`queue`, `runners`.`namespace` as runner_class, `functions`.`id` as `function_id`, `functions`.`name` as `function`, `extensions`.`exchange`, CONCAT( `exchange`, '.',`queue`,'.',`functions`.`name`) AS runner_routing_key FROM `legion`.`relationships` INNER JOIN `legion`.`functions` ON (`functions`.`id` = `relationships`.`action_id`) INNER JOIN `legion`.`runners` ON (`functions`.`runner_id` = `runners`.`id`) INNER JOIN `legion`.`extensions` ON (`runners`.`extension_id` = `extensions`.`id`) WHERE `relationships`.`trigger_id` = #{trigger_id} AND `relationships`.`active` = 1; " cache = Legion::Cache.get(sql) return cache unless cache.nil? results = Legion::Data::Connection.sequel.fetch(sql).all Legion::Cache.set(sql, results, 5) if results.is_a?(Array) && results.count.positive? results end |
#find_trigger(runner_class:, function:) ⇒ Object
6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/legion/extensions/tasker/helpers/fetch_delayed.rb', line 6 def find_trigger(runner_class:, function:, **) sql = "SELECT `functions`.`id` as `function_id`, `runner_id`, `runners`.`namespace` FROM `legion`.`functions` INNER JOIN `legion`.`runners` ON (`functions`.`runner_id` = `runners`.`id`) WHERE `functions`.`name` = '#{function}' AND `runners`.`namespace` = '#{runner_class}' LIMIT 1;" cache = Legion::Cache.get(sql) return cache unless cache.nil? results = Legion::Data::Connection.sequel.fetch(sql).first Legion::Cache.set(sql, results) if results.is_a?(Hash) && results.count.positive? results end |