Add country-based IP blocking for Caddy via Ansible
- Introduce generate_country_blocks.py to fetch IP ranges by country - Update group_vars/servers.yml with country blocking settings - Add country_block snippet to Caddyfile and apply to all sites - Create Ansible tasks for automated IP range generation and integration - Add documentation for configuring and managing country blocking
This commit is contained in:
162
config/ansible/files/generate_country_blocks.py
Normal file
162
config/ansible/files/generate_country_blocks.py
Normal file
@@ -0,0 +1,162 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Generate country IP ranges for Caddy blocking configuration.
|
||||
Downloads the latest country IP ranges and formats them for use in Caddyfile.
|
||||
"""
|
||||
|
||||
import json
|
||||
import requests
|
||||
import sys
|
||||
import argparse
|
||||
from typing import List, Dict
|
||||
import ipaddress
|
||||
|
||||
|
||||
def download_country_ranges(countries: List[str]) -> Dict[str, List[str]]:
|
||||
"""
|
||||
Download IP ranges for specified countries from ip2location.com free database.
|
||||
|
||||
Args:
|
||||
countries: List of ISO 3166-1 alpha-2 country codes (e.g., ['CN', 'RU'])
|
||||
|
||||
Returns:
|
||||
Dictionary mapping country codes to lists of IP ranges
|
||||
"""
|
||||
country_ranges = {}
|
||||
|
||||
for country in countries:
|
||||
print(f"Downloading IP ranges for {country}...", file=sys.stderr)
|
||||
|
||||
# Use ipinfo.io's free API for country IP ranges
|
||||
try:
|
||||
url = f"https://ipinfo.io/countries/{country.lower()}"
|
||||
headers = {'User-Agent': 'Mozilla/5.0 (compatible; country-blocker/1.0)'}
|
||||
response = requests.get(url, headers=headers, timeout=30)
|
||||
|
||||
if response.status_code == 200:
|
||||
# The response contains IP ranges in CIDR format, one per line
|
||||
ranges = []
|
||||
for line in response.text.strip().split('\n'):
|
||||
line = line.strip()
|
||||
if line and not line.startswith('#'):
|
||||
try:
|
||||
# Validate the IP range
|
||||
ipaddress.ip_network(line, strict=False)
|
||||
ranges.append(line)
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
country_ranges[country] = ranges
|
||||
print(f"Found {len(ranges)} IP ranges for {country}", file=sys.stderr)
|
||||
else:
|
||||
print(f"Failed to download ranges for {country}: HTTP {response.status_code}", file=sys.stderr)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error downloading ranges for {country}: {e}", file=sys.stderr)
|
||||
|
||||
return country_ranges
|
||||
|
||||
|
||||
def get_alternative_ranges(countries: List[str]) -> Dict[str, List[str]]:
|
||||
"""
|
||||
Alternative method using a different data source.
|
||||
Falls back to manually curated ranges for common blocking scenarios.
|
||||
"""
|
||||
# Common IP ranges for frequently blocked countries
|
||||
# These are examples - you should update with current ranges
|
||||
manual_ranges = {
|
||||
'CN': [
|
||||
'1.0.1.0/24', '1.0.2.0/23', '1.0.8.0/21', '1.0.32.0/19',
|
||||
'14.0.12.0/22', '14.0.36.0/22', '14.1.0.0/18', '14.16.0.0/12',
|
||||
'27.0.0.0/11', '27.50.40.0/21', '27.54.192.0/18', '27.98.208.0/20',
|
||||
# Add more ranges as needed
|
||||
],
|
||||
'RU': [
|
||||
'2.56.96.0/19', '2.58.56.0/21', '5.8.0.0/19', '5.34.96.0/19',
|
||||
'5.42.192.0/19', '5.44.64.0/18', '5.53.96.0/19', '5.61.24.0/21',
|
||||
# Add more ranges as needed
|
||||
],
|
||||
'KP': [
|
||||
'175.45.176.0/22', '210.52.109.0/24', '77.94.35.0/24'
|
||||
],
|
||||
'IR': [
|
||||
'2.176.0.0/12', '5.22.0.0/16', '5.23.0.0/16', '5.56.128.0/17',
|
||||
# Add more ranges as needed
|
||||
]
|
||||
}
|
||||
|
||||
result = {}
|
||||
for country in countries:
|
||||
if country in manual_ranges:
|
||||
result[country] = manual_ranges[country]
|
||||
print(f"Using manual ranges for {country}: {len(manual_ranges[country])} ranges", file=sys.stderr)
|
||||
else:
|
||||
print(f"No manual ranges available for {country}", file=sys.stderr)
|
||||
result[country] = []
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def format_for_caddy(country_ranges: Dict[str, List[str]]) -> List[str]:
|
||||
"""
|
||||
Format IP ranges for use in Caddyfile remote_ip matcher.
|
||||
|
||||
Args:
|
||||
country_ranges: Dictionary of country codes to IP ranges
|
||||
|
||||
Returns:
|
||||
List of IP ranges formatted for Caddy
|
||||
"""
|
||||
all_ranges = []
|
||||
for country, ranges in country_ranges.items():
|
||||
all_ranges.extend(ranges)
|
||||
|
||||
return all_ranges
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Generate country IP ranges for blocking')
|
||||
parser.add_argument('countries', nargs='+',
|
||||
help='Country codes to block (e.g., CN RU KP IR)')
|
||||
parser.add_argument('--format', choices=['caddy', 'json', 'list'],
|
||||
default='caddy', help='Output format')
|
||||
parser.add_argument('--fallback', action='store_true',
|
||||
help='Use manual fallback ranges instead of downloading')
|
||||
parser.add_argument('--output', '-o', help='Output file (default: stdout)')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Normalize country codes to uppercase
|
||||
countries = [c.upper() for c in args.countries]
|
||||
|
||||
# Download or use fallback ranges
|
||||
if args.fallback:
|
||||
country_ranges = get_alternative_ranges(countries)
|
||||
else:
|
||||
country_ranges = download_country_ranges(countries)
|
||||
# If download failed, try fallback
|
||||
if not any(country_ranges.values()):
|
||||
print("Download failed, using fallback ranges...", file=sys.stderr)
|
||||
country_ranges = get_alternative_ranges(countries)
|
||||
|
||||
# Format output
|
||||
if args.format == 'caddy':
|
||||
ranges = format_for_caddy(country_ranges)
|
||||
output = ' '.join(ranges)
|
||||
elif args.format == 'json':
|
||||
output = json.dumps(country_ranges, indent=2)
|
||||
else: # list format
|
||||
ranges = format_for_caddy(country_ranges)
|
||||
output = '\n'.join(ranges)
|
||||
|
||||
# Write output
|
||||
if args.output:
|
||||
with open(args.output, 'w') as f:
|
||||
f.write(output)
|
||||
print(f"Output written to {args.output}", file=sys.stderr)
|
||||
else:
|
||||
print(output)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Reference in New Issue
Block a user