Skip to main content

Command Palette

Search for a command to run...

Postgres is Your Friend. ORM is Not.

Updated
13 min read
Postgres is Your Friend. ORM is Not.

Postgres is amazing. It’s powerful, efficient, rock-solid, and genuinely one of the finest pieces of software engineering ever created. And it’s also much more than a traditional relational database. It’s a complete, rich ecosystem packed into a single engine.

Beyond the classic CRUD operations everyone expects, Postgres lets you do far more. You can store and query JSONB fields as if they were structured tables, and you can even build efficient indexes on top of them. It blurs the line between relational and semi-structured data in a way that’s both elegant and practical.

CREATE TABLE IF NOT EXISTS videos (
    id           UUID NOT NULL PRIMARY KEY,
    profile_id   UUID NOT NULL REFERENCES profiles (id) ON DELETE CASCADE,
    title        TEXT,
    state        VARCHAR(32) NOT NULL,
    metadata     JSONB NOT NULL,
    created_at   TIMESTAMPTZ DEFAULT now() NOT NULL,
    updated_at   TIMESTAMPTZ DEFAULT now() NOT NULL
);

CREATE INDEX idx_videos_tags_gin
ON videos USING GIN ((metadata->'tags') jsonb_path_ops);
SELECT * FROM videos WHERE metadata->'tags' @> '["personal"]';

You can easily implement a simple yet efficient full-text search using the pg_trgm extension. So you probably don’t need that Elasticsearch instance for your mid-sized project.

CREATE EXTENSION IF NOT EXISTS pg_trgm;

CREATE INDEX IF NOT EXISTS idx_videos_title_trgm
ON videos USING GIN (lower(title) gin_trgm_ops)
WHERE state = 'LIVE' AND title IS NOT NULL;
SELECT * FROM videos
WHERE
    title ILIKE '%' || $1 || '%'
    OR similarity(title, $1) > 0.12;

And you have pg_partman for automated partitioning!

CREATE EXTENSION pg_partman;

CREATE TABLE events (
    id          UUID NOT NULL PRIMARY KEY,
    created_at  TIMESTAMPTZ DEFAULT now() NOT NULL,
    payload     JSONB
);

SELECT partman.create_parent(
    p_parent_table := 'public.events',
    p_control      := 'created_at',
    p_type         := 'native',
    p_interval     := 'monthly'
);

UPDATE partman.part_config
   SET premake = 3,
       retention = '12 months'
 WHERE parent_table = 'public.events';

SELECT partman.run_maintenance();

There are many more features worth your attention: views and materialised views, window functions for efficient sliding aggregations, CTEs for clearer complex queries, etc.

One of the most underrated features in Postgres is the FOR UPDATE SKIP LOCKED mechanism, combined with Postgres’ native LISTEN/NOTIFY pub/sub. Using these two together, you can build robust concurrent processing that scales horizontally, coordinates work safely, and never loses a record.

Here’s how it looks on the Postgres side — it simply emits a notification on each insert to a dedicated channel. Your workers can subscribe to it and react instantly.

CREATE TABLE IF NOT EXISTS tasks (
    id            UUID NOT NULL PRIMARY KEY,
    task_name     VARCHAR(128) NOT NULL,
    payload       JSONB,
    state         VARCHAR(32) NOT NULL,
    created_at    TIMESTAMPTZ DEFAULT now() NOT NULL,
    updated_at    TIMESTAMPTZ DEFAULT now() NOT NULL
);

CREATE OR REPLACE FUNCTION notify_task()
RETURNS TRIGGER AS $$
DECLARE
    payload JSON;
BEGIN
    IF NEW.state = 'PENDING' THEN
        payload = json_build_object(
            'id', NEW.id,
            'task_name', NEW.task_name,
            'created_at', NEW.created_at
        );
        PERFORM pg_notify('tasks', payload::text);
    END IF;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trg_task_notify
AFTER INSERT ON tasks
FOR EACH ROW EXECUTE FUNCTION notify_task();

It fires native pg_notify() on every INSERT for you, keeping your repository implementation clean and single-purposed.

class TaskRepositoryAdapter:

    async def add(self, task: Task, connection: Connection | None = None) -> None:
        query = """
            INSERT INTO tasks (
                id,
                task_name,
                payload,
                state,
                created_at,
                updated_at
            ) VALUES (\(1, \)2, \(3, \)4, \(5, \)6)
        """
        await self.db.execute(
            query=query,
            args=[
                task.id,
                task.task_name,
                dumps(task.payload) if task.payload else None,
                task.state,
                task.created_at,
                task.updated_at,
            ],
            connection=connection,
        )

Here’s a simplified consumer that listens for notifications and runs the corresponding task. In real life there’s more logic, of course (retry strategy, logging, health checks, etc.) but the essence is this:

  • long-lived connection stays open to listen on the notification channel

  • another short-lived connection is used to lock the task row, update its state, and then run the actual work right after

class TaskConsumerAdapter:

    async def listen(self) -> None:
        self._notification_conn = await self.db.pool.acquire()
        await self._notification_conn.add_listener("tasks", self._on_notification)

    async def _on_notification(self, connection: Connection, pid: int, channel: str, payload: str) -> None:
        task_data = loads(payload)
        task_id = UUID(task_data["id"])
        asyncio.create_task(self._take_task(task_id))

    async def _take_task(self, task_id: UUID) -> None:
        async with self.db.pool.acquire() as conn, conn.transaction():
            # locks selected row
            task = await self.task_repo.get_pending(task_id=task_id, connection=conn)
            # marks status while locked
            await self.task_repo.update(
                task.change_state(TaskState.PROCESSING),
                connection=conn
            )

        consumer = self.consumers[task.task_name]
        await self._process_task(consumer, task)

    async def _process_task(self, consumer: TaskConsumer, task: Task) -> None:
        next_state = TaskState.FAILED  # let's start pessimistically
        try:
            await asyncio.wait_for(consumer.process(task=task), timeout=1800)
            next_state = TaskState.DONE
        except asyncio.TimeoutError:
            next_state = TaskState.PENDING  # mark to retry
        finally:
            await self.task_repo.update(task.change_state(next_state))

The entire distribution logic is controlled by just one line:

class TaskRepositoryAdapter:

    async def get_pending(self, task_id: UUID, connection: Connection | None = None) -> Task | None:
        query = """
            SELECT * from tasks
            WHERE id = $1
            AND state = $2
            FOR UPDATE SKIP LOCKED
        """
        rec = await self.db.fetchone(query=query, args=[task_id, TaskState.PENDING], connection=connection)

        return TaskFactories.task_from_record(rec)

And literally means:

  • FOR UPDATE locks the selected rows

  • SKIP LOCKED makes others skip already-locked rows

This way it’s safe to add more consumers later, they’ll simply compete for the next available pending task, naturally distributing work among themselves.

We use this approach in Hypha to register and process video-transcoding tasks. It’s simple and reliable.

Postgres can handle around 10k messages per second, which is more than enough in most cases.

If you need higher throughput or stronger delivery guarantees, that’s when you bring in tools like Kafka, RabbitMQ, or Redis Streams. But for the vast majority of applications, a properly designed App + Postgres combo is more than enough. Using this alone already helps reduce the number of moving parts and the overall infrastructure complexity.

Know Postgres. Use Postgres. Postgres is your friend.

ORM is not your friend

TL;DR: avoid ORMs, you don't need an ORM.

Surprisingly many people don’t see the difference between a Query Builder and an ORM, so let’s clarify.

An ORM is an Object–Relational Mapper. It maps database tables to classes and rows to objects. It lets you use a language-specific DSL instead of SQL, “hiding” SQL “complexity“ behind its own abstractions.

When you fetch a row with asyncpg, you get back a Record — a Python object representing a Postgres row. Is that an ORM? No. An ORM goes much further.

  • it performs bidirectional mapping between relational data and domain objects

  • it tracks object identity

  • it synchronises state changes

  • it flushes updates automatically

  • it maintains internal caches and a unit of work

A lot of complex work happens here. But do we really need all of it? Why? How is it actually helpful?

In practice, all these “features for free” are more like “shooting yourself in the leg for free”. You get N+1 queries, accidental writes on flush, hidden internal caches of loaded entities, and a lot of invisible behaviour you never asked for.

Speaking of hidden SQL complexity, let’s compare. Here is a raw SQL to fetch a list of videos matching a condition:

SELECT id, title
FROM videos
WHERE state = 'PUBLISHED'
  AND metadata->'tags' ? 'python'
  AND created_at >= NOW() - INTERVAL '30 days'
ORDER BY created_at DESC
LIMIT 20 OFFSET 0;

The same query using SQLAlchemy:

tag_filter = func.jsonb_exists(Video.metadata["tags"], "python")

query = (
    session.query(Video.id, Video.title)
    .filter(Video.state == "PUBLISHED")
    .filter(tag_filter)
    .filter(Video.created_at >= func.now() - text("INTERVAL '30 days'"))
    .order_by(Video.created_at.desc())
    .limit(20)
    .offset(0)
)

results = query.all()

Roughly the same query in Django ORM:

now_30_days_ago = timezone.now() - timedelta(days=30)

qs = (
    Video.objects
    .filter(state="PUBLISHED")
    .filter(metadata__tags__contains=["python"])
    .filter(created_at__gte=now_30_days_ago)
    .order_by("-created_at")
    [:20]
)

“Roughly” because Django ORM doesn’t support the JSONB ? operator. And if you need real SQL intervals, Django pushes you towards raw expressions or Func() wrappers. Of course, you have to know the tool really well to pull off tricks like this:

videos = Video.objects.annotate(
    seven_days_ago=Func(
        Value('7 days'),
        function='NOW() - INTERVAL',
    )
).filter(created_at__gt=F('seven_days_ago'))

But that’s more about expressiveness and implementation limitations. The deeper issue lies elsewhere, so let’s take a step back.

What we actually need is simple — persist aggregates and restore them later, right?

That responsibility belongs inside the aggregate’s repository. And yes, that can (and should) be done directly in plain SQL.

SQL is already an excellent DSL for relational data

It might look like this:

@dataclass(kw_only=True)
class Video:
    id: UUID = field(default_factory=uuid7)
    owner_id: HyphaID

    title: str | None = None

    playlist: VideoPlaylist | None = None
    video_tracks: list[VideoPlaylist] = field(default_factory=list)
    audio_tracks: list[AudioPlaylist] = field(default_factory=list)

    state: VideoState

    created_at: DateTime = field(default_factory=now)
    updated_at: DateTime = field(default_factory=now)


class VideoRepositoryAdapter:

    async def add(self, video: Video, connection: Connection | None = None) -> None:
        # handy pattern to implement a 'unit of work'
        if connection:
            await self._add(video, connection=connection)
        else:
            async with self.db.pool.acquire() as conn, conn.transaction():
                await self._add(video, connection=conn)

    async def _add(self, video: Video, connection: Connection) -> None:
        query = """
            INSERT INTO videos (
                id,
                profile_id,
                title,
                state,
                created_at,
                updated_at
            )
            VALUES (
                $1,
                $2,
                $3,
                $4,
                $5,
                $6
            );
        """
        await self.db.execute(
            query=query,
            args=[
                video.id,
                video.profile_id,
                video.title,
                video.state,
                video.created_at,
                video.updated_at,
            ],
            connection=connection,
        )

        # some aggregate attributes are stored in a separate table,
        # but all inserts still happen within the same transaction

        traits_query = """
            INSERT INTO video_traits (
                id,
                video_id,
                trait_type,
                trait_data,
                created_at
            ) VALUES (\(1, \)2, \(3, \)4, $5)
        """

        if video.playlist:
            playlist_trait = video.playlist
            await self.db.execute(
                query=traits_query,
                args=[
                    playlist_trait.id,
                    video.id,
                    VideoTraitType.MASTER_PLAYLIST,
                    playlist_trait.serialize(),
                    video.created_at,
                ],
                connection=connection,
            )

        for video_track_trait in video.video_tracks:
            await self.db.execute(
                query=traits_query,
                args=[
                    video_track_trait.id,
                    video.id,
                    VideoTraitType.STAGING_VIDEO,
                    video_track_trait.serialize(),
                    video.created_at,
                ],
                connection=connection,
            )

        for audio_track_trait in video.audio_tracks:
            await self.db.execute(
                query=traits_query,
                args=[
                    audio_track_trait.id,
                    video.id,
                    VideoTraitType.STAGING_AUDIO,
                    audio_track_trait.serialize(),
                    video.created_at,
                ],
                connection=connection,
            )

    async def get_multi(
        self,
        *,
        owner_id: HyphaID,
        states: list[VideoState] | None = None,
        limit: int = 10,
        offset: int = 0,
        connection: Connection | None = None,
    ) -> list[Video]:
        select_query = """
            SELECT
                v.id,
                p.public_id as owner_id,
                v.title,
                v.state,
                v.created_at,
                v.updated_at
            FROM videos v
        """
        query_args = [owner_id, limit, offset]
        conditions = [f"p.public_id = $1"]

        if states:
            query_args.append(states)
            conditions.append(f"v.state = ANY(${len(query_args)})")

        # all parts together
        select_query = f"""
            {select_query}
            JOIN profiles AS p ON v.profile_id = p.id
            WHERE {" AND ".join(conditions)}
            ORDER BY v.updated_at DESC
            LIMIT \(2 OFFSET \)3;
        """

        recs = await self.db.fetchmany(
            query=select_query,
            args=query_args,
            connection=connection,
        )
        if not recs:
            return []

        # the most practical way to fetch all traits we need,
        # only one extra query for N videos
        traits_query = """
            SELECT
                video_id,
                jsonb_agg(
                    jsonb_build_object(
                        'id', id,
                        'trait_type', trait_type,
                        'trait_data', trait_data
                    )
                ) AS traits
            FROM video_traits
            WHERE video_id = ANY($1)
            GROUP BY video_id;
        """

        trait_recs = await self.db.fetchmany(
            query=traits_query,
            args=[[rec["id"] for rec in recs]],
            connection=connection,
        )

        return list(VideoFactories.videos_from_record(recs, trait_recs))


class VideoFactories:
    @classmethod
    def videos_from_records(cls, recs: list[Record], trait_recs: list[Record]) -> Iterator[Video]:
        for rec in recs:
            video_attrs = {
                "id": UUID(str(rec["id"])),
                "owner_id": HyphaID(rec["owner_id"]),
                "title": rec["title"],
                "state": VideoState(rec["state"]),
                "created_at": pendulum.instance(rec["created_at"]),
                "updated_at": pendulum.instance(rec["updated_at"]),
            }
            # enrich the aggregate with data from traits
            ...
            yield Video(**video_attrs)

Quite a few very important details here.

  1. The aggregate can be persisted in more than 1 DB table

It’s fairly common to use several tables to persist an aggregate for the sake of flexibility and query efficiency. You may notice the video_traits table, which stores “schemaless” objects (parts of the aggregate), in addition to the main videos table, which stores the more surface-level values.

  1. SQL tables don’t necessarily map 1-to-1 to the domain model

In Hypha, we rarely have a one-to-one match between aggregates and tables. Using an ORM would mean translating domain models to ORM models and back again just to reassemble the final shape. If you follow the snippets carefully, you’ll notice this applies to both the underlying tables and the naming conventions. The database schema uses a generic profile_id column name (closer to the actual table name) while the Video aggregate uses owner_id, a more precise term for the specific bounded context.

  1. Raw SQL is not a sin

Might be shocking to some of you, but that’s exactly what you need. Your repository is your aggregate-scoped query builder. There’s nothing wrong with having SQL strings there — it doesn’t break Separation of Concerns and it follows Locality of Behaviour perfectly. It’s safe, because of course we use the driver-native query escaping for arguments.

Yes, you must be careful here with all the commas and quotes. Yes, you must follow a certain self-discipline. But you write it once, test it well, and reuse it everywhere afterward. In return, you get:

  • full control over the resulting SQL, allowing you to use the full power of SQL as a relational data DSL

  • transparency and flexibility in how you build queries

  • scoped, highly-efficient, purpose-built queries

There are two clear concerns experienced developers might raise here, though: how to deal with really big, complex analytical queries, and what to do about composability and reusability?

Surprisingly, the “workarounds” are just as simple and straightforward as the overall solution itself.

When a query starts growing far beyond the physical screen and makes the assembly logic harder to follow, you can template it. Literally move the query out of the method and into a Jinja template. The template can be as rich as you need, with parent CTEs as a base and conditional parameterisation. It’s a practical, pragmatic way to manage extra-long, heavy analytical queries.

Here comes the second one — how to reuse parts of the query? ORMs shine here because of method chaining, letting you extract common parts as a basis for other queries, right?

Even though this is possible (and often encouraged), blindly following the DRY principle can easily lead you to violate another, more important one:

Prefer Locality of Behaviour over DRY

By splitting the query chain into parts, you end up spreading the query composition across the repository (in the best case) or across several modules (in the worst), undermining the main advantage of the Locality of Behaviour principle — keeping behaviour scoped and contained.

If you start noticing repetitive parts of your SQL across several methods and feel the natural urge to refactor them into something reusable — stop right there and don’t. The optimisation might look more concise on the screen, but it won’t be in your head, ultimately making it harder to follow and maintain.

It’s perfectly fine to have repetitions across queries. It may take a bit longer to review and update the affected repositories and services when underlying schemas or base conditions change, but that process is infrequent and very straightforward. Your tests should reveal any gaps and guide you.

In return: each query remains isolated and atomic, with nothing external able to break it or implicitly change its behaviour. You can always review it visually without extra hopping and rebuild the mental context immediately. And that’s what truly matters.

No Alchemy needed when you have real Chemistry.

R

I fully agree that using Postgres’ native capabilities consciously gives a huge advantage in terms of performance when executing code. However, I don’t agree with the statement that ORM is “bad”. You just need to know how to use ORM properly. It’s like saying: why use a jackhammer if it costs electricity, when I can do the same manually with a hammer and a chisel?

Some things that I think are missing in this perspective:

  • maintainability – in a project with dozens of tables, the need to maintain raw SQL (whether as templates or – even worse – strings in Python) is a recipe for a nightmare. Not to mention SQL validation, a syntax checker or at least a formatter. Even the smallest schema change (e.g. column name) requires updating dozens of places in the codebase.

  • migrations and validation – this is another area we have to watch very carefully when a table schema changes, and manually handle additional serialization, eg. we cannot simply pass Python True into a boolean column, instead we must convert it into SQL literal TRUE.

  • exception handling – ORM gives more semantic domain-specific errors instead of generic driver messages like: ERROR: duplicate key value violates unique constraint.

  • consistent domain model – without ORM, we still map records into objects, only that mapping happens… inside the developer’s head.

For me the right approach should rather look like: using ORM as the default tool for CRUD and the domain model, and raw SQL where it gives a real advantage (aggregations, optimization, reporting). But even in the second case, you can create advanced queries in a clean and efficient way, however, you must know and understand the specific ORM at an advanced level.

1
R

Thank you for your comment Rafał Kondziela, your points are absolutely valid, and it also means I didn’t explain my position clearly enough. Let me expand on it.

ORM is not "bad". It’s a widely accepted but deeply wrong idea about how we should deal with a persistence layer.

Maintainability

You mentioned raw SQL being a nightmare in large projects. I’m not sure that's accurate unless we compare the actual maintenance cost.

If anything, certain patterns become far more painful with an ORM because you inevitably end up fighting against its default state tracking and session mechanics. You’re not simplifying your system, you’re wrestling with an abstraction that doesn’t want to let go.

As for schema changes: renaming a column used in dozens of queries usually looks as trivial as:

SELECT u.email AS superid
FROM users AS u

A project-wide search-and-replace like u.email → u.superid does the job, and your tests stay green. If they don’t — great, that’s exactly why tests exist.

Adding a column? You’d revisit queries, models and DTOs anyway. Add missing parts, amend tests or write new ones.

Migrations and validation

This heavily depends on the driver. For example, asyncpg handles booleans and other primitive types correctly out of the box. More complex types (like JSONB) require custom serialisation and that’s perfectly fine:

def orjson_default(obj: Any) -> Any:
    if isinstance(obj, (pendulum.DateTime, pendulum.Date)):
        return obj.isoformat()
    if isinstance(obj, Path | Decimal | UUID):
        return str(obj)
    if isinstance(obj, set):
        return list(obj)
    raise TypeError(f"Type {type(obj)} is not serializable")

def dumps(obj: Any) -> str:
    return orjson.dumps(obj, default=orjson_default).decode()

Exception handling

You can (and should) do exactly the same at the repository level:

async def _add(self, profile: Profile, connection: Connection) -> None:
    query = """
        INSERT INTO profiles (
            id, public_id, email, nickname, state, created_at, updated_at
        ) VALUES ($1, $2, $3, $4, $5, $6, $7)
    """
    try:
        await self.db.execute(query, [
            profile.id,
            profile.public_id,
            profile.email,
            profile.nickname,
            profile.state,
            profile.created_at,
            profile.updated_at,
        ], connection)
    except UniqueViolationError as exc:
        raise ProfileUniqueViolationError from exc

Consistent domain model

When an ORM hides the persistence layer, you lose visibility into how data is actually stored, which makes debugging, analytics, and performance work harder.

With explicit factories, you control the mapping directly. They are the single source of truth for how database records become domain objects. In Hypha’s case, these factories are monadic, meaning they encapsulate failure and error-handling behaviour by design, so we can choose whether to propagate, wrap, or suppress errors explicitly. But it's completely out of scope now, and simplified example might look like this:

class ProfileFactories:
    @classmethod
    def profile_from_record(cls, rec: Record) -> Profile | None:
        try:
            return Profile(
                id=UUID(str(rec["id"])),
                public_id=HyphaID(rec["public_id"]),
                email=rec["email"],
                nickname=rec["nickname"],
                state=ProfileState(rec["state"]),
                created_at=pendulum.instance(rec["created_at"]),
                updated_at=pendulum.instance(rec["updated_at"]),
            )
        except Exception:
            return None

Hypha is a real-world demonstration that this approach actually works. With dozens of repositories and services relying on raw SQL, the codebase remains clean, straightforward, and thoroughly tested. The absence of an ORM didn’t make the system harder. It made it simpler, more predictable, and easier to reason about.

R

Thanks for taking the time to expand on your ideas, Roman Zaiev. Now I see your architectural approach much better.

I think the key difference between our views comes down to what we consider the main responsibility of the persistence layer.

You focus strongly on transparency and control, and for Hypha it clearly works well. I get that.

For me, ORM’s biggest value isn’t query abstraction, but maintaining long-term consistency between the domain model, schema and business rules, without re-implementing that logic manually in every repository or factory.

That’s why I still think ORM is often the more efficient choice, especially in systems that evolve quickly or involve many contributors. It prevents rebuilding a lot of what ORM already provides out of the box (migrations, typing, constraints, semantic exceptions, domain-level validation).

But if the raw SQL + factories approach keeps Hypha simpler and more predictable, that’s awesome. Different contexts, different trade-offs. 👌

Great discussion, appreciate the exchange of perspectives! 👍