Expand country blocking to more high-risk countries
- Add IN, VN, BR, TR, ID, TH, BD, PK, RO to blocked list - Update alternative IP ranges for new countries in script - Enhance documentation with rationale, risk assessment, and best practices - Add test script for verifying country blocking functionality - Improve Ansible tasks for dependency installation
This commit is contained in:
251
config/ansible/files/test_country_blocking.py
Normal file
251
config/ansible/files/test_country_blocking.py
Normal file
@@ -0,0 +1,251 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test script to verify country blocking functionality in Caddy.
|
||||
This script tests access to your services from different country IP ranges.
|
||||
"""
|
||||
|
||||
import requests
|
||||
import sys
|
||||
import json
|
||||
import argparse
|
||||
from typing import List, Dict, Tuple
|
||||
import time
|
||||
import random
|
||||
|
||||
|
||||
# Sample IP addresses from blocked countries for testing
|
||||
TEST_IPS = {
|
||||
'CN': ['1.2.4.8', '14.17.22.35', '27.115.124.56', '36.248.15.72'],
|
||||
'RU': ['2.56.96.15', '5.8.15.32', '31.31.192.45', '77.88.8.8'],
|
||||
'IN': ['1.22.15.45', '14.96.32.78', '27.34.56.123', '117.192.45.89'],
|
||||
'KP': ['175.45.176.10', '210.52.109.15'],
|
||||
'IR': ['2.176.15.45', '5.22.78.123', '37.156.32.89'],
|
||||
'VN': ['14.160.32.45', '27.64.78.123', '113.160.45.89'],
|
||||
'BR': ['177.15.45.89', '179.32.78.123', '186.192.45.89'],
|
||||
'TR': ['31.145.15.45', '78.160.32.89', '94.54.78.123'],
|
||||
'ID': ['36.64.15.45', '114.4.32.89', '182.253.78.123'],
|
||||
'TH': ['27.130.15.45', '110.164.32.89', '202.28.78.123'],
|
||||
'BD': ['27.147.15.45', '114.130.32.89', '202.40.78.123'],
|
||||
'PK': ['39.32.15.45', '115.42.32.89', '202.47.123.45'],
|
||||
'RO': ['31.13.224.45', '89.136.32.89', '212.146.78.123'],
|
||||
'BY': ['31.130.176.45', '85.113.32.89', '212.98.160.123']
|
||||
}
|
||||
|
||||
# Sample IPs from allowed countries for comparison
|
||||
ALLOWED_IPS = {
|
||||
'US': ['8.8.8.8', '1.1.1.1', '208.67.222.222'],
|
||||
'DE': ['85.25.12.34', '193.99.144.85'],
|
||||
'NL': ['145.100.108.155', '194.109.6.66'],
|
||||
'GB': ['212.58.244.20', '151.101.65.140']
|
||||
}
|
||||
|
||||
|
||||
def test_ip_blocking(domain: str, test_ip: str, timeout: int = 10) -> Tuple[int, str, float]:
|
||||
"""
|
||||
Test if an IP is blocked by making a request with X-Forwarded-For header.
|
||||
|
||||
Args:
|
||||
domain: Domain to test against
|
||||
test_ip: IP address to simulate request from
|
||||
timeout: Request timeout in seconds
|
||||
|
||||
Returns:
|
||||
Tuple of (status_code, response_text, response_time)
|
||||
"""
|
||||
headers = {
|
||||
'X-Forwarded-For': test_ip,
|
||||
'X-Real-IP': test_ip,
|
||||
'User-Agent': 'Country-Blocking-Test/1.0'
|
||||
}
|
||||
|
||||
try:
|
||||
start_time = time.time()
|
||||
response = requests.get(f'https://{domain}',
|
||||
headers=headers,
|
||||
timeout=timeout,
|
||||
allow_redirects=False)
|
||||
response_time = time.time() - start_time
|
||||
|
||||
return response.status_code, response.text[:200], response_time
|
||||
|
||||
except requests.exceptions.Timeout:
|
||||
return -1, 'TIMEOUT', timeout
|
||||
except requests.exceptions.RequestException as e:
|
||||
return -2, f'ERROR: {str(e)}', 0.0
|
||||
|
||||
|
||||
def run_blocking_tests(domains: List[str], countries: List[str] = None) -> Dict:
|
||||
"""
|
||||
Run comprehensive blocking tests across multiple domains and countries.
|
||||
|
||||
Args:
|
||||
domains: List of domains to test
|
||||
countries: List of country codes to test (defaults to all blocked countries)
|
||||
|
||||
Returns:
|
||||
Dictionary with test results
|
||||
"""
|
||||
if countries is None:
|
||||
countries = list(TEST_IPS.keys())
|
||||
|
||||
results = {
|
||||
'blocked_tests': {},
|
||||
'allowed_tests': {},
|
||||
'summary': {
|
||||
'total_tests': 0,
|
||||
'blocked_correctly': 0,
|
||||
'allowed_correctly': 0,
|
||||
'errors': 0
|
||||
}
|
||||
}
|
||||
|
||||
print("🚀 Starting country blocking tests...\n")
|
||||
|
||||
# Test blocked countries
|
||||
for country in countries:
|
||||
if country not in TEST_IPS:
|
||||
print(f"⚠️ No test IPs available for {country}")
|
||||
continue
|
||||
|
||||
results['blocked_tests'][country] = {}
|
||||
test_ips = random.sample(TEST_IPS[country], min(2, len(TEST_IPS[country])))
|
||||
|
||||
for domain in domains:
|
||||
print(f"🔒 Testing {country} -> {domain}")
|
||||
results['blocked_tests'][country][domain] = []
|
||||
|
||||
for ip in test_ips:
|
||||
status, response, resp_time = test_ip_blocking(domain, ip)
|
||||
result = {
|
||||
'ip': ip,
|
||||
'status_code': status,
|
||||
'response': response,
|
||||
'response_time': resp_time,
|
||||
'blocked_correctly': status == 403
|
||||
}
|
||||
|
||||
results['blocked_tests'][country][domain].append(result)
|
||||
results['summary']['total_tests'] += 1
|
||||
|
||||
if result['blocked_correctly']:
|
||||
results['summary']['blocked_correctly'] += 1
|
||||
print(f" ✅ {ip} -> {status} (blocked)")
|
||||
elif status > 0:
|
||||
print(f" ❌ {ip} -> {status} (should be blocked!)")
|
||||
else:
|
||||
results['summary']['errors'] += 1
|
||||
print(f" ⚠️ {ip} -> ERROR: {response}")
|
||||
|
||||
time.sleep(0.5) # Rate limiting
|
||||
|
||||
# Test allowed countries for comparison
|
||||
print(f"\n🌍 Testing allowed countries for comparison...\n")
|
||||
|
||||
for country, ips in ALLOWED_IPS.items():
|
||||
results['allowed_tests'][country] = {}
|
||||
test_ip = random.choice(ips)
|
||||
|
||||
for domain in domains:
|
||||
print(f"🌐 Testing {country} -> {domain}")
|
||||
status, response, resp_time = test_ip_blocking(domain, test_ip)
|
||||
|
||||
result = {
|
||||
'ip': test_ip,
|
||||
'status_code': status,
|
||||
'response': response,
|
||||
'response_time': resp_time,
|
||||
'allowed_correctly': status not in [403, -1, -2]
|
||||
}
|
||||
|
||||
results['allowed_tests'][country][domain] = result
|
||||
results['summary']['total_tests'] += 1
|
||||
|
||||
if result['allowed_correctly']:
|
||||
results['summary']['allowed_correctly'] += 1
|
||||
print(f" ✅ {test_ip} -> {status} (allowed)")
|
||||
elif status == 403:
|
||||
print(f" ❌ {test_ip} -> {status} (should be allowed!)")
|
||||
else:
|
||||
results['summary']['errors'] += 1
|
||||
print(f" ⚠️ {test_ip} -> ERROR: {response}")
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def print_summary(results: Dict):
|
||||
"""Print a summary of test results."""
|
||||
summary = results['summary']
|
||||
total = summary['total_tests']
|
||||
|
||||
print(f"\n📊 Test Summary")
|
||||
print(f"{'='*50}")
|
||||
print(f"Total tests run: {total}")
|
||||
print(f"Blocked correctly: {summary['blocked_correctly']}")
|
||||
print(f"Allowed correctly: {summary['allowed_correctly']}")
|
||||
print(f"Errors: {summary['errors']}")
|
||||
|
||||
if total > 0:
|
||||
success_rate = ((summary['blocked_correctly'] + summary['allowed_correctly']) / total) * 100
|
||||
print(f"Success rate: {success_rate:.1f}%")
|
||||
|
||||
if success_rate >= 95:
|
||||
print("🎉 Country blocking is working excellently!")
|
||||
elif success_rate >= 85:
|
||||
print("✅ Country blocking is working well")
|
||||
elif success_rate >= 70:
|
||||
print("⚠️ Country blocking needs attention")
|
||||
else:
|
||||
print("❌ Country blocking has serious issues")
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Test country blocking functionality')
|
||||
parser.add_argument('domains', nargs='+', help='Domains to test (e.g., photos.mvl.sh git.mvl.sh)')
|
||||
parser.add_argument('--countries', nargs='*',
|
||||
help='Specific countries to test (default: all blocked countries)')
|
||||
parser.add_argument('--output', '-o', help='Save results to JSON file')
|
||||
parser.add_argument('--verbose', '-v', action='store_true',
|
||||
help='Show detailed output')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
print("🛡️ Country Blocking Test Suite")
|
||||
print(f"Testing domains: {', '.join(args.domains)}")
|
||||
|
||||
if args.countries:
|
||||
countries = [c.upper() for c in args.countries]
|
||||
print(f"Testing countries: {', '.join(countries)}")
|
||||
else:
|
||||
countries = None
|
||||
print("Testing all blocked countries")
|
||||
|
||||
print()
|
||||
|
||||
# Run tests
|
||||
results = run_blocking_tests(args.domains, countries)
|
||||
|
||||
# Print summary
|
||||
print_summary(results)
|
||||
|
||||
# Save detailed results if requested
|
||||
if args.output:
|
||||
with open(args.output, 'w') as f:
|
||||
json.dump(results, f, indent=2)
|
||||
print(f"\n💾 Detailed results saved to {args.output}")
|
||||
|
||||
# Exit with appropriate code
|
||||
if results['summary']['errors'] > 0:
|
||||
sys.exit(2)
|
||||
elif results['summary']['total_tests'] == 0:
|
||||
sys.exit(1)
|
||||
else:
|
||||
success_rate = ((results['summary']['blocked_correctly'] +
|
||||
results['summary']['allowed_correctly']) /
|
||||
results['summary']['total_tests']) * 100
|
||||
sys.exit(0 if success_rate >= 85 else 1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Reference in New Issue
Block a user