Module: Webhookdb::API::Helpers
- Extended by:
- Grape::API::Helpers
- Defined in:
- lib/webhookdb/api/helpers.rb
Defined Under Namespace
Classes: Prompt
Class Method Summary collapse
Instance Method Summary collapse
- #_log_webhook_request(opaque_id, organization_id, sstatus, request_headers) ⇒ Object
-
#execute_readonly_query(org, sql) ⇒ Array<Webhookdb::Organization::QueryResult,String,nil>
Run the given SQL inside the org, and use special error handling if it fails.
-
#handle_webhook_request(potential_opaque_id) ⇒ Object
Our primary webhook endpoint is /v1/service_integrations/<opaque_id>, but in some cases we need a ‘static’ endpoint for apps to send to, like /v1/install/front/webhooks.
- #lookup_service_integration!(org, identifier) ⇒ Object
Class Method Details
.prompt_for_required_param!(request, key, prompt, secret: false, output: "") ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
# File 'lib/webhookdb/api/helpers.rb', line 105 def self.prompt_for_required_param!(request, key, prompt, secret: false, output: "") step = Webhookdb::Replicator::StateMachineStep.new step.output = output step.post_to_url = request.path step.post_params = request.params.to_h step.post_params_value_key = key step.set_prompt(prompt, secret:) body = Webhookdb::Service.error_body( 422, "Prompt for required params", code: "prompt_required_params", more: {state_machine_step: Webhookdb::API::StateMachineEntity.represent(step)}, ) throw :error, message: body, status: 422, headers: {"Whdb-Prompt" => key.to_s} end |
Instance Method Details
#_log_webhook_request(opaque_id, organization_id, sstatus, request_headers) ⇒ Object
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 |
# File 'lib/webhookdb/api/helpers.rb', line 230 def _log_webhook_request(opaque_id, organization_id, sstatus, request_headers) return if request.headers[Webhookdb::LoggedWebhook::RETRY_HEADER] # Status can be set from: # - the 'status' method, which will be 201 if it hasn't been set, # or another value if it has been set. # - the webhook responder, which could respond with 401, etc # - if there was an exception- so no status is set yet- use 0 # The main thing to watch out for is that we: # - Cannot assume an exception is a 500 (it can be rescued later) # - Must handle error! calls # Anyway, this is all pretty confusing, but it's all tested. rstatus = status == 201 ? (sstatus || 0) : status request.body.rewind Webhookdb::LoggedWebhook.resilient_insert( request_body: request.body.read, request_headers: request_headers.to_json, request_method: request.request_method, request_path: request.path_info, response_status: rstatus, organization_id:, service_integration_opaque_id: opaque_id, ) end |
#execute_readonly_query(org, sql) ⇒ Array<Webhookdb::Organization::QueryResult,String,nil>
Run the given SQL inside the org, and use special error handling if it fails.
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 |
# File 'lib/webhookdb/api/helpers.rb', line 259 def execute_readonly_query(org, sql) result = org.execute_readonly_query(sql) return result, nil rescue Sequel::DatabaseError => e self.logger.error("db_query_database_error", error: e) # We want to handle InsufficientPrivileges and UndefinedTable explicitly # since we can hint the user at what to do. # Otherwise, we should just return the Postgres exception. msg = "" case e.wrapped_exception when PG::UndefinedTable missing_table = e.wrapped_exception..match(/relation (.+) does not/)&.captures&.first msg = "The table #{missing_table} does not exist. Run `webhookdb db tables` to see available tables." if missing_table when PG::InsufficientPrivilege msg = "You do not have permission to perform this query. Queries must be read-only." else msg = e.wrapped_exception. end return [nil, msg] end |
#handle_webhook_request(potential_opaque_id) ⇒ Object
Our primary webhook endpoint is /v1/service_integrations/<opaque_id>, but in some cases we need a ‘static’ endpoint for apps to send to, like /v1/install/front/webhooks. Those endpoints share the webhook handling behavior with this method.
The block passed to this method yields the service integration. This is important because we want to make sure to log the webhook if something goes wrong while looking up the service integration.
The potential_opaque_id should be a way to identify who is responsible for the webhook request. For example, to ‘/v1/service_integrations/svi_abc`, this would be `svi_abc` (even though it’s an invalid opaque id). In other cases, especially marketplace integrations, this could be some other value to identify the webhook that was sent.
If the block yields the Symbol :pass, no further handling is done; this would be done for example when there is no valid service integration. Otherwise, the block must yield a service integration.
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/webhookdb/api/helpers.rb', line 160 def handle_webhook_request(potential_opaque_id, &) opaque_id = potential_opaque_id organization_id = nil s_status = nil request_headers = {} raise LocalJumpError unless block_given? begin sint = yield return if sint == :pass raise "error instead of return nil if there is no service integration" if sint.nil? opaque_id = sint.opaque_id organization_id = sint.organization_id request_headers = request.headers.dup if (content_type = env["CONTENT_TYPE"]) request_headers["Content-Type"] = content_type end svc = Webhookdb::Replicator.create(sint).dispatch_request_to(request) svc.preprocess_headers_for_logging(request_headers) handling_sint = svc.service_integration whresp = svc.webhook_response(request) s_status, s_headers, s_body = whresp.to_rack (s_status = 200) if s_status >= 400 && Webhookdb.regression_mode? if s_status >= 400 logger.warn "rejected_webhook", webhook_headers: request_headers, webhook_body: env["api.request.body"] header "Whdb-Rejected-Reason", whresp.reason else req_body = env.key?("api.request.body") ? env["api.request.body"] : env["rack.input"].read req_body = {} if req_body.blank? process_kwargs = { headers: request_headers, body: req_body, request_path: request.path_info, request_method: request.request_method, } event_json = Amigo::Event.create( "webhookdb.serviceintegration.webhook", [handling_sint.id, process_kwargs], ).as_json # Audit Log this synchronously. # It should be fast enough. We may as well log here so we can avoid # serializing the (large) webhook payload multiple times, as with normal pubsub. Webhookdb::Async::AuditLogger.new.perform(event_json) if svc.process_webhooks_synchronously? || Webhookdb::Replicator.always_process_synchronously whreq = Webhookdb::Replicator::WebhookRequest.new( method: process_kwargs[:request_method], path: process_kwargs[:request_path], headers: process_kwargs[:headers], body: process_kwargs[:body], ) inserted = svc.upsert_webhook(whreq) s_body = svc.synchronous_processing_response_body(upserted: inserted, request: whreq) else queue = svc.upsert_has_deps? ? "netout" : "webhook" Webhookdb::Jobs::ProcessWebhook.set(queue:).perform_async(event_json) end end s_headers.each { |k, v| header k, v } if s_headers["Content-Type"] == "application/json" body Oj.load(s_body) else env["api.format"] = :binary body s_body end status s_status ensure _log_webhook_request(opaque_id, organization_id, s_status, request_headers) end end |
#lookup_service_integration!(org, identifier) ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/webhookdb/api/helpers.rb', line 121 def lookup_service_integration!(org, identifier) sints = org.service_integrations_dataset. where(Sequel[service_name: identifier] | Sequel[table_name: identifier] | Sequel[opaque_id: identifier]). limit(2).all return sints.first if sints.size == 1 merror!(403, "There is no service integration with that identifier.") if sints.empty? dupe_attr = nil alternative = nil if sints.first.service_name == identifier dupe_attr = "service name" alternative = "table name" else dupe_attr = "table name" alternative = "service name" end msg403 = "There are multiple integrations with that #{dupe_attr}. " \ "Try again using an integration id, or a #{alternative}. " \ "Use `webhookdb integrations list` to see all integrations." merror!(409, msg403) end |