Bidirectional Markdown syncing for Ghost

A couple days ago, I pulled the trigger and moved this blog from Next.js to Ghost. You can read more about the process behind this decision in my other piece. In this post, I want to focus on how the new stack works, and why I think this should stand the test of time. Here goes.

My primary focus for the architecture of this blog always has been the content backbone. Post content has been stored as Markdown files in a Git repository for the past 8 years and I do not expect this to change in the next 8 years either. Images and media are also stored in the repository using Git LFS, and hosted on my static subdomain with proper Cache-Control headers.

Previously, I spent a lot of effort on designing a polished landing page, including animations "inspired" by the best (are you really a web developer until you've attempted to copy Stripe?). For this iteration, my focus was on the content. As long as I found a way to get the Markdown files properly displayed, I'd be happy.

My good friend and frequent collaborator Tim switched his blog to Ghost, an open content and newsletter platform you can self-host. I trust his taste in software and share his idea about ownership, so I didn't think long before making this decision.

Self-hosting Ghost isn't a miracle, and their documentation contains everything you need. Where it gets a little more interesting is my way of syncing content: Most people probably just opt to author their posts in Ghost's fantastic editor (as did I, for the post you're reading at this very moment), yet I wanted the guarantee of having all my content on disk, neatly versioned and all.

Syncing from Markdown to Ghost

To get my 230+ existing posts onto Ghost in the first place, I had to write a script to convert the Markdown source to HTML and upload everything via the built-in Admin API. This was easier than I thought, and with the help of Claude Code, I got to the following solution:

#!/usr/bin/env python3
"""
Ghost CMS Sync Script
Syncs Markdown blog posts from content/posts/ to Ghost CMS.
"""

import os
import sys
import time
import glob as file_glob
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import jwt
import requests
import frontmatter
import markdown2
from dotenv import load_dotenv


# Debug mode - set to True to see HTML output
DEBUG = os.getenv('DEBUG', 'false').lower() == 'true'


# Configuration
GHOST_API_URL = "https://YOUR_URL_HERE"
CONTENT_DIR = Path(__file__).parent.parent / "content" / "posts"
ENV_FILE = Path(__file__).parent.parent / ".env"

# External post types that should not be synced to Ghost
EXTERNAL_POST_TYPES = ['EXTERNAL']


class GhostAPIClient:
    """Client for interacting with Ghost Admin API."""

    def __init__(self, api_url: str, admin_api_key: str):
        self.api_url = api_url.rstrip('/')
        self.admin_api_key = admin_api_key

        # Parse the admin API key
        try:
            self.key_id, self.key_secret = admin_api_key.split(':')
        except ValueError:
            raise ValueError("Invalid GHOST_ADMIN_API_KEY format. Expected 'ID:SECRET'")

    def _generate_jwt_token(self) -> str:
        """Generate a JWT token for Ghost Admin API authentication."""
        iat = int(datetime.now(timezone.utc).timestamp())

        header = {
            'alg': 'HS256',
            'typ': 'JWT',
            'kid': self.key_id
        }

        payload = {
            'iat': iat,
            'exp': iat + 300,  # Token expires in 5 minutes
            'aud': '/admin/'
        }

        token = jwt.encode(payload, bytes.fromhex(self.key_secret), algorithm='HS256', headers=header)
        return token

    def _make_request(self, method: str, endpoint: str, data: Optional[Dict] = None) -> requests.Response:
        """Make an authenticated request to Ghost API."""
        token = self._generate_jwt_token()
        url = f"{self.api_url}{endpoint}"

        headers = {
            'Authorization': f'Ghost {token}',
            'Content-Type': 'application/json'
        }

        response = requests.request(method, url, json=data, headers=headers)
        return response

    def get_post_by_slug(self, slug: str) -> Optional[Dict]:
        """Get a post by its slug."""
        try:
            response = self._make_request('GET', f'/ghost/api/admin/posts/slug/{slug}/')
            if response.status_code == 200:
                return response.json()['posts'][0]
            elif response.status_code == 404:
                return None
            else:
                print(f"  Warning: Unexpected status {response.status_code} when fetching slug {slug}")
                return None
        except Exception as e:
            print(f"  Error fetching post by slug {slug}: {e}")
            return None

    def get_posts_by_title(self, title: str) -> List[Dict]:
        """Get all posts with a specific title."""
        try:
            # Use filter parameter to search by title
            response = self._make_request('GET', f'/ghost/api/admin/posts/?filter=title:\'{title}\'&limit=all')
            if response.status_code == 200:
                return response.json()['posts']
            else:
                print(f"  Warning: Unexpected status {response.status_code} when searching for title")
                return []
        except Exception as e:
            print(f"  Error searching posts by title: {e}")
            return []

    def delete_post(self, post_id: str) -> Tuple[bool, Optional[str]]:
        """Delete a post from Ghost."""
        try:
            response = self._make_request('DELETE', f'/ghost/api/admin/posts/{post_id}/')
            if response.status_code == 204:
                return True, None
            else:
                error_msg = response.json().get('errors', [{}])[0].get('message', 'Unknown error')
                return False, f"Status {response.status_code}: {error_msg}"
        except Exception as e:
            return False, str(e)

    def create_post(self, post_data: Dict) -> Tuple[bool, Optional[str]]:
        """Create a new post in Ghost."""
        try:
            response = self._make_request('POST', '/ghost/api/admin/posts/?source=html', {'posts': [post_data]})
            if response.status_code in [200, 201]:
                return True, None
            else:
                error_msg = response.json().get('errors', [{}])[0].get('message', 'Unknown error')
                return False, f"Status {response.status_code}: {error_msg}"
        except Exception as e:
            return False, str(e)

    def update_post(self, post_id: str, post_data: Dict) -> Tuple[bool, Optional[str]]:
        """Update an existing post in Ghost."""
        try:
            response = self._make_request('PUT', f'/ghost/api/admin/posts/{post_id}/?source=html', {'posts': [post_data]})
            if response.status_code == 200:
                return True, None
            else:
                error_msg = response.json().get('errors', [{}])[0].get('message', 'Unknown error')
                return False, f"Status {response.status_code}: {error_msg}"
        except Exception as e:
            return False, str(e)


def parse_markdown_file(file_path: Path) -> Optional[Tuple[Dict, str]]:
    """Parse a markdown file and return frontmatter and content."""
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            post = frontmatter.load(f)

        # Validate required fields
        required_fields = ['title', 'path', 'date', 'published']
        missing_fields = [field for field in required_fields if field not in post.metadata]

        if missing_fields:
            print(f"  ⚠ Skipping {file_path.name}: Missing fields {missing_fields}")
            return None

        return post.metadata, post.content
    except Exception as e:
        print(f"  ⚠ Error parsing {file_path.name}: {e}")
        return None


def convert_markdown_to_html(markdown_content: str) -> str:
    """Convert Markdown content to HTML, preserving code block language classes."""
    import re
    import html as html_lib

    # Extract fenced code blocks with language info
    code_blocks = []

    def extract_fenced_block(match):
        language = match.group(1) or ''
        code = match.group(2)
        # Use HTML comment as placeholder (won't be processed by markdown2)
        placeholder = f'<!--CODE_BLOCK_{len(code_blocks)}-->'
        code_blocks.append((language, code))
        return placeholder

    # Extract fenced code blocks (with or without language)
    fenced_pattern = r'```(\w+)?\n(.*?)```'
    temp_md = re.sub(fenced_pattern, extract_fenced_block, markdown_content, flags=re.DOTALL)

    # Convert markdown to HTML (without fenced-code-blocks extra since we handle it manually)
    # Note: 'code-friendly' removed to enable underscore-based emphasis (_italic_)
    extras = ['tables', 'break-on-newline']
    html = markdown2.markdown(temp_md, extras=extras)

    # Replace placeholders with properly formatted code blocks
    for i, (language, code) in enumerate(code_blocks):
        placeholder = f'<!--CODE_BLOCK_{i}-->'
        code_escaped = html_lib.escape(code.rstrip())

        if language:
            replacement = f'<pre><code class="language-{language}">{code_escaped}</code></pre>'
        else:
            replacement = f'<pre><code>{code_escaped}</code></pre>'

        html = html.replace(placeholder, replacement)

    return html


def extract_slug_from_path(path: str) -> str:
    """Extract slug from the path field."""
    # Handle full URLs (external posts)
    if path.startswith('http://') or path.startswith('https://'):
        # Extract the last segment from the URL path
        # e.g., 'https://www.inngest.com/blog/sharding-at-inngest' -> 'sharding-at-inngest'
        from urllib.parse import urlparse
        parsed = urlparse(path)
        slug = parsed.path.rstrip('/').split('/')[-1]
        return slug if slug else path

    # Handle local paths like '/blog/2024-12-30-post-title'
    # The path is like '/blog/2024-12-30-looking-back-on-2024'
    # We want to extract just '2024-12-30-looking-back-on-2024'
    # Ghost will add the /blog/ prefix via its routing configuration
    if path.startswith('/blog/'):
        return path[6:]  # Remove '/blog/' prefix

    # Fallback for other formats
    return path.lstrip('/')


def map_to_ghost_post(frontmatter: Dict, html_content: str) -> Dict:
    """Map frontmatter and content to Ghost post format."""
    slug = extract_slug_from_path(frontmatter['path'])

    # Map published boolean to status
    status = 'published' if frontmatter.get('published', True) else 'draft'

    # Get topics as tags (or empty list if not present)
    tags = frontmatter.get('topics', [])

    # Ensure tags is a list
    if not isinstance(tags, list):
        tags = [tags]

    # Generate keywords meta tag for code injection
    keywords = frontmatter.get('keywords', [])
    keywords_tag = None
    if keywords and isinstance(keywords, list) and len(keywords) > 0:
        # Filter out None values and ensure all keywords are strings
        keywords = [str(k) for k in keywords if k is not None]
        if keywords:  # Only create meta tag if there are valid keywords
            # Escape special characters for HTML attributes
            import html
            keywords_str = ', '.join(keywords)
            keywords_escaped = html.escape(keywords_str, quote=True)
            keywords_tag = f'<meta name="keywords" content="{keywords_escaped}" />'

    # Generate og:image meta tag for code injection
    ogimage_tag = None
    if 'ogImageUrl' in frontmatter and frontmatter['ogImageUrl']:
        import html
        ogimage_url = html.escape(str(frontmatter['ogImageUrl']), quote=True)
        ogimage_tag = f'<meta property="og:image" content="{ogimage_url}" />'

    ghost_post = {
        'title': frontmatter['title'],
        'slug': slug,
        'html': html_content,
        'status': status,
        'published_at': frontmatter['date'],
        'tags': tags
    }

    # Add feature image if present
    if 'headerImageUrl' in frontmatter:
        ghost_post['feature_image'] = frontmatter['headerImageUrl']
        ghost_post['feature_image_alt'] = frontmatter['title']

    # Add code injection if keywords or og:image exist
    codeinjection_tags = []
    if keywords_tag:
        codeinjection_tags.append(keywords_tag)
    if ogimage_tag:
        codeinjection_tags.append(ogimage_tag)

    if codeinjection_tags:
        ghost_post['codeinjection_head'] = '\n'.join(codeinjection_tags)

    return ghost_post


def sync_post_to_ghost(client: GhostAPIClient, file_path: Path, dry_run: bool = False, cleanup_duplicates: bool = False) -> str:
    """Sync a single post to Ghost. Returns status: 'created', 'updated', 'skipped', or 'error'."""
    # Parse the markdown file
    result = parse_markdown_file(file_path)
    if result is None:
        return 'error'

    frontmatter_data, markdown_content = result

    # Skip external post types
    post_type = frontmatter_data.get('type', '')
    if post_type in EXTERNAL_POST_TYPES:
        print(f"  ⊘ Skipping external post type: {post_type}")
        return 'skipped'

    # Convert markdown to HTML
    html_content = convert_markdown_to_html(markdown_content)

    # Map to Ghost post format
    ghost_post = map_to_ghost_post(frontmatter_data, html_content)
    slug = ghost_post['slug']
    title = ghost_post['title']

    # Debug output
    if DEBUG:
        print(f"\n  Slug: {slug}")
        print(f"  Title: {ghost_post['title']}")
        print(f"  Status: {ghost_post['status']}")
        print(f"  Tags: {ghost_post['tags']}")
        print(f"  HTML preview (first 500 chars):\n{html_content[:500]}\n")
        if '<img' in html_content:
            print(f"  ✓ Images found in HTML")
            # Extract and show image tags
            import re
            img_tags = re.findall(r'<img[^>]+>', html_content)
            for img in img_tags[:3]:  # Show first 3 images
                print(f"    {img}")
        else:
            print(f"  ✗ No images found in HTML")

    if dry_run:
        print(f"  [DRY RUN] Would sync: {slug}")
        # Check for duplicates in dry-run mode too
        posts_with_title = client.get_posts_by_title(title)
        if len(posts_with_title) > 1:
            print(f"  ⚠ Found {len(posts_with_title)} posts with title '{title}':")
            for p in posts_with_title:
                print(f"    - {p['slug']} (ID: {p['id']}, updated: {p['updated_at'][:10]})")
        return 'skipped'

    # Check if post exists by slug
    existing_post = client.get_post_by_slug(slug)

    # Also check for duplicates by title
    posts_with_title = client.get_posts_by_title(title)

    # Handle duplicates
    if cleanup_duplicates and len(posts_with_title) > 1:
        print(f"  ⚠ Found {len(posts_with_title)} duplicate posts with title '{title}'")
        # Sort by updated_at to keep the most recent
        posts_sorted = sorted(posts_with_title, key=lambda p: p['updated_at'], reverse=True)
        post_to_keep = posts_sorted[0]
        posts_to_delete = posts_sorted[1:]

        print(f"    Keeping: {post_to_keep['slug']} (ID: {post_to_keep['id']}, updated: {post_to_keep['updated_at'][:10]})")

        for post in posts_to_delete:
            print(f"    Deleting: {post['slug']} (ID: {post['id']}, updated: {post['updated_at'][:10]})")
            success, error = client.delete_post(post['id'])
            if success:
                print(f"    ✓ Deleted duplicate")
            else:
                print(f"    ✗ Error deleting duplicate: {error}")

        # Use the kept post as the existing post
        existing_post = post_to_keep

    if existing_post:
        # Update existing post
        ghost_post['updated_at'] = existing_post['updated_at']
        success, error = client.update_post(existing_post['id'], ghost_post)
        if success:
            return 'updated'
        else:
            print(f"  ✗ Error updating {slug}: {error}")
            return 'error'
    else:
        # Create new post
        success, error = client.create_post(ghost_post)
        if success:
            return 'created'
        else:
            print(f"  ✗ Error creating {slug}: {error}")
            return 'error'


def main():
    """Main function to sync all posts."""
    import argparse

    parser = argparse.ArgumentParser(description='Sync Markdown posts to Ghost CMS')
    parser.add_argument('--dry-run', action='store_true', help='Preview changes without syncing')
    parser.add_argument('--limit', type=int, help='Limit number of posts to process')
    parser.add_argument('--post', type=str, help='Sync only a specific post (filename)')
    parser.add_argument('--cleanup-duplicates', action='store_true', help='Detect and remove duplicate posts with the same title')
    args = parser.parse_args()

    # Load environment variables
    load_dotenv(ENV_FILE)

    api_key = os.getenv('GHOST_ADMIN_API_KEY')
    if not api_key:
        print("Error: GHOST_ADMIN_API_KEY not found in .env file")
        sys.exit(1)

    # Initialize Ghost API client
    try:
        client = GhostAPIClient(GHOST_API_URL, api_key)
    except ValueError as e:
        print(f"Error: {e}")
        sys.exit(1)

    # Find all markdown files
    if args.post:
        # Sync specific post
        post_path = CONTENT_DIR / args.post
        if not post_path.exists():
            print(f"Error: Post not found: {post_path}")
            sys.exit(1)
        markdown_files = [post_path]
    else:
        # Sort in reverse order (newest first)
        markdown_files = sorted(CONTENT_DIR.glob('*.md'), reverse=True)
        if args.limit:
            markdown_files = markdown_files[:args.limit]

    total_posts = len(markdown_files)

    cleanup_msg = " (with duplicate cleanup)" if args.cleanup_duplicates else ""
    if args.dry_run:
        print(f"[DRY RUN MODE] Processing {total_posts} posts from {CONTENT_DIR}{cleanup_msg}...\n")
    else:
        print(f"Processing {total_posts} posts from {CONTENT_DIR}{cleanup_msg}...\n")

    # Track statistics
    stats = {
        'created': 0,
        'updated': 0,
        'error': 0,
        'skipped': 0
    }

    # Process each file
    for i, file_path in enumerate(markdown_files, 1):
        print(f"[{i}/{total_posts}] {file_path.name}")

        status = sync_post_to_ghost(client, file_path, dry_run=args.dry_run, cleanup_duplicates=args.cleanup_duplicates)
        stats[status] += 1

        if status == 'created':
            print(f"  ✓ Created")
        elif status == 'updated':
            print(f"  ✓ Updated")
        elif status == 'skipped' and not args.dry_run:
            print(f"  - Skipped")

        # Small delay to avoid rate limiting (skip in dry-run)
        if not args.dry_run:
            time.sleep(0.1)

    # Print summary
    print("\n" + "=" * 50)
    print("Sync Summary:")
    print(f"  Created: {stats['created']}")
    print(f"  Updated: {stats['updated']}")
    print(f"  Skipped: {stats['skipped']}")
    print(f"  Failed: {stats['error']}")
    print(f"  Total: {total_posts}")
    print("=" * 50)


if __name__ == '__main__':
    main()

As you can see, this script

  • fetches all Markdown files in a specific subdirectory
  • converts the source to HTML
  • uses the Admin API to create the post if not exists, or update existing posts
  • cleans up duplicate posts
  • provided keywords, tags, header images, social images, etc. in the expected format

And while this is cool, do you know what's even cooler? Automating it, of course! So I set up a GitHub Actions workflow to run the script every time content changed:

name: Sync Posts to Ghost CMS

on:
  push:
    branches:
      - main
    paths:
      - 'content/posts/**'
      - 'scripts/sync-to-ghost.py'
      - 'scripts/requirements.txt'
  workflow_dispatch:
    inputs:
      dry_run:
        description: 'Run in dry-run mode'
        required: false
        default: 'false'
        type: choice
        options:
          - 'true'
          - 'false'

jobs:
  sync-to-ghost:
    runs-on: ubuntu-latest
    permissions:
      contents: read

    steps:
      - name: Checkout repository
        uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
          cache: 'pip'

      - name: Install dependencies
        run: pip install -r scripts/requirements.txt

      - name: Sync posts to Ghost
        env:
          GHOST_ADMIN_API_KEY: ${{ secrets.GHOST_ADMIN_API_KEY }}
          PYTHONUNBUFFERED: 1
        run: |
          if [ "${{ github.event.inputs.dry_run }}" = "true" ]; then
            python scripts/sync-to-ghost.py --dry-run
          else
            python scripts/sync-to-ghost.py
          fi

      - name: Upload sync summary
        if: always()
        run: |
          echo "Sync completed. Check logs for details."

This is great for getting all content uploaded to Ghost, but what about using Ghost to author new posts while still relying on Markdown as the single source of truth? Enter the reverse sync process!

Syncing from Ghost to Markdown

Fetching and uploading all Markdown content to Ghost is nice, but it would be even better if I didn't have to enter a code editor to author my blog posts. Don't get me wrong, I love NeoVim now, but writing a blog is not the same as writing code to me, so I would strongly prefer a lighter experience.

After uploading all my initial posts I quickly thought: Why can't I run the same process in reverse? If I can turn Markdown into HTML, the opposite shouldn't be rocket science either. So I set out to create a reverse syncing script.

#!/usr/bin/env python3
"""
Ghost CMS Reverse Sync Script
Syncs posts from Ghost CMS to local Markdown files.
"""

import os
import re
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional
from urllib.parse import urlparse

import jwt
import requests
import frontmatter
from markdownify import MarkdownConverter
from bs4 import BeautifulSoup
from dotenv import load_dotenv


# Configuration
GHOST_API_URL = "https://YOUR_DOMAIN_HERE"
CONTENT_DIR = Path(__file__).parent.parent / "content" / "posts"
STATIC_DIR = Path(__file__).parent.parent / "content" / "static" / "posts"
ENV_FILE = Path(__file__).parent.parent / ".env"


class GhostAPIClient:
    """Client for interacting with Ghost Admin API."""

    def __init__(self, api_url: str, admin_api_key: str):
        self.api_url = api_url.rstrip('/')
        self.admin_api_key = admin_api_key

        # Parse the admin API key
        try:
            self.key_id, self.key_secret = admin_api_key.split(':')
        except ValueError:
            raise ValueError("Invalid GHOST_ADMIN_API_KEY format. Expected 'ID:SECRET'")

    def _generate_jwt_token(self) -> str:
        """Generate a JWT token for Ghost Admin API authentication."""
        iat = int(datetime.now(timezone.utc).timestamp())

        header = {
            'alg': 'HS256',
            'typ': 'JWT',
            'kid': self.key_id
        }

        payload = {
            'iat': iat,
            'exp': iat + 300,  # Token expires in 5 minutes
            'aud': '/admin/'
        }

        token = jwt.encode(payload, bytes.fromhex(self.key_secret), algorithm='HS256', headers=header)
        return token

    def get_all_posts(self) -> List[Dict]:
        """Fetch all posts from Ghost with pagination."""
        all_posts = []
        page = 1

        while True:
            token = self._generate_jwt_token()
            url = f"{self.api_url}/ghost/api/admin/posts/"
            params = {
                'limit': 50,
                'page': page,
                'formats': 'html',
                'include': 'tags,codeinjection_head'
            }
            headers = {
                'Authorization': f'Ghost {token}',
                'Content-Type': 'application/json'
            }

            try:
                response = requests.get(url, params=params, headers=headers)
                response.raise_for_status()
            except Exception as e:
                raise Exception(f"Failed to fetch posts (page {page}): {e}")

            data = response.json()
            posts = data.get('posts', [])
            all_posts.extend(posts)

            # Check if there are more pages
            meta = data.get('meta', {}).get('pagination', {})
            if page >= meta.get('pages', 1):
                break

            page += 1
            time.sleep(0.1)  # Small delay between requests

        return all_posts


class CustomMarkdownConverter(MarkdownConverter):
    """Custom converter that preserves code block languages."""

    def convert_pre(self, el, text, **kwargs):
        """Override pre tag conversion to preserve language classes."""
        if not text:
            return ''

        # Check if this pre contains a code element
        code = el.find('code')
        if code is not None:
            # Extract language from class attribute
            classes = code.get('class', [])
            if isinstance(classes, str):
                classes = classes.split()

            language = ''
            for cls in classes:
                if cls.startswith('language-'):
                    language = cls.replace('language-', '')
                    break

            # Get the code content
            code_text = code.get_text()

            # Return fenced code block with language
            # Ensure code_text ends with newline for proper fence formatting
            if not code_text.endswith('\n'):
                code_text += '\n'

            if language:
                return f'\n```{language}\n{code_text}```\n'
            else:
                return f'\n```\n{code_text}```\n'

        return super().convert_pre(el, text, **kwargs)


def convert_html_to_markdown(html: str) -> str:
    """Convert Ghost HTML to Markdown, preserving code block languages."""
    return CustomMarkdownConverter(
        heading_style="ATX",
        bullets="-",
        escape_asterisks=False,
        escape_underscores=False,
        strip=['script', 'style']
    ).convert(html)


def download_images(html: str, slug: str, dry_run: bool = False) -> None:
    """Download images from HTML to static directory."""
    soup = BeautifulSoup(html, 'html.parser')

    images_to_download = []
    for img in soup.find_all('img'):
        src = img.get('src')
        if not src or not src.startswith('https://static.brunoscheufler.com/posts/'):
            continue

        # Extract the full relative path from the URL
        # e.g., "https://static.brunoscheufler.com/posts/slug/gcloud/1.png"
        # -> extract "slug/gcloud/1.png"
        parsed = urlparse(src)
        url_path = parsed.path  # e.g., "/posts/provisioning-k8s-cluster/gcloud/1.png"

        # Remove '/posts/' prefix to get relative path
        if url_path.startswith('/posts/'):
            relative_path = url_path[7:]  # Remove '/posts/' -> "slug/gcloud/1.png"
        else:
            continue

        # Build target path preserving subdirectories
        target_path = STATIC_DIR / relative_path

        # Check if file exists
        if target_path.exists():
            print(f"    Image already exists: {relative_path}")
            continue

        images_to_download.append((src, target_path, relative_path))

    if not images_to_download:
        return

    if dry_run:
        # In dry-run mode, just log what would be downloaded
        for src, target_path, relative_path in images_to_download:
            print(f"    [DRY RUN] Would download: {relative_path}")
        return

    # Create parent directories and download
    for src, target_path, relative_path in images_to_download:
        try:
            target_path.parent.mkdir(parents=True, exist_ok=True)  # Create all parent dirs
            response = requests.get(src, timeout=10)
            response.raise_for_status()
            target_path.write_bytes(response.content)
            print(f"    Downloaded: {relative_path}")
        except Exception as e:
            print(f"    Warning: Failed to download {src}: {e}")


def extract_keywords_from_code_injection(codeinjection_head: Optional[str]) -> List[str]:
    """Extract keywords from Ghost code injection HTML."""
    if not codeinjection_head:
        return []

    soup = BeautifulSoup(codeinjection_head, 'html.parser')
    keywords_tag = soup.find('meta', {'name': 'keywords'})

    if keywords_tag:
        content = keywords_tag.get('content', '')
        if content:
            return [k.strip() for k in content.split(',')]

    return []


def extract_ogimage_from_code_injection(codeinjection_head: Optional[str]) -> Optional[str]:
    """Extract og:image URL from Ghost code injection HTML."""
    if not codeinjection_head:
        return None

    soup = BeautifulSoup(codeinjection_head, 'html.parser')
    ogimage_tag = soup.find('meta', {'property': 'og:image'})

    if ogimage_tag:
        content = ogimage_tag.get('content', '')
        if content:
            return content.strip()

    return None


def map_ghost_post_to_frontmatter(post: Dict) -> Dict:
    """Map Ghost post fields to frontmatter."""
    # Extract date from published_at or created_at
    published_at = post.get('published_at') or post.get('created_at')

    # Map status
    is_published = post.get('status') == 'published'

    # Extract tag names
    tags = post.get('tags', [])
    topics = [tag['name'] for tag in tags if isinstance(tag, dict)]

    # Build path with /blog/ prefix
    slug = post['slug']
    path = f"/blog/{slug}"

    # Extract keywords from code injection
    keywords = extract_keywords_from_code_injection(post.get('codeinjection_head'))

    # Extract og:image from code injection
    ogimage_url = extract_ogimage_from_code_injection(post.get('codeinjection_head'))

    # Build frontmatter
    frontmatter_data = {
        'published': is_published,
        'type': 'BLOG',  # Default to BLOG, can be manually changed
        'path': path,
        'date': published_at,
        'title': post['title'],
        'keywords': keywords,  # Extracted from code injection, empty if not present
        'topics': topics
    }

    # Add headerImageUrl if feature_image exists
    if post.get('feature_image'):
        frontmatter_data['headerImageUrl'] = post['feature_image']

    # Add ogImageUrl if extracted from code injection
    if ogimage_url:
        frontmatter_data['ogImageUrl'] = ogimage_url

    return frontmatter_data


def generate_filename(post: Dict) -> str:
    """Generate filename from Ghost post."""
    published_at = post.get('published_at') or post.get('created_at')
    date_str = published_at[:10]  # YYYY-MM-DD
    slug = post['slug']

    # If slug already has date prefix, use as-is
    if re.match(r'^\d{4}-\d{2}-\d{2}-', slug):
        return f"{slug}.md"

    return f"{date_str}-{slug}.md"


def sync_post_from_ghost(post: Dict, dry_run: bool = False) -> str:
    """Sync a single post from Ghost. Returns status: 'created', 'updated', or 'skipped'."""
    filename = generate_filename(post)
    file_path = CONTENT_DIR / filename

    # Convert HTML to Markdown
    markdown_content = convert_html_to_markdown(post['html'])

    # Download images (or check which would be downloaded in dry-run)
    download_images(post['html'], post['slug'], dry_run=dry_run)

    # Map to frontmatter
    frontmatter_data = map_ghost_post_to_frontmatter(post)

    # Create frontmatter post
    new_post = frontmatter.Post(markdown_content, **frontmatter_data)

    if dry_run:
        print(f"  [DRY RUN] Would write: {filename}")
        return 'skipped'

    # Check if file exists
    status = 'updated' if file_path.exists() else 'created'

    # Write file
    with open(file_path, 'w', encoding='utf-8') as f:
        f.write(frontmatter.dumps(new_post))

    return status


def main():
    """Main function to sync all posts from Ghost."""
    import argparse

    parser = argparse.ArgumentParser(description='Sync posts from Ghost CMS to Markdown')
    parser.add_argument('--dry-run', action='store_true', help='Preview changes without syncing')
    args = parser.parse_args()

    # Load environment variables
    load_dotenv(ENV_FILE)

    api_key = os.getenv('GHOST_ADMIN_API_KEY')
    if not api_key:
        print("Error: GHOST_ADMIN_API_KEY not found in .env file")
        sys.exit(1)

    # Initialize Ghost API client
    try:
        client = GhostAPIClient(GHOST_API_URL, api_key)
    except ValueError as e:
        print(f"Error: {e}")
        sys.exit(1)

    # Fetch all posts from Ghost
    print("Fetching posts from Ghost...")
    try:
        posts = client.get_all_posts()
    except Exception as e:
        print(f"Error fetching posts: {e}")
        sys.exit(1)

    total_posts = len(posts)

    if args.dry_run:
        print(f"\n[DRY RUN MODE] Processing {total_posts} posts...\n")
    else:
        print(f"\nProcessing {total_posts} posts...\n")

    # Track statistics
    stats = {'created': 0, 'updated': 0, 'skipped': 0}

    # Process each post
    for i, post in enumerate(posts, 1):
        print(f"[{i}/{total_posts}] {post['title']}")

        status = sync_post_from_ghost(post, dry_run=args.dry_run)
        stats[status] += 1

        if status == 'created':
            print(f"  ✓ Created")
        elif status == 'updated':
            print(f"  ✓ Updated")

    # Print summary
    print("\n" + "=" * 50)
    print("Reverse Sync Summary:")
    print(f"  Created: {stats['created']}")
    print(f"  Updated: {stats['updated']}")
    print(f"  Skipped: {stats['skipped']}")
    print(f"  Total: {total_posts}")
    print("=" * 50)


if __name__ == '__main__':
    main()

You can see this script does the following

  • load all posts from the API
  • convert HTML to Markdown while respecting certain rules for code blocks, etc.
  • download images and media from posts into the expected static content directory

I ran this a couple times to verify the format would be as close to the existing source as possible (having 230+ examples to choose from definitely helps with testing) and then ran a full reverse sync once.

And again, what's better than doing this manually? Running it on a schedule!

name: Reverse Sync from Ghost CMS

on:
  schedule:
    # Run daily at 2 AM UTC
    - cron: '0 2 * * *'
  workflow_dispatch:
    inputs:
      dry_run:
        description: 'Run in dry-run mode'
        required: false
        default: 'false'
        type: choice
        options:
          - 'true'
          - 'false'

jobs:
  reverse-sync:
    runs-on: ubuntu-latest
    permissions:
      contents: write
      pull-requests: write

    steps:
      - name: Checkout repository
        uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
          cache: 'pip'

      - name: Install dependencies
        run: pip install -r scripts/requirements.txt

      - name: Run reverse sync
        env:
          GHOST_ADMIN_API_KEY: ${{ secrets.GHOST_ADMIN_API_KEY }}
          PYTHONUNBUFFERED: 1
        run: |
          if [ "${{ github.event.inputs.dry_run }}" = "true" ]; then
            python scripts/sync-from-ghost.py --dry-run
          else
            python scripts/sync-from-ghost.py
          fi

      - name: Check for changes
        id: check_changes
        run: |
          if [ -n "$(git status --porcelain)" ]; then
            echo "has_changes=true" >> $GITHUB_OUTPUT
          else
            echo "has_changes=false" >> $GITHUB_OUTPUT
          fi

      - name: Count changes
        if: steps.check_changes.outputs.has_changes == 'true'
        id: count_changes
        run: |
          # Count new files
          NEW_FILES=$(git status --porcelain | grep "^??" | wc -l | xargs)
          # Count modified files
          MODIFIED_FILES=$(git status --porcelain | grep "^ M" | wc -l | xargs)

          echo "new_files=$NEW_FILES" >> $GITHUB_OUTPUT
          echo "modified_files=$MODIFIED_FILES" >> $GITHUB_OUTPUT

      - name: Create Pull Request
        if: steps.check_changes.outputs.has_changes == 'true'
        env:
          GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
        run: |
          # Configure git
          git config --global user.name "github-actions[bot]"
          git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"

          # Create branch with timestamp
          TIMESTAMP=$(date +%Y%m%d-%H%M%S)
          BRANCH_NAME="ghost-sync-$TIMESTAMP"

          # Create and switch to new branch
          git checkout -b "$BRANCH_NAME"

          # Add all changes
          git add content/posts/ content/static/posts/

          # Commit changes
          git commit -m "Sync posts from Ghost CMS

          - New posts: ${{ steps.count_changes.outputs.new_files }}
          - Updated posts: ${{ steps.count_changes.outputs.modified_files }}

          Automated sync performed at $(date -u +"%Y-%m-%d %H:%M:%S UTC")"

          # Push branch
          git push origin "$BRANCH_NAME"

          # Create PR with detailed body
          gh pr create \
            --title "Sync posts from Ghost CMS ($TIMESTAMP)" \
            --body "## Summary
          This PR contains changes synced from Ghost CMS.

          - **New posts**: ${{ steps.count_changes.outputs.new_files }}
          - **Updated posts**: ${{ steps.count_changes.outputs.modified_files }}
          - **Sync time**: $(date -u +"%Y-%m-%d %H:%M:%S UTC")

          ## Changes

          The following content was synced from Ghost:
          - Markdown post files in \`content/posts/\`
          - Static images in \`content/static/posts/\`

          ## Review Checklist

          - [ ] Review new post content and frontmatter
          - [ ] Check image downloads completed successfully
          - [ ] Verify frontmatter mapping (especially \`type\` and \`keywords\` fields)
          - [ ] Ensure no sensitive content was synced
          - [ ] Test builds locally if significant changes

          ## Notes

          This is an automated sync. The \`type\` field defaults to \`'BLOG'\` and may need manual adjustment for guides/tutorials. The \`keywords\` field is empty and may benefit from manual population." \
            --head "$BRANCH_NAME" \
            --base main

      - name: No changes detected
        if: steps.check_changes.outputs.has_changes == 'false'
        run: |
          echo "No changes detected. Ghost CMS is in sync with repository."

I hope this post serves as inspiration if you've ever thought about setting up something similar. I can totally recommend using Claude Code for the tedious work here, so you can focus on your content!