Skip to content

prefect.server.models.agents

Functions for interacting with agent ORM objects. Intended for internal use by the Prefect REST API.

create_agent(session, agent) async

Inserts a Agent.

If a Agent with the same name exists, an error will be thrown.

Parameters:

Name Type Description Default
session AsyncSession

a database session

required
agent Agent

a Agent model

required

Returns:

Type Description
Agent

orm_models.Agent: the newly-created or updated Agent

Source code in src/prefect/server/models/agents.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
async def create_agent(
    session: AsyncSession,
    agent: schemas.core.Agent,
) -> orm_models.Agent:
    """
    Inserts a Agent.

    If a Agent with the same name exists, an error will be thrown.

    Args:
        session (AsyncSession): a database session
        agent (schemas.core.Agent): a Agent model

    Returns:
        orm_models.Agent: the newly-created or updated Agent

    """

    model = orm_models.Agent(**agent.model_dump())
    session.add(model)
    await session.flush()

    return model

delete_agent(session, agent_id) async

Delete a Agent by id.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
agent_id str

a Agent id

required

Returns:

Name Type Description
bool bool

whether or not the Agent was deleted

Source code in src/prefect/server/models/agents.py
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
async def delete_agent(
    session: AsyncSession,
    agent_id: UUID,
) -> bool:
    """
    Delete a Agent by id.

    Args:
        session (AsyncSession): A database session
        agent_id (str): a Agent id

    Returns:
        bool: whether or not the Agent was deleted
    """

    result = await session.execute(
        delete(orm_models.Agent).where(orm_models.Agent.id == agent_id)
    )
    return result.rowcount > 0

read_agent(session, agent_id) async

Reads a Agent by id.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
agent_id str

a Agent id

required

Returns:

Type Description
Union[Agent, None]

orm_models.Agent: the Agent

Source code in src/prefect/server/models/agents.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
async def read_agent(
    session: AsyncSession,
    agent_id: UUID,
) -> Union[orm_models.Agent, None]:
    """
    Reads a Agent by id.

    Args:
        session (AsyncSession): A database session
        agent_id (str): a Agent id

    Returns:
        orm_models.Agent: the Agent
    """

    return await session.get(orm_models.Agent, agent_id)

read_agents(session, offset=None, limit=None) async

Read Agents.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
offset int

Query offset

None
limit(int)

Query limit

required

Returns:

Type Description
Sequence[Agent]

List[orm_models.Agent]: Agents

Source code in src/prefect/server/models/agents.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
async def read_agents(
    session: AsyncSession,
    offset: Union[int, None] = None,
    limit: Union[int, None] = None,
) -> Sequence[orm_models.Agent]:
    """
    Read Agents.

    Args:
        session (AsyncSession): A database session
        offset (int): Query offset
        limit(int): Query limit

    Returns:
        List[orm_models.Agent]: Agents
    """

    query = select(orm_models.Agent).order_by(orm_models.Agent.name)

    if offset is not None:
        query = query.offset(offset)
    if limit is not None:
        query = query.limit(limit)

    result = await session.execute(query)
    return result.scalars().unique().all()

record_agent_poll(db, session, agent_id, work_queue_id) async

Record that an agent has polled a work queue.

If the agent_id already exists, work_queue and last_activity_time will be updated.

This is a convenience method for designed for speed when agents are polling work queues. For other operations, the create_agent and update_agent methods should be used.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
agent_id UUID

An agent id

required
work_queue_id UUID

A work queue id

required
Source code in src/prefect/server/models/agents.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
@db_injector
async def record_agent_poll(
    db: PrefectDBInterface,
    session: AsyncSession,
    agent_id: UUID,
    work_queue_id: UUID,
) -> None:
    """
    Record that an agent has polled a work queue.

    If the agent_id already exists, work_queue and last_activity_time
    will be updated.

    This is a convenience method for designed for speed when agents
    are polling work queues. For other operations, the
    `create_agent` and `update_agent` methods should be used.

    Args:
        session (AsyncSession): A database session
        agent_id: An agent id
        work_queue_id: A work queue id
    """
    agent_data = schemas.core.Agent(
        id=agent_id, work_queue_id=work_queue_id, last_activity_time=pendulum.now("UTC")
    )
    insert_stmt = (
        db.insert(orm_models.Agent)
        .values(
            **agent_data.model_dump(
                include={"id", "name", "work_queue_id", "last_activity_time"}
            )
        )
        .on_conflict_do_update(
            index_elements=[db.Agent.id],
            set_=agent_data.model_dump_for_orm(
                include={"work_queue_id", "last_activity_time"}
            ),
        )
    )
    await session.execute(insert_stmt)

update_agent(session, agent_id, agent) async

Update a Agent by id.

Parameters:

Name Type Description Default
session AsyncSession

A database session

required
agent Agent

the work queue data

required
agent_id str

a Agent id

required

Returns:

Name Type Description
bool bool

whether or not the Agent was deleted

Source code in src/prefect/server/models/agents.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
async def update_agent(
    session: AsyncSession,
    agent_id: UUID,
    agent: schemas.core.Agent,
) -> bool:
    """
    Update a Agent by id.

    Args:
        session (AsyncSession): A database session
        agent: the work queue data
        agent_id (str): a Agent id

    Returns:
        bool: whether or not the Agent was deleted
    """

    update_stmt = (
        sa.update(orm_models.Agent)
        .where(orm_models.Agent.id == agent_id)
        # exclude_unset=True allows us to only update values provided by
        # the user, ignoring any defaults on the model
        .values(**agent.model_dump_for_orm(exclude_unset=True))
    )
    result = await session.execute(update_stmt)
    return result.rowcount > 0