query_async
query_async
can execute a query and serialize the results to a model.
Parameters🔗
name | type | description | optional | default |
---|---|---|---|---|
sql | str |
the sql query str to execute | ||
param | ParamType |
params to substitute in the query | None |
|
model | Any |
the callable to serialize the model; callable must be able to accept column names as kwargs. | dict |
|
buffered | bool |
whether to buffer reading the results of the query | True |
Example - Serialize to a dataclass🔗
The raw sql query can be executed using the query_async
method and map the results to a list of dataclasses.
import asyncio
import datetime
from dataclasses import dataclass
from pydapper import connect_async
@dataclass
class Task:
id: int
description: str
due_date: datetime.date
owner_id: int
async def main():
async with connect_async() as commands:
data = await commands.query_async("select * from task limit 1", model=Task)
print(data)
# [Task(id=1, description='Set up a test database', due_date=datetime.date(2021, 12, 31), owner_id=1)]
asyncio.run(main())
Example - Serialize a one to one relationship🔗
You can get creative with what you pass in to the model kwarg of query
import asyncio
import datetime
from dataclasses import dataclass
from pydapper import connect_async
@dataclass
class Owner:
id: int
name: str
@dataclass
class Task:
id: int
description: str
due_date: datetime.date
owner: Owner
@classmethod
def from_query_row(cls, id, description, due_date, owner_id, owner_name):
return cls(id, description, due_date, Owner(owner_id, owner_name))
query = """
select t.id, t.description, t.due_date, o.id as owner_id, o.name as owner_name
from task t join owner o on t.owner_id = o.id
limit 1
"""
async def main():
async with connect_async() as commands:
data = await commands.query_async(query, model=Task.from_query_row)
print(data)
"""
[
Task(
id=1,
description="Set up a test database",
due_date=datetime.date(2021, 12, 31),
owner=Owner(id=1, name="Zach Schumacher"),
)
]
"""
asyncio.run(main())
Example - Buffering queries🔗
By default, query
fetches all results and stores them in a list (buffered). By setting buffered=False
, you can
instead have query
act as a generator function, fetching one record from the result set at a time. This may be useful
if querying a large amount of data that would not fit into memory, but note that this keeps both the connection and
cursor open while you're retrieving results.
import asyncio
from pydapper import connect_async
async def main():
async with connect_async() as commands:
data = await commands.query_async("select * from task", buffered=False)
print(type(data))
# <class 'async_generator'>
async for row in data:
print(row)
# {'id': 1, 'description': 'Set up a test database', 'due_date': datetime.date(2021, 12, 31), 'owner_id': 1}
# {'id': 2, 'description': 'Seed the test database', 'due_date': datetime.date(2021, 12, 31), 'owner_id': 1}
# {'id': 3, 'description': 'Run the test suite', 'due_date': datetime.date(2022, 1, 1), 'owner_id': 1}
asyncio.run(main())
Example - Serializing a one-to-many relationship🔗
Using model is nice for simple serialization, but more complex serializations might require more complex logic. In this case, it is recommended to return an unbuffered result and serialize it as you iterate. See the example below:
import asyncio
import typing
from dataclasses import dataclass
from pydapper import connect_async
@dataclass
class Owner:
id: int
name: str
tasks: typing.List["Task"]
@dataclass
class Task:
id: int
description: str
query = """
select task.id as task_id,
owner.id as owner_id,
owner.name as owner_name,
task.description as description
from owner
join task on owner.id = task.owner_id
"""
async def main():
async with connect_async() as commands:
owners = dict()
async for record in await commands.query_async(query, buffered=False):
if (owner_id := record["owner_id"]) not in owners:
owners[owner_id] = Owner(id=owner_id, name=record["owner_name"], tasks=list())
owners[owner_id].tasks.append(Task(id=record["task_id"], description=record["description"]))
print(list(owners.values()))
"""
[
Owner(
id=1,
name='Zach Schumacher',
tasks=[
Task(
id=1,
description='Set up a test database',
),
Task(
id=2,
description='Seed the test database',
),
Task(
id=3,
description='Run the test suite',
),
],
),
]
"""
asyncio.run(main())