From 140863d674622431f2b97067a67bf382875a5b10 Mon Sep 17 00:00:00 2001 From: Menno van Leeuwen Date: Wed, 12 Mar 2025 14:17:43 +0100 Subject: [PATCH] feat: add Python linting support with pylint and black --- .github/workflows/python.yml | 42 +++++ bin/actions/auto-start.py | 31 ++-- bin/actions/hello.py | 144 +++++++++------ bin/actions/help.py | 6 +- bin/actions/lint.py | 113 +++++++++--- bin/actions/secrets.py | 90 ++++++---- bin/actions/service.py | 137 ++++++++------ bin/actions/update.py | 238 +++++++++++++++++-------- bin/helpers/functions.py | 73 +++++--- config/ansible/tasks/global/global.yml | 2 + 10 files changed, 593 insertions(+), 283 deletions(-) create mode 100644 .github/workflows/python.yml diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml new file mode 100644 index 0000000..8398648 --- /dev/null +++ b/.github/workflows/python.yml @@ -0,0 +1,42 @@ +name: Python Lint Check + +on: + pull_request: + push: + branches: [ master ] + +jobs: + check-python: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.10' + + - name: Install Python linting tools + run: | + python -m pip install --upgrade pip + python -m pip install pylint black + + - name: Run pylint + run: | + python_files=$(find . -name "*.py" -type f) + if [ -z "$python_files" ]; then + echo "No Python files found to lint" + exit 0 + fi + + pylint $python_files + + - name: Check Black formatting + run: | + python_files=$(find . -name "*.py" -type f) + if [ -z "$python_files" ]; then + echo "No Python files found to lint" + exit 0 + fi + + black --check $python_files diff --git a/bin/actions/auto-start.py b/bin/actions/auto-start.py index 0248d01..f8d0589 100755 --- a/bin/actions/auto-start.py +++ b/bin/actions/auto-start.py @@ -9,22 +9,27 @@ import subprocess sys.path.append(os.path.join(os.path.expanduser("~/.dotfiles"), "bin")) from helpers.functions import printfe, run_command + def check_command_exists(command): """Check if a command is available in the system""" try: - subprocess.run(["which", command], - check=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + subprocess.run( + ["which", command], + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) return True except subprocess.CalledProcessError: return False + def list_screen_sessions(): """List all screen sessions""" success, output = run_command(["screen", "-ls"]) return output + def wipe_dead_sessions(): """Check and clean up dead screen sessions""" screen_list = list_screen_sessions() @@ -32,47 +37,51 @@ def wipe_dead_sessions(): print("Found dead sessions, cleaning up...") run_command(["screen", "-wipe"]) + def is_app_running(app_name): """Check if an app is already running in a screen session""" screen_list = list_screen_sessions() return app_name in screen_list + def start_app(app_name, command): """Start an application in a screen session""" printfe("green", f"Starting {app_name} with command: {command}...") run_command(["screen", "-dmS", app_name] + command.split()) time.sleep(1) # Give it a moment to start + def main(): # Define dictionary with app_name => command mapping apps = { "vesktop": "vesktop", "ktailctl": "flatpak run org.fkoehler.KTailctl", - "ulauncher": "ulauncher --no-window-shadow --hide-window" + "ulauncher": "ulauncher --no-window-shadow --hide-window", } - + # Clean up dead sessions if any wipe_dead_sessions() - + print("Starting auto-start applications...") for app_name, command in apps.items(): # Get the binary name (first part of the command) command_binary = command.split()[0] - + # Check if the command exists if check_command_exists(command_binary): # Check if the app is already running if is_app_running(app_name): printfe("yellow", f"{app_name} is already running. Skipping...") continue - + # Start the application start_app(app_name, command) - + # Display screen sessions print(list_screen_sessions()) - + return 0 + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/actions/hello.py b/bin/actions/hello.py index 187e1af..f948672 100755 --- a/bin/actions/hello.py +++ b/bin/actions/hello.py @@ -9,28 +9,33 @@ from datetime import datetime sys.path.append(os.path.join(os.path.expanduser("~/.dotfiles"), "bin")) from helpers.functions import printfe, logo, _rainbow_color, COLORS + def get_last_ssh_login(): """Get information about the last SSH login""" try: - result = subprocess.run(['lastlog', '-u', os.environ.get("USER", "")], - capture_output=True, text=True) - + result = subprocess.run( + ["lastlog", "-u", os.environ.get("USER", "")], + capture_output=True, + text=True, + ) + # If lastlog didn't work try lastlog2 if result.returncode != 0: - result = subprocess.run(['lastlog2', os.environ.get("USER", "")], - capture_output=True, text=True) + result = subprocess.run( + ["lastlog2", os.environ.get("USER", "")], capture_output=True, text=True + ) if result.returncode == 0: - lines = result.stdout.strip().split('\n') + lines = result.stdout.strip().split("\n") if len(lines) >= 2: # Header line + data line # Parse the last login line - example format: # menno ssh 100.99.23.98 Mon Mar 10 19:09:43 +0100 2025 parts = lines[1].split() - if len(parts) >= 7 and 'ssh' in parts[1]: # Check if it's an SSH login + if len(parts) >= 7 and "ssh" in parts[1]: # Check if it's an SSH login # Extract IP address from the third column ip = parts[2] # Time is the rest of the line starting from position 3 - time_str = ' '.join(parts[3:]) + time_str = " ".join(parts[3:]) return f"{COLORS['cyan']}Last SSH login{COLORS['reset']}{COLORS['yellow']} {time_str}{COLORS['cyan']} from{COLORS['yellow']} {ip}" return None except Exception as e: @@ -38,64 +43,74 @@ def get_last_ssh_login(): # print(f"Error getting SSH login: {str(e)}") return None + def check_dotfiles_status(): """Check if the dotfiles repository is dirty""" dotfiles_path = os.environ.get("DOTFILES_PATH", os.path.expanduser("~/.dotfiles")) try: if not os.path.isdir(os.path.join(dotfiles_path, ".git")): return None - + # Check for git status details status = { - 'is_dirty': False, - 'untracked': 0, - 'modified': 0, - 'staged': 0, - 'commit_hash': '', - 'unpushed': 0 + "is_dirty": False, + "untracked": 0, + "modified": 0, + "staged": 0, + "commit_hash": "", + "unpushed": 0, } - + # Get status of files - result = subprocess.run(['git', 'status', '--porcelain'], - cwd=dotfiles_path, - capture_output=True, text=True) - + result = subprocess.run( + ["git", "status", "--porcelain"], + cwd=dotfiles_path, + capture_output=True, + text=True, + ) + if result.stdout.strip(): - status['is_dirty'] = True + status["is_dirty"] = True for line in result.stdout.splitlines(): - if line.startswith('??'): - status['untracked'] += 1 - if line.startswith(' M') or line.startswith('MM'): - status['modified'] += 1 - if line.startswith('M ') or line.startswith('A '): - status['staged'] += 1 - + if line.startswith("??"): + status["untracked"] += 1 + if line.startswith(" M") or line.startswith("MM"): + status["modified"] += 1 + if line.startswith("M ") or line.startswith("A "): + status["staged"] += 1 + # Get current commit hash - result = subprocess.run(['git', 'rev-parse', '--short', 'HEAD'], - cwd=dotfiles_path, - capture_output=True, text=True) + result = subprocess.run( + ["git", "rev-parse", "--short", "HEAD"], + cwd=dotfiles_path, + capture_output=True, + text=True, + ) if result.returncode == 0: - status['commit_hash'] = result.stdout.strip() - + status["commit_hash"] = result.stdout.strip() + # Count unpushed commits # Fix: Remove capture_output and set stdout explicitly - result = subprocess.run(['git', 'log', '--oneline', '@{u}..'], - cwd=dotfiles_path, - stdout=subprocess.PIPE, - stderr=subprocess.DEVNULL, - text=True) + result = subprocess.run( + ["git", "log", "--oneline", "@{u}.."], + cwd=dotfiles_path, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + text=True, + ) if result.returncode == 0: - status['unpushed'] = len(result.stdout.splitlines()) - + status["unpushed"] = len(result.stdout.splitlines()) + return status except Exception as e: print(f"Error checking dotfiles status: {str(e)}") return None + def get_condensed_status(): """Generate a condensed status line for trash and git status""" status_parts = [] - + # Check trash status trash_path = os.path.expanduser("~/.local/share/Trash/files") try: @@ -106,58 +121,71 @@ def get_condensed_status(): status_parts.append(f"[!] {count} file(s) in trash") except Exception: pass - + # Check dotfiles status dotfiles_status = check_dotfiles_status() if dotfiles_status is not None: - if dotfiles_status['is_dirty']: + if dotfiles_status["is_dirty"]: status_parts.append(f"{COLORS['yellow']}dotfiles is dirty{COLORS['reset']}") - status_parts.append(f"{COLORS['red']}[{dotfiles_status['untracked']}] untracked{COLORS['reset']}") - status_parts.append(f"{COLORS['yellow']}[{dotfiles_status['modified']}] modified{COLORS['reset']}") - status_parts.append(f"{COLORS['green']}[{dotfiles_status['staged']}] staged{COLORS['reset']}") - - if dotfiles_status['commit_hash']: - status_parts.append(f"{COLORS['white']}[{COLORS['blue']}{dotfiles_status['commit_hash']}{COLORS['white']}]{COLORS['reset']}") - - if dotfiles_status['unpushed'] > 0: - status_parts.append(f"{COLORS['yellow']}[!] You have {dotfiles_status['unpushed']} commit(s) to push{COLORS['reset']}") + status_parts.append( + f"{COLORS['red']}[{dotfiles_status['untracked']}] untracked{COLORS['reset']}" + ) + status_parts.append( + f"{COLORS['yellow']}[{dotfiles_status['modified']}] modified{COLORS['reset']}" + ) + status_parts.append( + f"{COLORS['green']}[{dotfiles_status['staged']}] staged{COLORS['reset']}" + ) + + if dotfiles_status["commit_hash"]: + status_parts.append( + f"{COLORS['white']}[{COLORS['blue']}{dotfiles_status['commit_hash']}{COLORS['white']}]{COLORS['reset']}" + ) + + if dotfiles_status["unpushed"] > 0: + status_parts.append( + f"{COLORS['yellow']}[!] You have {dotfiles_status['unpushed']} commit(s) to push{COLORS['reset']}" + ) else: status_parts.append("Unable to check dotfiles status") - + if status_parts: return " - ".join(status_parts) return None + def welcome(): """Display welcome message with hostname and username""" print() - + # Get hostname and username hostname = os.uname().nodename username = os.environ.get("USER", os.environ.get("USERNAME", "user")) - + # Get SSH login info first ssh_login = get_last_ssh_login() - + print(f"{COLORS['cyan']}You're logged in on [", end="") print(_rainbow_color(hostname), end="") print(f"{COLORS['cyan']}] as [", end="") print(_rainbow_color(username), end="") print(f"{COLORS['cyan']}]{COLORS['reset']}") - + # Display last SSH login info if available if ssh_login: print(f"{ssh_login}{COLORS['reset']}") - + # Display condensed status line condensed_status = get_condensed_status() if condensed_status: print(f"{COLORS['yellow']}{condensed_status}{COLORS['reset']}") + def main(): logo(continue_after=True) welcome() return 0 + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/actions/help.py b/bin/actions/help.py index f0c4401..43151a4 100755 --- a/bin/actions/help.py +++ b/bin/actions/help.py @@ -7,10 +7,11 @@ import sys sys.path.append(os.path.join(os.path.expanduser("~/.dotfiles"), "bin")) from helpers.functions import printfe, println, logo + def main(): # Print logo logo(continue_after=True) - + # Print help dotfiles_path = os.environ.get("DOTFILES_PATH", os.path.expanduser("~/.dotfiles")) try: @@ -20,9 +21,10 @@ def main(): except Exception as e: printfe("red", f"Error reading help file: {e}") return 1 - + println(" ", "cyan") return 0 + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/actions/lint.py b/bin/actions/lint.py index 0c3deb2..7b8173c 100755 --- a/bin/actions/lint.py +++ b/bin/actions/lint.py @@ -8,54 +8,62 @@ from pathlib import Path # Import helper functions sys.path.append(os.path.join(os.path.dirname(os.path.dirname(__file__)))) -from helpers.functions import printfe, ensure_dependencies, command_exists +from helpers.functions import printfe, command_exists DOTFILES_ROOT = os.path.expanduser("~/.dotfiles") + def lint_ansible(fix=False): """Run ansible-lint on Ansible files""" ansible_dir = os.path.join(DOTFILES_ROOT, "config/ansible") - + if not os.path.isdir(ansible_dir): printfe("yellow", "No ansible directory found at config/ansible") return 0 - + # Find all YAML files in the ansible directory yaml_files = [] for ext in [".yml", ".yaml"]: yaml_files.extend(list(Path(ansible_dir).glob(f"**/*{ext}"))) - + if not yaml_files: printfe("yellow", "No Ansible files found in config/ansible to lint") return 0 - + if not command_exists("ansible-lint"): - printfe("red", "ansible-lint is not installed. Please install it with pip or your package manager.") + printfe( + "red", + "ansible-lint is not installed. Please install it with pip or your package manager.", + ) return 1 - + printfe("blue", f"Running ansible-lint{' with auto-fix' if fix else ''}...") files_to_lint = [str(f) for f in yaml_files] - + command = ["ansible-lint"] if fix: command.append("--fix") command.extend(files_to_lint) - + result = subprocess.run(command) return result.returncode + def lint_nix(): """Run nixfmt on Nix files""" nix_files = list(Path(DOTFILES_ROOT).glob("**/*.nix")) - + if not nix_files: printfe("yellow", "No Nix files found to lint") return 0 - + if not command_exists("nixfmt"): - printfe("red", "nixfmt is not installed. Please install it with nix-env or your package manager.") + printfe( + "red", + "nixfmt is not installed. Please install it with nix-env or your package manager.", + ) return 1 - + printfe("blue", "Running nixfmt...") exit_code = 0 for nix_file in nix_files: @@ -63,33 +71,92 @@ def lint_nix(): result = subprocess.run(["nixfmt", str(nix_file)]) if result.returncode != 0: exit_code = 1 - + return exit_code + +def lint_python(fix=False): + """Run pylint and black on Python files""" + python_files = list(Path(DOTFILES_ROOT).glob("**/*.py")) + + if not python_files: + printfe("yellow", "No Python files found to lint") + return 0 + + exit_code = 0 + + # Check for pylint + if command_exists("pylint"): + printfe("blue", "Running pylint...") + files_to_lint = [str(f) for f in python_files] + result = subprocess.run(["pylint"] + files_to_lint) + if result.returncode != 0: + exit_code = 1 + else: + printfe("yellow", "pylint is not installed. Skipping Python linting.") + + # Check for black + if command_exists("black"): + printfe( + "blue", f"Running black{'--check' if not fix else ''} on Python files..." + ) + black_args = ["black"] + if not fix: + black_args.append("--check") + black_args.extend([str(f) for f in python_files]) + + result = subprocess.run(black_args) + if result.returncode != 0: + exit_code = 1 + else: + printfe("yellow", "black is not installed. Skipping Python formatting.") + + if not command_exists("pylint") and not command_exists("black"): + printfe( + "red", + "Neither pylint nor black is installed. Install them with pip: pip install pylint black", + ) + return 1 + + return exit_code + + def main(): parser = argparse.ArgumentParser(description="Run linters on dotfiles") parser.add_argument("--ansible", action="store_true", help="Run only ansible-lint") parser.add_argument("--nix", action="store_true", help="Run only nixfmt") - parser.add_argument("--fix", action="store_true", help="Auto-fix issues where possible (for ansible-lint)") + parser.add_argument( + "--python", action="store_true", help="Run only Python linters (pylint, black)" + ) + parser.add_argument( + "--fix", action="store_true", help="Auto-fix issues where possible" + ) args = parser.parse_args() - - # If no specific linter is specified, run both - run_ansible = args.ansible or not (args.ansible or args.nix) - run_nix = args.nix or not (args.ansible or args.nix) - + + # If no specific linter is specified, run all + run_ansible = args.ansible or not (args.ansible or args.nix or args.python) + run_nix = args.nix or not (args.ansible or args.nix or args.python) + run_python = args.python or not (args.ansible or args.nix or args.python) + exit_code = 0 - + if run_ansible: ansible_result = lint_ansible(fix=args.fix) if ansible_result != 0: exit_code = ansible_result - + if run_nix: nix_result = lint_nix() if nix_result != 0: exit_code = nix_result - + + if run_python: + python_result = lint_python(fix=args.fix) + if python_result != 0: + exit_code = python_result + return exit_code + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/actions/secrets.py b/bin/actions/secrets.py index 7dbc8e6..4279a9c 100755 --- a/bin/actions/secrets.py +++ b/bin/actions/secrets.py @@ -10,23 +10,26 @@ import glob sys.path.append(os.path.join(os.path.expanduser("~/.dotfiles"), "bin")) from helpers.functions import printfe, run_command + def get_password(): """Get password from 1Password""" op_cmd = "op" - + # Try to get the password - success, output = run_command([op_cmd, "read", "op://j7nmhqlsjmp2r6umly5t75hzb4/Dotfiles Secrets/password"]) - + success, output = run_command( + [op_cmd, "read", "op://j7nmhqlsjmp2r6umly5t75hzb4/Dotfiles Secrets/password"] + ) + if not success: printfe("red", "Failed to fetch password from 1Password.") return None - + # Check if we need to use a token if "use 'op item get" in output: # Extract the token token = output.split("use 'op item get ")[1].split(" --")[0] printfe("cyan", f"Got fetch token: {token}") - + # Use the token to get the actual password success, password = run_command( [op_cmd, "item", "get", token, "--reveal", "--fields", "password"] @@ -37,20 +40,23 @@ def get_password(): else: # We already got the password return output - + + def prompt_for_password(): """Ask for password manually""" import getpass + printfe("cyan", "Enter the password manually: ") password = getpass.getpass("") - + if not password: printfe("red", "Password cannot be empty.") sys.exit(1) - + printfe("green", "Password entered successfully.") return password + def calculate_checksum(file_path): """Calculate SHA256 checksum of a file""" sha256_hash = hashlib.sha256() @@ -59,55 +65,66 @@ def calculate_checksum(file_path): sha256_hash.update(byte_block) return sha256_hash.hexdigest() + def encrypt_folder(folder_path, password): """Recursively encrypt files in a folder""" for item in glob.glob(os.path.join(folder_path, "*")): # Skip .gpg and .sha256 files if item.endswith(".gpg") or item.endswith(".sha256"): continue - + # Handle directories recursively if os.path.isdir(item): encrypt_folder(item, password) continue - + # Calculate current checksum current_checksum = calculate_checksum(item) checksum_file = f"{item}.sha256" - + # Check if file changed since last encryption if os.path.exists(checksum_file): - with open(checksum_file, 'r') as f: + with open(checksum_file, "r") as f: previous_checksum = f.read().strip() - + if current_checksum == previous_checksum: continue - + # Remove existing .gpg file if it exists gpg_file = f"{item}.gpg" if os.path.exists(gpg_file): os.remove(gpg_file) - + # Encrypt the file printfe("cyan", f"Encrypting {item}...") cmd = [ - "gpg", "--quiet", "--batch", "--yes", "--symmetric", - "--cipher-algo", "AES256", "--armor", - "--passphrase", password, - "--output", gpg_file, item + "gpg", + "--quiet", + "--batch", + "--yes", + "--symmetric", + "--cipher-algo", + "AES256", + "--armor", + "--passphrase", + password, + "--output", + gpg_file, + item, ] - + success, _ = run_command(cmd) if success: printfe("cyan", f"Staging {item} for commit...") run_command(["git", "add", "-f", gpg_file]) - + # Update checksum file - with open(checksum_file, 'w') as f: + with open(checksum_file, "w") as f: f.write(current_checksum) else: printfe("red", f"Failed to encrypt {item}") + def decrypt_folder(folder_path, password): """Recursively decrypt files in a folder""" for item in glob.glob(os.path.join(folder_path, "*")): @@ -115,36 +132,44 @@ def decrypt_folder(folder_path, password): if item.endswith(".gpg"): output_file = item[:-4] # Remove .gpg extension printfe("cyan", f"Decrypting {item}...") - + cmd = [ - "gpg", "--quiet", "--batch", "--yes", "--decrypt", - "--passphrase", password, - "--output", output_file, item + "gpg", + "--quiet", + "--batch", + "--yes", + "--decrypt", + "--passphrase", + password, + "--output", + output_file, + item, ] - + success, _ = run_command(cmd) if not success: printfe("red", f"Failed to decrypt {item}") - + # Process directories recursively elif os.path.isdir(item): printfe("cyan", f"Decrypting folder {item}...") decrypt_folder(item, password) + def main(): if len(sys.argv) != 2 or sys.argv[1] not in ["encrypt", "decrypt"]: printfe("red", "Usage: secrets.py [encrypt|decrypt]") return 1 - + # Get the dotfiles path dotfiles_path = os.environ.get("DOTFILES_PATH", os.path.expanduser("~/.dotfiles")) secrets_path = os.path.join(dotfiles_path, "secrets") - + # Get the password password = get_password() if not password: password = prompt_for_password() - + # Perform the requested action if sys.argv[1] == "encrypt": printfe("cyan", "Encrypting secrets...") @@ -152,8 +177,9 @@ def main(): else: # decrypt printfe("cyan", "Decrypting secrets...") decrypt_folder(secrets_path, password) - + return 0 + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/actions/service.py b/bin/actions/service.py index 93a51c7..908ae79 100755 --- a/bin/actions/service.py +++ b/bin/actions/service.py @@ -12,63 +12,73 @@ from helpers.functions import printfe, println, logo # Base directory for Docker services $HOME/services SERVICES_DIR = os.path.join(os.path.expanduser("~"), "services") + def get_service_path(service_name): """Return the path to a service's docker-compose file""" service_dir = os.path.join(SERVICES_DIR, service_name) compose_file = os.path.join(service_dir, "docker-compose.yml") - + if not os.path.exists(compose_file): printfe("red", f"Error: Service '{service_name}' not found at {compose_file}") return None - + return compose_file + def run_docker_compose(args, service_name=None, compose_file=None): """Run docker compose command with provided args""" if service_name and not compose_file: compose_file = get_service_path(service_name) if not compose_file: return 1 - + cmd = ["docker", "compose"] - + if compose_file: cmd.extend(["-f", compose_file]) - + cmd.extend(args) - + printfe("blue", f"Running: {' '.join(cmd)}") result = subprocess.run(cmd) return result.returncode + def cmd_start(args): """Start a Docker service""" return run_docker_compose(["up", "-d"], service_name=args.service) + def cmd_stop(args): """Stop a Docker service""" return run_docker_compose(["down"], service_name=args.service) + def cmd_restart(args): """Restart a Docker service""" return run_docker_compose(["restart"], service_name=args.service) + def get_all_running_services(): """Return a list of all running services""" if not os.path.exists(SERVICES_DIR): return [] - + running_services = [] - services = [d for d in os.listdir(SERVICES_DIR) - if os.path.isdir(os.path.join(SERVICES_DIR, d)) and - os.path.exists(os.path.join(SERVICES_DIR, d, "docker-compose.yml"))] - + services = [ + d + for d in os.listdir(SERVICES_DIR) + if os.path.isdir(os.path.join(SERVICES_DIR, d)) + and os.path.exists(os.path.join(SERVICES_DIR, d, "docker-compose.yml")) + ] + for service in services: if check_service_running(service) > 0: running_services.append(service) - + return running_services + def cmd_update(args): """Update a Docker service by pulling new images and recreating containers if needed""" if args.all: @@ -76,24 +86,27 @@ def cmd_update(args): if not running_services: printfe("yellow", "No running services found to update") return 0 - + printfe("blue", f"Updating all running services: {', '.join(running_services)}") - + failed_services = [] for service in running_services: printfe("blue", f"\n=== Updating {service} ===") - + # Pull the latest images pull_result = run_docker_compose(["pull"], service_name=service) - + # Bring the service up with the latest images up_result = run_docker_compose(["up", "-d"], service_name=service) - + if pull_result != 0 or up_result != 0: failed_services.append(service) - + if failed_services: - printfe("red", f"\nFailed to update the following services: {', '.join(failed_services)}") + printfe( + "red", + f"\nFailed to update the following services: {', '.join(failed_services)}", + ) return 1 else: printfe("green", "\nAll running services updated successfully") @@ -104,10 +117,11 @@ def cmd_update(args): pull_result = run_docker_compose(["pull"], service_name=args.service) if pull_result != 0: return pull_result - + # Then bring the service up with the latest images return run_docker_compose(["up", "-d"], service_name=args.service) + def cmd_ps(args): """Show Docker service status""" if args.service: @@ -115,107 +129,123 @@ def cmd_ps(args): else: return run_docker_compose(["ps"]) + def cmd_logs(args): """Show Docker service logs""" cmd = ["logs"] - + if args.follow: cmd.append("-f") - + if args.tail: cmd.extend(["--tail", args.tail]) - + return run_docker_compose(cmd, service_name=args.service) + def check_service_running(service_name): """Check if service has running containers and return the count""" compose_file = get_service_path(service_name) if not compose_file: return 0 - + result = subprocess.run( ["docker", "compose", "-f", compose_file, "ps", "--quiet"], capture_output=True, - text=True + text=True, ) - + # Count non-empty lines to get container count - containers = [line for line in result.stdout.strip().split('\n') if line] + containers = [line for line in result.stdout.strip().split("\n") if line] return len(containers) + def cmd_list(args): """List available Docker services""" if not os.path.exists(SERVICES_DIR): printfe("red", f"Error: Services directory not found at {SERVICES_DIR}") return 1 - - services = [d for d in os.listdir(SERVICES_DIR) - if os.path.isdir(os.path.join(SERVICES_DIR, d)) and - os.path.exists(os.path.join(SERVICES_DIR, d, "docker-compose.yml"))] - + + services = [ + d + for d in os.listdir(SERVICES_DIR) + if os.path.isdir(os.path.join(SERVICES_DIR, d)) + and os.path.exists(os.path.join(SERVICES_DIR, d, "docker-compose.yml")) + ] + if not services: printfe("yellow", "No Docker services found") return 0 - - println("Available Docker services:", 'blue') + + println("Available Docker services:", "blue") for service in sorted(services): container_count = check_service_running(service) is_running = container_count > 0 - + if is_running: status = f"[RUNNING - {container_count} container{'s' if container_count > 1 else ''}]" color = "green" else: status = "[STOPPED]" color = "red" - + printfe(color, f" - {service:<20} {status}") - + return 0 + def main(): parser = argparse.ArgumentParser(description="Manage Docker services") subparsers = parser.add_subparsers(dest="command", help="Command to run") - + # Start command start_parser = subparsers.add_parser("start", help="Start a Docker service") start_parser.add_argument("service", help="Service to start") - + # Stop command stop_parser = subparsers.add_parser("stop", help="Stop a Docker service") stop_parser.add_argument("service", help="Service to stop") - + # Restart command restart_parser = subparsers.add_parser("restart", help="Restart a Docker service") restart_parser.add_argument("service", help="Service to restart") - + # Update command - update_parser = subparsers.add_parser("update", help="Update a Docker service (pull new images and recreate if needed)") + update_parser = subparsers.add_parser( + "update", + help="Update a Docker service (pull new images and recreate if needed)", + ) update_parser_group = update_parser.add_mutually_exclusive_group(required=True) - update_parser_group.add_argument("--all", action="store_true", help="Update all running services") + update_parser_group.add_argument( + "--all", action="store_true", help="Update all running services" + ) update_parser_group.add_argument("service", nargs="?", help="Service to update") - + # PS command ps_parser = subparsers.add_parser("ps", help="Show Docker service status") ps_parser.add_argument("service", nargs="?", help="Service to check") - + # Logs command logs_parser = subparsers.add_parser("logs", help="Show Docker service logs") logs_parser.add_argument("service", help="Service to show logs for") - logs_parser.add_argument("-f", "--follow", action="store_true", help="Follow log output") - logs_parser.add_argument("--tail", help="Number of lines to show from the end of logs") - + logs_parser.add_argument( + "-f", "--follow", action="store_true", help="Follow log output" + ) + logs_parser.add_argument( + "--tail", help="Number of lines to show from the end of logs" + ) + # List command and its alias subparsers.add_parser("list", help="List available Docker services") subparsers.add_parser("ls", help="List available Docker services (alias for list)") - + # Parse arguments args = parser.parse_args() - + if not args.command: parser.print_help() return 1 - + # Execute the appropriate command commands = { "start": cmd_start, @@ -225,10 +255,11 @@ def main(): "ps": cmd_ps, "logs": cmd_logs, "list": cmd_list, - "ls": cmd_list # Alias 'ls' to the same function as 'list' + "ls": cmd_list, # Alias 'ls' to the same function as 'list' } - + return commands[args.command](args) + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/actions/update.py b/bin/actions/update.py index b5028f1..6442ce7 100755 --- a/bin/actions/update.py +++ b/bin/actions/update.py @@ -9,84 +9,113 @@ import argparse sys.path.append(os.path.join(os.path.expanduser("~/.dotfiles"), "bin")) from helpers.functions import printfe, run_command + def help_message(): """Print help message and exit""" printfe("green", "Usage: upgrade.py [options]") printfe("green", "Options:") printfe("green", " --ha, -H Upgrade Home Manager packages.") printfe("green", " --ansible, -A Upgrade Ansible packages.") - printfe("green", " --ansible-verbose Upgrade Ansible packages with verbose output. (-vvv)") - printfe("green", " --full-speed, -F Upgrade packages and use all available cores for compilation. (Default: 8 cores)") + printfe( + "green", + " --ansible-verbose Upgrade Ansible packages with verbose output. (-vvv)", + ) + printfe( + "green", + " --full-speed, -F Upgrade packages and use all available cores for compilation. (Default: 8 cores)", + ) printfe("green", " --help, -h Display this help message.") return 0 + def check_git_repository(): """Check for changes in the dotfiles git repository and prompt user to pull if needed""" dotfiles_path = os.environ.get("DOTFILES_PATH", os.path.expanduser("~/.dotfiles")) - + printfe("cyan", "Checking for updates in dotfiles repository...") - + # Change to dotfiles directory current_dir = os.getcwd() os.chdir(dotfiles_path) - + # Check if this is a git repository status, _ = run_command(["git", "rev-parse", "--is-inside-work-tree"], shell=False) if not status: printfe("red", "The dotfiles directory is not a git repository.") os.chdir(current_dir) return False - + # Get the current branch name - status, current_branch = run_command(["git", "rev-parse", "--abbrev-ref", "HEAD"], shell=False) + status, current_branch = run_command( + ["git", "rev-parse", "--abbrev-ref", "HEAD"], shell=False + ) if not status: printfe("red", "Failed to determine current branch.") os.chdir(current_dir) return False - + current_branch = current_branch.strip() - + # Fetch the latest changes status, output = run_command(["git", "fetch"], shell=False) if not status: printfe("red", f"Failed to fetch changes from git repository: {output}") os.chdir(current_dir) return False - + # Check if remote branch exists - status, output = run_command(["git", "ls-remote", "--heads", "origin", current_branch], shell=False) + status, output = run_command( + ["git", "ls-remote", "--heads", "origin", current_branch], shell=False + ) if not status or not output.strip(): - printfe("yellow", f"Remote branch 'origin/{current_branch}' not found. Using local branch only.") + printfe( + "yellow", + f"Remote branch 'origin/{current_branch}' not found. Using local branch only.", + ) os.chdir(current_dir) return True - + # Check if we're behind the remote - status, output = run_command(["git", "rev-list", f"HEAD..origin/{current_branch}", "--count"], shell=False) + status, output = run_command( + ["git", "rev-list", f"HEAD..origin/{current_branch}", "--count"], shell=False + ) if not status: printfe("red", f"Failed to check for repository updates: {output}") os.chdir(current_dir) return False - + behind_count = output.strip() if behind_count == "0": - printfe("green", f"Dotfiles repository is up to date on branch '{current_branch}'.") + printfe( + "green", f"Dotfiles repository is up to date on branch '{current_branch}'." + ) os.chdir(current_dir) return True - + # Show what changes are available - status, output = run_command(["git", "log", f"HEAD..origin/{current_branch}", "--oneline"], shell=False) + status, output = run_command( + ["git", "log", f"HEAD..origin/{current_branch}", "--oneline"], shell=False + ) if status: - printfe("yellow", f"Your dotfiles repository is {behind_count} commit(s) behind on branch '{current_branch}'. Changes:") + printfe( + "yellow", + f"Your dotfiles repository is {behind_count} commit(s) behind on branch '{current_branch}'. Changes:", + ) for line in output.strip().splitlines(): printfe("yellow", f" • {line}") else: - printfe("yellow", f"Your dotfiles repository is {behind_count} commit(s) behind on branch '{current_branch}'.") - + printfe( + "yellow", + f"Your dotfiles repository is {behind_count} commit(s) behind on branch '{current_branch}'.", + ) + # Ask user if they want to pull changes response = input("Do you want to pull these changes? (yes/no): ").strip().lower() - + if response in ["yes", "y"]: - status, output = run_command(["git", "pull", "origin", current_branch], shell=False) + status, output = run_command( + ["git", "pull", "origin", current_branch], shell=False + ) if not status: printfe("red", f"Failed to pull changes: {output}") os.chdir(current_dir) @@ -94,19 +123,20 @@ def check_git_repository(): printfe("green", "Successfully updated dotfiles repository.") else: printfe("yellow", "Skipping repository update.") - + os.chdir(current_dir) return True + def ensure_ansible_collections(): """Ensure required Ansible collections are installed""" # List of required collections that can be expanded in the future required_collections = [ "community.general", ] - + printfe("cyan", "Checking for required Ansible collections...") - + # Get list of installed collections using ansible-galaxy status, output = run_command(["ansible-galaxy", "collection", "list"], shell=False) if not status: @@ -116,27 +146,27 @@ def ensure_ansible_collections(): else: # Parse output to get installed collections installed_collections = [] - + # Split output into lines and process lines = output.splitlines() collection_section = False - + for line in lines: line = line.strip() - + # Skip empty lines if not line: continue - + # Check if we've reached the collection listing section if line.startswith("Collection"): collection_section = True continue - + # Skip the separator line after the header if collection_section and line.startswith("--"): continue - + # Process collection entries if collection_section and " " in line: # Format is typically: "community.general 10.4.0" @@ -144,80 +174,113 @@ def ensure_ansible_collections(): if len(parts) >= 1: collection_name = parts[0] installed_collections.append(collection_name) - + # Check which required collections are missing missing_collections = [] for collection in required_collections: if collection not in installed_collections: missing_collections.append(collection) - + # Install missing collections if missing_collections: for collection in missing_collections: printfe("yellow", f"Installing {collection} collection...") - status, install_output = run_command(["ansible-galaxy", "collection", "install", collection], shell=False) + status, install_output = run_command( + ["ansible-galaxy", "collection", "install", collection], shell=False + ) if not status: - printfe("yellow", f"Warning: Failed to install {collection} collection: {install_output}") - printfe("yellow", f"Continuing anyway, but playbook might fail if it requires {collection}") + printfe( + "yellow", + f"Warning: Failed to install {collection} collection: {install_output}", + ) + printfe( + "yellow", + f"Continuing anyway, but playbook might fail if it requires {collection}", + ) else: printfe("green", f"Successfully installed {collection} collection") else: printfe("green", "All required collections are already installed.") - + return True + def main(): # Parse arguments parser = argparse.ArgumentParser(add_help=False) - parser.add_argument("--ha", "-H", action="store_true", help="Upgrade Home Manager packages") - parser.add_argument("--ansible", "-A", action="store_true", help="Upgrade Ansible packages") - parser.add_argument("--ansible-verbose", action="store_true", help="Upgrade Ansible packages with verbose output") - parser.add_argument("--full-speed", "-F", action="store_true", help="Use all available cores") - parser.add_argument("--help", "-h", action="store_true", help="Display help message") - + parser.add_argument( + "--ha", "-H", action="store_true", help="Upgrade Home Manager packages" + ) + parser.add_argument( + "--ansible", "-A", action="store_true", help="Upgrade Ansible packages" + ) + parser.add_argument( + "--ansible-verbose", + action="store_true", + help="Upgrade Ansible packages with verbose output", + ) + parser.add_argument( + "--full-speed", "-F", action="store_true", help="Use all available cores" + ) + parser.add_argument( + "--help", "-h", action="store_true", help="Display help message" + ) + args = parser.parse_args() - + if args.help: return help_message() - + # If no specific option provided, run all if not args.ha and not args.ansible and not args.ansible_verbose: args.ha = True args.ansible = True - + # If ansible_verbose is set, also set ansible if args.ansible_verbose: args.ansible = True - + # Always check git repository first if not check_git_repository(): printfe("red", "Failed to check or update dotfiles repository.") return 1 - + # Set cores and jobs based on full-speed flag if args.full_speed: import multiprocessing + cores = jobs = multiprocessing.cpu_count() else: cores = 8 jobs = 1 - + printfe("cyan", f"Limiting to {cores} cores with {jobs} jobs.") - + # Home Manager update if args.ha: - dotfiles_path = os.environ.get("DOTFILES_PATH", os.path.expanduser("~/.dotfiles")) + dotfiles_path = os.environ.get( + "DOTFILES_PATH", os.path.expanduser("~/.dotfiles") + ) hostname = os.uname().nodename - + printfe("cyan", "Updating Home Manager flake...") os.chdir(f"{dotfiles_path}/config/home-manager") - status, output = run_command(["nix", "--extra-experimental-features", "nix-command", - "--extra-experimental-features", "flakes", "flake", "update"], - shell=False) + status, output = run_command( + [ + "nix", + "--extra-experimental-features", + "nix-command", + "--extra-experimental-features", + "flakes", + "flake", + "update", + ], + shell=False, + ) if not status: printfe("red", f"Failed to update Home Manager flake: {output}") return 1 - + # Check if home-manager is installed status, _ = run_command(["which", "home-manager"], shell=False) if status: @@ -225,15 +288,29 @@ def main(): backup_file = os.path.expanduser("~/.config/mimeapps.list.backup") if os.path.exists(backup_file): os.remove(backup_file) - + printfe("cyan", "Upgrading Home Manager packages...") env = os.environ.copy() env["NIXPKGS_ALLOW_UNFREE"] = "1" - - cmd = ["home-manager", "--extra-experimental-features", "nix-command", - "--extra-experimental-features", "flakes", "switch", "-b", "backup", - f"--flake", f".#{hostname}", "--impure", "--cores", str(cores), "-j", str(jobs)] - + + cmd = [ + "home-manager", + "--extra-experimental-features", + "nix-command", + "--extra-experimental-features", + "flakes", + "switch", + "-b", + "backup", + f"--flake", + f".#{hostname}", + "--impure", + "--cores", + str(cores), + "-j", + str(jobs), + ] + result = subprocess.run(cmd, env=env) if result.returncode != 0: printfe("red", "Failed to upgrade Home Manager packages.") @@ -241,39 +318,48 @@ def main(): else: printfe("red", "Home Manager is not installed.") return 1 - + # Ansible update if args.ansible: - dotfiles_path = os.environ.get("DOTFILES_PATH", os.path.expanduser("~/.dotfiles")) + dotfiles_path = os.environ.get( + "DOTFILES_PATH", os.path.expanduser("~/.dotfiles") + ) hostname = os.uname().nodename username = os.environ.get("USER", os.environ.get("USERNAME", "user")) - + # Ensure required collections are installed if not ensure_ansible_collections(): - printfe("red", "Failed to ensure required Ansible collections are installed") + printfe( + "red", "Failed to ensure required Ansible collections are installed" + ) return 1 - + printfe("cyan", "Running Ansible playbook...") ansible_cmd = [ "/usr/bin/env", "ansible-playbook", - "-i", f"{dotfiles_path}/config/ansible/inventory.ini", + "-i", + f"{dotfiles_path}/config/ansible/inventory.ini", f"{dotfiles_path}/config/ansible/playbook.yml", - "--extra-vars", f"hostname={hostname}", - "--extra-vars", f"ansible_user={username}", - "--limit", hostname, - "--ask-become-pass" + "--extra-vars", + f"hostname={hostname}", + "--extra-vars", + f"ansible_user={username}", + "--limit", + hostname, + "--ask-become-pass", ] - + if args.ansible_verbose: ansible_cmd.append("-vvv") - + result = subprocess.run(ansible_cmd) if result.returncode != 0: printfe("red", "Failed to upgrade Ansible packages.") return 1 - + return 0 + if __name__ == "__main__": sys.exit(main()) diff --git a/bin/helpers/functions.py b/bin/helpers/functions.py index 4f03872..20870c8 100644 --- a/bin/helpers/functions.py +++ b/bin/helpers/functions.py @@ -1,12 +1,12 @@ #!/usr/bin/env python3 -import os import sys import subprocess import math import random import shutil import datetime + try: import pyfiglet except ImportError: @@ -23,32 +23,34 @@ COLORS = { "cyan": "\033[0;36m", "white": "\033[0;37m", "grey": "\033[0;90m", # Added grey color for timestamp - "reset": "\033[0m" + "reset": "\033[0m", } + def printfe(color, message, show_time=True): """ Print a formatted message with the specified color With timestamp and message type prefix similar to setup.sh """ color_code = COLORS.get(color.lower(), COLORS["reset"]) - + if show_time: # Add timestamp - timestamp = datetime.datetime.now().strftime('%H:%M:%S') - print(f"{COLORS['grey']}{timestamp}{COLORS['reset']}", end='') - + timestamp = datetime.datetime.now().strftime("%H:%M:%S") + print(f"{COLORS['grey']}{timestamp}{COLORS['reset']}", end="") + # Add message type based on color if color.lower() in ["green", "cyan", "blue", "purple"]: - print(f"{COLORS['green']} INF {COLORS['reset']}", end='') + print(f"{COLORS['green']} INF {COLORS['reset']}", end="") elif color.lower() == "yellow": - print(f"{COLORS['yellow']} WRN {COLORS['reset']}", end='') + print(f"{COLORS['yellow']} WRN {COLORS['reset']}", end="") elif color.lower() == "red": - print(f"{COLORS['red']} ERR {COLORS['reset']}", end='') - + print(f"{COLORS['red']} ERR {COLORS['reset']}", end="") + # Print the actual message with color print(f"{color_code}{message}{COLORS['reset']}") + def println(message, color=None): """Print a line with optional color""" if color: @@ -56,6 +58,7 @@ def println(message, color=None): else: printfe("reset", message) + def _rainbow_color(text, freq=0.1, offset=0): """Apply rainbow colors to text similar to lolcat""" colored_text = "" @@ -65,27 +68,28 @@ def _rainbow_color(text, freq=0.1, offset=0): r = int(127 * math.sin(freq * i + offset + 0) + 128) g = int(127 * math.sin(freq * i + offset + 2 * math.pi / 3) + 128) b = int(127 * math.sin(freq * i + offset + 4 * math.pi / 3) + 128) - + # Apply the RGB color to the character colored_text += f"\033[38;2;{r};{g};{b}m{char}\033[0m" else: colored_text += char - + return colored_text + def logo(continue_after=False): """Display the dotfiles logo""" try: # Try to read logo file first for backward compatibility if pyfiglet: # Generate ASCII art with pyfiglet and rainbow colors - ascii_art = pyfiglet.figlet_format("Menno's Dotfiles", font='slant') + ascii_art = pyfiglet.figlet_format("Menno's Dotfiles", font="slant") print("\n") # Add some space before the logo - + # Use a random offset to vary the rainbow start position random_offset = random.random() * 2 * math.pi line_offset = 0 - + for line in ascii_art.splitlines(): # Add a little variation to each line print(_rainbow_color(line, offset=random_offset + line_offset)) @@ -95,63 +99,76 @@ def logo(continue_after=False): printfe("yellow", "\n *** Menno's Dotfiles ***\n") printfe("cyan", " Note: Install pyfiglet for better logo display") printfe("cyan", " (pip install pyfiglet)\n") - + if not continue_after: sys.exit(0) except Exception as e: printfe("red", f"Error displaying logo: {e}") + def run_command(command, shell=False): """Run a shell command and return the result""" try: if not shell and not shutil.which(command[0]): return False, f"Command '{command[0]}' not found" - - result = subprocess.run(command, shell=shell, check=True, text=True, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + result = subprocess.run( + command, + shell=shell, + check=True, + text=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) return True, result.stdout.strip() except subprocess.CalledProcessError as e: return False, e.stderr.strip() except FileNotFoundError: return False, f"Command '{command[0]}' not found" + def command_exists(command): """Check if a command exists in the PATH""" - import shutil return shutil.which(command) is not None + def ensure_dependencies(): """Check and install required dependencies for the dotfiles system""" required_packages = [ - 'pyfiglet', # For ASCII art generation + "pyfiglet", # For ASCII art generation ] # Check if pip is available - success, _ = run_command(['pip', '--version']) + success, _ = run_command(["pip", "--version"]) if not success: - printfe("red", "Pip is required to install missing dependencies, try again after running `dotf update`") + printfe( + "red", + "Pip is required to install missing dependencies, try again after running `dotf update`", + ) return False - + missing_packages = [] for package in required_packages: try: __import__(package) except ImportError: missing_packages.append(package) - + if missing_packages: printfe("yellow", f"Missing dependencies: {', '.join(missing_packages)}") install = input("Would you like to install them now? (y/n): ").lower() - if install == 'y' or install == 'yes': + if install == "y" or install == "yes": printfe("cyan", "Installing missing dependencies...") for package in missing_packages: printfe("blue", f"Installing {package}...") - success, output = run_command(['pip', 'install', '--user', package, '--break-system-packages']) + success, output = run_command( + ["pip", "install", "--user", package, "--break-system-packages"] + ) if success: printfe("green", f"Successfully installed {package}") else: printfe("red", f"Failed to install {package}: {output}") - + printfe("green", "All dependencies have been processed") return True else: diff --git a/config/ansible/tasks/global/global.yml b/config/ansible/tasks/global/global.yml index cde5dad..0e33d9b 100644 --- a/config/ansible/tasks/global/global.yml +++ b/config/ansible/tasks/global/global.yml @@ -39,6 +39,8 @@ - python3 - python3-pip - python3-venv + - pylint + - black state: present become: true