How to Perform Bulk Insert/Update/Upsert Actions with SQLAlchemy ORM

Author:Murphy  |  View: 22338  |  Time: 2025-03-22 21:42:33

In practice, we often need to work with a large number of data records at the same time. When this happens, performance is an important issue. If not handled properly, it will be a bottleneck of your application and reduce efficiency and usability. In this post, we will introduce how to perform bulk insert, update, and upsert actions for large numbers of records with SQLAlchemy ORM. As you will see, with the latest version of SQLAlchemy 2.0, it is much easier to perform bulk actions than with previous versions. However, there are also some caveats that should be kept in mind when performing these bulk actions.

Image by PublicDomainPictures in Pixabay

Preparation

Firstly, start a MySQL server locally with Docker:

# Create a volume to persist the data.
$ docker volume create mysql8-data

# Create the container for MySQL.
$ docker run --name mysql8 -d -e MYSQL_ROOT_PASSWORD=root -e MYSQL_DATABASE=data -p 13306:3306 -v mysql8-data:/var/lib/mysql mysql:8

# Connect to the local MySQL server in Docker.
$ docker exec -it mysql8 mysql -u root -proot

mysql> SELECT VERSION();
+-----------+
| VERSION() |
+-----------+
| 8.4.0   |
+-----------+
1 row in set (0.00 sec)

Secondly, create a virtual environment to install the libraries needed:

conda create --name sqlalchemy2 python=3.12
conda activate sqlalchemy2

pip install -U "sqlalchemy>=2.0,<2.1"
pip install -U "PyMySQL>=1.1,<1.2"
pip install -U "cryptography>=42.0,<42.1"

Thirdly, create Session factory class which is required to work with ORM objects later:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session

db_url = "mysql+pymysql://root:root@localhost:13306/data"
engine = create_engine(db_url, echo=True)

session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)

Finally, we will create an ORM class which we will work with later:

from sqlalchemy import String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class Category(Base):
    __tablename__ = "categories"

    cid: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(100), unique=True)
    active: Mapped[bool] = mapped_column(default=False)

Importantly, the name field is set to be unique, and the active field has a default value, which makes it easier to demonstrate bulk actions later.

If you have been following this series of posts, you would need to drop the existing table first.

Base.metadata.drop_all(engine)

However, be extremely careful with this command if you connect to your own database (which is highly discouraged), rather than one started with Docker shown above, because it will drop all the tables in your database!

Then recreate the table with this command:

Base.metadata.create_all(engine)

If it's your first time to create the table, you can just run the create_all() command.


Regular bulk inserts

We will use the syntax introduced in this post to perform bulk inserts with SQLAlchemy ORM where an Insert construct is passed to Session.execute() with a list of dictionaries to be inserted. In this case, 100,000 rows are inserted at the same time. The number 100,000 is hand-picked so it can show the performance difference vividly but won't let us wait too long a time for slow actions.

import time
from sqlalchemy import insert

start_time = time.time()

with Session() as session:
    session.execute(
        insert(Category),
        [{"name": f"Category - {idx+1}"} for idx in range(100_000)],
    )
    session.commit()

duration = time.time() - start_time
print(f"Bulk insert takes {duration:.2f} seconds.")

This code runs very fast, taking only about 1 second to complete. Since we set echo=True for the engine, the SQL sent to the database is also printed in the console:

sqlalchemy.engine.Engine INSERT INTO categories (name, active) VALUES (%(name)s, %(active)s)
sqlalchemy.engine.Engine [generated in 0.07631s] [{'name': 'Category - 1', 'active': 0}, {'name': 'Category - 2', 'active': 0}, {'name': 'Category - 3', 'active': 0}, {'name': 'Category - 4', 'active': 0}, {'name': 'Category - 5', 'active': 0}, {'name': 'Category - 6', 'active': 0}, {'name': 'Category - 7', 'active': 0}, {'name': 'Category - 8', 'active': 0}  ... displaying 10 of 100000 total bound parameter sets ...  {'name': 'Category - 99999', 'active': 0}, {'name': 'Category - 100000', 'active': 0}]
sqlalchemy.engine.Engine COMMIT
Bulk insert takes 1.19 seconds.

Interesting, the active field is added automatically even though we didn't specify it in the dictionaries. The database default value of this field is used by default.


Bulk insert with None values

In practice, it's common that some fields have None as the value. When this happens for bulk insert, we should be alerted because it will have unexpected results or impact performance significantly.

Let's update the code above and introduce None value for the active field. We will set it to be None when the idx is a multiple of 10. It's a silly example, but it demonstrates the purpose very clearly :):

import time
from sqlalchemy import insert

start_time = time.time()

with Session() as session:
    session.execute(
        insert(Category),
        [
            {"name": f"Category - {idx+1}", "active": idx % 10 == 0 or None}
            for idx in range(100_000)
        ],
    )
    session.commit()

duration = time.time() - start_time
print(f"Bulk insert takes {duration:.2f} seconds.")

Before the code is run, we should delete the records to avoid violating the unique key on the name field. If you want to cid to auto increment from 1 again, you can truncate the table or simply recreate it:

Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)

When the updated INSERT code with None values is executed, you will see a lot of SQL queries emitted, and the time taken is much longer than the previous one:

.....(other logs omitted)
sqlalchemy.engine.Engine INSERT INTO categories (name, active) VALUES (%(name)s, %(active)s)
sqlalchemy.engine.Engine [cached since 346.6s ago] {'name': 'Category - 99991', 'active': 1}
sqlalchemy.engine.Engine INSERT INTO categories (name, active) VALUES (%(name)s, %(active)s)
sqlalchemy.engine.Engine [cached since 854.1s ago] [{'name': 'Category - 99992', 'active': 0}, {'name': 'Category - 99993', 'active': 0}, {'name': 'Category - 99994', 'active': 0}, {'name': 'Category - 99995', 'active': 0}, {'name': 'Category - 99996', 'active': 0}, {'name': 'Category - 99997', 'active': 0}, {'name': 'Category - 99998', 'active': 0}, {'name': 'Category - 99999', 'active': 0}, {'name': 'Category - 100000', 'active': 0}]
sqlalchemy.engine.Engine COMMIT
Bulk insert takes 5.57 seconds.

The log shows that the bulk INSERT is broken into many small pieces and each of them is executed separately. This happens because fields with None values are treated specially by SQLAlchemy.

The rows with None values will be separated from those without and grouped separately as well. They are divided into many small groups in order to keep the original sequence of the rows.

Technically, the fields with None values will be dropped from the rows to be inserted. However, since the active field has a default value in the database, it will be added back with the default value.

The Insert construct has an execution option render_nulls=True which causes all parameter dictionaries to be treated equivalently assuming the same set of keys in each dictionary. However, this may not be desired because the database defaults will not be applied. It's only useful when the default values for the related fields are actually None. If we set this option for our code (be sure to delete existing records first), we will have an error because the active field doesn't allow NULL values.

with Session() as session:
    session.execute(
        insert(Category).execution_options(render_nulls=True),
        [
            {"name": f"Category - {idx+1}", "active": idx % 10 == 0 or None}
            for idx in range(100_000)
        ],
    )
    session.commit()
IntegrityError: (pymysql.err.IntegrityError) (1048, "Column 'active' cannot be null")
[SQL: INSERT INTO categories (name, active) VALUES (%(name)s, %(active)s)]
[parameters: [{'name': 'Category - 1', 'active': 1}, {'name': 'Category - 2', 'active': None}, {'name': 'Category - 3', 'active': None}, {'name': 'Category - 4', 'active': None}, {'name': 'Category - 5', 'active': None}, {'name': 'Category - 6', 'active': None}, {'name': 'Category - 7', 'active': None}, {'name': 'Category - 8', 'active': None}  ... displaying 10 of 100000 total bound parameter sets ...  {'name': 'Category - 99999', 'active': None}, {'name': 'Category - 100000', 'active': None}]]

As you see, the None values are inserted literally as they are, which will be converted to NULL values by the database.

To solve this problem, we can assign the database default values explicitly to the fields with None values and thus avoid assigning None to any field. In this way, there will only be a single query emitted to the database for execution:

with Session() as session:
    session.execute(
        insert(Category),
        [
            {"name": f"Category - {idx+1}", "active": idx % 10 == 0}
            for idx in range(100_000)
        ],
    )
    session.commit()

The performance of this query is the same as that of the first one.

Alternatively, we can separate the rows into two groups, one with None values, and the other one without. This solution is only applicable when the auto-incremented ID is not important, otherwise, you should apply the solution above. You may think of assigning the auto-incremented ID explicitly, however, it's not applicable when your application is run concurrently as others may insert records at the same time and impact the IDs generated.

This solution is slightly slower than the previous one as there are two queries to be executed now, rather than one:

sqlalchemy.engine.Engine BEGIN (implicit)
sqlalchemy.engine.Engine INSERT INTO categories (name, active) VALUES (%(name)s, %(active)s)
sqlalchemy.engine.Engine [cached since 2145s ago] [{'name': 'Category - 2', 'active': 0}, {'name': 'Category - 3', 'active': 0}, {'name': 'Category - 4', 'active': 0}, {'name': 'Category - 5', 'active': 0}, {'name': 'Category - 6', 'active': 0}, {'name': 'Category - 7', 'active': 0}, {'name': 'Category - 8', 'active': 0}, {'name': 'Category - 9', 'active': 0}  ... displaying 10 of 90000 total bound parameter sets ...  {'name': 'Category - 99999', 'active': 0}, {'name': 'Category - 100000', 'active': 0}]
sqlalchemy.engine.Engine INSERT INTO categories (name, active) VALUES (%(name)s, %(active)s)
sqlalchemy.engine.Engine [cached since 454.9s ago] [{'name': 'Category - 1', 'active': 1}, {'name': 'Category - 11', 'active': 1}, {'name': 'Category - 21', 'active': 1}, {'name': 'Category - 31', 'active': 1}, {'name': 'Category - 41', 'active': 1}, {'name': 'Category - 51', 'active': 1}, {'name': 'Category - 61', 'active': 1}, {'name': 'Category - 71', 'active': 1}  ... displaying 10 of 10000 total bound parameter sets ...  {'name': 'Category - 99981', 'active': 1}, {'name': 'Category - 99991', 'active': 1}]
sqlalchemy.engine.Engine COMMIT
Bulk insert takes 1.19 seconds.

Bulk update with primary keys

When it comes to bulk updates, there are two types of actions, one with primary keys provided, and the other one without.

When we simply want to update something, it's natural to provide the primary key, and thus it's also simple to do it with SQLAlchemy ORM. We can just pass an Update construct together with a list of dictionaries to Session.execute(). The dictionaries contain the primary keys and the values to be updated:

from sqlalchemy import update

start_time = time.time()

with Session() as session:
    session.execute(
        update(Category),
        [{"cid": idx + 1, "active": idx % 10 == 0} for idx in range(100_000)],
    )
    session.commit()

duration = time.time() - start_time
print(f"Bulk update takes {duration:.2f} seconds.")

The update action is actually much slower than the insert action, even when the primary key is provided because the records need to be updated one by one:

sqlalchemy.engine.Engine BEGIN (implicit)
sqlalchemy.engine.Engine UPDATE categories SET active=%(active)s WHERE categories.cid = %(categories_cid)s
sqlalchemy.engine.Engine [generated in 0.26559s] [{'active': 1, 'categories_cid': 1}, {'active': 0, 'categories_cid': 2}, {'active': 0, 'categories_cid': 3}, {'active': 0, 'categories_cid': 4}, {'active': 0, 'categories_cid': 5}, {'active': 0, 'categories_cid': 6}, {'active': 0, 'categories_cid': 7}, {'active': 0, 'categories_cid': 8}  ... displaying 10 of 100000 total bound parameter sets ...  {'active': 0, 'categories_cid': 99999}, {'active': 0, 'categories_cid': 100000}]
sqlalchemy.engine.Engine COMMIT
Bulk update takes 10.01 seconds.

On the other hand, when the primary keys are not provided in the records to be updated, we cannot use the syntax above. In this case, we should invoke the statement against the Connection directly using the Session.connection() method:

from sqlalchemy import bindparam, update

start_time = time.time()

with Session() as session:
    session.connection().execute(
        update(Category).where(Category.name == bindparam("var_name")),
        [
            {"var_name": f"Category - {idx+1}", "active": idx % 10 == 0}
            for idx in range(100_000)
        ],
    )
    session.commit()

duration = time.time() - start_time
print(f"Bulk update takes {duration:.2f} seconds.")

The performance is actually similar to the case when primary keys are provided because the name field is indexed:

sqlalchemy.engine.Engine BEGIN (implicit)
sqlalchemy.engine.Engine UPDATE categories SET active=%(active)s WHERE categories.name = %(var_name)s
sqlalchemy.engine.Engine [generated in 0.09577s] [{'active': 1, 'var_name': 'Category - 1'}, {'active': 0, 'var_name': 'Category - 2'}, {'active': 0, 'var_name': 'Category - 3'}, {'active': 0, 'var_name': 'Category - 4'}, {'active': 0, 'var_name': 'Category - 5'}, {'active': 0, 'var_name': 'Category - 6'}, {'active': 0, 'var_name': 'Category - 7'}, {'active': 0, 'var_name': 'Category - 8'}  ... displaying 10 of 100000 total bound parameter sets ...  {'active': 0, 'var_name': 'Category - 99999'}, {'active': 0, 'var_name': 'Category - 100000'}]
sqlalchemy.engine.Engine COMMIT
Bulk update takes 10.35 seconds.

Bulk upserts

Finally, let's check how to perform bulk inserts. As SQLAlchemy does not yet have a backend-agnostic Upsert construct, we need to implement it using dialect-specific constructs. The general steps to perform upsert actions for MySQL are:

  • Import the insert construct from the MySQL dialect.
  • Create an INSERT statement that is the same as with the native backend-agnostic SQLAlchemy syntax.
  • Create an UPDATE statement using on_duplicate_key_update method of the INSERT statement created above.

For more details regarding how to perform upsert in MySQL, please refer to this post.

The code to perform bulk upserts is as follows:

from sqlalchemy.dialects.mysql import insert

start_time = time.time()

insert_stmt = insert(Category).values(
    [
        {"name": f"Category - {idx+1}", "active": idx % 10 != 0}
        for idx in range(100_000)
    ],
)
upsert_stmt = insert_stmt.on_duplicate_key_update(
    active=insert_stmt.inserted.active
)

with Session() as session:
    session.execute(upsert_stmt)
    session.commit()

duration = time.time() - start_time

print(f"Bulk upsert takes {duration:.2f} seconds.")

When the code is run, you will see that all the data to be inserted/updated will be added to the SQL query emitted to the database, and thus results in a super long query to be logged:

sqlalchemy.engine.Engine BEGIN (implicit)
sqlalchemy.engine.Engine INSERT INTO categories (name, active) VALUES (%(name_m0)s, %(active_m0)s), (%(name_m1)s, %(active_m1)s), (%(name_m2)s, %(active_m2)s), (%(name_m3)s, %(active_m3)s), (%(name_m4)s, %(active_m4)s), (%(name_m5)s, %(active_m5)s)......
......
......'name_m99998': 'Category - 99999', 'active_m99998': 1, 'name_m99999': 'Category - 100000', 'active_m99999': 1}
sqlalchemy.engine.Engine COMMIT
Bulk upsert takes 3.82 seconds.

If you try to log this query to a file or the cloud, you may have some issues as a very long string will be logged. However, the performance is not very bad, it's slower than simple insert actions but faster than update actions.


This post covers different scenarios for bulk actions with SQLAlchemy ORM, including bulk inserts, updates, and upserts. Generally, insert action is much faster than updates and upserts. However, special attention should be paid when there are None values for the rows to be inserted. A good solution is to apply database default values in advance and thus avoid having None values in the rows.

On the other hand, if you know there would be no conflicts for the rows to be inserted, use insert rather than upsert. And if the number of rows to be updated is small and can be identified, it would be more efficient to separate upsert into insert and update and execute them separately to maximize performance.

With the knowledge introduced in this post, you will then be able to deal with all types of bulk actions in your project with SQLAlchemy ORM. You may even be able to identify some slow code in your repo and improve it accordingly.


Related posts:

Tags: Data Science Hands On Tutorials MySQL Sqlalchemy Upsert

Comment