You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Splunk_Deploiement/apps/pusher_app_prem/bin/license_validator.py

469 lines
15 KiB

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Git Pusher - License Validator (RSA)
Validation des licences avec vérification de signature RSA
Ce fichier est distribué avec l'application client.
La clé publique peut être visible - seul le vendeur avec la clé privée peut signer.
"""
import os
import sys
import json
import base64
import socket
from datetime import datetime
# ============================================
# CLÉ PUBLIQUE RSA
# ============================================
# Cette clé est générée par le vendeur avec license_generator_rsa.py
# Commande: python license_generator_rsa.py export-key
#
# REMPLACEZ CETTE CLÉ PAR VOTRE CLÉ PUBLIQUE !
PUBLIC_KEY = '''-----BEGIN PUBLIC KEY-----
MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAnj2hOg61Q9k9iz4U5F7I
RdaJrpLTG+orz0/Kpbz2HSxbAVXkvL5GvYVfxROjy0UgxOZFycZAaGN2am+5CDHA
D1dTL9KCPhEaPqw4XTFnf6Ur5VG0+SftugTdTcyxRe614Z+i61/2ahk/vKG9D4kB
j4qV4se4lLk993lEaQrOXkXCbZ8royB5MPeOchPxZd7SDzoovEcyUmf2Fa2eYk6U
WmbrymCnJRsfxEVZofQQyp1ILS8KuSxaquXvMWm3cXV2Krs/3E5ax0vBPMrZRL+o
Vn7/dVnzbOlbifeosTYaad1DLd7NEgst3OFUv+dSH5hcCCc36IHMSvxcJ9l7s2kv
KbEFPeh582JNmRoMMNPRbd+/ZVDeJ/oB344+TtB6VeQ2GQyOyoggiLryZujg3WDE
shwLkFwiYGa/zEct0qs2/HBS1FOAqrLiPdQYJTx+RhrXhTni/p3H42L+2xJcbnki
fAgn8ND5k2yQw2gk4AlLqq2y01m0jsHdjaOfhKzzPLyzt1En/KAnCWJSzB+jur4z
fd70R4pLTYfawr2NTvTAhOtiOIjWj380oGmxeKCNJT1P4Dq8Yl+OxKLBy4cnUlgV
sbpKJug7Goth4g7bmCVMhC4bf7JB/iTrrS8DhMaWaZX/FeFloM3yYyo/gzmAY19z
sUdQuBpxCwIf/J7Q4dYsDkMCAwEAAQ==
-----END PUBLIC KEY-----'''
# ============================================
# CONFIGURATION
# ============================================
# Chemin de l'application
APP_HOME = os.environ.get('SPLUNK_HOME', '/opt/splunk') + '/etc/apps/pusher_app_prem'
LICENSE_FILE = os.path.join(APP_HOME, 'local', 'license.lic')
USAGE_FILE = os.path.join(APP_HOME, 'local', '.usage_stats')
# ============================================
# FONCTIONS UTILITAIRES
# ============================================
def get_splunk_hostname():
"""Récupérer le hostname du serveur Splunk"""
try:
# Essayer via servername de Splunk
server_conf = os.path.join(
os.environ.get('SPLUNK_HOME', '/opt/splunk'),
'etc', 'system', 'local', 'server.conf'
)
if os.path.exists(server_conf):
with open(server_conf, 'r') as f:
for line in f:
if line.strip().startswith('serverName'):
return line.split('=')[1].strip().lower()
except:
pass
# Fallback sur le hostname système
return socket.gethostname().lower()
def load_public_key():
"""Charger la clé publique RSA"""
try:
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
public_key = serialization.load_pem_public_key(
PUBLIC_KEY.encode('utf-8'),
backend=default_backend()
)
return public_key
except ImportError:
# Fallback si cryptography n'est pas installé
return None
except Exception as e:
print(f"Erreur chargement clé publique: {e}")
return None
def verify_rsa_signature(data, signature, public_key):
"""Vérifier la signature RSA"""
try:
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
public_key.verify(
signature,
data,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return True
except Exception:
return False
# ============================================
# PARSING DU FICHIER .LIC
# ============================================
def parse_license_file(filepath=None):
"""Parser un fichier de licence .lic"""
if filepath is None:
filepath = LICENSE_FILE
if not os.path.exists(filepath):
return None
try:
with open(filepath, 'r') as f:
content = f.read()
return parse_license_content(content)
except Exception as e:
print(f"Erreur lecture licence: {e}")
return None
def parse_license_content(content):
"""Parser le contenu d'une licence"""
try:
# Extraire le payload (ignorer les lignes de commentaires)
lines = content.strip().split('\n')
payload_b64 = None
for line in lines:
line = line.strip()
if line and not line.startswith('#'):
payload_b64 = line
break
if not payload_b64:
return None
# Décoder le payload
payload = json.loads(base64.b64decode(payload_b64).decode('utf-8'))
return {
"license_b64": payload.get("license"),
"signature_b64": payload.get("signature"),
"raw_payload": payload_b64
}
except Exception as e:
print(f"Erreur parsing licence: {e}")
return None
# ============================================
# VALIDATION DE LA LICENCE
# ============================================
def validate_license(filepath=None):
"""
Valider une licence
Returns:
dict avec:
- valid: bool
- error: str (si invalide)
- error_code: str (si invalide)
- license_id, type, type_name, expires, days_remaining, limits, features (si valide)
"""
# Parser le fichier
parsed = parse_license_file(filepath)
if not parsed:
return {
"valid": False,
"error": "Fichier de licence non trouvé ou invalide",
"error_code": "NO_LICENSE"
}
license_b64 = parsed.get("license_b64")
signature_b64 = parsed.get("signature_b64")
if not license_b64 or not signature_b64:
return {
"valid": False,
"error": "Format de licence invalide",
"error_code": "INVALID_FORMAT"
}
try:
# Décoder la licence et la signature
license_json = base64.b64decode(license_b64).decode('utf-8')
signature = base64.b64decode(signature_b64)
# Charger la clé publique
public_key = load_public_key()
if public_key is None:
# Mode dégradé sans cryptography - vérification basique
print("⚠️ Module cryptography non installé - vérification basique uniquement")
else:
# Vérifier la signature RSA
if not verify_rsa_signature(license_json.encode('utf-8'), signature, public_key):
return {
"valid": False,
"error": "Signature de licence invalide",
"error_code": "INVALID_SIGNATURE"
}
# Parser les données de licence
license_data = json.loads(license_json)
# Vérifier le hostname
expected_hostname = license_data.get("hostname", "").lower()
current_hostname = get_splunk_hostname()
if expected_hostname and expected_hostname != current_hostname:
return {
"valid": False,
"error": f"Licence non valide pour ce serveur. Attendu: {expected_hostname}, Actuel: {current_hostname}",
"error_code": "HOSTNAME_MISMATCH"
}
# Vérifier la date d'expiration
expires_str = license_data.get("expires", "")
if expires_str:
try:
expires_date = datetime.strptime(expires_str, "%Y-%m-%d")
now = datetime.now()
if now > expires_date:
return {
"valid": False,
"error": f"Licence expirée le {expires_str}",
"error_code": "LICENSE_EXPIRED"
}
days_remaining = (expires_date - now).days
except ValueError:
days_remaining = 0
else:
days_remaining = 999 # Pas d'expiration
# Licence valide !
return {
"valid": True,
"license_id": license_data.get("license_id"),
"type": license_data.get("type"),
"type_name": license_data.get("type_name"),
"customer": license_data.get("customer", {}),
"hostname": expected_hostname,
"issued": license_data.get("issued"),
"expires": expires_str,
"days_remaining": days_remaining,
"limits": license_data.get("limits", {}),
"features": license_data.get("features", [])
}
except json.JSONDecodeError:
return {
"valid": False,
"error": "Données de licence corrompues",
"error_code": "CORRUPTED_DATA"
}
except Exception as e:
return {
"valid": False,
"error": f"Erreur de validation: {str(e)}",
"error_code": "VALIDATION_ERROR"
}
# ============================================
# SAUVEGARDE DE LICENCE
# ============================================
def save_license_file(content):
"""Sauvegarder un fichier de licence uploadé"""
try:
# Créer le dossier local si nécessaire
local_dir = os.path.join(APP_HOME, 'local')
os.makedirs(local_dir, exist_ok=True)
# Valider le contenu avant de sauvegarder
parsed = parse_license_content(content)
if not parsed:
return {
"success": False,
"error": "Format de licence invalide"
}
# Valider la licence
# Sauvegarder temporairement pour valider
temp_file = LICENSE_FILE + '.tmp'
with open(temp_file, 'w') as f:
f.write(content)
validation = validate_license(temp_file)
if not validation.get("valid"):
os.remove(temp_file)
return {
"success": False,
"error": validation.get("error"),
"error_code": validation.get("error_code")
}
# Renommer le fichier temporaire
os.rename(temp_file, LICENSE_FILE)
os.chmod(LICENSE_FILE, 0o600)
return {
"success": True,
"license": validation
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
# ============================================
# GESTION DES LIMITES D'UTILISATION
# ============================================
def get_usage_stats():
"""Récupérer les statistiques d'utilisation"""
try:
if os.path.exists(USAGE_FILE):
with open(USAGE_FILE, 'r') as f:
return json.load(f)
except:
pass
return {
"total_pushes": 0,
"pushes_today": 0,
"last_push_date": None,
"apps_pushed": []
}
def save_usage_stats(stats):
"""Sauvegarder les statistiques d'utilisation"""
try:
local_dir = os.path.join(APP_HOME, 'local')
os.makedirs(local_dir, exist_ok=True)
with open(USAGE_FILE, 'w') as f:
json.dump(stats, f, indent=2)
os.chmod(USAGE_FILE, 0o600)
except Exception as e:
print(f"Erreur sauvegarde stats: {e}")
def increment_usage():
"""Incrémenter le compteur d'utilisation"""
stats = get_usage_stats()
today = datetime.now().strftime("%Y-%m-%d")
# Reset compteur quotidien si nouveau jour
if stats.get("last_push_date") != today:
stats["pushes_today"] = 0
stats["last_push_date"] = today
stats["total_pushes"] = stats.get("total_pushes", 0) + 1
stats["pushes_today"] = stats.get("pushes_today", 0) + 1
save_usage_stats(stats)
return stats
def check_limits():
"""Vérifier si les limites de la licence sont respectées"""
license_info = validate_license()
if not license_info.get("valid"):
return {
"allowed": False,
"error": license_info.get("error"),
"error_code": license_info.get("error_code")
}
limits = license_info.get("limits", {})
max_pushes = limits.get("max_pushes_per_day", -1)
if max_pushes > 0:
stats = get_usage_stats()
today = datetime.now().strftime("%Y-%m-%d")
# Reset si nouveau jour
if stats.get("last_push_date") != today:
stats["pushes_today"] = 0
if stats.get("pushes_today", 0) >= max_pushes:
return {
"allowed": False,
"error": f"Limite quotidienne atteinte ({max_pushes} pushes/jour)",
"error_code": "DAILY_LIMIT_REACHED"
}
return {
"allowed": True,
"license_type": license_info.get("type_name"),
"remaining_today": max_pushes - get_usage_stats().get("pushes_today", 0) if max_pushes > 0 else -1
}
def has_feature(feature_name):
"""Vérifier si une fonctionnalité est disponible dans la licence"""
license_info = validate_license()
if not license_info.get("valid"):
return False
features = license_info.get("features", [])
return feature_name in features
# ============================================
# CLI POUR TESTS
# ============================================
if __name__ == "__main__":
if len(sys.argv) > 1:
if sys.argv[1] == "status":
result = validate_license()
if result.get("valid"):
print("✅ Licence valide")
print(f" ID: {result.get('license_id')}")
print(f" Type: {result.get('type_name')}")
print(f" Hostname: {result.get('hostname')}")
print(f" Expire: {result.get('expires')} ({result.get('days_remaining')} jours)")
print(f" Features: {', '.join(result.get('features', []))}")
else:
print(f"❌ Licence invalide: {result.get('error')}")
elif sys.argv[1] == "hostname":
print(f"Hostname: {get_splunk_hostname()}")
elif sys.argv[1] == "usage":
stats = get_usage_stats()
print(f"Total pushes: {stats.get('total_pushes', 0)}")
print(f"Pushes aujourd'hui: {stats.get('pushes_today', 0)}")
elif sys.argv[1] == "check":
result = check_limits()
if result.get("allowed"):
print(f"✅ Push autorisé ({result.get('license_type')})")
else:
print(f"❌ Push refusé: {result.get('error')}")
else:
print("Usage: python license_validator.py [status|hostname|usage|check]")
else:
result = validate_license()
print(json.dumps(result, indent=2))