Bidirectional Markdown syncing for Ghost
A couple days ago, I pulled the trigger and moved this blog from Next.js to Ghost. You can read more about the process behind this decision in my other piece. In this post, I want to focus on how the new stack works, and why I think this should stand the test of time. Here goes.
My primary focus for the architecture of this blog always has been the content backbone. Post content has been stored as Markdown files in a Git repository for the past 8 years and I do not expect this to change in the next 8 years either. Images and media are also stored in the repository using Git LFS, and hosted on my static subdomain with proper Cache-Control headers.
Previously, I spent a lot of effort on designing a polished landing page, including animations "inspired" by the best (are you really a web developer until you've attempted to copy Stripe?). For this iteration, my focus was on the content. As long as I found a way to get the Markdown files properly displayed, I'd be happy.
My good friend and frequent collaborator Tim switched his blog to Ghost, an open content and newsletter platform you can self-host. I trust his taste in software and share his idea about ownership, so I didn't think long before making this decision.
Self-hosting Ghost isn't a miracle, and their documentation contains everything you need. Where it gets a little more interesting is my way of syncing content: Most people probably just opt to author their posts in Ghost's fantastic editor (as did I, for the post you're reading at this very moment), yet I wanted the guarantee of having all my content on disk, neatly versioned and all.
Syncing from Markdown to Ghost
To get my 230+ existing posts onto Ghost in the first place, I had to write a script to convert the Markdown source to HTML and upload everything via the built-in Admin API. This was easier than I thought, and with the help of Claude Code, I got to the following solution:
#!/usr/bin/env python3
"""
Ghost CMS Sync Script
Syncs Markdown blog posts from content/posts/ to Ghost CMS.
"""
import os
import sys
import time
import glob as file_glob
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import jwt
import requests
import frontmatter
import markdown2
from dotenv import load_dotenv
# Debug mode - set to True to see HTML output
DEBUG = os.getenv('DEBUG', 'false').lower() == 'true'
# Configuration
GHOST_API_URL = "https://YOUR_URL_HERE"
CONTENT_DIR = Path(__file__).parent.parent / "content" / "posts"
ENV_FILE = Path(__file__).parent.parent / ".env"
# External post types that should not be synced to Ghost
EXTERNAL_POST_TYPES = ['EXTERNAL']
class GhostAPIClient:
"""Client for interacting with Ghost Admin API."""
def __init__(self, api_url: str, admin_api_key: str):
self.api_url = api_url.rstrip('/')
self.admin_api_key = admin_api_key
# Parse the admin API key
try:
self.key_id, self.key_secret = admin_api_key.split(':')
except ValueError:
raise ValueError("Invalid GHOST_ADMIN_API_KEY format. Expected 'ID:SECRET'")
def _generate_jwt_token(self) -> str:
"""Generate a JWT token for Ghost Admin API authentication."""
iat = int(datetime.now(timezone.utc).timestamp())
header = {
'alg': 'HS256',
'typ': 'JWT',
'kid': self.key_id
}
payload = {
'iat': iat,
'exp': iat + 300, # Token expires in 5 minutes
'aud': '/admin/'
}
token = jwt.encode(payload, bytes.fromhex(self.key_secret), algorithm='HS256', headers=header)
return token
def _make_request(self, method: str, endpoint: str, data: Optional[Dict] = None) -> requests.Response:
"""Make an authenticated request to Ghost API."""
token = self._generate_jwt_token()
url = f"{self.api_url}{endpoint}"
headers = {
'Authorization': f'Ghost {token}',
'Content-Type': 'application/json'
}
response = requests.request(method, url, json=data, headers=headers)
return response
def get_post_by_slug(self, slug: str) -> Optional[Dict]:
"""Get a post by its slug."""
try:
response = self._make_request('GET', f'/ghost/api/admin/posts/slug/{slug}/')
if response.status_code == 200:
return response.json()['posts'][0]
elif response.status_code == 404:
return None
else:
print(f" Warning: Unexpected status {response.status_code} when fetching slug {slug}")
return None
except Exception as e:
print(f" Error fetching post by slug {slug}: {e}")
return None
def get_posts_by_title(self, title: str) -> List[Dict]:
"""Get all posts with a specific title."""
try:
# Use filter parameter to search by title
response = self._make_request('GET', f'/ghost/api/admin/posts/?filter=title:\'{title}\'&limit=all')
if response.status_code == 200:
return response.json()['posts']
else:
print(f" Warning: Unexpected status {response.status_code} when searching for title")
return []
except Exception as e:
print(f" Error searching posts by title: {e}")
return []
def delete_post(self, post_id: str) -> Tuple[bool, Optional[str]]:
"""Delete a post from Ghost."""
try:
response = self._make_request('DELETE', f'/ghost/api/admin/posts/{post_id}/')
if response.status_code == 204:
return True, None
else:
error_msg = response.json().get('errors', [{}])[0].get('message', 'Unknown error')
return False, f"Status {response.status_code}: {error_msg}"
except Exception as e:
return False, str(e)
def create_post(self, post_data: Dict) -> Tuple[bool, Optional[str]]:
"""Create a new post in Ghost."""
try:
response = self._make_request('POST', '/ghost/api/admin/posts/?source=html', {'posts': [post_data]})
if response.status_code in [200, 201]:
return True, None
else:
error_msg = response.json().get('errors', [{}])[0].get('message', 'Unknown error')
return False, f"Status {response.status_code}: {error_msg}"
except Exception as e:
return False, str(e)
def update_post(self, post_id: str, post_data: Dict) -> Tuple[bool, Optional[str]]:
"""Update an existing post in Ghost."""
try:
response = self._make_request('PUT', f'/ghost/api/admin/posts/{post_id}/?source=html', {'posts': [post_data]})
if response.status_code == 200:
return True, None
else:
error_msg = response.json().get('errors', [{}])[0].get('message', 'Unknown error')
return False, f"Status {response.status_code}: {error_msg}"
except Exception as e:
return False, str(e)
def parse_markdown_file(file_path: Path) -> Optional[Tuple[Dict, str]]:
"""Parse a markdown file and return frontmatter and content."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
post = frontmatter.load(f)
# Validate required fields
required_fields = ['title', 'path', 'date', 'published']
missing_fields = [field for field in required_fields if field not in post.metadata]
if missing_fields:
print(f" ⚠ Skipping {file_path.name}: Missing fields {missing_fields}")
return None
return post.metadata, post.content
except Exception as e:
print(f" ⚠ Error parsing {file_path.name}: {e}")
return None
def convert_markdown_to_html(markdown_content: str) -> str:
"""Convert Markdown content to HTML, preserving code block language classes."""
import re
import html as html_lib
# Extract fenced code blocks with language info
code_blocks = []
def extract_fenced_block(match):
language = match.group(1) or ''
code = match.group(2)
# Use HTML comment as placeholder (won't be processed by markdown2)
placeholder = f'<!--CODE_BLOCK_{len(code_blocks)}-->'
code_blocks.append((language, code))
return placeholder
# Extract fenced code blocks (with or without language)
fenced_pattern = r'```(\w+)?\n(.*?)```'
temp_md = re.sub(fenced_pattern, extract_fenced_block, markdown_content, flags=re.DOTALL)
# Convert markdown to HTML (without fenced-code-blocks extra since we handle it manually)
# Note: 'code-friendly' removed to enable underscore-based emphasis (_italic_)
extras = ['tables', 'break-on-newline']
html = markdown2.markdown(temp_md, extras=extras)
# Replace placeholders with properly formatted code blocks
for i, (language, code) in enumerate(code_blocks):
placeholder = f'<!--CODE_BLOCK_{i}-->'
code_escaped = html_lib.escape(code.rstrip())
if language:
replacement = f'<pre><code class="language-{language}">{code_escaped}</code></pre>'
else:
replacement = f'<pre><code>{code_escaped}</code></pre>'
html = html.replace(placeholder, replacement)
return html
def extract_slug_from_path(path: str) -> str:
"""Extract slug from the path field."""
# Handle full URLs (external posts)
if path.startswith('http://') or path.startswith('https://'):
# Extract the last segment from the URL path
# e.g., 'https://www.inngest.com/blog/sharding-at-inngest' -> 'sharding-at-inngest'
from urllib.parse import urlparse
parsed = urlparse(path)
slug = parsed.path.rstrip('/').split('/')[-1]
return slug if slug else path
# Handle local paths like '/blog/2024-12-30-post-title'
# The path is like '/blog/2024-12-30-looking-back-on-2024'
# We want to extract just '2024-12-30-looking-back-on-2024'
# Ghost will add the /blog/ prefix via its routing configuration
if path.startswith('/blog/'):
return path[6:] # Remove '/blog/' prefix
# Fallback for other formats
return path.lstrip('/')
def map_to_ghost_post(frontmatter: Dict, html_content: str) -> Dict:
"""Map frontmatter and content to Ghost post format."""
slug = extract_slug_from_path(frontmatter['path'])
# Map published boolean to status
status = 'published' if frontmatter.get('published', True) else 'draft'
# Get topics as tags (or empty list if not present)
tags = frontmatter.get('topics', [])
# Ensure tags is a list
if not isinstance(tags, list):
tags = [tags]
# Generate keywords meta tag for code injection
keywords = frontmatter.get('keywords', [])
keywords_tag = None
if keywords and isinstance(keywords, list) and len(keywords) > 0:
# Filter out None values and ensure all keywords are strings
keywords = [str(k) for k in keywords if k is not None]
if keywords: # Only create meta tag if there are valid keywords
# Escape special characters for HTML attributes
import html
keywords_str = ', '.join(keywords)
keywords_escaped = html.escape(keywords_str, quote=True)
keywords_tag = f'<meta name="keywords" content="{keywords_escaped}" />'
# Generate og:image meta tag for code injection
ogimage_tag = None
if 'ogImageUrl' in frontmatter and frontmatter['ogImageUrl']:
import html
ogimage_url = html.escape(str(frontmatter['ogImageUrl']), quote=True)
ogimage_tag = f'<meta property="og:image" content="{ogimage_url}" />'
ghost_post = {
'title': frontmatter['title'],
'slug': slug,
'html': html_content,
'status': status,
'published_at': frontmatter['date'],
'tags': tags
}
# Add feature image if present
if 'headerImageUrl' in frontmatter:
ghost_post['feature_image'] = frontmatter['headerImageUrl']
ghost_post['feature_image_alt'] = frontmatter['title']
# Add code injection if keywords or og:image exist
codeinjection_tags = []
if keywords_tag:
codeinjection_tags.append(keywords_tag)
if ogimage_tag:
codeinjection_tags.append(ogimage_tag)
if codeinjection_tags:
ghost_post['codeinjection_head'] = '\n'.join(codeinjection_tags)
return ghost_post
def sync_post_to_ghost(client: GhostAPIClient, file_path: Path, dry_run: bool = False, cleanup_duplicates: bool = False) -> str:
"""Sync a single post to Ghost. Returns status: 'created', 'updated', 'skipped', or 'error'."""
# Parse the markdown file
result = parse_markdown_file(file_path)
if result is None:
return 'error'
frontmatter_data, markdown_content = result
# Skip external post types
post_type = frontmatter_data.get('type', '')
if post_type in EXTERNAL_POST_TYPES:
print(f" ⊘ Skipping external post type: {post_type}")
return 'skipped'
# Convert markdown to HTML
html_content = convert_markdown_to_html(markdown_content)
# Map to Ghost post format
ghost_post = map_to_ghost_post(frontmatter_data, html_content)
slug = ghost_post['slug']
title = ghost_post['title']
# Debug output
if DEBUG:
print(f"\n Slug: {slug}")
print(f" Title: {ghost_post['title']}")
print(f" Status: {ghost_post['status']}")
print(f" Tags: {ghost_post['tags']}")
print(f" HTML preview (first 500 chars):\n{html_content[:500]}\n")
if '<img' in html_content:
print(f" ✓ Images found in HTML")
# Extract and show image tags
import re
img_tags = re.findall(r'<img[^>]+>', html_content)
for img in img_tags[:3]: # Show first 3 images
print(f" {img}")
else:
print(f" ✗ No images found in HTML")
if dry_run:
print(f" [DRY RUN] Would sync: {slug}")
# Check for duplicates in dry-run mode too
posts_with_title = client.get_posts_by_title(title)
if len(posts_with_title) > 1:
print(f" ⚠ Found {len(posts_with_title)} posts with title '{title}':")
for p in posts_with_title:
print(f" - {p['slug']} (ID: {p['id']}, updated: {p['updated_at'][:10]})")
return 'skipped'
# Check if post exists by slug
existing_post = client.get_post_by_slug(slug)
# Also check for duplicates by title
posts_with_title = client.get_posts_by_title(title)
# Handle duplicates
if cleanup_duplicates and len(posts_with_title) > 1:
print(f" ⚠ Found {len(posts_with_title)} duplicate posts with title '{title}'")
# Sort by updated_at to keep the most recent
posts_sorted = sorted(posts_with_title, key=lambda p: p['updated_at'], reverse=True)
post_to_keep = posts_sorted[0]
posts_to_delete = posts_sorted[1:]
print(f" Keeping: {post_to_keep['slug']} (ID: {post_to_keep['id']}, updated: {post_to_keep['updated_at'][:10]})")
for post in posts_to_delete:
print(f" Deleting: {post['slug']} (ID: {post['id']}, updated: {post['updated_at'][:10]})")
success, error = client.delete_post(post['id'])
if success:
print(f" ✓ Deleted duplicate")
else:
print(f" ✗ Error deleting duplicate: {error}")
# Use the kept post as the existing post
existing_post = post_to_keep
if existing_post:
# Update existing post
ghost_post['updated_at'] = existing_post['updated_at']
success, error = client.update_post(existing_post['id'], ghost_post)
if success:
return 'updated'
else:
print(f" ✗ Error updating {slug}: {error}")
return 'error'
else:
# Create new post
success, error = client.create_post(ghost_post)
if success:
return 'created'
else:
print(f" ✗ Error creating {slug}: {error}")
return 'error'
def main():
"""Main function to sync all posts."""
import argparse
parser = argparse.ArgumentParser(description='Sync Markdown posts to Ghost CMS')
parser.add_argument('--dry-run', action='store_true', help='Preview changes without syncing')
parser.add_argument('--limit', type=int, help='Limit number of posts to process')
parser.add_argument('--post', type=str, help='Sync only a specific post (filename)')
parser.add_argument('--cleanup-duplicates', action='store_true', help='Detect and remove duplicate posts with the same title')
args = parser.parse_args()
# Load environment variables
load_dotenv(ENV_FILE)
api_key = os.getenv('GHOST_ADMIN_API_KEY')
if not api_key:
print("Error: GHOST_ADMIN_API_KEY not found in .env file")
sys.exit(1)
# Initialize Ghost API client
try:
client = GhostAPIClient(GHOST_API_URL, api_key)
except ValueError as e:
print(f"Error: {e}")
sys.exit(1)
# Find all markdown files
if args.post:
# Sync specific post
post_path = CONTENT_DIR / args.post
if not post_path.exists():
print(f"Error: Post not found: {post_path}")
sys.exit(1)
markdown_files = [post_path]
else:
# Sort in reverse order (newest first)
markdown_files = sorted(CONTENT_DIR.glob('*.md'), reverse=True)
if args.limit:
markdown_files = markdown_files[:args.limit]
total_posts = len(markdown_files)
cleanup_msg = " (with duplicate cleanup)" if args.cleanup_duplicates else ""
if args.dry_run:
print(f"[DRY RUN MODE] Processing {total_posts} posts from {CONTENT_DIR}{cleanup_msg}...\n")
else:
print(f"Processing {total_posts} posts from {CONTENT_DIR}{cleanup_msg}...\n")
# Track statistics
stats = {
'created': 0,
'updated': 0,
'error': 0,
'skipped': 0
}
# Process each file
for i, file_path in enumerate(markdown_files, 1):
print(f"[{i}/{total_posts}] {file_path.name}")
status = sync_post_to_ghost(client, file_path, dry_run=args.dry_run, cleanup_duplicates=args.cleanup_duplicates)
stats[status] += 1
if status == 'created':
print(f" ✓ Created")
elif status == 'updated':
print(f" ✓ Updated")
elif status == 'skipped' and not args.dry_run:
print(f" - Skipped")
# Small delay to avoid rate limiting (skip in dry-run)
if not args.dry_run:
time.sleep(0.1)
# Print summary
print("\n" + "=" * 50)
print("Sync Summary:")
print(f" Created: {stats['created']}")
print(f" Updated: {stats['updated']}")
print(f" Skipped: {stats['skipped']}")
print(f" Failed: {stats['error']}")
print(f" Total: {total_posts}")
print("=" * 50)
if __name__ == '__main__':
main()As you can see, this script
- fetches all Markdown files in a specific subdirectory
- converts the source to HTML
- uses the Admin API to create the post if not exists, or update existing posts
- cleans up duplicate posts
- provided keywords, tags, header images, social images, etc. in the expected format
And while this is cool, do you know what's even cooler? Automating it, of course! So I set up a GitHub Actions workflow to run the script every time content changed:
name: Sync Posts to Ghost CMS
on:
push:
branches:
- main
paths:
- 'content/posts/**'
- 'scripts/sync-to-ghost.py'
- 'scripts/requirements.txt'
workflow_dispatch:
inputs:
dry_run:
description: 'Run in dry-run mode'
required: false
default: 'false'
type: choice
options:
- 'true'
- 'false'
jobs:
sync-to-ghost:
runs-on: ubuntu-latest
permissions:
contents: read
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: pip install -r scripts/requirements.txt
- name: Sync posts to Ghost
env:
GHOST_ADMIN_API_KEY: ${{ secrets.GHOST_ADMIN_API_KEY }}
PYTHONUNBUFFERED: 1
run: |
if [ "${{ github.event.inputs.dry_run }}" = "true" ]; then
python scripts/sync-to-ghost.py --dry-run
else
python scripts/sync-to-ghost.py
fi
- name: Upload sync summary
if: always()
run: |
echo "Sync completed. Check logs for details."This is great for getting all content uploaded to Ghost, but what about using Ghost to author new posts while still relying on Markdown as the single source of truth? Enter the reverse sync process!
Syncing from Ghost to Markdown
Fetching and uploading all Markdown content to Ghost is nice, but it would be even better if I didn't have to enter a code editor to author my blog posts. Don't get me wrong, I love NeoVim now, but writing a blog is not the same as writing code to me, so I would strongly prefer a lighter experience.
After uploading all my initial posts I quickly thought: Why can't I run the same process in reverse? If I can turn Markdown into HTML, the opposite shouldn't be rocket science either. So I set out to create a reverse syncing script.
#!/usr/bin/env python3
"""
Ghost CMS Reverse Sync Script
Syncs posts from Ghost CMS to local Markdown files.
"""
import os
import re
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, List, Optional
from urllib.parse import urlparse
import jwt
import requests
import frontmatter
from markdownify import MarkdownConverter
from bs4 import BeautifulSoup
from dotenv import load_dotenv
# Configuration
GHOST_API_URL = "https://YOUR_DOMAIN_HERE"
CONTENT_DIR = Path(__file__).parent.parent / "content" / "posts"
STATIC_DIR = Path(__file__).parent.parent / "content" / "static" / "posts"
ENV_FILE = Path(__file__).parent.parent / ".env"
class GhostAPIClient:
"""Client for interacting with Ghost Admin API."""
def __init__(self, api_url: str, admin_api_key: str):
self.api_url = api_url.rstrip('/')
self.admin_api_key = admin_api_key
# Parse the admin API key
try:
self.key_id, self.key_secret = admin_api_key.split(':')
except ValueError:
raise ValueError("Invalid GHOST_ADMIN_API_KEY format. Expected 'ID:SECRET'")
def _generate_jwt_token(self) -> str:
"""Generate a JWT token for Ghost Admin API authentication."""
iat = int(datetime.now(timezone.utc).timestamp())
header = {
'alg': 'HS256',
'typ': 'JWT',
'kid': self.key_id
}
payload = {
'iat': iat,
'exp': iat + 300, # Token expires in 5 minutes
'aud': '/admin/'
}
token = jwt.encode(payload, bytes.fromhex(self.key_secret), algorithm='HS256', headers=header)
return token
def get_all_posts(self) -> List[Dict]:
"""Fetch all posts from Ghost with pagination."""
all_posts = []
page = 1
while True:
token = self._generate_jwt_token()
url = f"{self.api_url}/ghost/api/admin/posts/"
params = {
'limit': 50,
'page': page,
'formats': 'html',
'include': 'tags,codeinjection_head'
}
headers = {
'Authorization': f'Ghost {token}',
'Content-Type': 'application/json'
}
try:
response = requests.get(url, params=params, headers=headers)
response.raise_for_status()
except Exception as e:
raise Exception(f"Failed to fetch posts (page {page}): {e}")
data = response.json()
posts = data.get('posts', [])
all_posts.extend(posts)
# Check if there are more pages
meta = data.get('meta', {}).get('pagination', {})
if page >= meta.get('pages', 1):
break
page += 1
time.sleep(0.1) # Small delay between requests
return all_posts
class CustomMarkdownConverter(MarkdownConverter):
"""Custom converter that preserves code block languages."""
def convert_pre(self, el, text, **kwargs):
"""Override pre tag conversion to preserve language classes."""
if not text:
return ''
# Check if this pre contains a code element
code = el.find('code')
if code is not None:
# Extract language from class attribute
classes = code.get('class', [])
if isinstance(classes, str):
classes = classes.split()
language = ''
for cls in classes:
if cls.startswith('language-'):
language = cls.replace('language-', '')
break
# Get the code content
code_text = code.get_text()
# Return fenced code block with language
# Ensure code_text ends with newline for proper fence formatting
if not code_text.endswith('\n'):
code_text += '\n'
if language:
return f'\n```{language}\n{code_text}```\n'
else:
return f'\n```\n{code_text}```\n'
return super().convert_pre(el, text, **kwargs)
def convert_html_to_markdown(html: str) -> str:
"""Convert Ghost HTML to Markdown, preserving code block languages."""
return CustomMarkdownConverter(
heading_style="ATX",
bullets="-",
escape_asterisks=False,
escape_underscores=False,
strip=['script', 'style']
).convert(html)
def download_images(html: str, slug: str, dry_run: bool = False) -> None:
"""Download images from HTML to static directory."""
soup = BeautifulSoup(html, 'html.parser')
images_to_download = []
for img in soup.find_all('img'):
src = img.get('src')
if not src or not src.startswith('https://static.brunoscheufler.com/posts/'):
continue
# Extract the full relative path from the URL
# e.g., "https://static.brunoscheufler.com/posts/slug/gcloud/1.png"
# -> extract "slug/gcloud/1.png"
parsed = urlparse(src)
url_path = parsed.path # e.g., "/posts/provisioning-k8s-cluster/gcloud/1.png"
# Remove '/posts/' prefix to get relative path
if url_path.startswith('/posts/'):
relative_path = url_path[7:] # Remove '/posts/' -> "slug/gcloud/1.png"
else:
continue
# Build target path preserving subdirectories
target_path = STATIC_DIR / relative_path
# Check if file exists
if target_path.exists():
print(f" Image already exists: {relative_path}")
continue
images_to_download.append((src, target_path, relative_path))
if not images_to_download:
return
if dry_run:
# In dry-run mode, just log what would be downloaded
for src, target_path, relative_path in images_to_download:
print(f" [DRY RUN] Would download: {relative_path}")
return
# Create parent directories and download
for src, target_path, relative_path in images_to_download:
try:
target_path.parent.mkdir(parents=True, exist_ok=True) # Create all parent dirs
response = requests.get(src, timeout=10)
response.raise_for_status()
target_path.write_bytes(response.content)
print(f" Downloaded: {relative_path}")
except Exception as e:
print(f" Warning: Failed to download {src}: {e}")
def extract_keywords_from_code_injection(codeinjection_head: Optional[str]) -> List[str]:
"""Extract keywords from Ghost code injection HTML."""
if not codeinjection_head:
return []
soup = BeautifulSoup(codeinjection_head, 'html.parser')
keywords_tag = soup.find('meta', {'name': 'keywords'})
if keywords_tag:
content = keywords_tag.get('content', '')
if content:
return [k.strip() for k in content.split(',')]
return []
def extract_ogimage_from_code_injection(codeinjection_head: Optional[str]) -> Optional[str]:
"""Extract og:image URL from Ghost code injection HTML."""
if not codeinjection_head:
return None
soup = BeautifulSoup(codeinjection_head, 'html.parser')
ogimage_tag = soup.find('meta', {'property': 'og:image'})
if ogimage_tag:
content = ogimage_tag.get('content', '')
if content:
return content.strip()
return None
def map_ghost_post_to_frontmatter(post: Dict) -> Dict:
"""Map Ghost post fields to frontmatter."""
# Extract date from published_at or created_at
published_at = post.get('published_at') or post.get('created_at')
# Map status
is_published = post.get('status') == 'published'
# Extract tag names
tags = post.get('tags', [])
topics = [tag['name'] for tag in tags if isinstance(tag, dict)]
# Build path with /blog/ prefix
slug = post['slug']
path = f"/blog/{slug}"
# Extract keywords from code injection
keywords = extract_keywords_from_code_injection(post.get('codeinjection_head'))
# Extract og:image from code injection
ogimage_url = extract_ogimage_from_code_injection(post.get('codeinjection_head'))
# Build frontmatter
frontmatter_data = {
'published': is_published,
'type': 'BLOG', # Default to BLOG, can be manually changed
'path': path,
'date': published_at,
'title': post['title'],
'keywords': keywords, # Extracted from code injection, empty if not present
'topics': topics
}
# Add headerImageUrl if feature_image exists
if post.get('feature_image'):
frontmatter_data['headerImageUrl'] = post['feature_image']
# Add ogImageUrl if extracted from code injection
if ogimage_url:
frontmatter_data['ogImageUrl'] = ogimage_url
return frontmatter_data
def generate_filename(post: Dict) -> str:
"""Generate filename from Ghost post."""
published_at = post.get('published_at') or post.get('created_at')
date_str = published_at[:10] # YYYY-MM-DD
slug = post['slug']
# If slug already has date prefix, use as-is
if re.match(r'^\d{4}-\d{2}-\d{2}-', slug):
return f"{slug}.md"
return f"{date_str}-{slug}.md"
def sync_post_from_ghost(post: Dict, dry_run: bool = False) -> str:
"""Sync a single post from Ghost. Returns status: 'created', 'updated', or 'skipped'."""
filename = generate_filename(post)
file_path = CONTENT_DIR / filename
# Convert HTML to Markdown
markdown_content = convert_html_to_markdown(post['html'])
# Download images (or check which would be downloaded in dry-run)
download_images(post['html'], post['slug'], dry_run=dry_run)
# Map to frontmatter
frontmatter_data = map_ghost_post_to_frontmatter(post)
# Create frontmatter post
new_post = frontmatter.Post(markdown_content, **frontmatter_data)
if dry_run:
print(f" [DRY RUN] Would write: {filename}")
return 'skipped'
# Check if file exists
status = 'updated' if file_path.exists() else 'created'
# Write file
with open(file_path, 'w', encoding='utf-8') as f:
f.write(frontmatter.dumps(new_post))
return status
def main():
"""Main function to sync all posts from Ghost."""
import argparse
parser = argparse.ArgumentParser(description='Sync posts from Ghost CMS to Markdown')
parser.add_argument('--dry-run', action='store_true', help='Preview changes without syncing')
args = parser.parse_args()
# Load environment variables
load_dotenv(ENV_FILE)
api_key = os.getenv('GHOST_ADMIN_API_KEY')
if not api_key:
print("Error: GHOST_ADMIN_API_KEY not found in .env file")
sys.exit(1)
# Initialize Ghost API client
try:
client = GhostAPIClient(GHOST_API_URL, api_key)
except ValueError as e:
print(f"Error: {e}")
sys.exit(1)
# Fetch all posts from Ghost
print("Fetching posts from Ghost...")
try:
posts = client.get_all_posts()
except Exception as e:
print(f"Error fetching posts: {e}")
sys.exit(1)
total_posts = len(posts)
if args.dry_run:
print(f"\n[DRY RUN MODE] Processing {total_posts} posts...\n")
else:
print(f"\nProcessing {total_posts} posts...\n")
# Track statistics
stats = {'created': 0, 'updated': 0, 'skipped': 0}
# Process each post
for i, post in enumerate(posts, 1):
print(f"[{i}/{total_posts}] {post['title']}")
status = sync_post_from_ghost(post, dry_run=args.dry_run)
stats[status] += 1
if status == 'created':
print(f" ✓ Created")
elif status == 'updated':
print(f" ✓ Updated")
# Print summary
print("\n" + "=" * 50)
print("Reverse Sync Summary:")
print(f" Created: {stats['created']}")
print(f" Updated: {stats['updated']}")
print(f" Skipped: {stats['skipped']}")
print(f" Total: {total_posts}")
print("=" * 50)
if __name__ == '__main__':
main()You can see this script does the following
- load all posts from the API
- convert HTML to Markdown while respecting certain rules for code blocks, etc.
- download images and media from posts into the expected static content directory
I ran this a couple times to verify the format would be as close to the existing source as possible (having 230+ examples to choose from definitely helps with testing) and then ran a full reverse sync once.
And again, what's better than doing this manually? Running it on a schedule!
name: Reverse Sync from Ghost CMS
on:
schedule:
# Run daily at 2 AM UTC
- cron: '0 2 * * *'
workflow_dispatch:
inputs:
dry_run:
description: 'Run in dry-run mode'
required: false
default: 'false'
type: choice
options:
- 'true'
- 'false'
jobs:
reverse-sync:
runs-on: ubuntu-latest
permissions:
contents: write
pull-requests: write
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
cache: 'pip'
- name: Install dependencies
run: pip install -r scripts/requirements.txt
- name: Run reverse sync
env:
GHOST_ADMIN_API_KEY: ${{ secrets.GHOST_ADMIN_API_KEY }}
PYTHONUNBUFFERED: 1
run: |
if [ "${{ github.event.inputs.dry_run }}" = "true" ]; then
python scripts/sync-from-ghost.py --dry-run
else
python scripts/sync-from-ghost.py
fi
- name: Check for changes
id: check_changes
run: |
if [ -n "$(git status --porcelain)" ]; then
echo "has_changes=true" >> $GITHUB_OUTPUT
else
echo "has_changes=false" >> $GITHUB_OUTPUT
fi
- name: Count changes
if: steps.check_changes.outputs.has_changes == 'true'
id: count_changes
run: |
# Count new files
NEW_FILES=$(git status --porcelain | grep "^??" | wc -l | xargs)
# Count modified files
MODIFIED_FILES=$(git status --porcelain | grep "^ M" | wc -l | xargs)
echo "new_files=$NEW_FILES" >> $GITHUB_OUTPUT
echo "modified_files=$MODIFIED_FILES" >> $GITHUB_OUTPUT
- name: Create Pull Request
if: steps.check_changes.outputs.has_changes == 'true'
env:
GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
# Configure git
git config --global user.name "github-actions[bot]"
git config --global user.email "41898282+github-actions[bot]@users.noreply.github.com"
# Create branch with timestamp
TIMESTAMP=$(date +%Y%m%d-%H%M%S)
BRANCH_NAME="ghost-sync-$TIMESTAMP"
# Create and switch to new branch
git checkout -b "$BRANCH_NAME"
# Add all changes
git add content/posts/ content/static/posts/
# Commit changes
git commit -m "Sync posts from Ghost CMS
- New posts: ${{ steps.count_changes.outputs.new_files }}
- Updated posts: ${{ steps.count_changes.outputs.modified_files }}
Automated sync performed at $(date -u +"%Y-%m-%d %H:%M:%S UTC")"
# Push branch
git push origin "$BRANCH_NAME"
# Create PR with detailed body
gh pr create \
--title "Sync posts from Ghost CMS ($TIMESTAMP)" \
--body "## Summary
This PR contains changes synced from Ghost CMS.
- **New posts**: ${{ steps.count_changes.outputs.new_files }}
- **Updated posts**: ${{ steps.count_changes.outputs.modified_files }}
- **Sync time**: $(date -u +"%Y-%m-%d %H:%M:%S UTC")
## Changes
The following content was synced from Ghost:
- Markdown post files in \`content/posts/\`
- Static images in \`content/static/posts/\`
## Review Checklist
- [ ] Review new post content and frontmatter
- [ ] Check image downloads completed successfully
- [ ] Verify frontmatter mapping (especially \`type\` and \`keywords\` fields)
- [ ] Ensure no sensitive content was synced
- [ ] Test builds locally if significant changes
## Notes
This is an automated sync. The \`type\` field defaults to \`'BLOG'\` and may need manual adjustment for guides/tutorials. The \`keywords\` field is empty and may benefit from manual population." \
--head "$BRANCH_NAME" \
--base main
- name: No changes detected
if: steps.check_changes.outputs.has_changes == 'false'
run: |
echo "No changes detected. Ghost CMS is in sync with repository."I hope this post serves as inspiration if you've ever thought about setting up something similar. I can totally recommend using Claude Code for the tedious work here, so you can focus on your content!