Class: Karafka::Pro::RecurringTasks::Serializer
- Inherits:
-
Object
- Object
- Karafka::Pro::RecurringTasks::Serializer
- Defined in:
- lib/karafka/pro/recurring_tasks/serializer.rb
Overview
Converts schedule command and log details into data we can dispatch to Kafka.
Constant Summary collapse
- SCHEMA_VERSION =
Current recurring tasks related schema structure
'1.0'
Instance Method Summary collapse
-
#command(command_name, task_id) ⇒ String
Serialized and compressed command data.
-
#log(event) ⇒ String
Serialized and compressed event log data.
-
#schedule(schedule) ⇒ String
Serializes and compresses the schedule with all its tasks and their execution state.
Instance Method Details
#command(command_name, task_id) ⇒ String
Returns serialized and compressed command data.
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/karafka/pro/recurring_tasks/serializer.rb', line 64 def command(command_name, task_id) data = { schema_version: SCHEMA_VERSION, schedule_version: Karafka::Pro::RecurringTasks.schedule.version, dispatched_at: Time.now.to_f, type: 'command', command: { name: command_name }, task: { id: task_id } } compress( serialize(data) ) end |
#log(event) ⇒ String
Returns serialized and compressed event log data.
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/karafka/pro/recurring_tasks/serializer.rb', line 85 def log(event) task = event[:task] data = { schema_version: SCHEMA_VERSION, schedule_version: Karafka::Pro::RecurringTasks.schedule.version, dispatched_at: Time.now.to_f, type: 'log', task: { id: task.id, time_taken: event.payload[:time] || -1, result: event.payload.key?(:error) ? 'failure' : 'success' } } compress( serialize(data) ) end |
#schedule(schedule) ⇒ String
Serializes and compresses the schedule with all its tasks and their execution state
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/karafka/pro/recurring_tasks/serializer.rb', line 35 def schedule(schedule) tasks = {} schedule.each do |task| tasks[task.id] = { id: task.id, cron: task.cron.original, previous_time: task.previous_time.to_i, next_time: task.next_time.to_i, enabled: task.enabled? } end data = { schema_version: SCHEMA_VERSION, schedule_version: schedule.version, dispatched_at: Time.now.to_f, type: 'schedule', tasks: tasks } compress( serialize(data) ) end |