How to Use SQLAlchemy to Make Database Requests Asynchronously
Making database requests is a typical IO-bound task as it spends most of the time waiting for the response from a database server. Therefore, if your application makes a lot of database requests, then the performance can be improved dramatically by running them concurrently, which is supported by SQLAchemy, a versatile Python SQL toolkit and Object Relational Mapper.
Besides, async programming is becoming more and more popular in Python, especially with FastAPI for web development, we often need to make database requests in coroutines, namely in functions defined with the async def
statement. Unfortunately, we cannot use the classical synchronous version of SQLAlchemy but need to create Asynchronous versions of engines, connections, and sessions.
In this post, we will introduce how to use SQLAlchemy asynchronously in different scenarios, namely with plain SQL queries, Core, and ORM. Importantly, we will introduce how to use it in multiple async tasks concurrently, which can improve the efficiency of IO-bound applications dramatically if used properly.
Preparation
We will start a MySQL server locally with Docker in which we will create the Database and table for demonstration:
# Create a volume to persist the data.
$ docker volume create mysql8-data
# Create the container for MySQL.
$ docker run --name mysql8 -d -e MYSQL_ROOT_PASSWORD=root -p 13306:3306 -v mysql8-data:/var/lib/mysql mysql:8
# Connect to the local MySQL server in Docker.
$ docker exec -it mysql8 mysql -u root -proot
mysql> SELECT VERSION();
+-----------+
| VERSION() |
+-----------+
| 8.3.0 |
+-----------+
1 row in set (0.00 sec)
CREATE DATABASE sales;
CREATE TABLE `sales`.`customers` (
`id` SMALLINT NOT NULL AUTO_INCREMENT,
`name` VARCHAR(50) NOT NULL,
`job` VARCHAR(50) DEFAULT '',
PRIMARY KEY (`id`),
UNIQUE `UQ_name` (`name`)
);
INSERT INTO sales.customers
(name, job)
VALUES
('Lynn', 'Backend Developer')
;
Then let's create a virtual environment so we can try out the latest versions of Python and the libraries:
conda create -n sql python=3.12
conda activate sql
pip install -U "Sqlalchemy[asyncio]>=2.0,<2.1"
pip install -U "aiomysql>=0.2,<0.3"
pip install -U "cryptography>=42.0,<42.1"
- sqlalchemy[asyncio] – SQLAlchemy is installed together with the greenlet dependency which is a library used by SQLAlchemy to work asynchronously.
- aiomysql – A driver for accessing a MySQL database from the asyncio framework which uses PyMySQL behind the scenes.
- cryptography – Used by SQLAlchemy for authentication.
Execute plain SQL query asynchronously
To run SQL queries asynchronously with SQLAlchemy, we need to first create an async engine with create_async_engine()
. Then we need to use await
when a connection is created, a query is executed, and the engine is disposed of:
import asyncio
from sqlalchemy import text
from sqlalchemy.ext.asyncio import create_async_engine
async def main():
# Create an asynchronous engine.
async_engine = create_async_engine(
"mysql+aiomysql://root:root@localhost:13306/sales"
)
# Insert new data with a transation.
async with async_engine.begin() as conn:
insert_query = text(
"""
INSERT INTO sales.customers
(name, job)
VALUES
(:name, :job)
"""
)
await conn.execute(
insert_query, {"name": "Hans", "job": "Data Engineer"}
)
# Check the data afer it's inserted.
async with async_engine.connect() as conn:
select_query = text(
"""
SELECT * FROM sales.customers
WHERE name = :name
"""
)
result = await conn.execute(select_query, {"name": "Hans"})
print(result.fetchall())
# Close and clean-up pooled connections.
await async_engine.dispose()
asyncio.run(main())
Note that when executing plain SQL queries asynchronously as shown above, we need to pass the variables using a dictionary, rather than using keyword arguments as is normally done with the synchronous version.
When the code above is run, you will see the following result printed:
[(2, 'Hans', 'Data Engineer'))]
Using plain SQL queries is a good choice when you want to start with SQLAlchemy quickly and don't know much about the Core and ORM features yet. However, as you can see above, it's not very Pythonic as it uses free-style plain SQL queries. When you have more experience with SQLAlchemy, you may want to use the Core or ORM features instead.
Use SQLAlchemy Core asynchronously
With SQLAlchemy 2.0, the Core feature, which normally means interacting with a Table
object directly, is quite powerful now. It is actually mingled to a very high extent with the ORM feature. For example, the select
operator can be used for both Core and ORM.
For Core use, we also need to create an async engine and then use it to create a connection asynchronously. The basic workflow is the same as that with plain queries, with the difference being that the statements are constructed with Core operators like insert
and select
.
import asyncio
from sqlalchemy import Column, Integer, insert
from sqlalchemy import MetaData
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy.ext.asyncio import create_async_engine
meta_data = MetaData()
table = Table(
"customers",
meta_data,
Column("id", Integer, primary_key=True),
Column("name", String(50), nullable=False),
Column("job", String(50), default=""),
)
async def main():
# Create an asynchronous engine.
async_engine = create_async_engine(
"mysql+aiomysql://root:root@localhost:13306/sales"
)
# Insert new data with a transation.
async with engine.begin() as conn:
stmt = insert(table).values(name="Jack", job="Frontend Developer")
await conn.execute(stmt)
# Check the data afer it's inserted.
async with engine.connect() as conn:
result = await conn.execute(
select(table).where(table.c.name == "Jack")
)
print(result.fetchall())
# Close and clean-up pooled connections.
await engine.dispose()
asyncio.run(main())
When the code above is run, the following result will be shown:
[(3, 'Jack', 'Frontend Developer')]
Use SQLAlchemy ORM asynchronously
Using the ORM feature of SQLAlchemy ORM is a bit more advanced, especially with version 2.0, where the syntax for creating the ORM class is changed dramatically. Especially, Mapped[]
is used to specify the type and mapped_column()
construct other attributes of a column.
from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
class Base(DeclarativeBase):
pass
class Customer(Base):
__tablename__ = "customers"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), nullable=False, unique=True)
job: Mapped[str | None] = mapped_column(String(50), default="")
To work with ORM asynchronously, we need to create an async session factory with async_sessionmaker()
, which is then used to create an async session instance using with
:
# Create an asynchronous session.
async_session = async_sessionmaker(engine, expire_on_commit=False)
# Create an async session instance.
async with async_session() as session:
...
The complete code for working with ORM asynchronously is as follows:
import asyncio
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
class Base(DeclarativeBase):
pass
class Customer(Base):
__tablename__ = "customers"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), nullable=False, unique=True)
job: Mapped[str | None] = mapped_column(String(50), default="")
async def main():
# Create an asynchronous engine.
engine = create_async_engine(
"mysql+aiomysql://root:root@localhost:13306/sales"
)
# Create an asynchronous session.
async_session = async_sessionmaker(engine, expire_on_commit=False)
# Create an async session instance.
async with async_session() as session:
# Insert new data with a transation.
async with session.begin():
session.add(Customer(name="Stephen", job="Manager"))
# Check the data afer it's inserted.
async with async_session() as session:
result = await session.execute(
select(Customer).where(Customer.name == "Stephen")
)
customer = result.scalars().one()
print(f"name = {customer.name}, job = {customer.job}")
# Close and clean-up pooled connections.
await engine.dispose()
asyncio.run(main())
When the code above is run, the following result will be shown:
name = Stephen, job = Manager
Use SQLAlchemy Core in multiple async tasks
Using SQLAlchemy Core in multiple async tasks concurrently is simple because the connection object can be passed and used in multiple async tasks directly:
import asyncio
from pprint import pprint
from sqlalchemy import Column, Integer
from sqlalchemy import MetaData
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy.ext.asyncio import create_async_engine
meta_data = MetaData()
table = Table(
"customers",
meta_data,
Column("id", Integer, primary_key=True),
Column("name", String(50), nullable=False),
Column("job", String(50), default=""),
)
async def get_customer(name, conn):
result = await conn.execute(select(table).where(table.c.name == name))
return result.fetchone()
async def main():
# Create an asynchronous engine.
engine = create_async_engine(
"mysql+aiomysql://root:root@localhost:13306/sales"
)
names = ["Lynn", "Hans", "Jack", "Stephen"]
tasks = []
# Check the data afer it's inserted.
async with engine.connect() as conn:
for name in names:
tasks.append(get_customer(name, conn))
results = await asyncio.gather(*tasks)
pprint(results)
# Close and clean-up pooled connections.
await engine.dispose()
asyncio.run(main())
When the code above is run, you'll see the following results printed:
[(1, 'Lynn', 'Backend Developer'),
(2, 'Hans', 'Data Engineer'),
(3, 'Jack', 'Frontend Developer'),
(4, 'Stephen', 'Manager')]
Use SQLAlchemy ORM in multiple async tasks
On the other hand, using SQLAlchemy ORM in multiple async tasks is a bit more complex, because the same AsyncSession
instance cannot be used in concurrent tasks directly.
Let's try to use it directly and see what would happen:
import asyncio
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
class Base(DeclarativeBase):
pass
class Customer(Base):
__tablename__ = "customers"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), nullable=False, unique=True)
job: Mapped[str | None] = mapped_column(String(50), default="")
async def get_customer(name, session):
result = await session.execute(
select(Customer).where(Customer.name == name)
)
customer = result.scalars().one()
return {"name": customer.name, "job": customer.job}
async def main():
# Create an asynchronous engine.
engine = create_async_engine(
"mysql+aiomysql://root:root@localhost:13306/sales"
)
# Create an asynchronous session.
async_session = async_sessionmaker(engine, expire_on_commit=False)
names = ["Lynn", "Hans", "Jack", "Stephen"]
tasks = []
# Check the data afer it's inserted.
async with async_session() as session:
for name in names:
tasks.append(get_customer(name, session))
results = await asyncio.gather(*tasks)
print(results)
# Close and clean-up pooled connections.
await engine.dispose()
asyncio.run(main())
When the code above is run, you will see this error:
sqlalchemy.exc.InvalidRequestError: This session is provisioning a new connection; concurrent operations are not permitted
This error means that a single instance of AsyncSession
cannot be shared among multiple concurrent tasks (such as when using a function like asyncio.gather()
). If you want to dive deeper into this topic, you can check this reference.
An easy and feasible solution for this issue is to create an instance of AsyncSession
in each task. We will refactor the code to create the engine
and async_session_factory
globally, and then call async_session_factory()
in each task to create an independent session:
import asyncio
from pprint import pprint
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
# Create an asynchronous engine.
engine = create_async_engine(
"mysql+aiomysql://root:root@localhost:13306/sales"
)
# Create an asynchronous session.
async_session_factory = async_sessionmaker(engine, expire_on_commit=False)
class Base(DeclarativeBase):
pass
class Customer(Base):
__tablename__ = "customers"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), nullable=False, unique=True)
job: Mapped[str | None] = mapped_column(String(50), default="")
async def get_customer(name):
# Create an async session instance.
async with async_session_factory() as session:
result = await session.execute(
select(Customer).where(Customer.name == name)
)
customer = result.scalars().one()
return {"name": customer.name, "job": customer.job}
async def main():
names = ["Lynn", "Hans", "Jack", "Stephen"]
tasks = []
# Check the data afer it's inserted.
for name in names:
tasks.append(get_customer(name))
results = await asyncio.gather(*tasks)
pprint(results)
# Close and clean-up pooled connections.
await engine.dispose()
asyncio.run(main())
And when the code is run, you will the results printed as follows:
[{'job': 'Backend Developer', 'name': 'Lynn'},
{'job': 'Data Engineer', 'name': 'Hans'},
{'job': 'Frontend Developer', 'name': 'Jack'},
{'job': 'Manager', 'name': 'Stephen'}]
Like HTTP requests, database requests are also IO-bound tasks as they spend most of the time waiting for the response from a database server. Therefore, we can boost the efficiency of an application dramatically by making database requests concurrently rather than sequentially.
On the other hand, making database requests asynchronously is also becoming more important because async programming is becoming more and more popular in Python, especially with FastAPI for web development, which also highlights the necessity to learn this topic.
In this post, we have introduced how to use SQLAlchemy asynchronously in different scenarios, namely with plain SQL queries, Core, and ORM. You can adapt the code simply to fit your specific usage. We especially introduced how to use SQLAlchemy in multiple async tasks concurrently, which can improve the efficiency of your application if it makes a lot of database requests concurrently.