Skip to content

prefect.runner.storage

BlockStorageAdapter

A storage adapter for a storage block object to allow it to be used as a runner storage object.

Source code in src/prefect/runner/storage.py
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
class BlockStorageAdapter:
    """
    A storage adapter for a storage block object to allow it to be used as a
    runner storage object.
    """

    def __init__(
        self,
        block: Union[ReadableDeploymentStorage, WritableDeploymentStorage],
        pull_interval: Optional[int] = 60,
    ):
        self._block = block
        self._pull_interval = pull_interval
        self._storage_base_path = Path.cwd()
        if not isinstance(block, Block):
            raise TypeError(
                f"Expected a block object. Received a {type(block).__name__!r} object."
            )
        if not hasattr(block, "get_directory"):
            raise ValueError("Provided block must have a `get_directory` method.")

        self._name = (
            f"{block.get_block_type_slug()}-{block._block_document_name}"
            if block._block_document_name
            else str(uuid4())
        )

    def set_base_path(self, path: Path):
        self._storage_base_path = path

    @property
    def pull_interval(self) -> Optional[int]:
        return self._pull_interval

    @property
    def destination(self) -> Path:
        return self._storage_base_path / self._name

    async def pull_code(self):
        if not self.destination.exists():
            self.destination.mkdir(parents=True, exist_ok=True)
        await self._block.get_directory(local_path=str(self.destination))

    def to_pull_step(self) -> dict:
        # Give blocks the change to implement their own pull step
        if hasattr(self._block, "get_pull_step"):
            return self._block.get_pull_step()
        else:
            if not self._block._block_document_name:
                raise BlockNotSavedError(
                    "Block must be saved with `.save()` before it can be converted to a"
                    " pull step."
                )
            return {
                "prefect.deployments.steps.pull_with_block": {
                    "block_type_slug": self._block.get_block_type_slug(),
                    "block_document_name": self._block._block_document_name,
                }
            }

    def __eq__(self, __value) -> bool:
        if isinstance(__value, BlockStorageAdapter):
            return self._block == __value._block
        return False

GitRepository

Pulls the contents of a git repository to the local filesystem.

Parameters:

Name Type Description Default
url str

The URL of the git repository to pull from

required
credentials Union[GitCredentials, Block, Dict[str, Any], None]

A dictionary of credentials to use when pulling from the repository. If a username is provided, an access token must also be provided.

None
name Optional[str]

The name of the repository. If not provided, the name will be inferred from the repository URL.

None
branch Optional[str]

The branch to pull from. Defaults to "main".

None
pull_interval Optional[int]

The interval in seconds at which to pull contents from remote storage to local storage. If None, remote storage will perform a one-time sync.

60

Examples:

Pull the contents of a private git repository to the local filesystem:

from prefect.runner.storage import GitRepository

storage = GitRepository(
    url="https://github.com/org/repo.git",
    credentials={"username": "oauth2", "access_token": "my-access-token"},
)

await storage.pull_code()
Source code in src/prefect/runner/storage.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
class GitRepository:
    """
    Pulls the contents of a git repository to the local filesystem.

    Parameters:
        url: The URL of the git repository to pull from
        credentials: A dictionary of credentials to use when pulling from the
            repository. If a username is provided, an access token must also be
            provided.
        name: The name of the repository. If not provided, the name will be
            inferred from the repository URL.
        branch: The branch to pull from. Defaults to "main".
        pull_interval: The interval in seconds at which to pull contents from
            remote storage to local storage. If None, remote storage will perform
            a one-time sync.

    Examples:
        Pull the contents of a private git repository to the local filesystem:

        ```python
        from prefect.runner.storage import GitRepository

        storage = GitRepository(
            url="https://github.com/org/repo.git",
            credentials={"username": "oauth2", "access_token": "my-access-token"},
        )

        await storage.pull_code()
        ```
    """

    def __init__(
        self,
        url: str,
        credentials: Union[GitCredentials, Block, Dict[str, Any], None] = None,
        name: Optional[str] = None,
        branch: Optional[str] = None,
        include_submodules: bool = False,
        pull_interval: Optional[int] = 60,
    ):
        if credentials is None:
            credentials = {}

        if (
            isinstance(credentials, dict)
            and credentials.get("username")
            and not (credentials.get("access_token") or credentials.get("password"))
        ):
            raise ValueError(
                "If a username is provided, an access token or password must also be"
                " provided."
            )
        self._url = url
        self._branch = branch
        self._credentials = credentials
        self._include_submodules = include_submodules
        repo_name = urlparse(url).path.split("/")[-1].replace(".git", "")
        default_name = f"{repo_name}-{branch}" if branch else repo_name
        self._name = name or default_name
        self._logger = get_logger(f"runner.storage.git-repository.{self._name}")
        self._storage_base_path = Path.cwd()
        self._pull_interval = pull_interval

    @property
    def destination(self) -> Path:
        return self._storage_base_path / self._name

    def set_base_path(self, path: Path):
        self._storage_base_path = path

    @property
    def pull_interval(self) -> Optional[int]:
        return self._pull_interval

    @property
    def _repository_url_with_credentials(self) -> str:
        if not self._credentials:
            return self._url

        url_components = urlparse(self._url)

        credentials = (
            self._credentials.model_dump()
            if isinstance(self._credentials, Block)
            else deepcopy(self._credentials)
        )

        for k, v in credentials.items():
            if isinstance(v, Secret):
                credentials[k] = v.get()
            elif isinstance(v, SecretStr):
                credentials[k] = v.get_secret_value()

        formatted_credentials = _format_token_from_credentials(
            urlparse(self._url).netloc, credentials
        )
        if url_components.scheme == "https" and formatted_credentials is not None:
            updated_components = url_components._replace(
                netloc=f"{formatted_credentials}@{url_components.netloc}"
            )
            repository_url = urlunparse(updated_components)
        else:
            repository_url = self._url

        return repository_url

    async def pull_code(self):
        """
        Pulls the contents of the configured repository to the local filesystem.
        """
        self._logger.debug(
            "Pulling contents from repository '%s' to '%s'...",
            self._name,
            self.destination,
        )

        git_dir = self.destination / ".git"

        if git_dir.exists():
            # Check if the existing repository matches the configured repository
            result = await run_process(
                ["git", "config", "--get", "remote.origin.url"],
                cwd=str(self.destination),
            )
            existing_repo_url = None
            if result.stdout is not None:
                existing_repo_url = _strip_auth_from_url(result.stdout.decode().strip())

            if existing_repo_url != self._url:
                raise ValueError(
                    f"The existing repository at {str(self.destination)} "
                    f"does not match the configured repository {self._url}"
                )

            self._logger.debug("Pulling latest changes from origin/%s", self._branch)
            # Update the existing repository
            cmd = ["git", "pull", "origin"]
            if self._branch:
                cmd += [self._branch]
            if self._include_submodules:
                cmd += ["--recurse-submodules"]
            cmd += ["--depth", "1"]
            try:
                await run_process(cmd, cwd=self.destination)
                self._logger.debug("Successfully pulled latest changes")
            except subprocess.CalledProcessError as exc:
                self._logger.error(
                    f"Failed to pull latest changes with exit code {exc}"
                )
                shutil.rmtree(self.destination)
                await self._clone_repo()

        else:
            await self._clone_repo()

    async def _clone_repo(self):
        """
        Clones the repository into the local destination.
        """
        self._logger.debug("Cloning repository %s", self._url)

        repository_url = self._repository_url_with_credentials

        cmd = [
            "git",
            "clone",
            repository_url,
        ]
        if self._branch:
            cmd += ["--branch", self._branch]
        if self._include_submodules:
            cmd += ["--recurse-submodules"]

        # Limit git history and set path to clone to
        cmd += ["--depth", "1", str(self.destination)]

        try:
            await run_process(cmd)
        except subprocess.CalledProcessError as exc:
            # Hide the command used to avoid leaking the access token
            exc_chain = None if self._credentials else exc
            raise RuntimeError(
                f"Failed to clone repository {self._url!r} with exit code"
                f" {exc.returncode}."
            ) from exc_chain

    def __eq__(self, __value) -> bool:
        if isinstance(__value, GitRepository):
            return (
                self._url == __value._url
                and self._branch == __value._branch
                and self._name == __value._name
            )
        return False

    def __repr__(self) -> str:
        return (
            f"GitRepository(name={self._name!r} repository={self._url!r},"
            f" branch={self._branch!r})"
        )

    def to_pull_step(self) -> Dict:
        pull_step = {
            "prefect.deployments.steps.git_clone": {
                "repository": self._url,
                "branch": self._branch,
            }
        }
        if self._include_submodules:
            pull_step["prefect.deployments.steps.git_clone"][
                "include_submodules"
            ] = self._include_submodules
        if isinstance(self._credentials, Block):
            pull_step["prefect.deployments.steps.git_clone"][
                "credentials"
            ] = f"{{{{ {self._credentials.get_block_placeholder()} }}}}"
        elif isinstance(self._credentials, dict):
            if isinstance(self._credentials.get("access_token"), Secret):
                pull_step["prefect.deployments.steps.git_clone"]["credentials"] = {
                    **self._credentials,
                    "access_token": (
                        "{{"
                        f" {self._credentials['access_token'].get_block_placeholder()} }}}}"
                    ),
                }
            elif self._credentials.get("access_token") is not None:
                raise ValueError(
                    "Please save your access token as a Secret block before converting"
                    " this storage object to a pull step."
                )

        return pull_step

pull_code() async

Pulls the contents of the configured repository to the local filesystem.

Source code in src/prefect/runner/storage.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
async def pull_code(self):
    """
    Pulls the contents of the configured repository to the local filesystem.
    """
    self._logger.debug(
        "Pulling contents from repository '%s' to '%s'...",
        self._name,
        self.destination,
    )

    git_dir = self.destination / ".git"

    if git_dir.exists():
        # Check if the existing repository matches the configured repository
        result = await run_process(
            ["git", "config", "--get", "remote.origin.url"],
            cwd=str(self.destination),
        )
        existing_repo_url = None
        if result.stdout is not None:
            existing_repo_url = _strip_auth_from_url(result.stdout.decode().strip())

        if existing_repo_url != self._url:
            raise ValueError(
                f"The existing repository at {str(self.destination)} "
                f"does not match the configured repository {self._url}"
            )

        self._logger.debug("Pulling latest changes from origin/%s", self._branch)
        # Update the existing repository
        cmd = ["git", "pull", "origin"]
        if self._branch:
            cmd += [self._branch]
        if self._include_submodules:
            cmd += ["--recurse-submodules"]
        cmd += ["--depth", "1"]
        try:
            await run_process(cmd, cwd=self.destination)
            self._logger.debug("Successfully pulled latest changes")
        except subprocess.CalledProcessError as exc:
            self._logger.error(
                f"Failed to pull latest changes with exit code {exc}"
            )
            shutil.rmtree(self.destination)
            await self._clone_repo()

    else:
        await self._clone_repo()

LocalStorage

Sets the working directory in the local filesystem. Parameters: Path: Local file path to set the working directory for the flow Examples: Sets the working directory for the local path to the flow: python from prefect.runner.storage import Localstorage storage = LocalStorage( path="/path/to/local/flow_directory", )

Source code in src/prefect/runner/storage.py
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
class LocalStorage:
    """
    Sets the working directory in the local filesystem.
    Parameters:
        Path: Local file path to set the working directory for the flow
    Examples:
        Sets the working directory for the local path to the flow:
        ```python
        from prefect.runner.storage import Localstorage
        storage = LocalStorage(
            path="/path/to/local/flow_directory",
        )
        ```
    """

    def __init__(
        self,
        path: str,
        pull_interval: Optional[int] = None,
    ):
        self._path = Path(path).resolve()
        self._logger = get_logger("runner.storage.local-storage")
        self._storage_base_path = Path.cwd()
        self._pull_interval = pull_interval

    @property
    def destination(self) -> Path:
        return self._path

    def set_base_path(self, path: Path):
        self._storage_base_path = path

    @property
    def pull_interval(self) -> Optional[int]:
        return self._pull_interval

    async def pull_code(self):
        # Local storage assumes the code already exists on the local filesystem
        # and does not need to be pulled from a remote location
        pass

    def to_pull_step(self) -> dict:
        """
        Returns a dictionary representation of the storage object that can be
        used as a deployment pull step.
        """
        step = {
            "prefect.deployments.steps.set_working_directory": {
                "directory": str(self.destination)
            }
        }
        return step

    def __eq__(self, __value) -> bool:
        if isinstance(__value, LocalStorage):
            return self._path == __value._path
        return False

    def __repr__(self) -> str:
        return f"LocalStorage(path={self._path!r})"

to_pull_step()

Returns a dictionary representation of the storage object that can be used as a deployment pull step.

Source code in src/prefect/runner/storage.py
607
608
609
610
611
612
613
614
615
616
617
def to_pull_step(self) -> dict:
    """
    Returns a dictionary representation of the storage object that can be
    used as a deployment pull step.
    """
    step = {
        "prefect.deployments.steps.set_working_directory": {
            "directory": str(self.destination)
        }
    }
    return step

RemoteStorage

Pulls the contents of a remote storage location to the local filesystem.

Parameters:

Name Type Description Default
url str

The URL of the remote storage location to pull from. Supports fsspec URLs. Some protocols may require an additional fsspec dependency to be installed. Refer to the fsspec docs for more details.

required
pull_interval Optional[int]

The interval in seconds at which to pull contents from remote storage to local storage. If None, remote storage will perform a one-time sync.

60
**settings Any

Any additional settings to pass the fsspec filesystem class.

{}

Examples:

Pull the contents of a remote storage location to the local filesystem:

from prefect.runner.storage import RemoteStorage

storage = RemoteStorage(url="s3://my-bucket/my-folder")

await storage.pull_code()

Pull the contents of a remote storage location to the local filesystem with additional settings:

from prefect.runner.storage import RemoteStorage
from prefect.blocks.system import Secret

storage = RemoteStorage(
    url="s3://my-bucket/my-folder",
    # Use Secret blocks to keep credentials out of your code
    key=Secret.load("my-aws-access-key"),
    secret=Secret.load("my-aws-secret-key"),
)

await storage.pull_code()
Source code in src/prefect/runner/storage.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
class RemoteStorage:
    """
    Pulls the contents of a remote storage location to the local filesystem.

    Parameters:
        url: The URL of the remote storage location to pull from. Supports
            `fsspec` URLs. Some protocols may require an additional `fsspec`
            dependency to be installed. Refer to the
            [`fsspec` docs](https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations)
            for more details.
        pull_interval: The interval in seconds at which to pull contents from
            remote storage to local storage. If None, remote storage will perform
            a one-time sync.
        **settings: Any additional settings to pass the `fsspec` filesystem class.

    Examples:
        Pull the contents of a remote storage location to the local filesystem:

        ```python
        from prefect.runner.storage import RemoteStorage

        storage = RemoteStorage(url="s3://my-bucket/my-folder")

        await storage.pull_code()
        ```

        Pull the contents of a remote storage location to the local filesystem
        with additional settings:

        ```python
        from prefect.runner.storage import RemoteStorage
        from prefect.blocks.system import Secret

        storage = RemoteStorage(
            url="s3://my-bucket/my-folder",
            # Use Secret blocks to keep credentials out of your code
            key=Secret.load("my-aws-access-key"),
            secret=Secret.load("my-aws-secret-key"),
        )

        await storage.pull_code()
        ```
    """

    def __init__(
        self,
        url: str,
        pull_interval: Optional[int] = 60,
        **settings: Any,
    ):
        self._url = url
        self._settings = settings
        self._logger = get_logger("runner.storage.remote-storage")
        self._storage_base_path = Path.cwd()
        self._pull_interval = pull_interval

    @staticmethod
    def _get_required_package_for_scheme(scheme: str) -> Optional[str]:
        # attempt to discover the package name for the given scheme
        # from fsspec's registry
        known_implementation = fsspec.registry.get(scheme)
        if known_implementation:
            return known_implementation.__module__.split(".")[0]
        # if we don't know the implementation, try to guess it for some
        # common schemes
        elif scheme == "s3":
            return "s3fs"
        elif scheme == "gs" or scheme == "gcs":
            return "gcsfs"
        elif scheme == "abfs" or scheme == "az":
            return "adlfs"
        else:
            return None

    @property
    def _filesystem(self) -> fsspec.AbstractFileSystem:
        scheme, _, _, _, _ = urlsplit(self._url)

        def replace_blocks_with_values(obj: Any) -> Any:
            if isinstance(obj, Block):
                if hasattr(obj, "get"):
                    return obj.get()
                if hasattr(obj, "value"):
                    return obj.value
                else:
                    return obj.model_dump()
            return obj

        settings_with_block_values = visit_collection(
            self._settings, replace_blocks_with_values, return_data=True
        )

        return fsspec.filesystem(scheme, **settings_with_block_values)

    def set_base_path(self, path: Path):
        self._storage_base_path = path

    @property
    def pull_interval(self) -> Optional[int]:
        """
        The interval at which contents from remote storage should be pulled to
        local storage. If None, remote storage will perform a one-time sync.
        """
        return self._pull_interval

    @property
    def destination(self) -> Path:
        """
        The local file path to pull contents from remote storage to.
        """
        return self._storage_base_path / self._remote_path

    @property
    def _remote_path(self) -> Path:
        """
        The remote file path to pull contents from remote storage to.
        """
        _, netloc, urlpath, _, _ = urlsplit(self._url)
        return Path(netloc) / Path(urlpath.lstrip("/"))

    async def pull_code(self):
        """
        Pulls contents from remote storage to the local filesystem.
        """
        self._logger.debug(
            "Pulling contents from remote storage '%s' to '%s'...",
            self._url,
            self.destination,
        )

        if not self.destination.exists():
            self.destination.mkdir(parents=True, exist_ok=True)

        remote_path = str(self._remote_path) + "/"

        try:
            await from_async.wait_for_call_in_new_thread(
                create_call(
                    self._filesystem.get,
                    remote_path,
                    str(self.destination),
                    recursive=True,
                )
            )
        except Exception as exc:
            raise RuntimeError(
                f"Failed to pull contents from remote storage {self._url!r} to"
                f" {self.destination!r}"
            ) from exc

    def to_pull_step(self) -> dict:
        """
        Returns a dictionary representation of the storage object that can be
        used as a deployment pull step.
        """

        def replace_block_with_placeholder(obj: Any) -> Any:
            if isinstance(obj, Block):
                return f"{{{{ {obj.get_block_placeholder()} }}}}"
            return obj

        settings_with_placeholders = visit_collection(
            self._settings, replace_block_with_placeholder, return_data=True
        )
        required_package = self._get_required_package_for_scheme(
            urlparse(self._url).scheme
        )
        step = {
            "prefect.deployments.steps.pull_from_remote_storage": {
                "url": self._url,
                **settings_with_placeholders,
            }
        }
        if required_package:
            step["prefect.deployments.steps.pull_from_remote_storage"][
                "requires"
            ] = required_package
        return step

    def __eq__(self, __value) -> bool:
        """
        Equality check for runner storage objects.
        """
        if isinstance(__value, RemoteStorage):
            return self._url == __value._url and self._settings == __value._settings
        return False

    def __repr__(self) -> str:
        return f"RemoteStorage(url={self._url!r})"

destination: Path property

The local file path to pull contents from remote storage to.

pull_interval: Optional[int] property

The interval at which contents from remote storage should be pulled to local storage. If None, remote storage will perform a one-time sync.

__eq__(__value)

Equality check for runner storage objects.

Source code in src/prefect/runner/storage.py
488
489
490
491
492
493
494
def __eq__(self, __value) -> bool:
    """
    Equality check for runner storage objects.
    """
    if isinstance(__value, RemoteStorage):
        return self._url == __value._url and self._settings == __value._settings
    return False

pull_code() async

Pulls contents from remote storage to the local filesystem.

Source code in src/prefect/runner/storage.py
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
async def pull_code(self):
    """
    Pulls contents from remote storage to the local filesystem.
    """
    self._logger.debug(
        "Pulling contents from remote storage '%s' to '%s'...",
        self._url,
        self.destination,
    )

    if not self.destination.exists():
        self.destination.mkdir(parents=True, exist_ok=True)

    remote_path = str(self._remote_path) + "/"

    try:
        await from_async.wait_for_call_in_new_thread(
            create_call(
                self._filesystem.get,
                remote_path,
                str(self.destination),
                recursive=True,
            )
        )
    except Exception as exc:
        raise RuntimeError(
            f"Failed to pull contents from remote storage {self._url!r} to"
            f" {self.destination!r}"
        ) from exc

to_pull_step()

Returns a dictionary representation of the storage object that can be used as a deployment pull step.

Source code in src/prefect/runner/storage.py
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
def to_pull_step(self) -> dict:
    """
    Returns a dictionary representation of the storage object that can be
    used as a deployment pull step.
    """

    def replace_block_with_placeholder(obj: Any) -> Any:
        if isinstance(obj, Block):
            return f"{{{{ {obj.get_block_placeholder()} }}}}"
        return obj

    settings_with_placeholders = visit_collection(
        self._settings, replace_block_with_placeholder, return_data=True
    )
    required_package = self._get_required_package_for_scheme(
        urlparse(self._url).scheme
    )
    step = {
        "prefect.deployments.steps.pull_from_remote_storage": {
            "url": self._url,
            **settings_with_placeholders,
        }
    }
    if required_package:
        step["prefect.deployments.steps.pull_from_remote_storage"][
            "requires"
        ] = required_package
    return step

RunnerStorage

Bases: Protocol

A storage interface for a runner to use to retrieve remotely stored flow code.

Source code in src/prefect/runner/storage.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@runtime_checkable
class RunnerStorage(Protocol):
    """
    A storage interface for a runner to use to retrieve
    remotely stored flow code.
    """

    def set_base_path(self, path: Path):
        """
        Sets the base path to use when pulling contents from remote storage to
        local storage.
        """
        ...

    @property
    def pull_interval(self) -> Optional[int]:
        """
        The interval at which contents from remote storage should be pulled to
        local storage. If None, remote storage will perform a one-time sync.
        """
        ...

    @property
    def destination(self) -> Path:
        """
        The local file path to pull contents from remote storage to.
        """
        ...

    async def pull_code(self):
        """
        Pulls contents from remote storage to the local filesystem.
        """
        ...

    def to_pull_step(self) -> dict:
        """
        Returns a dictionary representation of the storage object that can be
        used as a deployment pull step.
        """
        ...

    def __eq__(self, __value) -> bool:
        """
        Equality check for runner storage objects.
        """
        ...

destination: Path property

The local file path to pull contents from remote storage to.

pull_interval: Optional[int] property

The interval at which contents from remote storage should be pulled to local storage. If None, remote storage will perform a one-time sync.

__eq__(__value)

Equality check for runner storage objects.

Source code in src/prefect/runner/storage.py
63
64
65
66
67
def __eq__(self, __value) -> bool:
    """
    Equality check for runner storage objects.
    """
    ...

pull_code() async

Pulls contents from remote storage to the local filesystem.

Source code in src/prefect/runner/storage.py
50
51
52
53
54
async def pull_code(self):
    """
    Pulls contents from remote storage to the local filesystem.
    """
    ...

set_base_path(path)

Sets the base path to use when pulling contents from remote storage to local storage.

Source code in src/prefect/runner/storage.py
28
29
30
31
32
33
def set_base_path(self, path: Path):
    """
    Sets the base path to use when pulling contents from remote storage to
    local storage.
    """
    ...

to_pull_step()

Returns a dictionary representation of the storage object that can be used as a deployment pull step.

Source code in src/prefect/runner/storage.py
56
57
58
59
60
61
def to_pull_step(self) -> dict:
    """
    Returns a dictionary representation of the storage object that can be
    used as a deployment pull step.
    """
    ...

create_storage_from_source(source, pull_interval=60)

Creates a storage object from a URL.

Parameters:

Name Type Description Default
url

The URL to create a storage object from. Supports git and fsspec URLs.

required
pull_interval Optional[int]

The interval at which to pull contents from remote storage to local storage

60

Returns:

Name Type Description
RunnerStorage RunnerStorage

A runner storage compatible object

Source code in src/prefect/runner/storage.py
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
def create_storage_from_source(
    source: str, pull_interval: Optional[int] = 60
) -> RunnerStorage:
    """
    Creates a storage object from a URL.

    Args:
        url: The URL to create a storage object from. Supports git and `fsspec`
            URLs.
        pull_interval: The interval at which to pull contents from remote storage to
            local storage

    Returns:
        RunnerStorage: A runner storage compatible object
    """
    logger = get_logger("runner.storage")
    parsed_source = urlparse(source)
    if parsed_source.scheme == "git" or parsed_source.path.endswith(".git"):
        return GitRepository(url=source, pull_interval=pull_interval)
    elif parsed_source.scheme in ("file", "local"):
        source_path = source.split("://", 1)[-1]
        return LocalStorage(path=source_path, pull_interval=pull_interval)
    elif parsed_source.scheme in fsspec.available_protocols():
        return RemoteStorage(url=source, pull_interval=pull_interval)
    else:
        logger.debug("No valid fsspec protocol found for URL, assuming local storage.")
        return LocalStorage(path=source, pull_interval=pull_interval)