feat: add Python linting support with pylint and black
Some checks failed
Ansible Lint Check / check-ansible (push) Failing after 16s
Nix Format Check / check-format (push) Successful in 56s
Python Lint Check / check-python (push) Failing after 16s

This commit is contained in:
Menno van Leeuwen 2025-03-12 14:17:43 +01:00
parent 9b3039bc36
commit 140863d674
Signed by: vleeuwenmenno
SSH Key Fingerprint: SHA256:OJFmjANpakwD3F2Rsws4GLtbdz1TJ5tkQF0RZmF0TRE
10 changed files with 593 additions and 283 deletions

42
.github/workflows/python.yml vendored Normal file
View File

@ -0,0 +1,42 @@
name: Python Lint Check
on:
pull_request:
push:
branches: [ master ]
jobs:
check-python:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install Python linting tools
run: |
python -m pip install --upgrade pip
python -m pip install pylint black
- name: Run pylint
run: |
python_files=$(find . -name "*.py" -type f)
if [ -z "$python_files" ]; then
echo "No Python files found to lint"
exit 0
fi
pylint $python_files
- name: Check Black formatting
run: |
python_files=$(find . -name "*.py" -type f)
if [ -z "$python_files" ]; then
echo "No Python files found to lint"
exit 0
fi
black --check $python_files

View File

@ -9,22 +9,27 @@ import subprocess
sys.path.append(os.path.join(os.path.expanduser("~/.dotfiles"), "bin"))
from helpers.functions import printfe, run_command
def check_command_exists(command):
"""Check if a command is available in the system"""
try:
subprocess.run(["which", command],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
subprocess.run(
["which", command],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return True
except subprocess.CalledProcessError:
return False
def list_screen_sessions():
"""List all screen sessions"""
success, output = run_command(["screen", "-ls"])
return output
def wipe_dead_sessions():
"""Check and clean up dead screen sessions"""
screen_list = list_screen_sessions()
@ -32,23 +37,26 @@ def wipe_dead_sessions():
print("Found dead sessions, cleaning up...")
run_command(["screen", "-wipe"])
def is_app_running(app_name):
"""Check if an app is already running in a screen session"""
screen_list = list_screen_sessions()
return app_name in screen_list
def start_app(app_name, command):
"""Start an application in a screen session"""
printfe("green", f"Starting {app_name} with command: {command}...")
run_command(["screen", "-dmS", app_name] + command.split())
time.sleep(1) # Give it a moment to start
def main():
# Define dictionary with app_name => command mapping
apps = {
"vesktop": "vesktop",
"ktailctl": "flatpak run org.fkoehler.KTailctl",
"ulauncher": "ulauncher --no-window-shadow --hide-window"
"ulauncher": "ulauncher --no-window-shadow --hide-window",
}
# Clean up dead sessions if any
@ -74,5 +82,6 @@ def main():
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -9,28 +9,33 @@ from datetime import datetime
sys.path.append(os.path.join(os.path.expanduser("~/.dotfiles"), "bin"))
from helpers.functions import printfe, logo, _rainbow_color, COLORS
def get_last_ssh_login():
"""Get information about the last SSH login"""
try:
result = subprocess.run(['lastlog', '-u', os.environ.get("USER", "")],
capture_output=True, text=True)
result = subprocess.run(
["lastlog", "-u", os.environ.get("USER", "")],
capture_output=True,
text=True,
)
# If lastlog didn't work try lastlog2
if result.returncode != 0:
result = subprocess.run(['lastlog2', os.environ.get("USER", "")],
capture_output=True, text=True)
result = subprocess.run(
["lastlog2", os.environ.get("USER", "")], capture_output=True, text=True
)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
lines = result.stdout.strip().split("\n")
if len(lines) >= 2: # Header line + data line
# Parse the last login line - example format:
# menno ssh 100.99.23.98 Mon Mar 10 19:09:43 +0100 2025
parts = lines[1].split()
if len(parts) >= 7 and 'ssh' in parts[1]: # Check if it's an SSH login
if len(parts) >= 7 and "ssh" in parts[1]: # Check if it's an SSH login
# Extract IP address from the third column
ip = parts[2]
# Time is the rest of the line starting from position 3
time_str = ' '.join(parts[3:])
time_str = " ".join(parts[3:])
return f"{COLORS['cyan']}Last SSH login{COLORS['reset']}{COLORS['yellow']} {time_str}{COLORS['cyan']} from{COLORS['yellow']} {ip}"
return None
except Exception as e:
@ -38,6 +43,7 @@ def get_last_ssh_login():
# print(f"Error getting SSH login: {str(e)}")
return None
def check_dotfiles_status():
"""Check if the dotfiles repository is dirty"""
dotfiles_path = os.environ.get("DOTFILES_PATH", os.path.expanduser("~/.dotfiles"))
@ -47,51 +53,60 @@ def check_dotfiles_status():
# Check for git status details
status = {
'is_dirty': False,
'untracked': 0,
'modified': 0,
'staged': 0,
'commit_hash': '',
'unpushed': 0
"is_dirty": False,
"untracked": 0,
"modified": 0,
"staged": 0,
"commit_hash": "",
"unpushed": 0,
}
# Get status of files
result = subprocess.run(['git', 'status', '--porcelain'],
cwd=dotfiles_path,
capture_output=True, text=True)
result = subprocess.run(
["git", "status", "--porcelain"],
cwd=dotfiles_path,
capture_output=True,
text=True,
)
if result.stdout.strip():
status['is_dirty'] = True
status["is_dirty"] = True
for line in result.stdout.splitlines():
if line.startswith('??'):
status['untracked'] += 1
if line.startswith(' M') or line.startswith('MM'):
status['modified'] += 1
if line.startswith('M ') or line.startswith('A '):
status['staged'] += 1
if line.startswith("??"):
status["untracked"] += 1
if line.startswith(" M") or line.startswith("MM"):
status["modified"] += 1
if line.startswith("M ") or line.startswith("A "):
status["staged"] += 1
# Get current commit hash
result = subprocess.run(['git', 'rev-parse', '--short', 'HEAD'],
cwd=dotfiles_path,
capture_output=True, text=True)
result = subprocess.run(
["git", "rev-parse", "--short", "HEAD"],
cwd=dotfiles_path,
capture_output=True,
text=True,
)
if result.returncode == 0:
status['commit_hash'] = result.stdout.strip()
status["commit_hash"] = result.stdout.strip()
# Count unpushed commits
# Fix: Remove capture_output and set stdout explicitly
result = subprocess.run(['git', 'log', '--oneline', '@{u}..'],
cwd=dotfiles_path,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True)
result = subprocess.run(
["git", "log", "--oneline", "@{u}.."],
cwd=dotfiles_path,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
)
if result.returncode == 0:
status['unpushed'] = len(result.stdout.splitlines())
status["unpushed"] = len(result.stdout.splitlines())
return status
except Exception as e:
print(f"Error checking dotfiles status: {str(e)}")
return None
def get_condensed_status():
"""Generate a condensed status line for trash and git status"""
status_parts = []
@ -110,17 +125,27 @@ def get_condensed_status():
# Check dotfiles status
dotfiles_status = check_dotfiles_status()
if dotfiles_status is not None:
if dotfiles_status['is_dirty']:
if dotfiles_status["is_dirty"]:
status_parts.append(f"{COLORS['yellow']}dotfiles is dirty{COLORS['reset']}")
status_parts.append(f"{COLORS['red']}[{dotfiles_status['untracked']}] untracked{COLORS['reset']}")
status_parts.append(f"{COLORS['yellow']}[{dotfiles_status['modified']}] modified{COLORS['reset']}")
status_parts.append(f"{COLORS['green']}[{dotfiles_status['staged']}] staged{COLORS['reset']}")
status_parts.append(
f"{COLORS['red']}[{dotfiles_status['untracked']}] untracked{COLORS['reset']}"
)
status_parts.append(
f"{COLORS['yellow']}[{dotfiles_status['modified']}] modified{COLORS['reset']}"
)
status_parts.append(
f"{COLORS['green']}[{dotfiles_status['staged']}] staged{COLORS['reset']}"
)
if dotfiles_status['commit_hash']:
status_parts.append(f"{COLORS['white']}[{COLORS['blue']}{dotfiles_status['commit_hash']}{COLORS['white']}]{COLORS['reset']}")
if dotfiles_status["commit_hash"]:
status_parts.append(
f"{COLORS['white']}[{COLORS['blue']}{dotfiles_status['commit_hash']}{COLORS['white']}]{COLORS['reset']}"
)
if dotfiles_status['unpushed'] > 0:
status_parts.append(f"{COLORS['yellow']}[!] You have {dotfiles_status['unpushed']} commit(s) to push{COLORS['reset']}")
if dotfiles_status["unpushed"] > 0:
status_parts.append(
f"{COLORS['yellow']}[!] You have {dotfiles_status['unpushed']} commit(s) to push{COLORS['reset']}"
)
else:
status_parts.append("Unable to check dotfiles status")
@ -128,6 +153,7 @@ def get_condensed_status():
return " - ".join(status_parts)
return None
def welcome():
"""Display welcome message with hostname and username"""
print()
@ -154,10 +180,12 @@ def welcome():
if condensed_status:
print(f"{COLORS['yellow']}{condensed_status}{COLORS['reset']}")
def main():
logo(continue_after=True)
welcome()
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -7,6 +7,7 @@ import sys
sys.path.append(os.path.join(os.path.expanduser("~/.dotfiles"), "bin"))
from helpers.functions import printfe, println, logo
def main():
# Print logo
logo(continue_after=True)
@ -24,5 +25,6 @@ def main():
println(" ", "cyan")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -8,10 +8,11 @@ from pathlib import Path
# Import helper functions
sys.path.append(os.path.join(os.path.dirname(os.path.dirname(__file__))))
from helpers.functions import printfe, ensure_dependencies, command_exists
from helpers.functions import printfe, command_exists
DOTFILES_ROOT = os.path.expanduser("~/.dotfiles")
def lint_ansible(fix=False):
"""Run ansible-lint on Ansible files"""
ansible_dir = os.path.join(DOTFILES_ROOT, "config/ansible")
@ -30,7 +31,10 @@ def lint_ansible(fix=False):
return 0
if not command_exists("ansible-lint"):
printfe("red", "ansible-lint is not installed. Please install it with pip or your package manager.")
printfe(
"red",
"ansible-lint is not installed. Please install it with pip or your package manager.",
)
return 1
printfe("blue", f"Running ansible-lint{' with auto-fix' if fix else ''}...")
@ -44,6 +48,7 @@ def lint_ansible(fix=False):
result = subprocess.run(command)
return result.returncode
def lint_nix():
"""Run nixfmt on Nix files"""
nix_files = list(Path(DOTFILES_ROOT).glob("**/*.nix"))
@ -53,7 +58,10 @@ def lint_nix():
return 0
if not command_exists("nixfmt"):
printfe("red", "nixfmt is not installed. Please install it with nix-env or your package manager.")
printfe(
"red",
"nixfmt is not installed. Please install it with nix-env or your package manager.",
)
return 1
printfe("blue", "Running nixfmt...")
@ -66,16 +74,69 @@ def lint_nix():
return exit_code
def lint_python(fix=False):
"""Run pylint and black on Python files"""
python_files = list(Path(DOTFILES_ROOT).glob("**/*.py"))
if not python_files:
printfe("yellow", "No Python files found to lint")
return 0
exit_code = 0
# Check for pylint
if command_exists("pylint"):
printfe("blue", "Running pylint...")
files_to_lint = [str(f) for f in python_files]
result = subprocess.run(["pylint"] + files_to_lint)
if result.returncode != 0:
exit_code = 1
else:
printfe("yellow", "pylint is not installed. Skipping Python linting.")
# Check for black
if command_exists("black"):
printfe(
"blue", f"Running black{'--check' if not fix else ''} on Python files..."
)
black_args = ["black"]
if not fix:
black_args.append("--check")
black_args.extend([str(f) for f in python_files])
result = subprocess.run(black_args)
if result.returncode != 0:
exit_code = 1
else:
printfe("yellow", "black is not installed. Skipping Python formatting.")
if not command_exists("pylint") and not command_exists("black"):
printfe(
"red",
"Neither pylint nor black is installed. Install them with pip: pip install pylint black",
)
return 1
return exit_code
def main():
parser = argparse.ArgumentParser(description="Run linters on dotfiles")
parser.add_argument("--ansible", action="store_true", help="Run only ansible-lint")
parser.add_argument("--nix", action="store_true", help="Run only nixfmt")
parser.add_argument("--fix", action="store_true", help="Auto-fix issues where possible (for ansible-lint)")
parser.add_argument(
"--python", action="store_true", help="Run only Python linters (pylint, black)"
)
parser.add_argument(
"--fix", action="store_true", help="Auto-fix issues where possible"
)
args = parser.parse_args()
# If no specific linter is specified, run both
run_ansible = args.ansible or not (args.ansible or args.nix)
run_nix = args.nix or not (args.ansible or args.nix)
# If no specific linter is specified, run all
run_ansible = args.ansible or not (args.ansible or args.nix or args.python)
run_nix = args.nix or not (args.ansible or args.nix or args.python)
run_python = args.python or not (args.ansible or args.nix or args.python)
exit_code = 0
@ -89,7 +150,13 @@ def main():
if nix_result != 0:
exit_code = nix_result
if run_python:
python_result = lint_python(fix=args.fix)
if python_result != 0:
exit_code = python_result
return exit_code
if __name__ == "__main__":
sys.exit(main())

View File

@ -10,12 +10,15 @@ import glob
sys.path.append(os.path.join(os.path.expanduser("~/.dotfiles"), "bin"))
from helpers.functions import printfe, run_command
def get_password():
"""Get password from 1Password"""
op_cmd = "op"
# Try to get the password
success, output = run_command([op_cmd, "read", "op://j7nmhqlsjmp2r6umly5t75hzb4/Dotfiles Secrets/password"])
success, output = run_command(
[op_cmd, "read", "op://j7nmhqlsjmp2r6umly5t75hzb4/Dotfiles Secrets/password"]
)
if not success:
printfe("red", "Failed to fetch password from 1Password.")
@ -38,9 +41,11 @@ def get_password():
# We already got the password
return output
def prompt_for_password():
"""Ask for password manually"""
import getpass
printfe("cyan", "Enter the password manually: ")
password = getpass.getpass("")
@ -51,6 +56,7 @@ def prompt_for_password():
printfe("green", "Password entered successfully.")
return password
def calculate_checksum(file_path):
"""Calculate SHA256 checksum of a file"""
sha256_hash = hashlib.sha256()
@ -59,6 +65,7 @@ def calculate_checksum(file_path):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def encrypt_folder(folder_path, password):
"""Recursively encrypt files in a folder"""
for item in glob.glob(os.path.join(folder_path, "*")):
@ -77,7 +84,7 @@ def encrypt_folder(folder_path, password):
# Check if file changed since last encryption
if os.path.exists(checksum_file):
with open(checksum_file, 'r') as f:
with open(checksum_file, "r") as f:
previous_checksum = f.read().strip()
if current_checksum == previous_checksum:
@ -91,10 +98,19 @@ def encrypt_folder(folder_path, password):
# Encrypt the file
printfe("cyan", f"Encrypting {item}...")
cmd = [
"gpg", "--quiet", "--batch", "--yes", "--symmetric",
"--cipher-algo", "AES256", "--armor",
"--passphrase", password,
"--output", gpg_file, item
"gpg",
"--quiet",
"--batch",
"--yes",
"--symmetric",
"--cipher-algo",
"AES256",
"--armor",
"--passphrase",
password,
"--output",
gpg_file,
item,
]
success, _ = run_command(cmd)
@ -103,11 +119,12 @@ def encrypt_folder(folder_path, password):
run_command(["git", "add", "-f", gpg_file])
# Update checksum file
with open(checksum_file, 'w') as f:
with open(checksum_file, "w") as f:
f.write(current_checksum)
else:
printfe("red", f"Failed to encrypt {item}")
def decrypt_folder(folder_path, password):
"""Recursively decrypt files in a folder"""
for item in glob.glob(os.path.join(folder_path, "*")):
@ -117,9 +134,16 @@ def decrypt_folder(folder_path, password):
printfe("cyan", f"Decrypting {item}...")
cmd = [
"gpg", "--quiet", "--batch", "--yes", "--decrypt",
"--passphrase", password,
"--output", output_file, item
"gpg",
"--quiet",
"--batch",
"--yes",
"--decrypt",
"--passphrase",
password,
"--output",
output_file,
item,
]
success, _ = run_command(cmd)
@ -131,6 +155,7 @@ def decrypt_folder(folder_path, password):
printfe("cyan", f"Decrypting folder {item}...")
decrypt_folder(item, password)
def main():
if len(sys.argv) != 2 or sys.argv[1] not in ["encrypt", "decrypt"]:
printfe("red", "Usage: secrets.py [encrypt|decrypt]")
@ -155,5 +180,6 @@ def main():
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -12,6 +12,7 @@ from helpers.functions import printfe, println, logo
# Base directory for Docker services $HOME/services
SERVICES_DIR = os.path.join(os.path.expanduser("~"), "services")
def get_service_path(service_name):
"""Return the path to a service's docker-compose file"""
service_dir = os.path.join(SERVICES_DIR, service_name)
@ -23,6 +24,7 @@ def get_service_path(service_name):
return compose_file
def run_docker_compose(args, service_name=None, compose_file=None):
"""Run docker compose command with provided args"""
if service_name and not compose_file:
@ -41,27 +43,34 @@ def run_docker_compose(args, service_name=None, compose_file=None):
result = subprocess.run(cmd)
return result.returncode
def cmd_start(args):
"""Start a Docker service"""
return run_docker_compose(["up", "-d"], service_name=args.service)
def cmd_stop(args):
"""Stop a Docker service"""
return run_docker_compose(["down"], service_name=args.service)
def cmd_restart(args):
"""Restart a Docker service"""
return run_docker_compose(["restart"], service_name=args.service)
def get_all_running_services():
"""Return a list of all running services"""
if not os.path.exists(SERVICES_DIR):
return []
running_services = []
services = [d for d in os.listdir(SERVICES_DIR)
if os.path.isdir(os.path.join(SERVICES_DIR, d)) and
os.path.exists(os.path.join(SERVICES_DIR, d, "docker-compose.yml"))]
services = [
d
for d in os.listdir(SERVICES_DIR)
if os.path.isdir(os.path.join(SERVICES_DIR, d))
and os.path.exists(os.path.join(SERVICES_DIR, d, "docker-compose.yml"))
]
for service in services:
if check_service_running(service) > 0:
@ -69,6 +78,7 @@ def get_all_running_services():
return running_services
def cmd_update(args):
"""Update a Docker service by pulling new images and recreating containers if needed"""
if args.all:
@ -93,7 +103,10 @@ def cmd_update(args):
failed_services.append(service)
if failed_services:
printfe("red", f"\nFailed to update the following services: {', '.join(failed_services)}")
printfe(
"red",
f"\nFailed to update the following services: {', '.join(failed_services)}",
)
return 1
else:
printfe("green", "\nAll running services updated successfully")
@ -108,6 +121,7 @@ def cmd_update(args):
# Then bring the service up with the latest images
return run_docker_compose(["up", "-d"], service_name=args.service)
def cmd_ps(args):
"""Show Docker service status"""
if args.service:
@ -115,6 +129,7 @@ def cmd_ps(args):
else:
return run_docker_compose(["ps"])
def cmd_logs(args):
"""Show Docker service logs"""
cmd = ["logs"]
@ -127,6 +142,7 @@ def cmd_logs(args):
return run_docker_compose(cmd, service_name=args.service)
def check_service_running(service_name):
"""Check if service has running containers and return the count"""
compose_file = get_service_path(service_name)
@ -136,28 +152,32 @@ def check_service_running(service_name):
result = subprocess.run(
["docker", "compose", "-f", compose_file, "ps", "--quiet"],
capture_output=True,
text=True
text=True,
)
# Count non-empty lines to get container count
containers = [line for line in result.stdout.strip().split('\n') if line]
containers = [line for line in result.stdout.strip().split("\n") if line]
return len(containers)
def cmd_list(args):
"""List available Docker services"""
if not os.path.exists(SERVICES_DIR):
printfe("red", f"Error: Services directory not found at {SERVICES_DIR}")
return 1
services = [d for d in os.listdir(SERVICES_DIR)
if os.path.isdir(os.path.join(SERVICES_DIR, d)) and
os.path.exists(os.path.join(SERVICES_DIR, d, "docker-compose.yml"))]
services = [
d
for d in os.listdir(SERVICES_DIR)
if os.path.isdir(os.path.join(SERVICES_DIR, d))
and os.path.exists(os.path.join(SERVICES_DIR, d, "docker-compose.yml"))
]
if not services:
printfe("yellow", "No Docker services found")
return 0
println("Available Docker services:", 'blue')
println("Available Docker services:", "blue")
for service in sorted(services):
container_count = check_service_running(service)
is_running = container_count > 0
@ -173,6 +193,7 @@ def cmd_list(args):
return 0
def main():
parser = argparse.ArgumentParser(description="Manage Docker services")
subparsers = parser.add_subparsers(dest="command", help="Command to run")
@ -190,9 +211,14 @@ def main():
restart_parser.add_argument("service", help="Service to restart")
# Update command
update_parser = subparsers.add_parser("update", help="Update a Docker service (pull new images and recreate if needed)")
update_parser = subparsers.add_parser(
"update",
help="Update a Docker service (pull new images and recreate if needed)",
)
update_parser_group = update_parser.add_mutually_exclusive_group(required=True)
update_parser_group.add_argument("--all", action="store_true", help="Update all running services")
update_parser_group.add_argument(
"--all", action="store_true", help="Update all running services"
)
update_parser_group.add_argument("service", nargs="?", help="Service to update")
# PS command
@ -202,8 +228,12 @@ def main():
# Logs command
logs_parser = subparsers.add_parser("logs", help="Show Docker service logs")
logs_parser.add_argument("service", help="Service to show logs for")
logs_parser.add_argument("-f", "--follow", action="store_true", help="Follow log output")
logs_parser.add_argument("--tail", help="Number of lines to show from the end of logs")
logs_parser.add_argument(
"-f", "--follow", action="store_true", help="Follow log output"
)
logs_parser.add_argument(
"--tail", help="Number of lines to show from the end of logs"
)
# List command and its alias
subparsers.add_parser("list", help="List available Docker services")
@ -225,10 +255,11 @@ def main():
"ps": cmd_ps,
"logs": cmd_logs,
"list": cmd_list,
"ls": cmd_list # Alias 'ls' to the same function as 'list'
"ls": cmd_list, # Alias 'ls' to the same function as 'list'
}
return commands[args.command](args)
if __name__ == "__main__":
sys.exit(main())

View File

@ -9,17 +9,25 @@ import argparse
sys.path.append(os.path.join(os.path.expanduser("~/.dotfiles"), "bin"))
from helpers.functions import printfe, run_command
def help_message():
"""Print help message and exit"""
printfe("green", "Usage: upgrade.py [options]")
printfe("green", "Options:")
printfe("green", " --ha, -H Upgrade Home Manager packages.")
printfe("green", " --ansible, -A Upgrade Ansible packages.")
printfe("green", " --ansible-verbose Upgrade Ansible packages with verbose output. (-vvv)")
printfe("green", " --full-speed, -F Upgrade packages and use all available cores for compilation. (Default: 8 cores)")
printfe(
"green",
" --ansible-verbose Upgrade Ansible packages with verbose output. (-vvv)",
)
printfe(
"green",
" --full-speed, -F Upgrade packages and use all available cores for compilation. (Default: 8 cores)",
)
printfe("green", " --help, -h Display this help message.")
return 0
def check_git_repository():
"""Check for changes in the dotfiles git repository and prompt user to pull if needed"""
dotfiles_path = os.environ.get("DOTFILES_PATH", os.path.expanduser("~/.dotfiles"))
@ -38,7 +46,9 @@ def check_git_repository():
return False
# Get the current branch name
status, current_branch = run_command(["git", "rev-parse", "--abbrev-ref", "HEAD"], shell=False)
status, current_branch = run_command(
["git", "rev-parse", "--abbrev-ref", "HEAD"], shell=False
)
if not status:
printfe("red", "Failed to determine current branch.")
os.chdir(current_dir)
@ -54,14 +64,21 @@ def check_git_repository():
return False
# Check if remote branch exists
status, output = run_command(["git", "ls-remote", "--heads", "origin", current_branch], shell=False)
status, output = run_command(
["git", "ls-remote", "--heads", "origin", current_branch], shell=False
)
if not status or not output.strip():
printfe("yellow", f"Remote branch 'origin/{current_branch}' not found. Using local branch only.")
printfe(
"yellow",
f"Remote branch 'origin/{current_branch}' not found. Using local branch only.",
)
os.chdir(current_dir)
return True
# Check if we're behind the remote
status, output = run_command(["git", "rev-list", f"HEAD..origin/{current_branch}", "--count"], shell=False)
status, output = run_command(
["git", "rev-list", f"HEAD..origin/{current_branch}", "--count"], shell=False
)
if not status:
printfe("red", f"Failed to check for repository updates: {output}")
os.chdir(current_dir)
@ -69,24 +86,36 @@ def check_git_repository():
behind_count = output.strip()
if behind_count == "0":
printfe("green", f"Dotfiles repository is up to date on branch '{current_branch}'.")
printfe(
"green", f"Dotfiles repository is up to date on branch '{current_branch}'."
)
os.chdir(current_dir)
return True
# Show what changes are available
status, output = run_command(["git", "log", f"HEAD..origin/{current_branch}", "--oneline"], shell=False)
status, output = run_command(
["git", "log", f"HEAD..origin/{current_branch}", "--oneline"], shell=False
)
if status:
printfe("yellow", f"Your dotfiles repository is {behind_count} commit(s) behind on branch '{current_branch}'. Changes:")
printfe(
"yellow",
f"Your dotfiles repository is {behind_count} commit(s) behind on branch '{current_branch}'. Changes:",
)
for line in output.strip().splitlines():
printfe("yellow", f"{line}")
else:
printfe("yellow", f"Your dotfiles repository is {behind_count} commit(s) behind on branch '{current_branch}'.")
printfe(
"yellow",
f"Your dotfiles repository is {behind_count} commit(s) behind on branch '{current_branch}'.",
)
# Ask user if they want to pull changes
response = input("Do you want to pull these changes? (yes/no): ").strip().lower()
if response in ["yes", "y"]:
status, output = run_command(["git", "pull", "origin", current_branch], shell=False)
status, output = run_command(
["git", "pull", "origin", current_branch], shell=False
)
if not status:
printfe("red", f"Failed to pull changes: {output}")
os.chdir(current_dir)
@ -98,6 +127,7 @@ def check_git_repository():
os.chdir(current_dir)
return True
def ensure_ansible_collections():
"""Ensure required Ansible collections are installed"""
# List of required collections that can be expanded in the future
@ -155,10 +185,18 @@ def ensure_ansible_collections():
if missing_collections:
for collection in missing_collections:
printfe("yellow", f"Installing {collection} collection...")
status, install_output = run_command(["ansible-galaxy", "collection", "install", collection], shell=False)
status, install_output = run_command(
["ansible-galaxy", "collection", "install", collection], shell=False
)
if not status:
printfe("yellow", f"Warning: Failed to install {collection} collection: {install_output}")
printfe("yellow", f"Continuing anyway, but playbook might fail if it requires {collection}")
printfe(
"yellow",
f"Warning: Failed to install {collection} collection: {install_output}",
)
printfe(
"yellow",
f"Continuing anyway, but playbook might fail if it requires {collection}",
)
else:
printfe("green", f"Successfully installed {collection} collection")
else:
@ -166,14 +204,27 @@ def ensure_ansible_collections():
return True
def main():
# Parse arguments
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("--ha", "-H", action="store_true", help="Upgrade Home Manager packages")
parser.add_argument("--ansible", "-A", action="store_true", help="Upgrade Ansible packages")
parser.add_argument("--ansible-verbose", action="store_true", help="Upgrade Ansible packages with verbose output")
parser.add_argument("--full-speed", "-F", action="store_true", help="Use all available cores")
parser.add_argument("--help", "-h", action="store_true", help="Display help message")
parser.add_argument(
"--ha", "-H", action="store_true", help="Upgrade Home Manager packages"
)
parser.add_argument(
"--ansible", "-A", action="store_true", help="Upgrade Ansible packages"
)
parser.add_argument(
"--ansible-verbose",
action="store_true",
help="Upgrade Ansible packages with verbose output",
)
parser.add_argument(
"--full-speed", "-F", action="store_true", help="Use all available cores"
)
parser.add_argument(
"--help", "-h", action="store_true", help="Display help message"
)
args = parser.parse_args()
@ -197,6 +248,7 @@ def main():
# Set cores and jobs based on full-speed flag
if args.full_speed:
import multiprocessing
cores = jobs = multiprocessing.cpu_count()
else:
cores = 8
@ -206,14 +258,25 @@ def main():
# Home Manager update
if args.ha:
dotfiles_path = os.environ.get("DOTFILES_PATH", os.path.expanduser("~/.dotfiles"))
dotfiles_path = os.environ.get(
"DOTFILES_PATH", os.path.expanduser("~/.dotfiles")
)
hostname = os.uname().nodename
printfe("cyan", "Updating Home Manager flake...")
os.chdir(f"{dotfiles_path}/config/home-manager")
status, output = run_command(["nix", "--extra-experimental-features", "nix-command",
"--extra-experimental-features", "flakes", "flake", "update"],
shell=False)
status, output = run_command(
[
"nix",
"--extra-experimental-features",
"nix-command",
"--extra-experimental-features",
"flakes",
"flake",
"update",
],
shell=False,
)
if not status:
printfe("red", f"Failed to update Home Manager flake: {output}")
return 1
@ -230,9 +293,23 @@ def main():
env = os.environ.copy()
env["NIXPKGS_ALLOW_UNFREE"] = "1"
cmd = ["home-manager", "--extra-experimental-features", "nix-command",
"--extra-experimental-features", "flakes", "switch", "-b", "backup",
f"--flake", f".#{hostname}", "--impure", "--cores", str(cores), "-j", str(jobs)]
cmd = [
"home-manager",
"--extra-experimental-features",
"nix-command",
"--extra-experimental-features",
"flakes",
"switch",
"-b",
"backup",
f"--flake",
f".#{hostname}",
"--impure",
"--cores",
str(cores),
"-j",
str(jobs),
]
result = subprocess.run(cmd, env=env)
if result.returncode != 0:
@ -244,25 +321,33 @@ def main():
# Ansible update
if args.ansible:
dotfiles_path = os.environ.get("DOTFILES_PATH", os.path.expanduser("~/.dotfiles"))
dotfiles_path = os.environ.get(
"DOTFILES_PATH", os.path.expanduser("~/.dotfiles")
)
hostname = os.uname().nodename
username = os.environ.get("USER", os.environ.get("USERNAME", "user"))
# Ensure required collections are installed
if not ensure_ansible_collections():
printfe("red", "Failed to ensure required Ansible collections are installed")
printfe(
"red", "Failed to ensure required Ansible collections are installed"
)
return 1
printfe("cyan", "Running Ansible playbook...")
ansible_cmd = [
"/usr/bin/env",
"ansible-playbook",
"-i", f"{dotfiles_path}/config/ansible/inventory.ini",
"-i",
f"{dotfiles_path}/config/ansible/inventory.ini",
f"{dotfiles_path}/config/ansible/playbook.yml",
"--extra-vars", f"hostname={hostname}",
"--extra-vars", f"ansible_user={username}",
"--limit", hostname,
"--ask-become-pass"
"--extra-vars",
f"hostname={hostname}",
"--extra-vars",
f"ansible_user={username}",
"--limit",
hostname,
"--ask-become-pass",
]
if args.ansible_verbose:
@ -275,5 +360,6 @@ def main():
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@ -1,12 +1,12 @@
#!/usr/bin/env python3
import os
import sys
import subprocess
import math
import random
import shutil
import datetime
try:
import pyfiglet
except ImportError:
@ -23,9 +23,10 @@ COLORS = {
"cyan": "\033[0;36m",
"white": "\033[0;37m",
"grey": "\033[0;90m", # Added grey color for timestamp
"reset": "\033[0m"
"reset": "\033[0m",
}
def printfe(color, message, show_time=True):
"""
Print a formatted message with the specified color
@ -35,20 +36,21 @@ def printfe(color, message, show_time=True):
if show_time:
# Add timestamp
timestamp = datetime.datetime.now().strftime('%H:%M:%S')
print(f"{COLORS['grey']}{timestamp}{COLORS['reset']}", end='')
timestamp = datetime.datetime.now().strftime("%H:%M:%S")
print(f"{COLORS['grey']}{timestamp}{COLORS['reset']}", end="")
# Add message type based on color
if color.lower() in ["green", "cyan", "blue", "purple"]:
print(f"{COLORS['green']} INF {COLORS['reset']}", end='')
print(f"{COLORS['green']} INF {COLORS['reset']}", end="")
elif color.lower() == "yellow":
print(f"{COLORS['yellow']} WRN {COLORS['reset']}", end='')
print(f"{COLORS['yellow']} WRN {COLORS['reset']}", end="")
elif color.lower() == "red":
print(f"{COLORS['red']} ERR {COLORS['reset']}", end='')
print(f"{COLORS['red']} ERR {COLORS['reset']}", end="")
# Print the actual message with color
print(f"{color_code}{message}{COLORS['reset']}")
def println(message, color=None):
"""Print a line with optional color"""
if color:
@ -56,6 +58,7 @@ def println(message, color=None):
else:
printfe("reset", message)
def _rainbow_color(text, freq=0.1, offset=0):
"""Apply rainbow colors to text similar to lolcat"""
colored_text = ""
@ -73,13 +76,14 @@ def _rainbow_color(text, freq=0.1, offset=0):
return colored_text
def logo(continue_after=False):
"""Display the dotfiles logo"""
try:
# Try to read logo file first for backward compatibility
if pyfiglet:
# Generate ASCII art with pyfiglet and rainbow colors
ascii_art = pyfiglet.figlet_format("Menno's Dotfiles", font='slant')
ascii_art = pyfiglet.figlet_format("Menno's Dotfiles", font="slant")
print("\n") # Add some space before the logo
# Use a random offset to vary the rainbow start position
@ -101,35 +105,46 @@ def logo(continue_after=False):
except Exception as e:
printfe("red", f"Error displaying logo: {e}")
def run_command(command, shell=False):
"""Run a shell command and return the result"""
try:
if not shell and not shutil.which(command[0]):
return False, f"Command '{command[0]}' not found"
result = subprocess.run(command, shell=shell, check=True, text=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
result = subprocess.run(
command,
shell=shell,
check=True,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
return True, result.stdout.strip()
except subprocess.CalledProcessError as e:
return False, e.stderr.strip()
except FileNotFoundError:
return False, f"Command '{command[0]}' not found"
def command_exists(command):
"""Check if a command exists in the PATH"""
import shutil
return shutil.which(command) is not None
def ensure_dependencies():
"""Check and install required dependencies for the dotfiles system"""
required_packages = [
'pyfiglet', # For ASCII art generation
"pyfiglet", # For ASCII art generation
]
# Check if pip is available
success, _ = run_command(['pip', '--version'])
success, _ = run_command(["pip", "--version"])
if not success:
printfe("red", "Pip is required to install missing dependencies, try again after running `dotf update`")
printfe(
"red",
"Pip is required to install missing dependencies, try again after running `dotf update`",
)
return False
missing_packages = []
@ -142,11 +157,13 @@ def ensure_dependencies():
if missing_packages:
printfe("yellow", f"Missing dependencies: {', '.join(missing_packages)}")
install = input("Would you like to install them now? (y/n): ").lower()
if install == 'y' or install == 'yes':
if install == "y" or install == "yes":
printfe("cyan", "Installing missing dependencies...")
for package in missing_packages:
printfe("blue", f"Installing {package}...")
success, output = run_command(['pip', 'install', '--user', package, '--break-system-packages'])
success, output = run_command(
["pip", "install", "--user", package, "--break-system-packages"]
)
if success:
printfe("green", f"Successfully installed {package}")
else:

View File

@ -39,6 +39,8 @@
- python3
- python3-pip
- python3-venv
- pylint
- black
state: present
become: true