8 min read

Transactions with PostgreSQL and MikroORM

One of the most important things to care about as a web developer is the integrity of the data. In this article, we learn what a transaction is and how it can…

June 6, 2022

One of the most important things to care about as a web developer is the integrity of the data. In this article, we learn what a transaction is and how it can help us ensure that our data is correct.

The idea behind transactions#

A transaction is a set of instructions that either happens entirely or doesn’t happen at all. To understand why we might need transactions, let’s use the most common example.

When transferring money from one bank account to another, two steps happen:

  • we withdraw a certain amount of money from the first account,
  • we add the same amount to the second account.

If the whole operation fails completely, that’s something relatively harmless. The worst scenario would be to perform just a part of the above steps. For example, if we withdraw the money from the first account but fail to add it to the second one, we break the integrity of our data. To prevent that, we can bundle multiple steps into a single unit of work, referred to as a transaction.

ACID properties#

A valid transaction can be described using a few properties:

Atomicity#

All of the operations in a transaction are a single unit. Therefore, it either succeeds entirely or fully fails.

Consistency#

The transaction transitions the database from one valid state to another.

Isolation#

Multiple transactions could occur concurrently without the risk of having an invalid state of the database. In our case, another transaction should see the funds in one bank account or the other, but not in both.

Durability#

As soon as we commit the changes from the transaction, they should survive permanently.

Transactions in PostgreSQL#

Fortunately, PostgreSQL gives us the tools to ensure all ACID properties. To create a transaction, we need to group a set of statements with BEGIN and COMMIT.

Previously, we’ve defined a many-to-many relationship between categories and posts. First, let’s create a transaction that deletes a category and all of the posts within it.

BEGIN;
 
--Deleting posts that belong to a given category
DELETE FROM post_entity
  WHERE id IN (
    SELECT post_entity_id FROM post_entity_categories WHERE category_id = 1
  );
 
--Disconnecting posts from categories
DELETE FROM post_entity_categories
  WHERE category_id=1;
 
--Deleting the category
DELETE FROM category
  WHERE id=1;
 
COMMIT;

Thanks to using a transaction, if something goes wrong when deleting a category, PostgreSQL performs a rollback, and thanks to that, the posts are still intact.

We can also perform a rollback manually and abort the current transaction.

BEGIN;
 
DROP TABLE "post_entity_categories";
 
ROLLBACK;

Thanks to using ROLLBACK, the post_entity_categories will never be dropped in the above transaction.

Transactions with MikroORM#

MikroORM implements the unit of work pattern. Thanks to that, it batches queries out of the box.

We’ve learned that we need to flush all of the changes we’ve made to our entities if we want the changes to be reflected in the database.

Flush Modes#

A crucial thing to notice is that MikroORM supports a few flushing strategies.

database.module.ts#
import { Module } from "@nestjs/common"
import { ConfigModule, ConfigService } from "@nestjs/config"
import { MikroOrmModule } from "@mikro-orm/nestjs"
import { FlushMode } from "@mikro-orm/core/enums"
@Module({
  imports: [
    MikroOrmModule.forRootAsync({
      imports: [ConfigModule],
      inject: [ConfigService],
      useFactory: (configService: ConfigService) => ({
        flushMode: FlushMode.ALWAYS, // ...
      }),
    }),
  ],
})
export class DatabaseModule {}

With FlushMode.ALWAYS, MikroORM flushes before every query. Therefore, using it would prevent us from implementing transactions by delaying the flush.

With FlushMode.AUTO, MikroORM sometimes flushes implicitly, which might be a little surprising.

posts.service.ts#
async createPost(post: CreatePostDto, user: User) {
  const postData = {
    ...post,
    author: user,
  };
  const newPost = await this.postRepository.create(postData);
  // creating a new post, but not flushing it yet
  this.postRepository.persist(newPost);
 
  // querying all of the current posts
  const allCurrentPosts = await this.postRepository.findAll();
 
  const isNewPostPersisted = allCurrentPosts.some(post => {
    return post.id === newPost.id;
  })
  console.log(isNewPostPersisted); // true
 
  return newPost;
}

Since we’ve queried all of the posts before flushing the newly created entity, MikroORM automatically flushed our changes for us.

The above behavior can sometimes get in the way of implementing transactions. Because of that, in this article, we use the FlushMode.COMMIT option that aims to delay the flush until the current transaction is committed.

database.module.ts#
import { Module } from "@nestjs/common"
import { ConfigModule, ConfigService } from "@nestjs/config"
import { MikroOrmModule } from "@mikro-orm/nestjs"
import { FlushMode } from "@mikro-orm/core/enums"
@Module({
  imports: [
    MikroOrmModule.forRootAsync({
      imports: [ConfigModule],
      inject: [ConfigService],
      useFactory: (configService: ConfigService) => ({
        flushMode: FlushMode.COMMIT,
        debug: configService.get("SHOULD_DEBUG_SQL"), // ...
      }),
    }),
  ],
})
export class DatabaseModule {}

We also use debug to investigate what queries MikroORM is performing.

Delaying flushing to implement transactions#

Let’s start by making some adjustments to our PostsService:

posts.service.ts#
import { Injectable } from "@nestjs/common"
import { InjectRepository } from "@mikro-orm/nestjs"
import { EntityRepository } from "@mikro-orm/core"
import PostEntity from "./post.entity"
import PostNotFoundException from "./exceptions/postNotFound.exception"
@Injectable()
export class PostsService {
  constructor(
    @InjectRepository(PostEntity)
    private readonly postRepository: EntityRepository<PostEntity>,
  ) {}
  async getPostById(id: number) {
    const post = await this.postRepository.findOne({
      id,
    })
    if (!post) {
      throw new PostNotFoundException(id)
    }
    return post
  }
  async getPostsFromCategory(categoryId: number) {
    return this.postRepository.find({
      categories: {
        id: categoryId,
      },
    })
  }
  async deletePost(id: number, withFlush = true) {
    const post = await this.getPostById(id)
    this.postRepository.remove(post)
    if (withFlush) {
      return this.postRepository.flush()
    }
  } // ...
}

Now, our deletePost accepts an additional withFlush argument. Thanks to doing that, we can avoid flushing if we need to.

Let’s also make changes to our CategoriesService to use the above functionality.

categories.service.ts#
import { Injectable } from "@nestjs/common"
import CategoryNotFoundException from "./exceptions/categoryNotFound.exception"
import { InjectRepository } from "@mikro-orm/nestjs"
import { EntityRepository } from "@mikro-orm/core"
import Category from "./category.entity"
import { PostsService } from "../posts/posts.service"
import { EntityManager } from "@mikro-orm/postgresql"
@Injectable()
export default class CategoriesService {
  constructor(
    @InjectRepository(Category)
    private readonly categoryRepository: EntityRepository<Category>,
    private readonly postsService: PostsService,
    private readonly entityManager: EntityManager,
  ) {}
  async getCategoryById(id: number) {
    const category = await this.categoryRepository.findOne({
      id,
    })
    if (!category) {
      throw new CategoryNotFoundException(id)
    }
    return category
  }
  async deleteCategory(id: number, withFlush = true) {
    const category = await this.getCategoryById(id)
    this.categoryRepository.remove(category)
    if (withFlush) {
      return this.categoryRepository.flush()
    }
  }
  async deleteCategoryWithPosts(categoryId: number) {
    const allPosts = await this.postsService.getPostsFromCategory(categoryId)
    for (const post of allPosts) {
      await this.postsService.deletePost(post.id, false)
    }
    await this.deleteCategory(categoryId)
    return this.entityManager.flush()
  } // ...
}

The crucial part above is the deleteCategoryWithPosts function. It calls the postsService.deletePost(post.id, false) method on every post from the category, marking it for deleting without flushing.

Then, we also mark the category for deleting with the this.deleteCategory method.

A the end of the deleteCategoryWithPosts function, we used entityManager.flush. Thanks to doing that, we removed all the posts and categories that we marked for deleting. If an error occurs at any point of the transaction, MikroORM automatically rolls back all of the changes.

Using categoryRepository.flush() would have the same effect as entityManager.flush and would delete both posts and categories. We can use entityManager.flush to put an emphasis on the fact that we make changes not only to the categories.

Thanks to the fact that we’ve used the debug mode in MikroORM, we can take a look at the logs:

[query] select “p0”.* from “post_entity” as “p0” left join “post_entity_categories” as “p1” on “p0″.”id” = “p1″.”post_entity_id” where “p1″.”category_id” = 6 [took 2 ms]
[query] select “c0”.* from “category” as “c0” where “c0″.”id” = 6 limit 1 [took 1 ms]
[query] begin
[query] delete from “post_entity” where “id” in (36, 37, 38, 39, 40) [took 1 ms]
[query] delete from “category” where “id” in (6) [took 0 ms]
[query] commit

Above, we can see that removing posts and the category was performed in a single transaction.

Creating transactions explicitly#

So far, we’ve been defining transactions implicitly. If we want to be more verbose, we can do that explicitly.

categories.service.ts#
async deleteCategoryWithPosts(categoryId: number) {
  const allPosts = await this.postsService.getPostsFromCategory(categoryId);
  return this.entityManager.transactional(async () => {
    for (const post of allPosts) {
      await this.postsService.deletePost(post.id, false);
    }
    await this.deleteCategory(categoryId);
  });
}

When we use entityManager.transactional, MikroORM runs our callback inside a database transaction and flushes the changes at the end.

If we want to be even more explicit, we can manually begin, commit, and roll back a transaction.

categories.service.ts#
async deleteCategoryWithPosts(categoryId: number) {
  const allPosts = await this.postsService.getPostsFromCategory(categoryId);
 
  const forkedEntityManager = this.entityManager.fork();
  await forkedEntityManager.begin();
 
  try {
    for (const post of allPosts) {
      await this.postsService.deletePost(post.id, false);
    }
    await this.deleteCategory(categoryId);
    await forkedEntityManager.commit();
  } catch (error) {
    forkedEntityManager.rollback();
    throw error;
  }
  return this.entityManager.flush();
}

The above is equivalent to the entityManager.transactional function. It forks the entity manager above to get a fresh entity manager with a new identity map.

Summary#

In this article, we’ve gone through the idea of transactions and implemented them both through SQL and MikroORM. We’ve deleted a category and its post within a transaction. The above allowed us to prevent the posts from being deleted without removing the category. Thanks to doing that, we’ve dealt with the danger of losing the integrity of our database.

Transactions with PostgreSQL and MikroORM | NestJS.io