#!/usr/bin/env python # -*- coding: utf-8 -*- """ Git Pusher - License Validator (Server-side) Ce fichier doit être déployé sur le serveur Splunk dans l'application Emplacement: /opt/splunk/etc/apps/pusher_app_prem/bin/license_validator.py """ import hashlib import hmac import base64 import json import os import socket from datetime import datetime from http.server import HTTPServer, BaseHTTPRequestHandler import logging # Configuration du logging log_dir = '/opt/splunk/var/log/splunk' os.makedirs(log_dir, exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(os.path.join(log_dir, 'license_validator.log')), logging.StreamHandler() ] ) logger = logging.getLogger('license_validator') # ============================================ # CONFIGURATION - DOIT CORRESPONDRE AU GÉNÉRATEUR ! # ============================================ # IMPORTANT: Cette clé DOIT être identique à celle du générateur SECRET_KEY = "git_pusher_super_secret_key_2024_change_me_in_production" # Chemin vers le fichier de licence LICENSE_FILE_PATH = "/opt/splunk/etc/apps/pusher_app_prem/local/license.lic" # Fichier de cache pour les stats d'utilisation USAGE_STATS_PATH = "/opt/splunk/etc/apps/pusher_app_prem/local/usage_stats.json" def get_splunk_hostname(): """Récupérer le hostname du serveur Splunk""" # Essayer d'abord via l'API Splunk try: import urllib.request import ssl ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE splunk_username = os.environ.get('SPLUNK_USERNAME', 'admin') splunk_password = os.environ.get('SPLUNK_PASSWORD', '2312Jocpam!?') credentials = base64.b64encode(f"{splunk_username}:{splunk_password}".encode()).decode() req = urllib.request.Request("https://127.0.0.1:8089/services/server/info?output_mode=json") req.add_header('Authorization', f'Basic {credentials}') with urllib.request.urlopen(req, timeout=5, context=ssl_context) as response: data = json.loads(response.read().decode('utf-8')) hostname = data.get('entry', [{}])[0].get('content', {}).get('host', '') if hostname: return hostname.lower().strip() except Exception as e: logger.warning(f"Could not get hostname via Splunk API: {e}") # Fallback sur le hostname système return socket.gethostname().lower().strip() def create_signature(data_str): """Créer une signature HMAC-SHA256""" signature = hmac.new( SECRET_KEY.encode('utf-8'), data_str.encode('utf-8'), hashlib.sha256 ).hexdigest() return signature def verify_signature(data_str, signature): """Vérifier une signature""" expected = create_signature(data_str) return hmac.compare_digest(expected, signature) def read_license_file(filepath=None): """Lire et parser un fichier de licence""" if filepath is None: filepath = LICENSE_FILE_PATH try: if not os.path.exists(filepath): return None with open(filepath, 'r', encoding='utf-8') as f: content = f.read() # Extraire la partie base64 (ignorer les commentaires) lines = content.strip().split('\n') base64_lines = [l for l in lines if not l.startswith('#') and l.strip()] base64_content = ''.join(base64_lines) # Décoder decoded = base64.b64decode(base64_content).decode('utf-8') license_file = json.loads(decoded) return license_file except Exception as e: logger.error(f"Error reading license file: {e}") return None def parse_license_content(content): """Parser le contenu d'un fichier de licence (pour l'upload)""" try: # Extraire la partie base64 (ignorer les commentaires) lines = content.strip().split('\n') base64_lines = [l for l in lines if not l.startswith('#') and l.strip()] base64_content = ''.join(base64_lines) # Décoder decoded = base64.b64decode(base64_content).decode('utf-8') license_file = json.loads(decoded) return license_file except Exception as e: logger.error(f"Error parsing license content: {e}") return None def validate_license(license_file=None, current_hostname=None): """ Valider une licence Args: license_file: Contenu du fichier de licence (dict) - si None, lit le fichier par défaut current_hostname: Hostname actuel - si None, détecte automatiquement Returns: dict avec {valid: bool, error: str, license_info: dict} """ try: # Lire la licence si non fournie if license_file is None: license_file = read_license_file() if license_file is None: return { "valid": False, "error": "Aucun fichier de licence trouvé", "error_code": "NO_LICENSE" } # Récupérer le hostname si non fourni if current_hostname is None: current_hostname = get_splunk_hostname() license_data = license_file.get("license", {}) signature = license_file.get("signature", "") # Recréer le JSON pour vérifier la signature license_json = json.dumps(license_data, separators=(',', ':'), sort_keys=True) # Vérifier la signature if not verify_signature(license_json, signature): return { "valid": False, "error": "Signature invalide - fichier corrompu ou modifié", "error_code": "INVALID_SIGNATURE" } # Vérifier le hostname expected_hostname = license_data.get("binding", {}).get("hostname", "").lower() current_hostname_clean = current_hostname.lower().strip() if current_hostname_clean != expected_hostname: return { "valid": False, "error": f"Cette licence est pour '{expected_hostname}', pas '{current_hostname_clean}'", "error_code": "HOSTNAME_MISMATCH", "expected_hostname": expected_hostname, "current_hostname": current_hostname_clean } # Vérifier l'expiration expires_timestamp = license_data.get("dates", {}).get("expires_timestamp", 0) if datetime.now().timestamp() > expires_timestamp: expires_date = license_data.get("dates", {}).get("expires", "unknown") return { "valid": False, "error": f"Licence expirée le {expires_date}", "error_code": "EXPIRED" } # Calculer les jours restants days_remaining = (expires_timestamp - datetime.now().timestamp()) / (24 * 3600) return { "valid": True, "license_id": license_data.get("license_id"), "type": license_data.get("type"), "type_name": license_data.get("type_name"), "customer": license_data.get("customer", {}), "expires": license_data.get("dates", {}).get("expires"), "days_remaining": int(days_remaining), "limits": license_data.get("limits", {}), "features": license_data.get("features", []), "hostname": expected_hostname } except Exception as e: logger.error(f"Validation error: {e}") return { "valid": False, "error": f"Erreur de validation: {str(e)}", "error_code": "VALIDATION_ERROR" } def save_license_file(content): """ Sauvegarder un nouveau fichier de licence Args: content: Contenu du fichier .lic Returns: dict avec {success: bool, error: str} """ try: # Créer le dossier local si nécessaire local_dir = os.path.dirname(LICENSE_FILE_PATH) os.makedirs(local_dir, exist_ok=True) # Parser d'abord pour valider license_file = parse_license_content(content) if license_file is None: return { "success": False, "error": "Format de fichier de licence invalide" } # Valider la licence validation = validate_license(license_file) if not validation.get("valid"): return { "success": False, "error": validation.get("error", "Licence invalide") } # Sauvegarder with open(LICENSE_FILE_PATH, 'w', encoding='utf-8') as f: f.write(content) logger.info(f"License saved: {validation.get('license_id')}") return { "success": True, "license_info": validation } except Exception as e: logger.error(f"Error saving license: {e}") return { "success": False, "error": str(e) } def get_usage_stats(): """Récupérer les statistiques d'utilisation""" try: if os.path.exists(USAGE_STATS_PATH): with open(USAGE_STATS_PATH, 'r') as f: return json.load(f) except: pass return { "pushes_today": 0, "pushes_total": 0, "last_push_date": None, "apps_pushed": [] } def increment_usage(): """Incrémenter le compteur d'utilisation""" stats = get_usage_stats() today = datetime.now().strftime("%Y-%m-%d") # Reset si nouveau jour if stats.get("last_push_date") != today: stats["pushes_today"] = 0 stats["last_push_date"] = today stats["pushes_today"] += 1 stats["pushes_total"] += 1 try: os.makedirs(os.path.dirname(USAGE_STATS_PATH), exist_ok=True) with open(USAGE_STATS_PATH, 'w') as f: json.dump(stats, f) except Exception as e: logger.error(f"Error saving usage stats: {e}") return stats def check_limits(): """ Vérifier si les limites de la licence sont respectées Returns: dict avec {allowed: bool, error: str} """ validation = validate_license() if not validation.get("valid"): return { "allowed": False, "error": validation.get("error") } limits = validation.get("limits", {}) stats = get_usage_stats() # Vérifier pushes par jour max_pushes = limits.get("max_pushes_per_day", -1) if max_pushes > 0 and stats.get("pushes_today", 0) >= max_pushes: return { "allowed": False, "error": f"Limite quotidienne atteinte ({max_pushes} pushes/jour)" } return { "allowed": True, "license_info": validation, "usage": stats } # ============================================ # API REST POUR L'INTERFACE WEB # ============================================ class LicenseAPIHandler(BaseHTTPRequestHandler): """Handler pour les requêtes de l'API licence""" def send_cors_headers(self): """Envoyer les headers CORS complets""" origin = self.headers.get('Origin', '*') self.send_header('Access-Control-Allow-Origin', origin) self.send_header('Access-Control-Allow-Methods', 'POST, GET, OPTIONS, PUT, DELETE') self.send_header('Access-Control-Allow-Headers', 'Content-Type, Authorization, X-Requested-With, Accept, Origin') self.send_header('Access-Control-Allow-Credentials', 'true') self.send_header('Access-Control-Max-Age', '86400') def do_OPTIONS(self): logger.info(f"OPTIONS request from {self.headers.get('Origin', 'unknown')}") self.send_response(200) self.send_cors_headers() self.end_headers() return def do_GET(self): """GET /license - Récupérer les infos de licence""" self.send_response(200) self.send_header('Content-type', 'application/json') self.send_cors_headers() self.end_headers() try: if '/license/status' in self.path or self.path == '/license': validation = validate_license() usage = get_usage_stats() hostname = get_splunk_hostname() response = { "status": "valid" if validation.get("valid") else "invalid", "hostname": hostname, "license": validation if validation.get("valid") else None, "error": validation.get("error") if not validation.get("valid") else None, "error_code": validation.get("error_code") if not validation.get("valid") else None, "usage": usage } elif '/license/hostname' in self.path: response = { "hostname": get_splunk_hostname() } else: response = {"error": "Unknown endpoint"} self.wfile.write(json.dumps(response).encode()) except Exception as e: logger.error(f"GET error: {e}") self.wfile.write(json.dumps({"error": str(e)}).encode()) def do_POST(self): """POST /license/upload - Uploader une nouvelle licence""" self.send_response(200) self.send_header('Content-type', 'application/json') self.send_cors_headers() self.end_headers() try: content_length = int(self.headers.get('Content-Length', 0)) body = self.rfile.read(content_length).decode('utf-8') if '/license/upload' in self.path: # Le body contient le contenu du fichier .lic data = json.loads(body) license_content = data.get('license_content', '') if not license_content: response = {"success": False, "error": "Contenu de licence vide"} else: response = save_license_file(license_content) elif '/license/delete' in self.path: # Supprimer la licence if os.path.exists(LICENSE_FILE_PATH): os.remove(LICENSE_FILE_PATH) response = {"success": True, "message": "Licence supprimée"} else: response = {"success": False, "error": "Aucune licence à supprimer"} else: response = {"error": "Unknown endpoint"} self.wfile.write(json.dumps(response).encode()) except Exception as e: logger.error(f"POST error: {e}") self.wfile.write(json.dumps({"error": str(e)}).encode()) def log_message(self, format, *args): logger.debug(format % args) def start_license_server(port=9998): """Démarrer le serveur API licence (optionnel, peut être intégré au serveur principal)""" server = HTTPServer(('127.0.0.1', port), LicenseAPIHandler) logger.info(f"License API server listening on 127.0.0.1:{port}") server.serve_forever() # ============================================ # CLI POUR TESTS # ============================================ if __name__ == '__main__': import sys if len(sys.argv) > 1: if sys.argv[1] == "status": result = validate_license() print(json.dumps(result, indent=2, ensure_ascii=False)) elif sys.argv[1] == "hostname": print(f"Hostname: {get_splunk_hostname()}") elif sys.argv[1] == "server": start_license_server() else: print("Usage:") print(" python license_validator.py status # Vérifier la licence") print(" python license_validator.py hostname # Afficher le hostname") print(" python license_validator.py server # Démarrer l'API") else: result = validate_license() print(json.dumps(result, indent=2, ensure_ascii=False))