"""Command to rotate encryption key for encrypted parameters.
Two-step process for secure key rotation:
Step 1: Generate new key and backup old one
python manage.py dap_rotate_key
- Backs up current key to dap_backup_key.json
- Generates and displays new key
- Shows next command to run
Step 2: Apply rotation with old key
python manage.py dap_rotate_key --old-key <key>
- Decrypts with old key (from parameter)
- Re-encrypts with new key (from settings)
Arguments:
--old-key: Old encryption key for decryption. When provided, performs step 2.
--backup-file: Path to backup file (default: dap_backup_key.json at project root)
"""
from __future__ import annotations
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any
from django.core.exceptions import ImproperlyConfigured
from django.core.management.base import BaseCommand, CommandParser
from django_app_parameter.models import Parameter
from django_app_parameter.utils import decrypt_value, encrypt_value, get_setting
if TYPE_CHECKING:
from typing_extensions import TypedDict
class KeyBackupEntry(TypedDict):
"""Structure for a single key backup entry."""
timestamp: str
key: str
parameters_count: int
class BackupData(TypedDict):
"""Structure for the backup file."""
keys: list[KeyBackupEntry]
try:
from cryptography.fernet import Fernet, InvalidToken
except ImportError:
Fernet = None # type: ignore[assignment, misc]
InvalidToken = None # type: ignore[assignment, misc]
HAS_CRYPTOGRAPHY = Fernet is not None
logger = logging.getLogger(__name__)
[docs]
class Command(BaseCommand):
help = "Rotate encryption key for encrypted parameters (two-step process)"
[docs]
def add_arguments(self, parser: CommandParser) -> None:
parser.add_argument(
"--old-key",
type=str,
help="Old encryption key for decryption (triggers step 2: apply rotation)",
)
parser.add_argument(
"--backup-file",
type=str,
help="Path to backup file (default: dap_backup_key.json at project root)",
)
[docs]
def handle(self, *args: Any, **options: Any) -> None:
# Check if cryptography is available
if not HAS_CRYPTOGRAPHY:
raise ImproperlyConfigured(
"Encryption requires the 'cryptography' package. "
"Install it with: pip install django-app-parameter[cryptography]"
)
# Determine backup file location
backup_file_path = self._get_backup_file_path(options.get("backup_file"))
if options.get("old_key"):
# Step 2: Apply rotation
self._apply_rotation(options["old_key"], backup_file_path)
else:
# Step 1: Generate new key and backup
self._generate_and_backup(backup_file_path)
def _get_backup_file_path(self, custom_path: str | None) -> Path:
"""Get the backup file path from settings or parameter."""
if custom_path:
return Path(custom_path)
# Try to get from settings
backup_path = get_setting("encryption_key_backup_file")
if backup_path:
return Path(backup_path)
# Default to project root
return Path("dap_backup_key.json")
def _generate_and_backup(self, backup_file: Path) -> None:
"""Step 1: Generate new key and backup the old one."""
self.stdout.write(
self.style.HTTP_INFO("=== Step 1: Generate new key and backup ===\n")
)
# Get current key from settings
try:
current_key_str = get_setting("encryption_key")
if not current_key_str:
self.stdout.write(
self.style.ERROR(
"No encryption key configured in settings. Nothing to rotate."
)
)
return
except Exception as e:
self.stdout.write(
self.style.ERROR(f"Failed to get current encryption key: {e}")
)
return
# Check if there are encrypted parameters
encrypted_count = Parameter.objects.filter(enable_cypher=True).count()
self.stdout.write(f"Found {encrypted_count} encrypted parameters\n")
# Generate new key
new_key = Fernet.generate_key() # type: ignore[misc]
new_key_str = new_key.decode("utf-8")
# Backup old key with timestamp
timestamp = datetime.now().isoformat()
# Load existing backup file or create new structure
backup_data: dict[str, Any]
if backup_file.exists():
with backup_file.open("r") as f:
backup_data = json.load(f)
else:
backup_data = {"keys": []}
# Add current key to backup history
backup_data["keys"].append(
{
"timestamp": timestamp,
"key": current_key_str,
"parameters_count": encrypted_count,
}
)
# Write backup file
backup_file.parent.mkdir(parents=True, exist_ok=True)
with backup_file.open("w") as f:
json.dump(backup_data, f, indent=4)
self.stdout.write(self.style.SUCCESS(f"✓ Backed up old key to: {backup_file}"))
self.stdout.write(self.style.SUCCESS("✓ Generated new encryption key\n"))
# Display new key
self.stdout.write(self.style.HTTP_INFO("NEW ENCRYPTION KEY:"))
self.stdout.write(self.style.WARNING(f"{new_key_str}\n"))
# Display instructions
self.stdout.write(self.style.HTTP_INFO("NEXT STEPS:"))
self.stdout.write(
"1. Update your settings with the new key:\n"
f" DJANGO_APP_PARAMETER = {{'encryption_key': '{new_key_str}'}}\n"
)
self.stdout.write("2. Restart your application to use the new key\n")
self.stdout.write(
"3. Once settings are updated, run:\n"
f" python manage.py dap_rotate_key --old-key {current_key_str}\n"
)
def _apply_rotation(self, old_key_str: str, backup_file: Path) -> None:
"""Step 2: Re-encrypt parameters with new key from settings."""
self.stdout.write(self.style.HTTP_INFO("=== Step 2: Apply rotation ===\n"))
# Validate old key
try:
old_key = old_key_str.encode("utf-8")
Fernet(old_key) # type: ignore[misc] # Validate Fernet key format
except Exception as e:
self.stdout.write(self.style.ERROR(f"Invalid old key provided: {e}"))
return
# Get new key from settings
try:
new_key_str = get_setting("encryption_key")
if not new_key_str:
self.stdout.write(
self.style.ERROR(
"No encryption key configured in settings. "
"Please update settings first."
)
)
return
new_key = new_key_str.encode("utf-8")
Fernet(new_key) # type: ignore[misc] # Validate
except Exception as e:
self.stdout.write(
self.style.ERROR(f"Invalid encryption key in settings: {e}")
)
return
# Check if old and new keys are the same
if old_key_str == new_key_str:
self.stdout.write(
self.style.ERROR(
"Old key and new key are identical. "
"Please update settings with the new key first."
)
)
return
# Get encrypted parameters
encrypted_params = Parameter.objects.filter(enable_cypher=True)
count = encrypted_params.count()
if count == 0:
self.stdout.write(
self.style.WARNING("No encrypted parameters found. Nothing to do.")
)
return
self.stdout.write(f"Processing {count} encrypted parameters...\n")
# Re-encrypt each parameter using helpers with explicit keys
success_count = 0
failed_params: list[str] = []
for param in encrypted_params:
try:
# Decrypt with old key (passing key explicitly)
decrypted_value = decrypt_value(param.value, encryption_key=old_key_str)
# Re-encrypt with new key (passing key explicitly)
param.value = encrypt_value(decrypted_value, encryption_key=new_key_str)
param.save()
success_count += 1
logger.debug("Re-encrypted parameter: %s", param.slug)
except InvalidToken: # type: ignore[misc]
failed_params.append(f"{param.slug} (failed to decrypt with old key)")
logger.error("Failed to decrypt parameter %s with old key", param.slug)
except Exception as e:
failed_params.append(f"{param.slug} ({e})")
logger.error("Failed to re-encrypt parameter %s: %s", param.slug, e)
# Display results
if success_count > 0:
self.stdout.write(
self.style.SUCCESS(
f"\n✓ Successfully re-encrypted {success_count}/{count} parameters"
)
)
if failed_params:
self.stdout.write(
self.style.ERROR(
f"\n✗ Failed to re-encrypt {len(failed_params)} parameters:"
)
)
for failed in failed_params:
self.stdout.write(f" - {failed}")
self.stdout.write(
self.style.WARNING(f"\nCheck backup file for recovery: {backup_file}")
)
else:
self.stdout.write(
self.style.SUCCESS("\n✓ Rotation completed successfully!")
)
self.stdout.write(f"Backup file available at: {backup_file}")