Skip to content

Real-time updates from postgres using notify listen and SSE

Posted on:January 2, 2024 at 04:50 PM

Getting live updates from an API doesn’t have to be complicated! In this post, I will describe how to set up FastAPI and Postgres to accomplish exactly this. We will be using server-sent events and postgres notify/listen, no websockets or fancy message queues needed! (Of course they have their place, but not adding additional complexity where it isn’t needed is never a bad thing).

As always, the full example code can be found on my GitHub.

Table of contents

Open Table of contents

Postgres notify/listen

Postgres is a powerful, open-source relational database with many features, one of which is the built-in notify/listen mechanism. NOTIFY allows you to publish messages to a channel, and as the name implies, LISTEN allows you to listen to a channel.

This by itself is not quite a full-fledged message queue or pubsub system, as messages are only delivered to currently listening clients, and there are no retries or message persistence out of the box (although those can of course be emulated by using the underlying database).

However, combined with a trigger function that NOTIFYs when relevant data is updated, this is enough to get us the real-time updates we want!

Database setup

Lets say we have a table called items, which have just an id and a message, like this:

CREATE TABLE IF NOT EXISTS item (
    id SERIAL PRIMARY KEY,
    message TEXT NOT NULL
)

We can then set up a trigger function that sends a NOTIFY whenever it is called. Trigger functions in postgres are functions that have access to some special variables like NEW which, when the function is triggered by a row insertion or update, contains the data of the new row. This function sends a NOTIFY to the ‘item_updates’ channel containing a json representation of the new row:

CREATE OR REPLACE FUNCTION item_notify()
    RETURNS trigger AS
$$
BEGIN
    PERFORM pg_notify('item_updates', to_json(NEW)::text);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

Lastly, we set up the trigger to call the item_notify() function whenever a row in the item table is updated or inserted:

CREATE OR REPLACE TRIGGER item_update
    AFTER INSERT OR UPDATE ON item
    FOR EACH ROW
EXECUTE PROCEDURE item_notify();

With this set up, we have a table of items, and a message will be sent to the ‘item_updates’ channel each time an item gets inserted or updated.

Application code setup

On the backend, we need to listen to the channel where messages are published. To do this, we need to connect to our database using psycopg and write a little wrapper class to use as a FastAPI dependency:

class PostgresListener:
    def __init__(self, conn: ACT):
        self.conn = conn
        self.listen_task = None
        self.listeners = []

    async def start_listen_task(self):
        async def listen_task(conn: ACT) -> None:
            async for notify in self.conn.notifies():
                for queue in self.listeners:
                    await queue.put(notify)

        self.listen_task = asyncio.create_task(listen_task(self.conn))

    def listen(self):
        queue = asyncio.Queue()
        self.listeners.append(queue)
        return queue

    def close(self, queue):
        self.listeners.remove(queue)

Since psycopg’s notifies() only supports one listener per connection, we need to fan out to different queues manually. We are using asyncio wherever possible to prevent blocking while waiting for new notifications.

The listen task is where the notifications from the database are received, and then distributed to the different listener queues for the clients to process them at their convenience.

The listen() and close() methods can be used by client code to start listening and hanging up when they are done.

To actually use the listener class, we need to instantiate one in a helper / dependency injection function. We don’t want to have multiple database connections listening to the same update channel or recreate the listener for every API call, so we use a singleton instance of the listener class.

_notify_conn = None
_postgres_listener = None


async def get_postgres_listener() -> PostgresListener:
    global _notify_conn
    global _postgres_listener

    if _notify_conn is None:
        _notify_conn = await psycopg.AsyncConnection.connect(
            db.connection_string, autocommit=True
        )
        await _notify_conn.execute(f"LISTEN item_updates;")

    if _postgres_listener is None:
        _postgres_listener = PostgresListener(_notify_conn)
        await _postgres_listener.start_listen_task()

    return _postgres_listener

Server-sent events

Conceptually, server-sent events can be sort of thought of as a unidirectional websocket, allowing only messages from the server to the client. Under the hood they are essentially just a http request that stays open for a long time, and data is streamed through in a specific format, with events delineated by two newlines \n\n. This is also the main advantage of SSE over websockets, besides being less complex. They are just a http request, and thus can be used pretty much everywhere you can use those.

FastAPI setup

Our FastAPI code for this is simple, there are just two endpoints.

The first one is for inserting new items into the database, this just dumps the request body into the message field. In a real FastAPI project I’d recommend using sqlalchemy, but for the sake of simplicity we’ll just use raw SQL here:

@app.post("/")
async def add_item(request: Request,
                   db: Annotated[CT, Depends(get_db)]):
    message = await request.body()
    message = message.decode()

    db.execute('''
    INSERT INTO item (message)
    VALUES (%s)
    ''', [message])
    db.commit()

The second endpoint will be used to listen to updates, and is slightly more complicated:


@app.get("/updates")
async def get_updates(req: Request,
                      postgres_listener: Annotated[
                          PostgresListener,
                          Depends(get_postgres_listener)
                      ]):
    async def sse_wrapper():
        queue = postgres_listener.listen()

        try:
            while notify := await queue.get():
                msg = notify.payload

                if await req.is_disconnected():
                    break

                yield {
                    'data': msg
                }

        except asyncio.CancelledError as e:
            logger.info(
                f"Disconnected from client (via refresh/close) {req.client}"
            )
            raise e

        finally:
            postgres_listener.close(queue)

    return EventSourceResponse(sse_wrapper())

This endpoint uses starlette_sse to send events to the client. First, it creates a wrapper function that grabs a queue from the postgres listener we built before, and asynchronously waits for messages to show up. When messages show up, they are yielded to the EventSourceResponse which handles the actual communication for us. If anything goes wrong or the client disconnects, the finally block will take care of closing the queue for us.

And with that, we are ready to go! We should now be able to insert items into the database, and get notified when it happens. Let’s start the server, open two terminals and give it a try.

To listen:

curl -N http://localhost:8000/updates

To insert an item:

curl -X POST http://localhost:8000/ -d "Hoi"

As soon as you insert an item, you should see it pop up in the other window!

data: {"id":1,"message":"Hoi"}

I hope this was useful! As always, the full example code can be found on my GitHub if you want to try running it yourself.