Class: AzkabanScheduler::Session
- Inherits:
-
Object
- Object
- AzkabanScheduler::Session
- Defined in:
- lib/azkaban_scheduler/session.rb
Instance Attribute Summary collapse
-
#id ⇒ Object
Returns the value of attribute id.
Class Method Summary collapse
Instance Method Summary collapse
- #create_project(project) ⇒ Object
- #delete_project(project_name) ⇒ Object
- #fetch_flow_executions(project_name, flow_id, offset = 0, limit = 10) ⇒ Object
- #fetch_project_flows(project_name) ⇒ Object
- #get_project_id(project_name) ⇒ Object
-
#initialize(client, id) ⇒ Session
constructor
A new instance of Session.
- #list_flow_ids(project_name) ⇒ Object
- #list_schedules ⇒ Object
- #post_schedule(project_id, project_name, flow, start_time, options = {}) ⇒ Object
- #remove_all_schedules(project_name) ⇒ Object
- #remove_schedule(schedule_id) ⇒ Object
- #upload_project(project) ⇒ Object
Constructor Details
#initialize(client, id) ⇒ Session
Returns a new instance of Session.
23 24 25 26 |
# File 'lib/azkaban_scheduler/session.rb', line 23 def initialize(client, id) @client = client @id = id end |
Instance Attribute Details
#id ⇒ Object
Returns the value of attribute id.
6 7 8 |
# File 'lib/azkaban_scheduler/session.rb', line 6 def id @id end |
Class Method Details
.start(client, username, password) ⇒ Object
8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
# File 'lib/azkaban_scheduler/session.rb', line 8 def self.start(client, username, password) params = {'action' => 'login', 'username' => username, 'password' => password} response = client.post('/', params) response.error! unless response.kind_of?(Net::HTTPSuccess) result = JSON.parse(response.body) unless result["status"] == "success" = result["error"] if == "Incorrect Login. Username\/Password not found." raise AuthenticationError, end raise AzkabanError, end new(client, result['session.id']) end |
Instance Method Details
#create_project(project) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/azkaban_scheduler/session.rb', line 28 def create_project(project) response = @client.post('/manager', { 'session.id' => @id, 'action' => 'create', 'name' => project.name, 'description' => project.description, }) response.error! unless response.kind_of?(Net::HTTPSuccess) result = JSON.parse(response.body) if result['status'] != 'success' = result['message'] if == "Active project with name #{project.name} already exists in db." raise ProjectExistsError, elsif == "Project names must start with a letter, followed by any number of letters, digits, '-' or '_'." raise InvalidProjectNameError, elsif == "Description cannot be empty." raise ProjectDescriptionEmptyError, end raise AzkabanError, end result end |
#delete_project(project_name) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/azkaban_scheduler/session.rb', line 71 def delete_project(project_name) response = @client.get('/manager', { 'session.id' => @id, 'project' => project_name, 'delete' => 'true', }) response.error! unless response.kind_of?(Net::HTTPSuccess) || response.kind_of?(Net::HTTPRedirection) = (response) unless ['azkaban.success.message'] = ['azkaban.failure.message'] if == "Project #{project_name} doesn't exist." return false end raise AzkabanError, end true end |
#fetch_flow_executions(project_name, flow_id, offset = 0, limit = 10) ⇒ Object
109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/azkaban_scheduler/session.rb', line 109 def fetch_flow_executions(project_name, flow_id, offset=0, limit=10) response = @client.get('/manager', { 'session.id' => @id, 'ajax' => 'fetchFlowExecutions', 'project' => project_name, 'flow' => flow_id, 'start' => offset, 'length' => limit, }) response.error! unless response.kind_of?(Net::HTTPSuccess) JSON.parse(response.body) end |
#fetch_project_flows(project_name) ⇒ Object
99 100 101 102 103 104 105 106 107 |
# File 'lib/azkaban_scheduler/session.rb', line 99 def fetch_project_flows(project_name) response = @client.get('/manager', { 'session.id' => @id, 'ajax' => 'fetchprojectflows', 'project' => project_name, }) response.error! unless response.kind_of?(Net::HTTPSuccess) JSON.parse(response.body) end |
#get_project_id(project_name) ⇒ Object
89 90 91 92 |
# File 'lib/azkaban_scheduler/session.rb', line 89 def get_project_id(project_name) result = fetch_project_flows(project_name) result['projectId'] end |
#list_flow_ids(project_name) ⇒ Object
94 95 96 97 |
# File 'lib/azkaban_scheduler/session.rb', line 94 def list_flow_ids(project_name) result = fetch_project_flows(project_name) result['flows'].map{ |flow| flow['flowId'] } end |
#list_schedules ⇒ Object
122 123 124 125 126 127 |
# File 'lib/azkaban_scheduler/session.rb', line 122 def list_schedules response = @client.post('/schedule', { 'ajax' => 'loadFlow' }, ) response.error! unless response.kind_of?(Net::HTTPSuccess) result = JSON.parse(response.body) result['items'] || [] end |
#post_schedule(project_id, project_name, flow, start_time, options = {}) ⇒ Object
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 |
# File 'lib/azkaban_scheduler/session.rb', line 152 def post_schedule(project_id, project_name, flow, start_time, ={}) response = @client.post('/schedule', { 'ajax' => 'scheduleFlow', 'project' => project_name, 'projectName' => project_name, 'projectId' => project_id, 'flow' => flow, 'disabled' => [:disabled] || '[]', 'period' => [:period] || "1d", 'scheduleTime' => start_time.utc.strftime("%I,%M,%p,UTC"), 'scheduleDate' => start_time.utc.strftime("%m/%d/%Y"), 'is_recurring' => [:period] ? 'on' : 'off', 'concurrentOption' => [:concurrent_option] || 'skip', 'failureEmailsOverride' => (!![:failure_emails_override]).to_s, 'successEmailsOverride' => (!![:success_emails_override]).to_s, 'failureAction' => [:failure_action] || 'finishCurrent', 'failureEmails' => Array([:failure_emails]).join(', '), 'successEmails' => Array([:success_emails]).join(', '), 'notifyFailureFirst' => (!![:notify_failure_first]).to_s, 'notifyFailureLast' => (!![:notify_failure_last]).to_s, }, ) response.error! unless response.kind_of?(Net::HTTPSuccess) result = JSON.parse(response.body) unless result['status'] == 'success' raise AzkabanError, result['message'] end nil end |
#remove_all_schedules(project_name) ⇒ Object
144 145 146 147 148 149 150 |
# File 'lib/azkaban_scheduler/session.rb', line 144 def remove_all_schedules(project_name) list_schedules.each do |schedule| next unless schedule['projectname'] == project_name schedule_id = schedule['scheduleid'] remove_schedule(schedule_id) end end |
#remove_schedule(schedule_id) ⇒ Object
129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/azkaban_scheduler/session.rb', line 129 def remove_schedule(schedule_id) response = @client.post('/schedule', { 'action' => 'removeSched', 'scheduleId' => schedule_id, }, ) response.error! unless response.kind_of?(Net::HTTPSuccess) result = JSON.parse(response.body) unless result['status'] == 'success' = result['message'] return false if == "Schedule with ID #{schedule_id} does not exist" raise AzkabanError, end true end |
#upload_project(project) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/azkaban_scheduler/session.rb', line 51 def upload_project(project) response = @client.multipart_post('/manager', { 'session.id' => @id, 'ajax' => 'upload', 'project' => project.name, 'file' => UploadIO.new(project.build, 'application/zip', 'file.zip'), }) response.error! unless response.kind_of?(Net::HTTPSuccess) result = JSON.parse(response.body) if = result['error'] if == "Installation Failed. Project '#{project.name}' doesn't exist." raise ProjectNotFoundError, end raise AzkabanError, end project.id = result['projectId'] project.version = result['version'] result end |