Jay Taylor's notes
back to listing indexPython FastAPI Postgres SqlAlchemy Row Level Security Multitenancy
[web search]Multi-Tenancy in Python, FastAPI and SqlAlchemy using Postgres Row Level Security
Multi-Tenancy is a software architecture in which a shared instance of a software serves multiple customers or tenants. A tenant is a group of users that belong together. For example, Salesforce is a multi-tenant Saas Provider, and every customer who opens an account on Salesforce is a tenant. Most Software as a Service(SaaS) companies choose to architect their applications as multi-tenant to save on costs. After all, it becomes prohibitively expensive to provision a new database/server for every customer.
The challenge with multi-tenancy applications is data isolation. Since all tenants share the same database, we need to ensure that users belonging to one tenant cannot access data belonging to another tenant and vice-versa. There are several approaches to achieving multi-tenancy in cloud-based applications, each with pros and cons. Today we will explore achieving multi-tenancy using Postgres Row Level Security for Python apps.
What is Row Level Security
Row Level Security is an [RBAC](https://en.wikipedia.org/wiki/Role-based_access_control) system in Postgresql. It allows us to define a set of [policies](https://www.postgresql.org/docs/current/ddl-rowsecurity.html) which restrict which rows are accessible by users. It allows individual tenants to have full SQL access to the database while hiding each tenant’s information from other tenants.
Enabling Row Level Security on Python Apps
The first thing we need to enable Row level security is at least two database users. One user creates the tables, run migrations, etc., and another accesses data on behalf of tenants. This is because, in Postgres, the table owner isn’t restricted by security policies by default. Furthermore, superusers and users created with the permission aren't subject to the table's policies. BYPASSRLS
Database changes
So let's start by creating an app called that has a database called toy_app
toy_db
CREATE DATABASE toy_db;
CREATE ROLE admin with ENCRYPTED PASSWORD 'password';
GRANT ALL ON DATABASE toy_db TO admin;
We also need another user that will be used to access the database on behalf of the tenants. Let's call this user . We will grant this user only the permissions it needs to deal with tenant data. This can be created via an alembic migration. tenant_user
def upgrade() -> None:
op.execute(
"""
DO $$
BEGIN
IF NOT EXISTS(SELECT * FROM pg_roles WHERE rolname = 'tenant_user') THEN
CREATE ROLE tenant_user;
GRANT tenant_user TO admin;
GRANT USAGE ON SCHEMA public TO tenant_user;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO tenant_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA pyblic GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO tenant_user;
GRANT USAGE ON ALL SEQUENCES IN SCHEMA public TO tenant_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT USAGE ON SEQUENCES TO tenant_user;
END IF;
END
$$
"""
)
Now that the Postgres roles are available, we need to create a table to store the list of tenants accessing our app. Let us create this in another migration.
def upgrade() -> None:
op.create_table(
"tenants",
sa.Column("id", sa.INTEGER, primary_key=True),
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False)
)
op.execute(
"ALTER TABLE tenants ENABLE ROW LEVEL SECURITY;",
)
op.execute(
"CREATE POLICY tenant_isolation on tenants
USING (id = current_setting('app.current_tenant')::int);",
)
You'll notice that we have two statements after the operation. create_table
ALTER TABLE tenants ENABLE ROW LEVEL SECURITY;
What this does is that it tells Postgres to enable Row Level Security on this table.
CREATE POLICY tenant_isolation on tenants USING (id = current_setting('app.current_tenant'))::int;
Over here, we are creating a Policy telling Postgres to restrict access to this table by id. We will see how to set later. The app.current_tenant
is just a type cast because the ::int
is stored as a string. current_tenant
Now that we have a tenants table, we can create a table that stores some information about our tenants. Let's create a table called toys.
def upgrade() -> None:
op.create_table(
"toys",
sa.Column("id", sa.INTEGER, primary_key=True),
sa.Column("tenant_id", sa.Integer, sa.ForeignKey("tenants.id")),
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False)
)
op.execute(
"ALTER TABLE toys ENABLE ROW LEVEL SECURITY;",
)
op.execute(
"CREATE POLICY tenant_isolation on toys
USING (tenant_id = current_setting('app.current_tenant')::int);",
)
As you can see, we have added a column on tenant_id
. This lets us know which toys belong to which tenant. We have also enabled row-level security on the table and created a policy restricting access by tenant id. toys
Note: You will have to enable row-level security and create a policy on every table to which you want to restrict access.
Now that we have our data model set up, we can see how to implement row-level security for querying data.
Middleware
We restrict access to data using a tenant_id which means every request to our service will need to have the tenant id included as part of its context. Usually, this is included via a subdomain, as part of an authorization header, or even as a request param. For our service, let's assume we get the tenant id via a header called . Every request to our service needs to include this header to set the tenant context. X-Tenant-Id
To set the tenant context in our app, we will use middleware. Since we are using FastAPI, we can create ASGI middleware.
from contextvars import ContextVar
from starlette.datastructures import Headers
from starlette.responses import JSONResponse
from starlette.responses import Response
from starlette.types import ASGIApp
from starlette.types import Receive
from starlette.types import Scope
from starlette.types import Send
global_tenant_id: ContextVar[str] = ContextVar("global_tenant_id", default=None)
class PostgresRLSMiddleware:
def __init__(
self,
app: ASGIApp,
) -> None:
self.app = app
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] not in (
"http",
"websocket",
):
await self.app(scope, receive, send)
return
headers = Headers(scope=scope)
request_tenant_id = headers.get("X-Tenant-Id")
response: Response
if request_tenant_id is None:
response = JSONResponse(
{
"error": "Tenant must be provided",
},
status_code=401,
headers=dict(headers),
)
await response(scope, receive, send)
else:
global_tenant_id.set(request_tenant_id)
await self.app(scope, receive, send)
def get_global_tenant_id():
return global_tenant_id.get()
What this does it it saves the tenant id as a [contextvar](https://docs.python.org/3/library/contextvars.html). In case you haven't used them, contextvars allow you to to save global vars inside a specific runtime context. Using this library, we can create a new running context for every incoming request, and save a value on a global var, which will only be available within that context. This means that we can save the tenant id specific to particular request.
Establishing a database connection
Now that we have the tenant id saved, we need to figure out how to pass it to the data layer so that our queries are restricted by tenant id. Fortunately, we can accomplish this easily by creating a connection using SQLAlchemy.
When creating a connection via SQLAlchemy, we can set tenant context as follows.
from middleware import get_global_tenant_id
async def get_session() -> AsyncSession:
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async with async_session() as session:
try:
tenant_id = int(get_global_tenant_id())
query = text(f"SET app.current_tenant={tenant_id};")
await session.execute(text("SET SESSION ROLE tenant_user;"))
await session.execute(query)
yield session
except:
await session.rollback()
raise
finally:
await session.execute(text("RESET ROLE;"))
await session.commit()
pass
Pay attention to this line. SET app.current_tenant='{tenant_id}'
We are setting the tenant context for this particular request here. This ensures that row level security will only return values associated with that particular tenant.
This line is used because we connected to the database as a superuser role in Postgres. But you can also connect to the DB using this role instead. SET SESSION ROLE tenant_user;
You can now use this method to get a connection and use it to query all data
@app.get("/toys", response_model=list[Toy])
async def get_songs(session: AsyncSession = Depends(get_session)):
result = await session.execute(select(Toy))
toys = result.scalars().all()
return [Toy(name=toy.name, id=toy.id) for toy in toys]
That's it. You now have row level security enabled on your app.
Source code for this project can be found here: https://github.com/theundeadmonk/postgres-rls-fastapi-demo