Skip to content

query_async

query_async can execute a query and serialize the results to a model.

Parameters🔗

name type description optional default
sql str the sql query str to execute 👎
param ParamType params to substitute in the query 👍 None
model Any the callable to serialize the model; callable must be able to accept column names as kwargs. 👍 dict
buffered bool whether to buffer reading the results of the query 👍 True

Example - Serialize to a dataclass🔗

The raw sql query can be executed using the query_async method and map the results to a list of dataclasses.

import asyncio
import datetime
from dataclasses import dataclass

from pydapper import connect_async


@dataclass
class Task:
    id: int
    description: str
    due_date: datetime.date
    owner_id: int


async def main():
    async with connect_async() as commands:
        data = await commands.query_async("select * from task limit 1", model=Task)

    print(data)
    # [Task(id=1, description='Set up a test database', due_date=datetime.date(2021, 12, 31), owner_id=1)]


asyncio.run(main())
(This script is complete, it should run "as is")

Example - Serialize a one to one relationship🔗

You can get creative with what you pass in to the model kwarg of query

import asyncio
import datetime
from dataclasses import dataclass

from pydapper import connect_async


@dataclass
class Owner:
    id: int
    name: str


@dataclass
class Task:
    id: int
    description: str
    due_date: datetime.date
    owner: Owner

    @classmethod
    def from_query_row(cls, id, description, due_date, owner_id, owner_name):
        return cls(id, description, due_date, Owner(owner_id, owner_name))


query = """
select t.id, t.description, t.due_date, o.id as owner_id, o.name as owner_name
  from task t join owner o on t.owner_id = o.id
  limit 1
"""


async def main():
    async with connect_async() as commands:
        data = await commands.query_async(query, model=Task.from_query_row)

    print(data)
    """
    [
        Task(
            id=1,
            description="Set up a test database",
            due_date=datetime.date(2021, 12, 31),
            owner=Owner(id=1, name="Zach Schumacher"),
        )
    ]
    """


asyncio.run(main())
(This script is complete, it should run "as is")

Example - Buffering queries🔗

By default, query fetches all results and stores them in a list (buffered). By setting buffered=False, you can instead have query act as a generator function, fetching one record from the result set at a time. This may be useful if querying a large amount of data that would not fit into memory, but note that this keeps both the connection and cursor open while you're retrieving results.

import asyncio

from pydapper import connect_async


async def main():
    async with connect_async() as commands:
        data = await commands.query_async("select * from task", buffered=False)

        print(type(data))
        # <class 'async_generator'>

        async for row in data:
            print(row)

        # {'id': 1, 'description': 'Set up a test database', 'due_date': datetime.date(2021, 12, 31), 'owner_id': 1}
        # {'id': 2, 'description': 'Seed the test database', 'due_date': datetime.date(2021, 12, 31), 'owner_id': 1}
        # {'id': 3, 'description': 'Run the test suite', 'due_date': datetime.date(2022, 1, 1), 'owner_id': 1}


asyncio.run(main())
(This script is complete, it should run "as is")

Example - Serializing a one-to-many relationship🔗

Using model is nice for simple serialization, but more complex serializations might require more complex logic. In this case, it is recommended to return an unbuffered result and serialize it as you iterate. See the example below:

import asyncio
import typing
from dataclasses import dataclass

from pydapper import connect_async


@dataclass
class Owner:
    id: int
    name: str
    tasks: typing.List["Task"]


@dataclass
class Task:
    id: int
    description: str


query = """
select task.id as task_id,
       owner.id as owner_id,
       owner.name as owner_name,
       task.description as description
  from owner
  join task on owner.id = task.owner_id 
"""


async def main():
    async with connect_async() as commands:
        owners = dict()
        async for record in await commands.query_async(query, buffered=False):
            if (owner_id := record["owner_id"]) not in owners:
                owners[owner_id] = Owner(id=owner_id, name=record["owner_name"], tasks=list())
            owners[owner_id].tasks.append(Task(id=record["task_id"], description=record["description"]))

    print(list(owners.values()))
    """
    [
        Owner(
            id=1,
            name='Zach Schumacher',
            tasks=[
                Task(
                    id=1,
                    description='Set up a test database',
                ),
                Task(
                    id=2,
                    description='Seed the test database',
                ),
                Task(
                    id=3,
                    description='Run the test suite',
                ),
            ],
        ),
    ]
    """


asyncio.run(main())
(This script is complete, it should run "as is")