Initialize a FastAPI apps with Keycloak for authentication.
This guide will use Keycloak on Docker: Keycloak on Docker
Required
- FastAPI: 0.73
- Keycloak: 17
Docs:
Video:
Folow this guide to deploy a kleycloak server Get started with Keycloak
For the next example the server will be available at url: http://192.168.56.36:8080/
create a realm
create a client
confidential
ON
ON
create a user and password
Suppose with use the setup as follow:
keycloak url: http://192.168.56.36:8080/
realm: sassy
client_id: sassy-client
client_secret_key: 8JOKwgz3f62bUxm96ovz5jU7lUf02qei
user: sassy-user
password: password123
This guide used the following libraries
# install FastAPI
pip install fastapi==0.73.0
# install uvicorn with "Cython-based" dependencies and other "optional extras".
pip install "uvicorn[standard]"==0.17.5
# keycloack adapater
pip install python-keycloak==0.27.0
# OAuth2 uses "form data" for sending the username and password.
pip install python-multipart==0.0.5
#
email-validator==1.1.3
main.py
fileimport asyncio
import json
from enum import Enum
from typing import Optional, Union, Dict, Set, List, Any
from fastapi import FastAPI, HTTPException, Depends, status, Query
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from keycloak import KeycloakOpenID
from keycloak.exceptions import KeycloakAuthenticationError, KeycloakGetError
from pydantic import BaseModel
KC_SERVER = "http://192.168.56.36:8080/"
KC_REALMS = "sassy"
KC_CLIENT_ID = "sassy-client"
KC_CLIENT_SECRET_KEY = "8JOKwgz3f62bUxm96ovz5jU7lUf02qei"
class KCAuth:
"""Keycloak class."""
def __init__(self):
self._client: Optional[KeycloakOpenID] = None
self._refresh_token: Optional[str] = None
@property
def client(self):
return self._client
def set_client(
self,
kc_server: Optional[str] = None,
kc_realms: Optional[str] = None,
kc_client_id: Optional[str] = None,
kc_client_secret_key: Optional[str] = None,
):
self._client = KeycloakOpenID(
server_url=kc_server or KC_SERVER,
client_id=kc_client_id or KC_CLIENT_ID,
realm_name=kc_realms or KC_REALMS,
client_secret_key=kc_client_secret_key or KC_CLIENT_SECRET_KEY,
verify=True)
def token(self, username: str, password: str) -> Union[Dict, Set[str]]:
token = self.client.token(username, password)
self._refresh_token = token['refresh_token']
return token
def userinfo(self, token: str) -> Union[Dict, Set[str]]:
return self.client.userinfo(token)
def introspect(self, token: str) -> Union[Dict, Set[str]]:
return self.client.introspect(token)
def refresh_token(self) -> Union[Dict, Set[str]]:
token = self.client.refresh_token(self._refresh_token)
self._refresh_token = token['refresh_token']
return token
def logout(self) -> Union[Dict, Set[str]]:
return self.client.logout(self._refresh_token)
def decode_token(self, token: str) -> Dict[str , Set[str]]:
keycloak_public_key = "-----BEGIN PUBLIC KEY-----\n" + \
self.client.public_key() + \
"\n-----END PUBLIC KEY-----"
options = {
"verify_signature": True,
"verify_aud": True,
"verify_exp": True,
}
return self.client.decode_token(
token=token,
key=keycloak_public_key,
options=options,
)
kc_auth = KCAuth()
tags_metadata = [
{
"name": "authentication",
"description": "Operations with users. The **login** logic "
"is also here.",
},
{
"name": "items",
"description": "users details",
"externalDocs": {
"description": "Items external docs",
"url": "https://fastapi.tiangolo.com/",
},
},
]
description = """
This is the description.
## Title Here
We can add some content here **content**.
something else here
## Other title here
* **login** (_not implemented_).
* **token** (_not implemented_).
"""
app = FastAPI(
title="Title API",
description=description,
version="0.0.1",
terms_of_service="http://example.com/terms/",
contact={
"name": "Deadpoolio the Amazing",
"url": "http://usrl-to-website/contact/",
"email": "dp@x-force.example.com",
},
license_info={
"name": "Apache 2.0",
"url": "https://www.apache.org/licenses/LICENSE-2.0.html",
},
openapi_tags=tags_metadata,
)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="login")
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
active: Optional[bool] = None
roles: List[str]
resource_access: Dict[str, Any]
scope: List[str]
class Token(BaseModel):
access_token: str
token_type: str
class TokenInfo(BaseModel):
token: str
token_info: Union[Dict, Set[str]]
class Item(BaseModel):
name: str
price: float
is_offer: Optional[bool] = None
class ModelName(str, Enum):
alexnet = "alexnet"
resnet = "resnet"
lenet = "lenet"
@app.get("/")
async def read_root(arg1: str, arg2: Optional[str] = None):
return {"data": [
{
"arg1": arg1
},
{
"arg2": arg2
}
]}
@app.post("/login", response_model=Token, tags=["authentication"])
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
try:
kc_auth.set_client(
kc_client_id=form_data.client_id,
kc_client_secret_key=form_data.client_secret
)
token = kc_auth.token(form_data.username, form_data.password)
return Token(
access_token=token['access_token'],
token_type=token['token_type']
)
except (KeycloakGetError, KeycloakAuthenticationError) as exc:
err_msg = json.loads(exc.error_message.decode())
res_code = exc.response_code
raise HTTPException(
status_code=res_code,
detail=f"{err_msg['error_description']}")
@app.post("/logout", response_model=str, tags=["authentication"])
async def logout(token: str = Depends(oauth2_scheme)):
try:
kc_auth.logout()
token_detail = kc_auth.introspect(token)
if not token_detail['active']:
return "Disconnected"
raise HTTPException(
status_code=status.HTTP_304_NOT_MODIFIED,
detail="token is still active")
except Exception as exc:
raise HTTPException(status_code=404, detail=f"{exc}")
@app.post("/refresh_token", response_model=TokenInfo, tags=["authentication"])
async def refresh_token(token: str = Depends(oauth2_scheme)):
try:
kc_auth.refresh_token()
return TokenInfo(
token=token,
token_info=kc_auth.introspect(token),
)
except (KeycloakAuthenticationError, KeycloakGetError) as exc:
err_msg = json.loads(exc.error_message.decode())
res_code = exc.response_code
raise HTTPException(
status_code=res_code,
detail=f"{err_msg['error_description']}")
@app.get("/users/me", response_model=User, tags=["authentication"])
async def read_users_me(token: str = Depends(oauth2_scheme)):
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
userinfo = kc_auth.userinfo(token)
token_info = kc_auth.introspect(token)
user = User(
username=userinfo.get('preferred_username', ""),
email=userinfo.get('email', ""),
full_name=userinfo.get('name', ""),
active=token_info.get('active', ""),
roles=token_info['realm_access'].get('roles', ""),
resource_access=token_info.get('resource_access', ""),
scope=token_info.get('scope', "").split(),
)
return user
except KeycloakAuthenticationError as exc:
err_msg = json.loads(exc.error_message.decode())
res_code = exc.response_code
raise HTTPException(
status_code=res_code,
detail=f"{err_msg['error_description']}")
@app.get("/auth/", response_model=TokenInfo, tags=["authentication"])
async def read_auth(token: str = Depends(oauth2_scheme)):
try:
return TokenInfo(
token=token,
token_info=kc_auth.introspect(token),
)
except Exception as exc:
raise HTTPException(status_code=404, detail=f"{exc}")
# -----------------------------------------------------------
# The following endpoints are for example and can be deletes
# -----------------------------------------------------------
@app.get("/items/{item_id}", tags=["items"])
async def read_item(item_id: int, q: Optional[str] = None):
return {"item_id": item_id, "q": q}
@app.put("/items/{item_id}", response_model=Item, tags=["items"])
async def update_item(item_id: int, item: Item) -> Item:
try:
if item_id == 123:
raise IndexError(f"Unable to deal with {123}")
item.name = "Mr " + item.name + " " + str(item_id)
return item
except Exception as exc:
raise HTTPException(status_code=404, detail=str(exc))
@app.get("/models/{model_name}", tags=["items"])
async def get_model(model_name: ModelName):
return {"model_name": model_name, "message": "Have some residuals"}
@app.get("/files/{file_path:path}")
async def read_file(file_path: str):
return {"file_path": file_path}
@app.get("/list_items/")
async def read_list_items(
q: list = Query(
[],
deprecated=True,
title="THis is the query title",
description="This is the description",
)
):
"""Read list items (docstring)."""
query_items = {"q": q}
return query_items
main
: main.py
app
: FastAPI instance in main.py
--reload
flag in development mode will use watchgod
default port is 8000
uvicorn main:app --reload
Provide a specific port
uvicorn main:app --reload --port 8081