Skip to content

Squawk Development Guide

Table of Contents

  1. Development Environment Setup
  2. Project Structure
  3. Development Workflow
  4. Testing Framework
  5. Code Quality Standards
  6. Database Development
  7. API Development
  8. Frontend Development
  9. Debugging and Troubleshooting
  10. Performance Optimization
  11. Security Considerations
  12. Release Process

Development Environment Setup

Prerequisites

# System requirements
- Python 3.8 or higher
- Git 2.20+
- SQLite3 (for development)
- Docker 20.10+ with Docker Compose v2 (recommended)
- Ubuntu 22.04 LTS (recommended/tested for Docker builds; other platforms may work)
- Node.js 16+ (for frontend tooling)

# For building from source (optional)
- build-essential
- libxml2-dev, libxslt1-dev (for SAML)
- libldap-dev, libsasl2-dev (for LDAP)
- pkg-config
# Clone the repository
git clone https://github.com/penguintechinc/squawk.git
cd Squawk

# Build and start all services
docker-compose up -d --build

# View logs
docker-compose logs -f dns-server

# Run tests in Docker
docker-compose exec dns-server python3 -m pytest tests/

Building Docker Images

# Build DNS Server (Ubuntu 22.04 based)
cd dns-server
docker build -t squawk-dns-server:dev .

# Build DNS Client (Ubuntu 22.04 based)
cd ../dns-client
docker build -t squawk-dns-client:dev .

# Build with specific features
docker build --build-arg ENABLE_ENTERPRISE=true -t squawk-dns-server:enterprise .

Manual Development Setup

# Install system dependencies (Ubuntu/Debian)
sudo apt-get update
sudo apt-get install -y \
  python3-dev python3-pip python3-venv \
  build-essential pkg-config \
  libxml2-dev libxslt1-dev \
  libldap-dev libsasl2-dev

# Create virtual environments
cd dns-server
python3 -m venv venv
source venv/bin/activate
pip install --upgrade pip wheel setuptools

# Install with fallback for enterprise features
pip install -r requirements.txt || \
  (echo "Enterprise features failed, using base requirements" && \
   pip install -r requirements-base.txt)

pip install -r requirements-dev.txt

cd ../dns-client  
python3 -m venv venv
source venv/bin/activate
pip install --upgrade pip wheel setuptools
pip install -r requirements.txt
pip install -r requirements-dev.txt

IDE Configuration

VS Code Setup

// .vscode/settings.json
{
    "python.defaultInterpreterPath": "./dns-server/venv/bin/python",
    "python.linting.enabled": true,
    "python.linting.flake8Enabled": true,
    "python.linting.mypyEnabled": true,
    "python.formatting.provider": "black",
    "python.formatting.blackArgs": ["--line-length", "88"],
    "python.testing.pytestEnabled": true,
    "python.testing.pytestArgs": ["tests/"],
    "files.exclude": {
        "**/__pycache__": true,
        "**/*.pyc": true,
        "**/venv": true,
        "**/.pytest_cache": true
    }
}

PyCharm Setup

# PyCharm configuration
- Interpreter: Project venv Python
- Code style: Black (88 char line length)
- Test runner: pytest
- Type checker: mypy
- Linter: flake8

Environment Variables

# .env.development
export SQUAWK_ENV=development
export SQUAWK_DEBUG=true
export SQUAWK_PORT=8080
export SQUAWK_CONSOLE_PORT=8000
export DB_URL=sqlite:///dev.db
export LOG_LEVEL=DEBUG
export USE_NEW_AUTH=true

# Load environment
source .env.development

Project Structure

Directory Layout

Squawk/
├── dns-server/                 # DNS server component
│   ├── bins/                  # Executable scripts
│   │   └── server.py         # Main DNS server
│   ├── libs/                 # Shared libraries
│   │   ├── __init__.py
│   │   ├── dns_utils.py      # DNS utility functions
│   │   ├── auth.py           # Authentication helpers
│   │   └── validators.py     # Input validation
│   ├── tests/                # Server tests
│   │   ├── unit/             # Unit tests
│   │   ├── integration/      # Integration tests
│   │   ├── security/         # Security tests
│   │   └── fixtures/         # Test data
│   ├── web/                  # Py4web applications
│   │   └── apps/
│   │       └── dns_console/  # Web console app
│   ├── requirements.txt      # Production dependencies
│   ├── requirements-dev.txt  # Development dependencies
│   └── venv/                # Virtual environment
├── dns-client/               # DNS client component
│   ├── bins/                # Client executables
│   │   ├── client.py        # Main client
│   │   └── k8s-client.py    # Kubernetes client
│   ├── libs/                # Client libraries
│   │   ├── __init__.py
│   │   ├── client.py        # DoH client library
│   │   ├── forwarder.py     # DNS forwarder
│   │   └── config.py        # Configuration management
│   ├── tests/               # Client tests
│   └── venv/               # Virtual environment
├── docs/                    # Documentation
│   ├── USAGE.md            # Usage guide
│   ├── CONTRIBUTING.md     # Contribution guide
│   ├── ARCHITECTURE.md     # Architecture docs
│   ├── DEVELOPMENT.md      # This file
│   └── API.md              # API documentation
├── scripts/                # Build and deployment scripts
│   ├── setup-dev.sh        # Development setup
│   ├── run-tests.sh        # Test runner
│   ├── build.sh           # Build script
│   └── deploy.sh          # Deployment script
├── docker/                 # Docker configurations
│   ├── Dockerfile.server   # Server container
│   ├── Dockerfile.client   # Client container
│   └── docker-compose.dev.yml
├── .github/               # GitHub workflows
│   └── workflows/
│       ├── ci.yml         # Continuous integration
│       └── release.yml    # Release automation
├── Makefile              # Development commands
├── pyproject.toml        # Python project config
├── claude.md            # AI assistant context
└── README.md            # Project overview

Module Organization

DNS Server Modules

# dns-server/libs/dns_utils.py
def validate_domain(domain: str) -> bool:
    """Validate domain name format"""

def format_dns_response(answer: List[str], status: int) -> Dict:
    """Format DNS response as JSON"""

def parse_dns_query(query_string: str) -> Tuple[str, str]:
    """Parse DNS query parameters"""
# dns-server/libs/auth.py
class TokenManager:
    """Handle token operations"""

    def validate_token(self, token: str) -> Optional[Dict]:
        """Validate authentication token"""

    def check_permissions(self, token_id: int, domain: str) -> bool:
        """Check domain permissions"""
# dns-server/libs/validators.py
def validate_token_format(token: str) -> bool:
    """Validate token format"""

def validate_domain_name(domain: str) -> bool:
    """Validate domain name"""

def sanitize_input(user_input: str) -> str:
    """Sanitize user input"""

DNS Client Modules

# dns-client/libs/client.py
class DNSOverHTTPSClient:
    """DoH client implementation"""

    def query(self, domain: str, record_type: str = "A") -> Dict:
        """Perform DNS query"""

    def set_auth_token(self, token: str) -> None:
        """Set authentication token"""
# dns-client/libs/forwarder.py
class DNSForwarder:
    """Local DNS forwarding service"""

    def start_udp_server(self) -> None:
        """Start UDP DNS server"""

    def start_tcp_server(self) -> None:
        """Start TCP DNS server"""

Development Workflow

Feature Development Process

# 1. Create feature branch
git checkout -b feature/token-expiry
git push -u origin feature/token-expiry

# 2. Implement feature
# Edit code, write tests, update docs

# 3. Run local tests
make test
make lint
make type-check

# 4. Commit changes
git add .
git commit -m "feat: add token expiry functionality

- Add expiry field to tokens table
- Implement expiry validation in auth
- Add UI for setting token expiry
- Update API documentation

Closes #123"

# 5. Push and create PR
git push
gh pr create --title "Add token expiry functionality" --body "Implements token expiration feature as requested in #123"

Code Review Process

# Before submitting PR:
1. [ ] All tests pass
2. [ ] Code coverage > 80%
3. [ ] No linting errors
4. [ ] Type checking passes
5. [ ] Documentation updated
6. [ ] Security review completed
7. [ ] Performance impact assessed

# Review checklist:
1. [ ] Code follows style guidelines
2. [ ] Logic is correct and efficient
3. [ ] Error handling is appropriate
4. [ ] Tests are comprehensive
5. [ ] Documentation is clear
6. [ ] Security implications reviewed
7. [ ] Breaking changes documented

Commit Message Convention

# Format: <type>(<scope>): <subject>

# Types:
feat:     # New feature
fix:      # Bug fix
docs:     # Documentation only changes
style:    # Formatting, missing semicolons, etc
refactor: # Code change that neither fixes bug nor adds feature
perf:     # Performance improvement
test:     # Adding missing tests
chore:    # Changes to build process or auxiliary tools

# Examples:
feat(auth): add token expiry functionality
fix(server): resolve DNS timeout issue
docs(api): update authentication endpoints
test(client): add integration tests for forwarder

Testing Framework

Test Structure

# tests/conftest.py
import pytest
from dns_server.libs.auth import TokenManager
from dns_client.libs.client import DNSOverHTTPSClient

@pytest.fixture
def token_manager():
    """Token manager fixture"""
    return TokenManager(db_url="sqlite:///:memory:")

@pytest.fixture
def dns_client():
    """DNS client fixture"""
    return DNSOverHTTPSClient("http://localhost:8080", "test-token")

@pytest.fixture
def sample_token():
    """Sample token for testing"""
    return {
        'id': 1,
        'token': 'test-token-12345',
        'name': 'Test Token',
        'active': True,
        'domains': ['example.com', '*.test.com']
    }

Unit Testing

# tests/unit/test_auth.py
import pytest
from unittest.mock import Mock, patch
from dns_server.libs.auth import TokenManager

class TestTokenManager:
    def test_validate_token_success(self, token_manager, sample_token):
        """Test successful token validation"""
        with patch.object(token_manager, 'get_token') as mock_get:
            mock_get.return_value = sample_token

            result = token_manager.validate_token('test-token-12345')

            assert result is not None
            assert result['name'] == 'Test Token'
            mock_get.assert_called_once_with('test-token-12345')

    def test_validate_token_not_found(self, token_manager):
        """Test token not found"""
        with patch.object(token_manager, 'get_token') as mock_get:
            mock_get.return_value = None

            result = token_manager.validate_token('invalid-token')

            assert result is None

    @pytest.mark.parametrize("domain,expected", [
        ("example.com", True),
        ("sub.example.com", True), 
        ("test.com", True),
        ("other.com", False),
    ])
    def test_check_permissions(self, token_manager, sample_token, domain, expected):
        """Test permission checking"""
        result = token_manager.check_permissions(sample_token['id'], domain)
        assert result == expected

Integration Testing

# tests/integration/test_end_to_end.py
import pytest
import requests
from dns_server.bins.server import main as server_main
from dns_client.libs.client import DNSOverHTTPSClient

class TestEndToEnd:
    @pytest.fixture(scope="class")
    def running_server(self):
        """Start DNS server for testing"""
        # Start server in background thread
        pass

    def test_dns_query_with_valid_token(self, running_server):
        """Test complete DNS query flow"""
        client = DNSOverHTTPSClient("http://localhost:8080", "valid-test-token")

        response = client.query("example.com", "A")

        assert response['Status'] == 0
        assert 'Answer' in response
        assert len(response['Answer']) > 0

    def test_dns_query_with_invalid_token(self, running_server):
        """Test DNS query with invalid token"""
        client = DNSOverHTTPSClient("http://localhost:8080", "invalid-token")

        with pytest.raises(requests.exceptions.HTTPError) as exc_info:
            client.query("example.com", "A")

        assert exc_info.value.response.status_code == 403

Security Testing

# tests/security/test_injection.py
import pytest
from dns_server.libs.auth import TokenManager

class TestSecurityInjection:
    def test_sql_injection_token(self, token_manager):
        """Test SQL injection in token validation"""
        malicious_tokens = [
            "'; DROP TABLE tokens; --",
            "' OR '1'='1",
            "admin'/**/AND/**/1=1--",
        ]

        for token in malicious_tokens:
            result = token_manager.validate_token(token)
            assert result is None  # Should not succeed

    def test_domain_validation_bypass(self):
        """Test domain validation bypass attempts"""
        from dns_server.libs.validators import validate_domain_name

        bypass_attempts = [
            "../admin.example.com",
            "example.com/../admin",
            "example.com\x00admin.com",
            "example.com;admin.com",
        ]

        for attempt in bypass_attempts:
            assert not validate_domain_name(attempt)

    def test_xss_prevention(self):
        """Test XSS prevention in web console"""
        malicious_inputs = [
            "<script>alert('xss')</script>",
            "javascript:alert('xss')",
            "<img src=x onerror=alert('xss')>",
        ]

        for input_str in malicious_inputs:
            # Test that input is properly escaped
            pass

Performance Testing

# tests/performance/test_load.py
import pytest
import time
import concurrent.futures
from dns_client.libs.client import DNSOverHTTPSClient

class TestPerformance:
    def test_concurrent_queries(self):
        """Test performance under concurrent load"""
        client = DNSOverHTTPSClient("http://localhost:8080", "test-token")

        def make_query():
            return client.query("example.com", "A")

        start_time = time.time()

        with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
            futures = [executor.submit(make_query) for _ in range(100)]
            results = [f.result() for f in futures]

        end_time = time.time()

        # Assertions
        assert len(results) == 100
        assert all(r['Status'] == 0 for r in results)
        assert end_time - start_time < 10  # Should complete within 10 seconds

    def test_memory_usage(self):
        """Test memory usage during operation"""
        import psutil
        import os

        process = psutil.Process(os.getpid())
        initial_memory = process.memory_info().rss

        client = DNSOverHTTPSClient("http://localhost:8080", "test-token")

        # Make many queries
        for _ in range(1000):
            client.query("example.com", "A")

        final_memory = process.memory_info().rss
        memory_increase = final_memory - initial_memory

        # Memory increase should be reasonable (< 50MB)
        assert memory_increase < 50 * 1024 * 1024

Running Tests

# Run all tests
make test

# Run specific test categories
pytest tests/unit/
pytest tests/integration/ 
pytest tests/security/
pytest tests/performance/

# Run with coverage
pytest --cov=dns_server --cov=dns_client --cov-report=html

# Run tests in parallel
pytest -n auto

# Run tests with verbose output
pytest -v

# Run specific test file
pytest tests/unit/test_auth.py

# Run tests matching pattern
pytest -k "test_token"

# Run tests with debugging
pytest --pdb

Code Quality Standards

Linting Configuration

# setup.cfg
[flake8]
max-line-length = 88
extend-ignore = E203, W503
exclude = 
    .git,
    __pycache__,
    venv,
    .venv,
    build,
    dist

[mypy]
python_version = 3.8
warn_return_any = True
warn_unused_configs = True
disallow_untyped_defs = True
disallow_incomplete_defs = True
check_untyped_defs = True
disallow_untyped_decorators = True
no_implicit_optional = True
warn_redundant_casts = True
warn_unused_ignores = True
# pyproject.toml
[tool.black]
line-length = 88
target-version = ['py38']
include = '\.pyi?$'
extend-exclude = '''
/(
  # directories
  \.eggs
  | \.git
  | \.hg
  | \.mypy_cache
  | \.tox
  | \.venv
  | build
  | dist
)/
'''

[tool.isort]
profile = "black"
line_length = 88
multi_line_output = 3
include_trailing_comma = true
force_grid_wrap = 0
use_parentheses = true
ensure_newline_before_comments = true

Pre-commit Hooks

# .pre-commit-config.yaml
repos:
  - repo: https://github.com/pre-commit/pre-commit-hooks
    rev: v4.4.0
    hooks:
      - id: trailing-whitespace
      - id: end-of-file-fixer
      - id: check-yaml
      - id: check-added-large-files
      - id: check-merge-conflict

  - repo: https://github.com/psf/black
    rev: 23.1.0
    hooks:
      - id: black

  - repo: https://github.com/pycqa/isort
    rev: 5.12.0
    hooks:
      - id: isort

  - repo: https://github.com/pycqa/flake8
    rev: 6.0.0
    hooks:
      - id: flake8

  - repo: https://github.com/pre-commit/mirrors-mypy
    rev: v1.0.1
    hooks:
      - id: mypy
        additional_dependencies: [types-all]

Code Documentation Standards

def authenticate_token(token: str, domain: str, db_connection) -> bool:
    """Authenticate token for domain access.

    Validates the provided authentication token and checks if it has
    permission to access the specified domain. Supports both exact
    domain matching and wildcard permissions.

    Args:
        token: The authentication token to validate. Must be a valid
            base64url encoded string of at least 32 characters.
        domain: The domain name being accessed. Must be a valid FQDN
            or subdomain following RFC 1035 standards.
        db_connection: Database connection object for token lookup.
            Should be an active SQLite, PostgreSQL, or MySQL connection.

    Returns:
        True if the token is valid and has permission for the domain,
        False otherwise.

    Raises:
        ValueError: If token format is invalid or domain is malformed.
        DatabaseError: If database connection fails or query errors occur.
        AuthenticationError: If token validation process fails.

    Example:
        >>> db = sqlite3.connect(':memory:')
        >>> authenticate_token('abc123def456', 'example.com', db)
        True
        >>> authenticate_token('invalid', 'example.com', db)
        False

    Note:
        This function performs timing-attack resistant token comparison
        and logs all authentication attempts for security auditing.
    """
    # Implementation here
    pass

Database Development

Migration System

# migrations/001_initial_schema.py
def up(db):
    """Create initial database schema"""
    db.executesql('''
        CREATE TABLE tokens (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            token VARCHAR(255) UNIQUE NOT NULL,
            name VARCHAR(100) NOT NULL,
            description TEXT,
            created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
            last_used DATETIME,
            active BOOLEAN DEFAULT TRUE
        )
    ''')

    db.executesql('''
        CREATE TABLE domains (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            name VARCHAR(255) UNIQUE NOT NULL,
            description TEXT,
            created_at DATETIME DEFAULT CURRENT_TIMESTAMP
        )
    ''')

    db.executesql('''
        CREATE TABLE token_domains (
            token_id INTEGER REFERENCES tokens(id) ON DELETE CASCADE,
            domain_id INTEGER REFERENCES domains(id) ON DELETE CASCADE,
            created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
            PRIMARY KEY (token_id, domain_id)
        )
    ''')

def down(db):
    """Drop tables"""
    db.executesql('DROP TABLE IF EXISTS token_domains')
    db.executesql('DROP TABLE IF EXISTS domains')
    db.executesql('DROP TABLE IF EXISTS tokens')
# migrations/002_add_query_logs.py
def up(db):
    """Add query logging table"""
    db.executesql('''
        CREATE TABLE query_logs (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            token_id INTEGER REFERENCES tokens(id) ON DELETE SET NULL,
            domain_queried VARCHAR(255) NOT NULL,
            query_type VARCHAR(10),
            status VARCHAR(20),
            client_ip VARCHAR(45),
            timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
        )
    ''')

    # Add indexes for performance
    db.executesql('CREATE INDEX idx_query_logs_timestamp ON query_logs(timestamp DESC)')
    db.executesql('CREATE INDEX idx_query_logs_token ON query_logs(token_id)')

def down(db):
    """Remove query logging table"""
    db.executesql('DROP TABLE IF EXISTS query_logs')

Database Testing

# tests/unit/test_database.py
import pytest
from pydal import DAL, Field

@pytest.fixture
def test_db():
    """Test database fixture"""
    db = DAL('sqlite://test.db')

    # Define tables
    db.define_table('tokens',
        Field('token', 'string'),
        Field('name', 'string'),
        Field('active', 'boolean', default=True)
    )

    yield db

    # Cleanup
    db.close()

def test_token_creation(test_db):
    """Test token creation"""
    token_id = test_db.tokens.insert(
        token='test-token-123',
        name='Test Token'
    )

    assert token_id is not None

    token = test_db.tokens[token_id]
    assert token.token == 'test-token-123'
    assert token.name == 'Test Token'
    assert token.active is True

def test_token_uniqueness(test_db):
    """Test token uniqueness constraint"""
    test_db.tokens.insert(token='duplicate-token', name='Token 1')

    with pytest.raises(Exception):  # Should raise constraint violation
        test_db.tokens.insert(token='duplicate-token', name='Token 2')

API Development

API Design Principles

# RESTful API design
GET    /api/v1/tokens              # List all tokens
POST   /api/v1/tokens              # Create new token
GET    /api/v1/tokens/{id}         # Get specific token
PUT    /api/v1/tokens/{id}         # Update token
DELETE /api/v1/tokens/{id}         # Delete token
PATCH  /api/v1/tokens/{id}/status  # Toggle token status

# Domain management
GET    /api/v1/domains             # List all domains
POST   /api/v1/domains             # Add new domain
DELETE /api/v1/domains/{id}        # Remove domain

# Permission management
GET    /api/v1/permissions         # List all permissions
POST   /api/v1/permissions         # Grant permission
DELETE /api/v1/permissions/{token_id}/{domain_id}  # Revoke permission

API Response Standards

# Success responses
{
    "success": true,
    "data": {
        "id": 123,
        "token": "abc123",
        "name": "My Token"
    },
    "message": "Token created successfully"
}

# Error responses  
{
    "success": false,
    "error": {
        "code": "VALIDATION_ERROR",
        "message": "Token name is required",
        "details": {
            "field": "name",
            "constraint": "not_empty"
        }
    }
}

# List responses
{
    "success": true,
    "data": [
        {"id": 1, "name": "Token 1"},
        {"id": 2, "name": "Token 2"}
    ],
    "pagination": {
        "page": 1,
        "per_page": 20,
        "total": 45,
        "pages": 3
    }
}

API Testing

# tests/integration/test_api.py
import pytest
import requests

class TestTokenAPI:
    def test_create_token(self, api_base_url):
        """Test token creation API"""
        payload = {
            "name": "Test API Token",
            "description": "Created via API test"
        }

        response = requests.post(f"{api_base_url}/tokens", json=payload)

        assert response.status_code == 201
        data = response.json()
        assert data["success"] is True
        assert "token" in data["data"]
        assert data["data"]["name"] == "Test API Token"

    def test_list_tokens(self, api_base_url, sample_tokens):
        """Test token listing API"""
        response = requests.get(f"{api_base_url}/tokens")

        assert response.status_code == 200
        data = response.json()
        assert data["success"] is True
        assert len(data["data"]) >= len(sample_tokens)

    def test_invalid_token_creation(self, api_base_url):
        """Test invalid token creation"""
        payload = {}  # Missing required fields

        response = requests.post(f"{api_base_url}/tokens", json=payload)

        assert response.status_code == 400
        data = response.json()
        assert data["success"] is False
        assert "error" in data

Frontend Development

Py4web Templates

<!-- templates/layout.html -->
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>[[=title or "DNS Console"]] - Squawk</title>

    <!-- CSS Framework -->
    <link href="https://cdn.jsdelivr.net/npm/[email protected]/dist/tailwind.min.css" rel="stylesheet">
    <!-- Custom CSS -->
    <link href="[[=URL('static', 'css/app.css')]]" rel="stylesheet">
</head>
<body class="bg-gray-50">
    <nav class="bg-blue-600 text-white p-4">
        <div class="container mx-auto flex justify-between items-center">
            <h1 class="text-xl font-bold">Squawk DNS Console</h1>
            <div class="space-x-4">
                <a href="[[=URL('index')]]" class="hover:text-blue-200">Dashboard</a>
                <a href="[[=URL('tokens')]]" class="hover:text-blue-200">Tokens</a>
                <a href="[[=URL('domains')]]" class="hover:text-blue-200">Domains</a>
                <a href="[[=URL('permissions')]]" class="hover:text-blue-200">Permissions</a>
                <a href="[[=URL('logs')]]" class="hover:text-blue-200">Logs</a>
            </div>
        </div>
    </nav>

    <main class="container mx-auto mt-8 px-4">
        [[if flash:]]
        <div class="bg-blue-100 border-l-4 border-blue-500 text-blue-700 p-4 mb-6" role="alert">
            [[=flash]]
        </div>
        [[pass]]

        [[include]]
    </main>

    <!-- JavaScript -->
    <script src="[[=URL('static', 'js/app.js')]]"></script>
</body>
</html>

JavaScript Frontend

// static/js/app.js
class SquawkConsole {
    constructor() {
        this.baseUrl = '/dns_console/api';
        this.init();
    }

    init() {
        this.setupEventListeners();
        this.loadDashboardData();
    }

    setupEventListeners() {
        // Permission matrix checkboxes
        document.addEventListener('change', (e) => {
            if (e.target.classList.contains('permission-checkbox')) {
                this.togglePermission(e.target);
            }
        });

        // Token form submission
        const tokenForm = document.getElementById('token-form');
        if (tokenForm) {
            tokenForm.addEventListener('submit', (e) => {
                this.handleTokenSubmit(e);
            });
        }
    }

    async togglePermission(checkbox) {
        const tokenId = checkbox.dataset.tokenId;
        const domainId = checkbox.dataset.domainId;

        try {
            const response = await fetch(`${this.baseUrl}/permissions/toggle`, {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json',
                },
                body: JSON.stringify({
                    token_id: tokenId,
                    domain_id: domainId
                })
            });

            const result = await response.json();

            if (result.success) {
                checkbox.checked = result.new_state;
                this.showNotification('Permission updated successfully', 'success');
            } else {
                checkbox.checked = !checkbox.checked; // Revert
                this.showNotification(`Error: ${result.error}`, 'error');
            }
        } catch (error) {
            checkbox.checked = !checkbox.checked; // Revert
            this.showNotification('Network error occurred', 'error');
        }
    }

    showNotification(message, type = 'info') {
        const notification = document.createElement('div');
        notification.className = `fixed top-4 right-4 p-4 rounded-md shadow-md z-50 ${
            type === 'success' ? 'bg-green-500' : 
            type === 'error' ? 'bg-red-500' : 'bg-blue-500'
        } text-white`;
        notification.textContent = message;

        document.body.appendChild(notification);

        setTimeout(() => {
            notification.remove();
        }, 3000);
    }

    async loadDashboardData() {
        try {
            const response = await fetch(`${this.baseUrl}/stats`);
            const data = await response.json();

            if (data.success) {
                this.updateDashboardStats(data.data);
            }
        } catch (error) {
            console.error('Failed to load dashboard data:', error);
        }
    }

    updateDashboardStats(stats) {
        const elements = {
            tokenCount: document.getElementById('token-count'),
            domainCount: document.getElementById('domain-count'),
            queryCount: document.getElementById('query-count')
        };

        if (elements.tokenCount) {
            elements.tokenCount.textContent = stats.tokens;
        }
        if (elements.domainCount) {
            elements.domainCount.textContent = stats.domains;
        }
        if (elements.queryCount) {
            elements.queryCount.textContent = stats.queries;
        }
    }
}

// Initialize when DOM is loaded
document.addEventListener('DOMContentLoaded', () => {
    new SquawkConsole();
});

Debugging and Troubleshooting

Debug Configuration

# debug.py - Debug utilities
import logging
import sys
from typing import Any, Dict

# Configure debug logging
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout),
        logging.FileHandler('debug.log')
    ]
)

def debug_request(handler_instance):
    """Debug HTTP request details"""
    logger = logging.getLogger('request_debug')
    logger.debug(f"Path: {handler_instance.path}")
    logger.debug(f"Method: {handler_instance.command}")
    logger.debug(f"Headers: {dict(handler_instance.headers)}")
    logger.debug(f"Client: {handler_instance.client_address}")

def debug_database_query(query: str, params: tuple = None):
    """Debug database queries"""
    logger = logging.getLogger('db_debug')
    logger.debug(f"Query: {query}")
    if params:
        logger.debug(f"Parameters: {params}")

def debug_token_validation(token: str, domain: str, result: bool):
    """Debug token validation process"""
    logger = logging.getLogger('auth_debug')
    logger.debug(f"Token: {token[:8]}... (truncated)")
    logger.debug(f"Domain: {domain}")
    logger.debug(f"Result: {result}")

Common Issues and Solutions

# troubleshooting.py
class TroubleshootingGuide:
    """Common issues and their solutions"""

    @staticmethod
    def diagnose_auth_failure(token: str, domain: str) -> Dict[str, Any]:
        """Diagnose authentication failures"""
        issues = []

        # Check token format
        if len(token) < 32:
            issues.append("Token too short - should be at least 32 characters")

        # Check domain format
        if not re.match(r'^[a-zA-Z0-9.-]+$', domain):
            issues.append("Invalid domain format")

        # Check database connection
        try:
            # Test database connection
            pass
        except Exception as e:
            issues.append(f"Database connection failed: {e}")

        return {
            'token_format_valid': len(token) >= 32,
            'domain_format_valid': bool(re.match(r'^[a-zA-Z0-9.-]+$', domain)),
            'issues': issues,
            'suggestions': [
                "Verify token is active in web console",
                "Check domain permissions are assigned",
                "Review server logs for detailed errors"
            ]
        }

    @staticmethod
    def check_server_health() -> Dict[str, Any]:
        """Check server health status"""
        return {
            'database_connection': True,
            'upstream_dns_reachable': True,
            'memory_usage_mb': 128,
            'active_connections': 15
        }

Performance Debugging

# performance.py
import time
import functools
from typing import Callable

def timing_decorator(func: Callable) -> Callable:
    """Decorator to measure function execution time"""
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()

        logger = logging.getLogger('performance')
        logger.debug(f"{func.__name__} took {end_time - start_time:.4f} seconds")

        return result
    return wrapper

@timing_decorator
def validate_token_with_timing(token: str, domain: str) -> bool:
    """Token validation with performance timing"""
    # Implementation here
    pass

class PerformanceMonitor:
    """Monitor performance metrics"""

    def __init__(self):
        self.metrics = {}

    def record_request(self, duration: float, status_code: int):
        """Record request metrics"""
        if status_code not in self.metrics:
            self.metrics[status_code] = []

        self.metrics[status_code].append(duration)

    def get_stats(self) -> Dict[str, float]:
        """Get performance statistics"""
        stats = {}

        for status_code, durations in self.metrics.items():
            stats[f'avg_time_{status_code}'] = sum(durations) / len(durations)
            stats[f'max_time_{status_code}'] = max(durations)
            stats[f'min_time_{status_code}'] = min(durations)
            stats[f'requests_{status_code}'] = len(durations)

        return stats

Performance Optimization

Database Optimization

-- Performance indexes
CREATE INDEX CONCURRENTLY idx_tokens_active_token ON tokens(active, token) WHERE active = true;
CREATE INDEX CONCURRENTLY idx_query_logs_timestamp_desc ON query_logs(timestamp DESC);
CREATE INDEX CONCURRENTLY idx_token_domains_covering ON token_domains(token_id, domain_id);

-- Analyze table statistics
ANALYZE tokens;
ANALYZE domains; 
ANALYZE token_domains;
ANALYZE query_logs;

-- Vacuum for SQLite
VACUUM;

-- For PostgreSQL
VACUUM ANALYZE;

Caching Strategy

# caching.py
import time
from typing import Dict, Any, Optional
from functools import lru_cache

class TokenPermissionCache:
    """Cache for token permissions"""

    def __init__(self, ttl: int = 300):  # 5 minute TTL
        self.cache = {}
        self.ttl = ttl

    def get(self, token: str, domain: str) -> Optional[bool]:
        """Get cached permission result"""
        key = f"{token}:{domain}"

        if key in self.cache:
            result, timestamp = self.cache[key]
            if time.time() - timestamp < self.ttl:
                return result
            else:
                del self.cache[key]

        return None

    def set(self, token: str, domain: str, result: bool) -> None:
        """Cache permission result"""
        key = f"{token}:{domain}"
        self.cache[key] = (result, time.time())

    def invalidate(self, token: str = None) -> None:
        """Invalidate cache entries"""
        if token:
            # Invalidate specific token
            keys_to_delete = [k for k in self.cache.keys() if k.startswith(f"{token}:")]
            for key in keys_to_delete:
                del self.cache[key]
        else:
            # Clear entire cache
            self.cache.clear()

# Use LRU cache for frequently accessed functions
@lru_cache(maxsize=1000)
def validate_domain_format(domain: str) -> bool:
    """Cached domain format validation"""
    import re
    pattern = re.compile(r'^[a-zA-Z0-9.-]+$')
    return bool(pattern.match(domain))

Security Considerations

Input Sanitization

# security.py
import html
import re
from typing import str

def sanitize_domain_input(domain: str) -> str:
    """Sanitize domain name input"""
    # Remove dangerous characters
    domain = re.sub(r'[^a-zA-Z0-9.-]', '', domain)

    # Limit length
    domain = domain[:253]  # Max domain length per RFC

    # Remove leading/trailing dots
    domain = domain.strip('.')

    return domain.lower()

def sanitize_html_input(user_input: str) -> str:
    """Sanitize HTML input to prevent XSS"""
    # Escape HTML entities
    sanitized = html.escape(user_input)

    # Remove potentially dangerous protocols
    dangerous_patterns = [
        r'javascript:',
        r'data:',
        r'vbscript:',
    ]

    for pattern in dangerous_patterns:
        sanitized = re.sub(pattern, '', sanitized, flags=re.IGNORECASE)

    return sanitized

def validate_token_format(token: str) -> bool:
    """Validate token format"""
    # Check length
    if len(token) < 32 or len(token) > 128:
        return False

    # Check character set (base64url)
    if not re.match(r'^[A-Za-z0-9_-]+$', token):
        return False

    return True

Rate Limiting

# rate_limiting.py
import time
from collections import defaultdict, deque
from typing import Dict, Deque

class RateLimiter:
    """Token bucket rate limiter"""

    def __init__(self, requests_per_minute: int = 60):
        self.requests_per_minute = requests_per_minute
        self.requests: Dict[str, Deque[float]] = defaultdict(deque)

    def is_allowed(self, identifier: str) -> bool:
        """Check if request is allowed"""
        now = time.time()
        minute_ago = now - 60

        # Clean old requests
        request_times = self.requests[identifier]
        while request_times and request_times[0] < minute_ago:
            request_times.popleft()

        # Check if under limit
        if len(request_times) < self.requests_per_minute:
            request_times.append(now)
            return True

        return False

# Usage in DNS handler
rate_limiter = RateLimiter(requests_per_minute=100)

def check_rate_limit(self, client_ip: str) -> bool:
    """Check rate limit for client"""
    return rate_limiter.is_allowed(client_ip)

Release Process

Version Management

# Semantic versioning
MAJOR.MINOR.PATCH

# Example versions:
1.0.0  # Initial release
1.0.1  # Patch release (bug fixes)
1.1.0  # Minor release (new features, backward compatible)
2.0.0  # Major release (breaking changes)

Release Checklist

## Pre-release Checklist

- [ ] All tests pass (unit, integration, security)
- [ ] Code coverage > 80%
- [ ] Documentation updated
- [ ] CHANGELOG.md updated
- [ ] Version number bumped
- [ ] Security scan completed
- [ ] Performance regression tests passed
- [ ] Breaking changes documented
- [ ] Migration scripts tested
- [ ] Deployment scripts updated

## Release Process

1. [ ] Create release branch: `git checkout -b release/v1.2.0`
2. [ ] Update version in all relevant files
3. [ ] Run full test suite: `make test-all`
4. [ ] Update CHANGELOG.md with release notes
5. [ ] Commit changes: `git commit -m "chore: prepare v1.2.0 release"`
6. [ ] Create pull request for review
7. [ ] Merge to main after approval
8. [ ] Create git tag: `git tag v1.2.0`
9. [ ] Push tag: `git push origin v1.2.0`
10. [ ] GitHub Actions will build and publish artifacts
11. [ ] Update documentation site
12. [ ] Announce release

Automated Release Pipeline

# .github/workflows/release.yml
name: Release
on:
  push:
    tags:
      - 'v*'

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Run tests
        run: make test-all

  build:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3

      - name: Build Docker images
        run: |
          docker build -t squawk:${{ github.ref_name }} .
          docker tag squawk:${{ github.ref_name }} squawk:latest

      - name: Push to registry
        run: |
          docker push squawk:${{ github.ref_name }}
          docker push squawk:latest

  release:
    needs: build
    runs-on: ubuntu-latest
    steps:
      - name: Create GitHub Release
        uses: actions/create-release@v1
        with:
          tag_name: ${{ github.ref }}
          release_name: Release ${{ github.ref }}
          draft: false
          prerelease: false

This comprehensive development guide provides all the necessary information for developers to contribute effectively to the Squawk project, from initial setup through the release process.