Class: AzkabanScheduler::Session

Inherits:
Object
  • Object
show all
Defined in:
lib/azkaban_scheduler/session.rb

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, id) ⇒ Session

Returns a new instance of Session.


23
24
25
26
# File 'lib/azkaban_scheduler/session.rb', line 23

def initialize(client, id)
  @client = client
  @id = id
end

Instance Attribute Details

#idObject

Returns the value of attribute id


6
7
8
# File 'lib/azkaban_scheduler/session.rb', line 6

def id
  @id
end

Class Method Details

.start(client, username, password) ⇒ Object


8
9
10
11
12
13
14
15
16
17
18
19
20
21
# File 'lib/azkaban_scheduler/session.rb', line 8

def self.start(client, username, password)
  params = {'action' => 'login', 'username' => username, 'password' => password}
  response = client.post('/', params)
  response.error! unless response.kind_of?(Net::HTTPSuccess)
  result = JSON.parse(response.body)
  unless result["status"] == "success"
    error_message = result["error"]
    if error_message == "Incorrect Login. Username\/Password not found."
      raise AuthenticationError, error_message
    end
    raise AzkabanError, error_message
  end
  new(client, result['session.id'])
end

Instance Method Details

#create_project(project) ⇒ Object


28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/azkaban_scheduler/session.rb', line 28

def create_project(project)
  response = @client.post('/manager', {
    'session.id' => @id,
    'action' => 'create',
    'name' => project.name,
    'description' => project.description,
  })
  response.error! unless response.kind_of?(Net::HTTPSuccess)
  result = JSON.parse(response.body)
  if result['status'] != 'success'
    error_message = result['message']
    if error_message == "Active project with name #{project.name} already exists in db."
      raise ProjectExistsError, error_message
    elsif error_message == "Project names must start with a letter, followed by any number of letters, digits, '-' or '_'."
      raise InvalidProjectNameError, error_message
    elsif error_message == "Description cannot be empty."
      raise ProjectDescriptionEmptyError, error_message
    end
    raise AzkabanError, error_message
  end
  result
end

#delete_project(project_name) ⇒ Object


71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/azkaban_scheduler/session.rb', line 71

def delete_project(project_name)
  response = @client.get('/manager', {
    'session.id' => @id,
    'project' => project_name,
    'delete' => 'true',
  })
  response.error! unless response.kind_of?(Net::HTTPSuccess) || response.kind_of?(Net::HTTPRedirection)
  cookies = response_cookies(response)
  unless cookies['azkaban.success.message']
    error_message = cookies['azkaban.failure.message']
    if error_message == "Project #{project_name} doesn't exist."
      return false
    end
    raise AzkabanError, error_message
  end
  true
end

#fetch_flow_executions(project_name, flow_id, offset = 0, limit = 10) ⇒ Object


109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/azkaban_scheduler/session.rb', line 109

def fetch_flow_executions(project_name, flow_id, offset=0, limit=10)
  response = @client.get('/manager', {
    'session.id' => @id,
    'ajax' => 'fetchFlowExecutions',
    'project' => project_name,
    'flow' => flow_id,
    'start' => offset,
    'length' => limit,
  })
  response.error! unless response.kind_of?(Net::HTTPSuccess)
  JSON.parse(response.body)
end

#fetch_project_flows(project_name) ⇒ Object


99
100
101
102
103
104
105
106
107
# File 'lib/azkaban_scheduler/session.rb', line 99

def fetch_project_flows(project_name)
  response = @client.get('/manager', {
    'session.id' => @id,
    'ajax' => 'fetchprojectflows',
    'project' => project_name,
  })
  response.error! unless response.kind_of?(Net::HTTPSuccess)
  JSON.parse(response.body)
end

#get_project_id(project_name) ⇒ Object


89
90
91
92
# File 'lib/azkaban_scheduler/session.rb', line 89

def get_project_id(project_name)
  result = fetch_project_flows(project_name)
  result['projectId']
end

#list_flow_ids(project_name) ⇒ Object


94
95
96
97
# File 'lib/azkaban_scheduler/session.rb', line 94

def list_flow_ids(project_name)
  result = fetch_project_flows(project_name)
  result['flows'].map{ |flow| flow['flowId'] }
end

#list_schedulesObject


122
123
124
125
126
127
# File 'lib/azkaban_scheduler/session.rb', line 122

def list_schedules
  response = @client.post('/schedule', { 'ajax' => 'loadFlow' }, session_id_cookie)
  response.error! unless response.kind_of?(Net::HTTPSuccess)
  result = JSON.parse(response.body)
  result['items'] || []
end

#post_schedule(project_id, project_name, flow, start_time, options = {}) ⇒ Object


152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/azkaban_scheduler/session.rb', line 152

def post_schedule(project_id, project_name, flow, start_time, options={})
  response = @client.post('/schedule', {
    'ajax' => 'scheduleFlow',
    'project' => project_name,
    'projectName' => project_name,
    'projectId' => project_id,
    'flow' => flow,
    'disabled' => options[:disabled] || '[]',
    'period' => options[:period] || "1d",
    'scheduleTime' => start_time.utc.strftime("%I,%M,%p,UTC"),
    'scheduleDate' => start_time.utc.strftime("%m/%d/%Y"),
    'is_recurring' => options[:period] ? 'on' : 'off',
    'concurrentOption' => options[:concurrent_option] || 'skip',
    'failureEmailsOverride' => (!!options[:failure_emails_override]).to_s,
    'successEmailsOverride' => (!!options[:success_emails_override]).to_s,
    'failureAction' => options[:failure_action] || 'finishCurrent',
    'failureEmails' => Array(options[:failure_emails]).join(', '),
    'successEmails' => Array(options[:success_emails]).join(', '),
    'notifyFailureFirst' => (!!options[:notify_failure_first]).to_s,
    'notifyFailureLast' => (!!options[:notify_failure_last]).to_s,
  }, session_id_cookie)
  response.error! unless response.kind_of?(Net::HTTPSuccess)
  result = JSON.parse(response.body)
  unless result['status'] == 'success'
    raise AzkabanError, result['message']
  end
  nil
end

#remove_all_schedules(project_name) ⇒ Object


144
145
146
147
148
149
150
# File 'lib/azkaban_scheduler/session.rb', line 144

def remove_all_schedules(project_name)
  list_schedules.each do |schedule|
    next unless schedule['projectname'] == project_name
    schedule_id = schedule['scheduleid']
    remove_schedule(schedule_id)
  end
end

#remove_schedule(schedule_id) ⇒ Object


129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/azkaban_scheduler/session.rb', line 129

def remove_schedule(schedule_id)
  response = @client.post('/schedule', {
    'action' => 'removeSched',
    'scheduleId' => schedule_id,
  }, session_id_cookie)
  response.error! unless response.kind_of?(Net::HTTPSuccess)
  result = JSON.parse(response.body)
  unless result['status'] == 'success'
    error_message = result['message']
    return false if error_message == "Schedule with ID #{schedule_id} does not exist"
    raise AzkabanError, error_message
  end
  true
end

#upload_project(project) ⇒ Object


51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/azkaban_scheduler/session.rb', line 51

def upload_project(project)
  response = @client.multipart_post('/manager', {
    'session.id' => @id,
    'ajax' => 'upload',
    'project' => project.name,
    'file' => UploadIO.new(project.build, 'application/zip', 'file.zip'),
  })
  response.error! unless response.kind_of?(Net::HTTPSuccess)
  result = JSON.parse(response.body)
  if error_message = result['error']
    if error_message == "Installation Failed. Project '#{project.name}' doesn't exist."
      raise ProjectNotFoundError, error_message
    end
    raise AzkabanError, error_message
  end
  project.id = result['projectId']
  project.version = result['version']
  result
end