Bases: Block
Block that enables calling webhooks.
Source code in src/prefect/blocks/webhook.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57 | class Webhook(Block):
"""
Block that enables calling webhooks.
"""
_block_type_name = "Webhook"
_logo_url = "https://cdn.sanity.io/images/3ugk85nk/production/c7247cb359eb6cf276734d4b1fbf00fb8930e89e-250x250.png" # type: ignore
_documentation_url = "https://docs.prefect.io/api-ref/prefect/blocks/webhook/#prefect.blocks.webhook.Webhook"
method: Literal["GET", "POST", "PUT", "PATCH", "DELETE"] = Field(
default="POST", description="The webhook request method. Defaults to `POST`."
)
url: SecretStr = Field(
default=...,
title="Webhook URL",
description="The webhook URL.",
examples=["https://hooks.slack.com/XXX"],
)
headers: SecretDict = Field(
default_factory=lambda: SecretDict(dict()),
title="Webhook Headers",
description="A dictionary of headers to send with the webhook request.",
)
def block_initialization(self):
self._client = AsyncClient(transport=_http_transport)
async def call(self, payload: Optional[dict] = None) -> Response:
"""
Call the webhook.
Args:
payload: an optional payload to send when calling the webhook.
"""
async with self._client:
return await self._client.request(
method=self.method,
url=self.url.get_secret_value(),
headers=self.headers.get_secret_value(),
json=payload,
)
|
call(payload=None)
async
Call the webhook.
Parameters:
Name |
Type |
Description |
Default |
payload |
Optional[dict]
|
an optional payload to send when calling the webhook.
|
None
|
Source code in src/prefect/blocks/webhook.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57 | async def call(self, payload: Optional[dict] = None) -> Response:
"""
Call the webhook.
Args:
payload: an optional payload to send when calling the webhook.
"""
async with self._client:
return await self._client.request(
method=self.method,
url=self.url.get_secret_value(),
headers=self.headers.get_secret_value(),
json=payload,
)
|