#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Git Pusher - License Validator (RSA) Validation des licences avec vérification de signature RSA Ce fichier est distribué avec l'application client. La clé publique peut être visible - seul le vendeur avec la clé privée peut signer. """ import os import sys import json import base64 import socket from datetime import datetime # ============================================ # CLÉ PUBLIQUE RSA # ============================================ # Cette clé est générée par le vendeur avec license_generator_rsa.py # Commande: python license_generator_rsa.py export-key # # REMPLACEZ CETTE CLÉ PAR VOTRE CLÉ PUBLIQUE ! PUBLIC_KEY = '''-----BEGIN PUBLIC KEY----- MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAnj2hOg61Q9k9iz4U5F7I RdaJrpLTG+orz0/Kpbz2HSxbAVXkvL5GvYVfxROjy0UgxOZFycZAaGN2am+5CDHA D1dTL9KCPhEaPqw4XTFnf6Ur5VG0+SftugTdTcyxRe614Z+i61/2ahk/vKG9D4kB j4qV4se4lLk993lEaQrOXkXCbZ8royB5MPeOchPxZd7SDzoovEcyUmf2Fa2eYk6U WmbrymCnJRsfxEVZofQQyp1ILS8KuSxaquXvMWm3cXV2Krs/3E5ax0vBPMrZRL+o Vn7/dVnzbOlbifeosTYaad1DLd7NEgst3OFUv+dSH5hcCCc36IHMSvxcJ9l7s2kv KbEFPeh582JNmRoMMNPRbd+/ZVDeJ/oB344+TtB6VeQ2GQyOyoggiLryZujg3WDE shwLkFwiYGa/zEct0qs2/HBS1FOAqrLiPdQYJTx+RhrXhTni/p3H42L+2xJcbnki fAgn8ND5k2yQw2gk4AlLqq2y01m0jsHdjaOfhKzzPLyzt1En/KAnCWJSzB+jur4z fd70R4pLTYfawr2NTvTAhOtiOIjWj380oGmxeKCNJT1P4Dq8Yl+OxKLBy4cnUlgV sbpKJug7Goth4g7bmCVMhC4bf7JB/iTrrS8DhMaWaZX/FeFloM3yYyo/gzmAY19z sUdQuBpxCwIf/J7Q4dYsDkMCAwEAAQ== -----END PUBLIC KEY-----''' # ============================================ # CONFIGURATION # ============================================ # Chemin de l'application APP_HOME = os.environ.get('SPLUNK_HOME', '/opt/splunk') + '/etc/apps/pusher_app_prem' LICENSE_FILE = os.path.join(APP_HOME, 'local', 'license.lic') USAGE_FILE = os.path.join(APP_HOME, 'local', '.usage_stats') # ============================================ # FONCTIONS UTILITAIRES # ============================================ def get_splunk_hostname(): """Récupérer le hostname du serveur Splunk""" try: # Essayer via servername de Splunk server_conf = os.path.join( os.environ.get('SPLUNK_HOME', '/opt/splunk'), 'etc', 'system', 'local', 'server.conf' ) if os.path.exists(server_conf): with open(server_conf, 'r') as f: for line in f: if line.strip().startswith('serverName'): return line.split('=')[1].strip().lower() except: pass # Fallback sur le hostname système return socket.gethostname().lower() def load_public_key(): """Charger la clé publique RSA""" try: from cryptography.hazmat.primitives import serialization from cryptography.hazmat.backends import default_backend public_key = serialization.load_pem_public_key( PUBLIC_KEY.encode('utf-8'), backend=default_backend() ) return public_key except ImportError: # Fallback si cryptography n'est pas installé return None except Exception as e: print(f"Erreur chargement clé publique: {e}") return None def verify_rsa_signature(data, signature, public_key): """Vérifier la signature RSA""" try: from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding public_key.verify( signature, data, padding.PSS( mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH ), hashes.SHA256() ) return True except Exception: return False # ============================================ # PARSING DU FICHIER .LIC # ============================================ def parse_license_file(filepath=None): """Parser un fichier de licence .lic""" if filepath is None: filepath = LICENSE_FILE if not os.path.exists(filepath): return None try: with open(filepath, 'r') as f: content = f.read() return parse_license_content(content) except Exception as e: print(f"Erreur lecture licence: {e}") return None def parse_license_content(content): """Parser le contenu d'une licence""" try: # Extraire le payload (ignorer les lignes de commentaires) lines = content.strip().split('\n') payload_b64 = None for line in lines: line = line.strip() if line and not line.startswith('#'): payload_b64 = line break if not payload_b64: return None # Décoder le payload payload = json.loads(base64.b64decode(payload_b64).decode('utf-8')) return { "license_b64": payload.get("license"), "signature_b64": payload.get("signature"), "raw_payload": payload_b64 } except Exception as e: print(f"Erreur parsing licence: {e}") return None # ============================================ # VALIDATION DE LA LICENCE # ============================================ def validate_license(filepath=None): """ Valider une licence Returns: dict avec: - valid: bool - error: str (si invalide) - error_code: str (si invalide) - license_id, type, type_name, expires, days_remaining, limits, features (si valide) """ # Parser le fichier parsed = parse_license_file(filepath) if not parsed: return { "valid": False, "error": "Fichier de licence non trouvé ou invalide", "error_code": "NO_LICENSE" } license_b64 = parsed.get("license_b64") signature_b64 = parsed.get("signature_b64") if not license_b64 or not signature_b64: return { "valid": False, "error": "Format de licence invalide", "error_code": "INVALID_FORMAT" } try: # Décoder la licence et la signature license_json = base64.b64decode(license_b64).decode('utf-8') signature = base64.b64decode(signature_b64) # Charger la clé publique public_key = load_public_key() if public_key is None: # Mode dégradé sans cryptography - vérification basique print("⚠️ Module cryptography non installé - vérification basique uniquement") else: # Vérifier la signature RSA if not verify_rsa_signature(license_json.encode('utf-8'), signature, public_key): return { "valid": False, "error": "Signature de licence invalide", "error_code": "INVALID_SIGNATURE" } # Parser les données de licence license_data = json.loads(license_json) # Vérifier le hostname expected_hostname = license_data.get("hostname", "").lower() current_hostname = get_splunk_hostname() if expected_hostname and expected_hostname != current_hostname: return { "valid": False, "error": f"Licence non valide pour ce serveur. Attendu: {expected_hostname}, Actuel: {current_hostname}", "error_code": "HOSTNAME_MISMATCH" } # Vérifier la date d'expiration expires_str = license_data.get("expires", "") if expires_str: try: expires_date = datetime.strptime(expires_str, "%Y-%m-%d") now = datetime.now() if now > expires_date: return { "valid": False, "error": f"Licence expirée le {expires_str}", "error_code": "LICENSE_EXPIRED" } days_remaining = (expires_date - now).days except ValueError: days_remaining = 0 else: days_remaining = 999 # Pas d'expiration # Licence valide ! return { "valid": True, "license_id": license_data.get("license_id"), "type": license_data.get("type"), "type_name": license_data.get("type_name"), "customer": license_data.get("customer", {}), "hostname": expected_hostname, "issued": license_data.get("issued"), "expires": expires_str, "days_remaining": days_remaining, "limits": license_data.get("limits", {}), "features": license_data.get("features", []) } except json.JSONDecodeError: return { "valid": False, "error": "Données de licence corrompues", "error_code": "CORRUPTED_DATA" } except Exception as e: return { "valid": False, "error": f"Erreur de validation: {str(e)}", "error_code": "VALIDATION_ERROR" } # ============================================ # SAUVEGARDE DE LICENCE # ============================================ def save_license_file(content): """Sauvegarder un fichier de licence uploadé""" try: # Créer le dossier local si nécessaire local_dir = os.path.join(APP_HOME, 'local') os.makedirs(local_dir, exist_ok=True) # Valider le contenu avant de sauvegarder parsed = parse_license_content(content) if not parsed: return { "success": False, "error": "Format de licence invalide" } # Valider la licence # Sauvegarder temporairement pour valider temp_file = LICENSE_FILE + '.tmp' with open(temp_file, 'w') as f: f.write(content) validation = validate_license(temp_file) if not validation.get("valid"): os.remove(temp_file) return { "success": False, "error": validation.get("error"), "error_code": validation.get("error_code") } # Renommer le fichier temporaire os.rename(temp_file, LICENSE_FILE) os.chmod(LICENSE_FILE, 0o600) return { "success": True, "license": validation } except Exception as e: return { "success": False, "error": str(e) } # ============================================ # GESTION DES LIMITES D'UTILISATION # ============================================ def get_usage_stats(): """Récupérer les statistiques d'utilisation""" try: if os.path.exists(USAGE_FILE): with open(USAGE_FILE, 'r') as f: return json.load(f) except: pass return { "total_pushes": 0, "pushes_today": 0, "last_push_date": None, "apps_pushed": [] } def save_usage_stats(stats): """Sauvegarder les statistiques d'utilisation""" try: local_dir = os.path.join(APP_HOME, 'local') os.makedirs(local_dir, exist_ok=True) with open(USAGE_FILE, 'w') as f: json.dump(stats, f, indent=2) os.chmod(USAGE_FILE, 0o600) except Exception as e: print(f"Erreur sauvegarde stats: {e}") def increment_usage(): """Incrémenter le compteur d'utilisation""" stats = get_usage_stats() today = datetime.now().strftime("%Y-%m-%d") # Reset compteur quotidien si nouveau jour if stats.get("last_push_date") != today: stats["pushes_today"] = 0 stats["last_push_date"] = today stats["total_pushes"] = stats.get("total_pushes", 0) + 1 stats["pushes_today"] = stats.get("pushes_today", 0) + 1 save_usage_stats(stats) return stats def check_limits(): """Vérifier si les limites de la licence sont respectées""" license_info = validate_license() if not license_info.get("valid"): return { "allowed": False, "error": license_info.get("error"), "error_code": license_info.get("error_code") } limits = license_info.get("limits", {}) max_pushes = limits.get("max_pushes_per_day", -1) if max_pushes > 0: stats = get_usage_stats() today = datetime.now().strftime("%Y-%m-%d") # Reset si nouveau jour if stats.get("last_push_date") != today: stats["pushes_today"] = 0 if stats.get("pushes_today", 0) >= max_pushes: return { "allowed": False, "error": f"Limite quotidienne atteinte ({max_pushes} pushes/jour)", "error_code": "DAILY_LIMIT_REACHED" } return { "allowed": True, "license_type": license_info.get("type_name"), "remaining_today": max_pushes - get_usage_stats().get("pushes_today", 0) if max_pushes > 0 else -1 } def has_feature(feature_name): """Vérifier si une fonctionnalité est disponible dans la licence""" license_info = validate_license() if not license_info.get("valid"): return False features = license_info.get("features", []) return feature_name in features # ============================================ # CLI POUR TESTS # ============================================ if __name__ == "__main__": if len(sys.argv) > 1: if sys.argv[1] == "status": result = validate_license() if result.get("valid"): print("✅ Licence valide") print(f" ID: {result.get('license_id')}") print(f" Type: {result.get('type_name')}") print(f" Hostname: {result.get('hostname')}") print(f" Expire: {result.get('expires')} ({result.get('days_remaining')} jours)") print(f" Features: {', '.join(result.get('features', []))}") else: print(f"❌ Licence invalide: {result.get('error')}") elif sys.argv[1] == "hostname": print(f"Hostname: {get_splunk_hostname()}") elif sys.argv[1] == "usage": stats = get_usage_stats() print(f"Total pushes: {stats.get('total_pushes', 0)}") print(f"Pushes aujourd'hui: {stats.get('pushes_today', 0)}") elif sys.argv[1] == "check": result = check_limits() if result.get("allowed"): print(f"✅ Push autorisé ({result.get('license_type')})") else: print(f"❌ Push refusé: {result.get('error')}") else: print("Usage: python license_validator.py [status|hostname|usage|check]") else: result = validate_license() print(json.dumps(result, indent=2))