Skip to content

prefect.utilities.dockerutils

BuildError

Bases: Exception

Raised when a Docker build fails

Source code in src/prefect/utilities/dockerutils.py
128
129
class BuildError(Exception):
    """Raised when a Docker build fails"""

ImageBuilder

An interface for preparing Docker build contexts and building images

Source code in src/prefect/utilities/dockerutils.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
class ImageBuilder:
    """An interface for preparing Docker build contexts and building images"""

    base_directory: Path
    context: Optional[Path]
    platform: Optional[str]
    dockerfile_lines: List[str]

    def __init__(
        self,
        base_image: str,
        base_directory: Path = None,
        platform: Optional[str] = None,
        context: Path = None,
    ):
        """Create an ImageBuilder

        Args:
            base_image: the base image to use
            base_directory: the starting point on your host for relative file locations,
                defaulting to the current directory
            context: use this path as the build context (if not provided, will create a
                temporary directory for the context)

        Returns:
            The image ID
        """
        self.base_directory = base_directory or context or Path().absolute()
        self.temporary_directory = None
        self.context = context
        self.platform = platform
        self.dockerfile_lines = []

        if self.context:
            dockerfile_path: Path = self.context / "Dockerfile"
            if dockerfile_path.exists():
                raise ValueError(f"There is already a Dockerfile at {context}")

        self.add_line(f"FROM {base_image}")

    def __enter__(self) -> Self:
        if self.context and not self.temporary_directory:
            return self

        self.temporary_directory = TemporaryDirectory()
        self.context = Path(self.temporary_directory.__enter__())
        return self

    def __exit__(
        self, exc: Type[BaseException], value: BaseException, traceback: TracebackType
    ) -> None:
        if not self.temporary_directory:
            return

        self.temporary_directory.__exit__(exc, value, traceback)
        self.temporary_directory = None
        self.context = None

    def add_line(self, line: str) -> None:
        """Add a line to this image's Dockerfile"""
        self.add_lines([line])

    def add_lines(self, lines: Iterable[str]) -> None:
        """Add lines to this image's Dockerfile"""
        self.dockerfile_lines.extend(lines)

    def copy(self, source: Union[str, Path], destination: Union[str, PurePosixPath]):
        """Copy a file to this image"""
        if not self.context:
            raise Exception("No context available")

        if not isinstance(destination, PurePosixPath):
            destination = PurePosixPath(destination)

        if not isinstance(source, Path):
            source = Path(source)

        if source.is_absolute():
            source = source.resolve().relative_to(self.base_directory)

        if self.temporary_directory:
            os.makedirs(self.context / source.parent, exist_ok=True)

            if source.is_dir():
                shutil.copytree(self.base_directory / source, self.context / source)
            else:
                shutil.copy2(self.base_directory / source, self.context / source)

        self.add_line(f"COPY {source} {destination}")

    def write_text(self, text: str, destination: Union[str, PurePosixPath]):
        if not self.context:
            raise Exception("No context available")

        if not isinstance(destination, PurePosixPath):
            destination = PurePosixPath(destination)

        source_hash = hashlib.sha256(text.encode()).hexdigest()
        (self.context / f".{source_hash}").write_text(text)
        self.add_line(f"COPY .{source_hash} {destination}")

    def build(
        self, pull: bool = False, stream_progress_to: Optional[TextIO] = None
    ) -> str:
        """Build the Docker image from the current state of the ImageBuilder

        Args:
            pull: True to pull the base image during the build
            stream_progress_to: an optional stream (like sys.stdout, or an io.TextIO)
                that will collect the build output as it is reported by Docker

        Returns:
            The image ID
        """
        dockerfile_path: Path = self.context / "Dockerfile"

        with dockerfile_path.open("w") as dockerfile:
            dockerfile.writelines(line + "\n" for line in self.dockerfile_lines)

        try:
            return build_image(
                self.context,
                platform=self.platform,
                pull=pull,
                stream_progress_to=stream_progress_to,
            )
        finally:
            os.unlink(dockerfile_path)

    def assert_has_line(self, line: str) -> None:
        """Asserts that the given line is in the Dockerfile"""
        all_lines = "\n".join(
            [f"  {i+1:>3}: {line}" for i, line in enumerate(self.dockerfile_lines)]
        )
        message = (
            f"Expected {line!r} not found in Dockerfile.  Dockerfile:\n{all_lines}"
        )
        assert line in self.dockerfile_lines, message

    def assert_line_absent(self, line: str) -> None:
        """Asserts that the given line is absent from the Dockerfile"""
        if line not in self.dockerfile_lines:
            return

        i = self.dockerfile_lines.index(line)

        surrounding_lines = "\n".join(
            [
                f"  {i+1:>3}: {line}"
                for i, line in enumerate(self.dockerfile_lines[i - 2 : i + 2])
            ]
        )
        message = (
            f"Unexpected {line!r} found in Dockerfile at line {i+1}.  "
            f"Surrounding lines:\n{surrounding_lines}"
        )

        assert line not in self.dockerfile_lines, message

    def assert_line_before(self, first: str, second: str) -> None:
        """Asserts that the first line appears before the second line"""
        self.assert_has_line(first)
        self.assert_has_line(second)

        first_index = self.dockerfile_lines.index(first)
        second_index = self.dockerfile_lines.index(second)

        surrounding_lines = "\n".join(
            [
                f"  {i+1:>3}: {line}"
                for i, line in enumerate(
                    self.dockerfile_lines[second_index - 2 : first_index + 2]
                )
            ]
        )

        message = (
            f"Expected {first!r} to appear before {second!r} in the Dockerfile, but "
            f"{first!r} was at line {first_index+1} and {second!r} as at line "
            f"{second_index+1}.  Surrounding lines:\n{surrounding_lines}"
        )

        assert first_index < second_index, message

    def assert_line_after(self, second: str, first: str) -> None:
        """Asserts that the second line appears after the first line"""
        self.assert_line_before(first, second)

    def assert_has_file(self, source: Path, container_path: PurePosixPath) -> None:
        """Asserts that the given file or directory will be copied into the container
        at the given path"""
        if source.is_absolute():
            source = source.relative_to(self.base_directory)

        self.assert_has_line(f"COPY {source} {container_path}")

__init__(base_image, base_directory=None, platform=None, context=None)

Create an ImageBuilder

Parameters:

Name Type Description Default
base_image str

the base image to use

required
base_directory Path

the starting point on your host for relative file locations, defaulting to the current directory

None
context Path

use this path as the build context (if not provided, will create a temporary directory for the context)

None

Returns:

Type Description

The image ID

Source code in src/prefect/utilities/dockerutils.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def __init__(
    self,
    base_image: str,
    base_directory: Path = None,
    platform: Optional[str] = None,
    context: Path = None,
):
    """Create an ImageBuilder

    Args:
        base_image: the base image to use
        base_directory: the starting point on your host for relative file locations,
            defaulting to the current directory
        context: use this path as the build context (if not provided, will create a
            temporary directory for the context)

    Returns:
        The image ID
    """
    self.base_directory = base_directory or context or Path().absolute()
    self.temporary_directory = None
    self.context = context
    self.platform = platform
    self.dockerfile_lines = []

    if self.context:
        dockerfile_path: Path = self.context / "Dockerfile"
        if dockerfile_path.exists():
            raise ValueError(f"There is already a Dockerfile at {context}")

    self.add_line(f"FROM {base_image}")

add_line(line)

Add a line to this image's Dockerfile

Source code in src/prefect/utilities/dockerutils.py
261
262
263
def add_line(self, line: str) -> None:
    """Add a line to this image's Dockerfile"""
    self.add_lines([line])

add_lines(lines)

Add lines to this image's Dockerfile

Source code in src/prefect/utilities/dockerutils.py
265
266
267
def add_lines(self, lines: Iterable[str]) -> None:
    """Add lines to this image's Dockerfile"""
    self.dockerfile_lines.extend(lines)

assert_has_file(source, container_path)

Asserts that the given file or directory will be copied into the container at the given path

Source code in src/prefect/utilities/dockerutils.py
391
392
393
394
395
396
397
def assert_has_file(self, source: Path, container_path: PurePosixPath) -> None:
    """Asserts that the given file or directory will be copied into the container
    at the given path"""
    if source.is_absolute():
        source = source.relative_to(self.base_directory)

    self.assert_has_line(f"COPY {source} {container_path}")

assert_has_line(line)

Asserts that the given line is in the Dockerfile

Source code in src/prefect/utilities/dockerutils.py
332
333
334
335
336
337
338
339
340
def assert_has_line(self, line: str) -> None:
    """Asserts that the given line is in the Dockerfile"""
    all_lines = "\n".join(
        [f"  {i+1:>3}: {line}" for i, line in enumerate(self.dockerfile_lines)]
    )
    message = (
        f"Expected {line!r} not found in Dockerfile.  Dockerfile:\n{all_lines}"
    )
    assert line in self.dockerfile_lines, message

assert_line_absent(line)

Asserts that the given line is absent from the Dockerfile

Source code in src/prefect/utilities/dockerutils.py
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
def assert_line_absent(self, line: str) -> None:
    """Asserts that the given line is absent from the Dockerfile"""
    if line not in self.dockerfile_lines:
        return

    i = self.dockerfile_lines.index(line)

    surrounding_lines = "\n".join(
        [
            f"  {i+1:>3}: {line}"
            for i, line in enumerate(self.dockerfile_lines[i - 2 : i + 2])
        ]
    )
    message = (
        f"Unexpected {line!r} found in Dockerfile at line {i+1}.  "
        f"Surrounding lines:\n{surrounding_lines}"
    )

    assert line not in self.dockerfile_lines, message

assert_line_after(second, first)

Asserts that the second line appears after the first line

Source code in src/prefect/utilities/dockerutils.py
387
388
389
def assert_line_after(self, second: str, first: str) -> None:
    """Asserts that the second line appears after the first line"""
    self.assert_line_before(first, second)

assert_line_before(first, second)

Asserts that the first line appears before the second line

Source code in src/prefect/utilities/dockerutils.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
def assert_line_before(self, first: str, second: str) -> None:
    """Asserts that the first line appears before the second line"""
    self.assert_has_line(first)
    self.assert_has_line(second)

    first_index = self.dockerfile_lines.index(first)
    second_index = self.dockerfile_lines.index(second)

    surrounding_lines = "\n".join(
        [
            f"  {i+1:>3}: {line}"
            for i, line in enumerate(
                self.dockerfile_lines[second_index - 2 : first_index + 2]
            )
        ]
    )

    message = (
        f"Expected {first!r} to appear before {second!r} in the Dockerfile, but "
        f"{first!r} was at line {first_index+1} and {second!r} as at line "
        f"{second_index+1}.  Surrounding lines:\n{surrounding_lines}"
    )

    assert first_index < second_index, message

build(pull=False, stream_progress_to=None)

Build the Docker image from the current state of the ImageBuilder

Parameters:

Name Type Description Default
pull bool

True to pull the base image during the build

False
stream_progress_to Optional[TextIO]

an optional stream (like sys.stdout, or an io.TextIO) that will collect the build output as it is reported by Docker

None

Returns:

Type Description
str

The image ID

Source code in src/prefect/utilities/dockerutils.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def build(
    self, pull: bool = False, stream_progress_to: Optional[TextIO] = None
) -> str:
    """Build the Docker image from the current state of the ImageBuilder

    Args:
        pull: True to pull the base image during the build
        stream_progress_to: an optional stream (like sys.stdout, or an io.TextIO)
            that will collect the build output as it is reported by Docker

    Returns:
        The image ID
    """
    dockerfile_path: Path = self.context / "Dockerfile"

    with dockerfile_path.open("w") as dockerfile:
        dockerfile.writelines(line + "\n" for line in self.dockerfile_lines)

    try:
        return build_image(
            self.context,
            platform=self.platform,
            pull=pull,
            stream_progress_to=stream_progress_to,
        )
    finally:
        os.unlink(dockerfile_path)

copy(source, destination)

Copy a file to this image

Source code in src/prefect/utilities/dockerutils.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def copy(self, source: Union[str, Path], destination: Union[str, PurePosixPath]):
    """Copy a file to this image"""
    if not self.context:
        raise Exception("No context available")

    if not isinstance(destination, PurePosixPath):
        destination = PurePosixPath(destination)

    if not isinstance(source, Path):
        source = Path(source)

    if source.is_absolute():
        source = source.resolve().relative_to(self.base_directory)

    if self.temporary_directory:
        os.makedirs(self.context / source.parent, exist_ok=True)

        if source.is_dir():
            shutil.copytree(self.base_directory / source, self.context / source)
        else:
            shutil.copy2(self.base_directory / source, self.context / source)

    self.add_line(f"COPY {source} {destination}")

PushError

Bases: Exception

Raised when a Docker image push fails

Source code in src/prefect/utilities/dockerutils.py
400
401
class PushError(Exception):
    """Raised when a Docker image push fails"""

build_image(context, dockerfile='Dockerfile', tag=None, pull=False, platform=None, stream_progress_to=None, **kwargs)

Builds a Docker image, returning the image ID

Parameters:

Name Type Description Default
context Path

the root directory for the Docker build context

required
dockerfile str

the path to the Dockerfile, relative to the context

'Dockerfile'
tag Optional[str]

the tag to give this image

None
pull bool

True to pull the base image during the build

False
stream_progress_to Optional[TextIO]

an optional stream (like sys.stdout, or an io.TextIO) that will collect the build output as it is reported by Docker

None

Returns:

Type Description
str

The image ID

Source code in src/prefect/utilities/dockerutils.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
@silence_docker_warnings()
def build_image(
    context: Path,
    dockerfile: str = "Dockerfile",
    tag: Optional[str] = None,
    pull: bool = False,
    platform: Optional[str] = None,
    stream_progress_to: Optional[TextIO] = None,
    **kwargs: Any,
) -> str:
    """Builds a Docker image, returning the image ID

    Args:
        context: the root directory for the Docker build context
        dockerfile: the path to the Dockerfile, relative to the context
        tag: the tag to give this image
        pull: True to pull the base image during the build
        stream_progress_to: an optional stream (like sys.stdout, or an io.TextIO) that
            will collect the build output as it is reported by Docker

    Returns:
        The image ID
    """

    if not context:
        raise ValueError("context required to build an image")

    if not Path(context).exists():
        raise ValueError(f"Context path {context} does not exist")

    kwargs = {key: kwargs[key] for key in kwargs if key not in ["decode", "labels"]}

    image_id = None
    with docker_client() as client:
        events = client.api.build(
            path=str(context),
            tag=tag,
            dockerfile=dockerfile,
            pull=pull,
            decode=True,
            labels=IMAGE_LABELS,
            platform=platform,
            **kwargs,
        )

        try:
            for event in events:
                if "stream" in event:
                    if not stream_progress_to:
                        continue
                    stream_progress_to.write(event["stream"])
                    stream_progress_to.flush()
                elif "aux" in event:
                    image_id = event["aux"]["ID"]
                elif "error" in event:
                    raise BuildError(event["error"])
                elif "message" in event:
                    raise BuildError(event["message"])
        except docker.errors.APIError as e:
            raise BuildError(e.explanation) from e

    assert image_id, "The Docker daemon did not return an image ID"
    return image_id

docker_client()

Get the environmentally-configured Docker client

Source code in src/prefect/utilities/dockerutils.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
@contextmanager
def docker_client() -> Generator["DockerClient", None, None]:
    """Get the environmentally-configured Docker client"""
    client = None
    try:
        with silence_docker_warnings():
            client = docker.DockerClient.from_env()

            yield client
    except docker.errors.DockerException as exc:
        raise RuntimeError(
            "This error is often thrown because Docker is not running. Please ensure Docker is running."
        ) from exc
    finally:
        client is not None and client.close()

format_outlier_version_name(version)

Formats outlier docker version names to pass packaging.version.parse validation - Current cases are simple, but creates stub for more complicated formatting if eventually needed. - Example outlier versions that throw a parsing exception: - "20.10.0-ce" (variant of community edition label) - "20.10.0-ee" (variant of enterprise edition label)

Parameters:

Name Type Description Default
version str

raw docker version value

required

Returns:

Name Type Description
str

value that can pass packaging.version.parse validation

Source code in src/prefect/utilities/dockerutils.py
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
def format_outlier_version_name(version: str):
    """
    Formats outlier docker version names to pass `packaging.version.parse` validation
    - Current cases are simple, but creates stub for more complicated formatting if eventually needed.
    - Example outlier versions that throw a parsing exception:
      - "20.10.0-ce" (variant of community edition label)
      - "20.10.0-ee" (variant of enterprise edition label)

    Args:
        version (str): raw docker version value

    Returns:
        str: value that can pass `packaging.version.parse` validation
    """
    return version.replace("-ce", "").replace("-ee", "")

generate_default_dockerfile(context=None)

Generates a default Dockerfile used for deploying flows. The Dockerfile is written to a temporary file and yielded. The temporary file is removed after the context manager exits.

Parameters:

Name Type Description Default
- context

The context to use for the Dockerfile. Defaults to the current working directory.

required
Source code in src/prefect/utilities/dockerutils.py
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
@contextmanager
def generate_default_dockerfile(context: Optional[Path] = None):
    """
    Generates a default Dockerfile used for deploying flows. The Dockerfile is written
    to a temporary file and yielded. The temporary file is removed after the context
    manager exits.

    Args:
        - context: The context to use for the Dockerfile. Defaults to
            the current working directory.
    """
    if not context:
        context = Path.cwd()
    lines = []
    base_image = get_prefect_image_name()
    lines.append(f"FROM {base_image}")
    dir_name = context.name

    if (context / "requirements.txt").exists():
        lines.append(f"COPY requirements.txt /opt/prefect/{dir_name}/requirements.txt")
        lines.append(
            f"RUN python -m pip install -r /opt/prefect/{dir_name}/requirements.txt"
        )

    lines.append(f"COPY . /opt/prefect/{dir_name}/")
    lines.append(f"WORKDIR /opt/prefect/{dir_name}/")

    temp_dockerfile = context / "Dockerfile"
    if Path(temp_dockerfile).exists():
        raise RuntimeError(
            "Failed to generate Dockerfile. Dockerfile already exists in the"
            " current directory."
        )

    with Path(temp_dockerfile).open("w") as f:
        f.writelines(line + "\n" for line in lines)

    try:
        yield temp_dockerfile
    finally:
        temp_dockerfile.unlink()

get_prefect_image_name(prefect_version=None, python_version=None, flavor=None)

Get the Prefect image name matching the current Prefect and Python versions.

Parameters:

Name Type Description Default
prefect_version Optional[str]

An optional override for the Prefect version.

None
python_version Optional[str]

An optional override for the Python version; must be at the minor level e.g. '3.9'.

None
flavor Optional[str]

An optional alternative image flavor to build, like 'conda'

None
Source code in src/prefect/utilities/dockerutils.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def get_prefect_image_name(
    prefect_version: Optional[str] = None,
    python_version: Optional[str] = None,
    flavor: Optional[str] = None,
) -> str:
    """
    Get the Prefect image name matching the current Prefect and Python versions.

    Args:
        prefect_version: An optional override for the Prefect version.
        python_version: An optional override for the Python version; must be at the
            minor level e.g. '3.9'.
        flavor: An optional alternative image flavor to build, like 'conda'
    """
    parsed_version = (prefect_version or prefect.__version__).split("+")
    is_prod_build = len(parsed_version) == 1
    prefect_version = (
        parsed_version[0]
        if is_prod_build
        else "sha-" + prefect.__version_info__["full-revisionid"][:7]
    )

    python_version = python_version or python_version_minor()

    tag = slugify(
        f"{prefect_version}-python{python_version}" + (f"-{flavor}" if flavor else ""),
        lowercase=False,
        max_length=128,
        # Docker allows these characters for tag names
        regex_pattern=r"[^a-zA-Z0-9_.-]+",
    )

    image = "prefect" if is_prod_build else "prefect-dev"
    return f"prefecthq/{image}:{tag}"

parse_image_tag(name)

Parse Docker Image String

  • If a tag exists, this function parses and returns the image registry and tag, separately as a tuple.
  • Example 1: 'prefecthq/prefect:latest' -> ('prefecthq/prefect', 'latest')
  • Example 2: 'hostname.io:5050/folder/subfolder:latest' -> ('hostname.io:5050/folder/subfolder', 'latest')
  • Supports parsing Docker Image strings that follow Docker Image Specification v1.1.0
  • Image building tools typically enforce this standard

Parameters:

Name Type Description Default
name str

Name of Docker Image

required
Return
Source code in src/prefect/utilities/dockerutils.py
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
def parse_image_tag(name: str) -> Tuple[str, Optional[str]]:
    """
    Parse Docker Image String

    - If a tag exists, this function parses and returns the image registry and tag,
      separately as a tuple.
      - Example 1: 'prefecthq/prefect:latest' -> ('prefecthq/prefect', 'latest')
      - Example 2: 'hostname.io:5050/folder/subfolder:latest' -> ('hostname.io:5050/folder/subfolder', 'latest')
    - Supports parsing Docker Image strings that follow Docker Image Specification v1.1.0
      - Image building tools typically enforce this standard

    Args:
        name (str): Name of Docker Image

    Return:
        tuple: image registry, image tag
    """
    tag = None
    name_parts = name.split("/")
    # First handles the simplest image names (DockerHub-based, index-free, potentionally with a tag)
    # - Example: simplename:latest
    if len(name_parts) == 1:
        if ":" in name_parts[0]:
            image_name, tag = name_parts[0].split(":")
        else:
            image_name = name_parts[0]
    else:
        # 1. Separates index (hostname.io or prefecthq) from path:tag (folder/subfolder:latest or prefect:latest)
        # 2. Separates path and tag (if tag exists)
        # 3. Reunites index and path (without tag) as image name
        index_name = name_parts[0]
        image_path = "/".join(name_parts[1:])
        if ":" in image_path:
            image_path, tag = image_path.split(":")
        image_name = f"{index_name}/{image_path}"
    return image_name, tag

push_image(image_id, registry_url, name, tag=None, stream_progress_to=None)

Pushes a local image to a Docker registry, returning the registry-qualified tag for that image

This assumes that the environment's Docker daemon is already authenticated to the given registry, and currently makes no attempt to authenticate.

Parameters:

Name Type Description Default
image_id str

a Docker image ID

required
registry_url str

the URL of a Docker registry

required
name str

the name of this image

required
tag str

the tag to give this image (defaults to a short representation of the image's ID)

None
stream_progress_to Optional[TextIO]

an optional stream (like sys.stdout, or an io.TextIO) that will collect the build output as it is reported by Docker

None

Returns:

Type Description
str

A registry-qualified tag, like my-registry.example.com/my-image:abcdefg

Source code in src/prefect/utilities/dockerutils.py
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
@silence_docker_warnings()
def push_image(
    image_id: str,
    registry_url: str,
    name: str,
    tag: Optional[str] = None,
    stream_progress_to: Optional[TextIO] = None,
) -> str:
    """Pushes a local image to a Docker registry, returning the registry-qualified tag
    for that image

    This assumes that the environment's Docker daemon is already authenticated to the
    given registry, and currently makes no attempt to authenticate.

    Args:
        image_id (str): a Docker image ID
        registry_url (str): the URL of a Docker registry
        name (str): the name of this image
        tag (str): the tag to give this image (defaults to a short representation of
            the image's ID)
        stream_progress_to: an optional stream (like sys.stdout, or an io.TextIO) that
            will collect the build output as it is reported by Docker

    Returns:
        A registry-qualified tag, like my-registry.example.com/my-image:abcdefg
    """

    if not tag:
        tag = slugify(pendulum.now("utc").isoformat())

    _, registry, _, _, _ = urlsplit(registry_url)
    repository = f"{registry}/{name}"

    with docker_client() as client:
        image: "docker.Image" = client.images.get(image_id)
        image.tag(repository, tag=tag)
        events = client.api.push(repository, tag=tag, stream=True, decode=True)
        try:
            for event in events:
                if "status" in event:
                    if not stream_progress_to:
                        continue
                    stream_progress_to.write(event["status"])
                    if "progress" in event:
                        stream_progress_to.write(" " + event["progress"])
                    stream_progress_to.write("\n")
                    stream_progress_to.flush()
                elif "error" in event:
                    raise PushError(event["error"])
        finally:
            client.api.remove_image(f"{repository}:{tag}", noprune=True)

    return f"{repository}:{tag}"

split_repository_path(repository_path)

Splits a Docker repository path into its namespace and repository components.

Parameters:

Name Type Description Default
repository_path str

The Docker repository path to split.

required

Returns:

Type Description
Tuple[Optional[str], str]

Tuple[Optional[str], str]: A tuple containing the namespace and repository components. - namespace (Optional[str]): The Docker namespace, combining the registry and organization. None if not present. - repository (Optionals[str]): The repository name.

Source code in src/prefect/utilities/dockerutils.py
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
def split_repository_path(repository_path: str) -> Tuple[Optional[str], str]:
    """
    Splits a Docker repository path into its namespace and repository components.

    Args:
        repository_path: The Docker repository path to split.

    Returns:
        Tuple[Optional[str], str]: A tuple containing the namespace and repository components.
            - namespace (Optional[str]): The Docker namespace, combining the registry and organization. None if not present.
            - repository (Optionals[str]): The repository name.
    """
    parts = repository_path.split("/", 2)

    # Check if the path includes a registry and organization or just organization/repository
    if len(parts) == 3 or (len(parts) == 2 and ("." in parts[0] or ":" in parts[0])):
        # Namespace includes registry and organization
        namespace = "/".join(parts[:-1])
        repository = parts[-1]
    elif len(parts) == 2:
        # Only organization/repository provided, so namespace is just the first part
        namespace = parts[0]
        repository = parts[1]
    else:
        # No namespace provided
        namespace = None
        repository = parts[0]

    return namespace, repository

to_run_command(command)

Convert a process-style list of command arguments to a single Dockerfile RUN instruction.

Source code in src/prefect/utilities/dockerutils.py
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
def to_run_command(command: List[str]) -> str:
    """
    Convert a process-style list of command arguments to a single Dockerfile RUN
    instruction.
    """
    if not command:
        return ""

    run_command = f"RUN {command[0]}"
    if len(command) > 1:
        run_command += " " + " ".join([repr(arg) for arg in command[1:]])

    # TODO: Consider performing text-wrapping to improve readability of the generated
    #       Dockerfile
    # return textwrap.wrap(
    #     run_command,
    #     subsequent_indent=" " * 4,
    #     break_on_hyphens=False,
    #     break_long_words=False
    # )

    return run_command