`@app.get("/login")` async def show_login_form(req...
# ory-selfhosting
r
@app.get("/login")
async def show_login_form(request: Request): """ Handles GET requests to display the login form. Fetches login flow data from Kratos. """ # Check if the session cookie is present and valid if "ory_kratos_session" in request.cookies: session_response = requests.get( f"{KRATOS_EXTERNAL_API_URL}/sessions/whoami", cookies=request.cookies ) if session_response.status_code == 200 and session_response.json().get("active"): return RedirectResponse(url="/home") else: RedirectResponse(url=f"{KRATOS_EXTERNAL_API_URL}/self-service/login/browser") try: async with httpx.AsyncClient() as client: cookies = request.cookies print(f"i am in start login flow cookie is :{cookies}") print("=====================================================================================") headers = {"Accept": "application/json"} response = await client.get( f"{KRATOS_EXTERNAL_API_URL}/self-service/login/browser", cookies=cookies, headers=headers, ) if response.status_code != 200: raise HTTPException(status_code=response.status_code, detail="Failed to start login flow") print(f"response login: {response.json()}") flow_data = response.json() csrf_token = None # Extract CSRF token for node in flow_data["ui"]["nodes"]: attributes = node["attributes"] if attributes.get("name") == "csrf_token": csrf_token = attributes["value"] break if not csrf_token or not flow_data["id"]: raise HTTPException(status_code=400, detail="CSRF token or Flow ID is missing.") print(f"i am in start login flow csrf_token is :{csrf_token}") print("=====================================================================================") return templates.TemplateResponse( "login.html", { "request": request, "flow_id": flow_data["id"], "csrf_token": csrf_token, }, ) except Exception as e: print(f"Error fetching login flow: {e}") raise HTTPException(status_code=500, detail="Internal server error") @app.post("/login") async def handle_login( request: Request, email: str = Form(...), password: str = Form(...), flow: str = Form(...), csrf_token: str = Form(...), ): """ Handles POST requests for user login. """ try: print(f"i am in update login flow csrf_token is :{csrf_token}") print("=====================================================================================") # Prepare the login data login_data = UpdateLoginFlowBody( actual_instance=UpdateLoginFlowWithPasswordMethod( csrf_token=csrf_token, password=password, method="password", identifier=email, transient_payload={}, ) ) # Convert the login data to a dictionary login_data_dict = login_data.dict() # Convert cookies dictionary to a string cookies_str = "; ".join([f"{key}={value}" for key, value in request.cookies.items()]) print(f"Cookies as string: {cookies_str}") # Send the login request to Kratos async with httpx.AsyncClient() as client: response = await client.post( f"{KRATOS_EXTERNAL_API_URL}/self-service/login?flow={flow}", json=login_data_dict["actual_instance"], headers={ "Accept": "application/json", "Content-Type": "application/json", "Cookie": cookies_str, "X-CSRF-Token": csrf_token, }, ) print(f"Raw Kratos response: {response.json()}") # Handle the response if response.status_code == 200: # Login successful print("Login successful") # Set the session cookie in the response session_cookie = response.cookies.get("ory_kratos_session") if session_cookie: response = RedirectResponse(url="/home", status_code=303) response.set_cookie( key="ory_kratos_session", value=session_cookie, httponly=True, secure=True, # Set to False if not using HTTPS samesite="lax", ) return response else: raise HTTPException(status_code=500, detail="Session cookie not found in Kratos response") elif response.status_code == 400: # Handle form validation errors error_detail = response.json().get("error", {}).get("message", "Login failed") print(f"Login failed: {error_detail}") raise HTTPException(status_code=400, detail=error_detail) elif response.status_code == 410: # Handle expired flow error_detail = response.json().get("error", {}).get("message", "The login flow has expired") print(f"Login flow expired: {error_detail}") raise HTTPException(status_code=410, detail=error_detail) else: # Handle unexpected response types print(f"Unexpected response: {response.json()}") raise HTTPException(status_code=500, detail="Unexpected response from Kratos") except ValueError as ve: print(f"ValueError: {ve}") raise HTTPException(status_code=400, detail=f"Value Error: {ve}") except AttributeError as ae: print(f"AttributeError: {ae}") raise HTTPException(status_code=400, detail=f"Attribute Error: {ae}") except Exception as e: print(f"Error: {e}") raise HTTPException(status_code=400, detail=f"Data serialization error: {e}")