diff --git a/feldman_vss.py b/feldman_vss.py index a95456c..1463301 100644 --- a/feldman_vss.py +++ b/feldman_vss.py @@ -1,42 +1,75 @@ """ Post-Quantum Secure Feldman's Verifiable Secret Sharing (VSS) Implementation -Version 0.7.0-Alpha +Version 0.7.4-Alpha This module provides a secure, production-ready implementation of Feldman's VSS scheme -with post-quantical security by design. It enhances Shamir's Secret Sharing with +with post-quantum security by design. It enhances Shamir's Secret Sharing with mathematical verification capabilities while remaining resistant to quantum attacks through hash-based commitments. -Key features: -1. Secure group operations for commitment generation and verification -2. Efficient batch verification of multiple shares -3. Support for serialization and deserialization of verification data -4. Exclusively post-quantum secure using hash-based commitments -5. Comprehensive validation and error handling -6. Protection against both classical and quantum security pitfalls -7. Fault injection countermeasures with redundant computation - -Security considerations: -- Uses hash-based commitments for post-quantum security (no discrete logarithm option) -- Always operates with at least 4096-bit prime fields for quantum resistance -- Uses BLAKE3 for cryptographic hashing operations (faster and more secure than SHA-256) -- Designed for seamless integration with Shamir's Secret Sharing implementation -- Implements redundant computation and checksum verification for fault injection resistance - -Note: This implementation is fully compatible with the ShamirSecretSharing class in +Key Features: + +1. Post-Quantum Security: Exclusively uses hash-based commitments (BLAKE3 or SHA3-256) + for proven resistance to quantum computer attacks. No reliance on discrete logarithm + problems. +2. Secure Group Operations: Employs the CyclicGroup class, which uses gmpy2 for + efficient and secure modular arithmetic. Includes optimized exponentiation + (with precomputation and a thread-safe LRU cache) and multi-exponentiation. +3. Efficient Batch Verification: batch_verify_shares provides optimized verification + of multiple shares against the same commitments, significantly improving performance + for large numbers of shares. +4. Serialization and Deserialization: serialize_commitments and + deserialize_commitments methods provide secure serialization and deserialization of + commitment data, including checksums for integrity verification and handling of + extra entropy for low-entropy secrets. +5. Comprehensive Validation and Error Handling: Extensive input validation and error + handling throughout the code to prevent misuse and ensure robustness. +6. Fault Injection Countermeasures: Uses redundant computation (`secure_redundant_execution`) + and constant-time comparisons (constant_time_compare) to mitigate fault injection attacks. +7. Zero-Knowledge Proofs: Supports the creation and verification of zero-knowledge + proofs of polynomial knowledge, allowing a prover to demonstrate knowledge of the + secret polynomial without revealing the coefficients. +8. Share Refreshing: Implements an enhanced version of Chen & Lindell's Protocol 5 + for secure share refreshing, with improved Byzantine fault tolerance, adaptive + quorum-based Byzantine detection, and optimized verification. +9. Integration with Pedersen VSS: Includes helper functions (integrate_with_pedersen, + create_dual_commitment_proof, verify_dual_commitments) for combining Feldman VSS + with Pedersen VSS, providing both binding and hiding properties. +10. Configurable Parameters: The VSSConfig class allows customization of security + parameters, including the prime bit length, safe prime usage, hash algorithm + (BLAKE3 or SHA3-256), and LRU cache size. +11. Deterministic Hashing: Guarantees deterministic commitment generation across different + platforms and execution environments by using fixed-length byte representations for + integers in hash calculations. +12. Thread-Safe LRU Cache: Employs a SafeLRUCache for efficient and thread-safe caching + of exponentiation results, with bounded memory usage. + +Security Considerations: + +- Always uses at least 4096-bit prime fields for post-quantum security (configurable). +- Strongly recommends using safe primes (where (p-1)/2 is also prime) for enhanced security. +- Defaults to BLAKE3 for cryptographic hashing (faster and more secure than SHA3-256), + but falls back to SHA3-256 if BLAKE3 is not available. +- Designed for seamless integration with Shamir's Secret Sharing implementation. +- Implements countermeasures against timing attacks, fault injection attacks, and + Byzantine behavior. +- Uses cryptographically secure random number generation (secrets module). + +Note: This implementation is fully compatible with the `ShamirSecretSharing` class in the main module and is optimized to work in synergy with Pedersen VSS. """ import threading import secrets import hashlib -import msgpack # Modern and efficient serialization library +import msgpack from base64 import urlsafe_b64encode, urlsafe_b64decode import warnings import time import random from dataclasses import dataclass from typing import Callable, Any +from collections import OrderedDict # Import BLAKE3 for cryptographic hashing (faster and more secure than SHA3-256) try: @@ -50,18 +83,6 @@ ImportWarning, ) -# Import xxhash for high-performance checksums -try: - import xxhash - HAS_XXHASH = True -except ImportError: - HAS_XXHASH = False - warnings.warn( - "xxhash library not found. Using cryptographic fallback for checksums. " - "Install xxhash with: pip install xxhash for better performance.", - ImportWarning, - ) - # Import gmpy2 - now a strict requirement try: import gmpy2 @@ -188,6 +209,7 @@ class VSSConfig: safe_prime: bool = True # Always use safe primes for better security secure_serialization: bool = True # Use secure serialization format use_blake3: bool = True # Whether to use BLAKE3 (falls back to SHA3-256 if unavailable) + cache_size: int = 128 # Default cache size for exponentiation results def __post_init__(self): # Security check - enforce minimum prime size for post-quantum security @@ -206,6 +228,51 @@ def __post_init__(self): RuntimeWarning, ) +class SafeLRUCache: + """ + Description: + Thread-safe LRU cache implementation for efficient caching with memory constraints. + + Arguments: + capacity (int): Maximum number of items to store in the cache. + """ + def __init__(self, capacity): + self.capacity = capacity + self.cache = OrderedDict() + self.lock = threading.RLock() # Use RLock for compatibility with existing code + + def get(self, key): + """Get an item from the cache, moving it to most recently used position.""" + with self.lock: + if key in self.cache: + # Move to the end (most recently used) + value = self.cache.pop(key) + self.cache[key] = value + return value + return None + + def put(self, key, value): + """Add an item to the cache, evicting least recently used item if necessary.""" + with self.lock: + if key in self.cache: + # Remove existing item first + self.cache.pop(key) + elif len(self.cache) >= self.capacity: + # Remove the first item (least recently used) + self.cache.popitem(last=False) + # Add new item + self.cache[key] = value + + def clear(self): + """Clear the cache.""" + with self.lock: + self.cache.clear() + + def __len__(self): + """Return number of items in the cache.""" + with self.lock: + return len(self.cache) + class CyclicGroup: """ Description: @@ -226,7 +293,7 @@ class CyclicGroup: """ def __init__( - self, prime=None, generator=None, prime_bits=3072, use_safe_prime=True + self, prime=None, generator=None, prime_bits=4096, use_safe_prime=True, cache_size=128 ): # For post-quantum security, we recommend at least 3072-bit primes if prime_bits < 3072: @@ -270,9 +337,8 @@ def __init__( else: self.generator = self._find_generator() - # Add cache for exponentiation with thread safety - self.cached_powers = {} - self._cache_lock = threading.RLock() + # Cache initialization with SafeLRUCache + self.cached_powers = SafeLRUCache(capacity=cache_size) # Pre-compute fixed-base exponentiations for common operations self._precompute_window_size = 8 @@ -425,7 +491,7 @@ def _find_generator(self): q = (self.prime - 1) // 2 # Try quadratic residues: for g in Z_p*, g^2 generates the q-order subgroup - for _ in range(100): # Try multiple times with different values + for _ in range(10000): # Try multiple times with different values h = secrets.randbelow(self.prime - 3) + 2 # Random value in [2, p-2] g = gmpy2.powmod(h, 2, self.prime) # Square to get quadratic residue @@ -505,7 +571,6 @@ def exp(self, base, exponent): if base == self.generator and self._precomputed_powers: return self._exp_with_precomputation(exponent) - # Standard exponentiation with thread-safe caching # Normalize inputs base = gmpy2.mpz(base % self.prime) exponent = gmpy2.mpz(exponent % (self.prime - 1)) # By Fermat's Little Theorem @@ -513,18 +578,16 @@ def exp(self, base, exponent): # Check cache for common operations cache_key = (base, exponent) - # Thread-safe cache access - with self._cache_lock: - if cache_key in self.cached_powers: - return self.cached_powers[cache_key] - - # Use efficient binary exponentiation for large numbers - result = gmpy2.powmod(base, exponent, self.prime) + # Thread-safe cache access using SafeLRUCache methods + result = self.cached_powers.get(cache_key) + if result is not None: + return result - # Cache the result if the cache isn't too large - if len(self.cached_powers) < 1000: - self.cached_powers[cache_key] = result + # Use efficient binary exponentiation for large numbers + result = gmpy2.powmod(base, exponent, self.prime) + # Cache the result using SafeLRUCache's put method (no need to check size) + self.cached_powers.put(cache_key, result) return result def _exp_with_precomputation(self, exponent): @@ -636,15 +699,15 @@ def clear_cache(self): Outputs: None """ - with self._cache_lock: - self.cached_powers.clear() + # Use SafeLRUCache's clear method + self.cached_powers.clear() def hash_to_group(self, data): """ Description: Hash arbitrary data to an element in the group with uniform distribution. Uses rejection sampling to ensure uniformity across the group range [1, prime-1]. - This eliminates the modulo bias present in naive approaches. + Handles primes of any size by expanding hash output as needed. Arguments: data (bytes): The data to hash. @@ -655,25 +718,49 @@ def hash_to_group(self, data): Outputs: int: An element in the range [1, prime-1] with uniform distribution. """ + # Calculate how many bytes we need based on prime size + required_bytes = (self.prime.bit_length() + 7) // 8 + counter = 0 + max_attempts = 100 # Reasonable limit to prevent infinite loops original_data = data - - while True: - # Use BLAKE3 if available (faster and more secure), otherwise fall back to SHA3-256 - if HAS_BLAKE3: - h = blake3.blake3(data).digest(32) # 32 bytes output (256 bits) - else: - h = hashlib.sha3_256(data).digest() - - # Convert to integer and check if it's in the valid range - value = int.from_bytes(h, "big") + + while counter < max_attempts: + # Generate enough hash blocks to cover the required bytes + hash_blocks = bytearray() + block_counter = 0 + + while len(hash_blocks) < required_bytes: + block_data = original_data + counter.to_bytes(8, "big") + block_counter.to_bytes(8, "big") + if HAS_BLAKE3: + h = blake3.blake3(block_data).digest(min(32, required_bytes - len(hash_blocks))) + else: + h = hashlib.sha3_256(block_data).digest() + hash_blocks.extend(h) + block_counter += 1 + + # Truncate to required number of bytes and convert to integer + value = int.from_bytes(hash_blocks[:required_bytes], "big") + + # Check if in valid range if 1 <= value < self.prime: return gmpy2.mpz(value) - - # Prepare for next iteration with counter to avoid cycles + counter += 1 - # Use a 4-byte counter for efficiency while maintaining uniqueness - data = original_data + counter.to_bytes(4, "big") + + # Fallback if we exceed max attempts (very unlikely with proper sizing) + # Use modular reduction as a last resort (introduces slight bias for uniformity) + value = int.from_bytes(hash_blocks, "big") + result = gmpy2.mpz((value % (self.prime - 1)) + 1) + + # Log this rare case + warnings.warn( + f"hash_to_group exceeded {max_attempts} attempts for rejection sampling. " + f"Falling back to modular reduction which introduces slight bias.", + SecurityWarning + ) + + return result def _enhanced_encode_for_hash(self, *args, context="FeldmanVSS"): """ @@ -682,6 +769,8 @@ def _enhanced_encode_for_hash(self, *args, context="FeldmanVSS"): This prevents collision attacks by properly separating the inputs using length-prefixing, ensuring the security of hash-based commitments. + Uses fixed-length byte representation for integers to guarantee deterministic + hashing across different platforms and execution environments. Arguments: *args: Values to encode for hashing. @@ -700,19 +789,27 @@ def _enhanced_encode_for_hash(self, *args, context="FeldmanVSS"): # Add protocol version identifier encoded += VSS_VERSION.encode('utf-8') - # Add context string with length prefixing for enhanced domain separation + # Add context string with length prefixing for domain separation context_bytes = context.encode('utf-8') encoded += len(context_bytes).to_bytes(4, 'big') encoded += context_bytes + # Calculate byte length for integer serialization once + prime_bit_length = self.prime.bit_length() + byte_length = (prime_bit_length + 7) // 8 + # Add each value with proper length prefixing for arg in args: - # Convert to bytes if not already + # Convert to bytes with type-specific handling if isinstance(arg, bytes): arg_bytes = arg elif isinstance(arg, str): arg_bytes = arg.encode('utf-8') + elif isinstance(arg, int) or isinstance(arg, gmpy2.mpz): + # Handle integers with fixed-length representation + arg_bytes = int(arg).to_bytes(byte_length, 'big') else: + # For other types, use string representation arg_bytes = str(arg).encode('utf-8') # Add 4-byte length followed by the data itself @@ -889,14 +986,13 @@ def compute_checksum(data: bytes) -> int: Outputs: int: The computed checksum. """ - if HAS_XXHASH: - # Use 128-bit hash for better collision resistance - return xxhash.xxh3_128(data).intdigest() - else: - # Fallback to BLAKE3 or SHA3-256 - slower but cryptographically secure - if HAS_BLAKE3: - return int.from_bytes(blake3.blake3(data).digest()[:16], "big") - return int.from_bytes(hashlib.sha3_256(data).digest()[:16], "big") + # Input validation + if not isinstance(data, bytes): + raise TypeError("data must be bytes") + + if HAS_BLAKE3: + return int.from_bytes(blake3.blake3(data).digest()[:16], "big") + return int.from_bytes(hashlib.sha3_256(data).digest()[:16], "big") def secure_redundant_execution(func: Callable, *args, **kwargs) -> Any: """ @@ -922,6 +1018,10 @@ def secure_redundant_execution(func: Callable, *args, **kwargs) -> Any: Raises: SecurityError: If any computation results don't match. """ + # Input validation + if not callable(func): + raise TypeError("func must be callable") + # Execute function 3 times independently results = [func(*args, **kwargs) for _ in range(3)] @@ -972,7 +1072,8 @@ def __init__(self, field, config=None, group=None): # Use the enhanced CyclicGroup with appropriate security parameters self.group = CyclicGroup( prime_bits=self.config.prime_bits, - use_safe_prime=self.config.safe_prime + use_safe_prime=self.config.safe_prime, + cache_size=self.config.cache_size ) else: self.group = group @@ -988,6 +1089,10 @@ def _compute_hash_commitment_single(self, value, randomizer, index, context=None """ Description: Single-instance hash commitment computation (internal use). + + Uses deterministic byte encoding for integers to ensure consistent commitment + values regardless of platform or execution environment, which is critical + for cryptographic security. Arguments: value (int): The value to commit to. @@ -1006,21 +1111,29 @@ def _compute_hash_commitment_single(self, value, randomizer, index, context=None Outputs: int: The computed hash commitment. """ + # Convert inputs to mpz to ensure consistent handling + value = gmpy2.mpz(value) + randomizer = gmpy2.mpz(randomizer) + + # Calculate byte length based on prime size + prime_bit_length = self.group.prime.bit_length() + byte_length = (prime_bit_length + 7) // 8 + + # Prepare elements with proper byte encoding elements = [ - VSS_VERSION, # Protocol version - "COMMIT", # Fixed domain separator - context or "polynomial", # Context with default - str(value), # Value to commit to - str(randomizer), # Randomizer value - # str(index) # Position index - REMOVED to fix verification mismatch + VSS_VERSION, # Protocol version + "COMMIT", # Fixed domain separator + context or "polynomial", # Context with default + value.to_bytes(byte_length, 'big'), # Value to commit to + randomizer.to_bytes(byte_length, 'big'), # Randomizer value ] # Add extra entropy if provided for low-entropy secrets if extra_entropy: if isinstance(extra_entropy, bytes): - elements.append(extra_entropy.hex()) + elements.append(extra_entropy) else: - elements.append(str(extra_entropy)) + elements.append(str(extra_entropy).encode('utf-8')) # Use the consistent encoding method from the group class encoded = self.group._enhanced_encode_for_hash(*elements) @@ -1115,7 +1228,7 @@ def _compute_expected_commitment(self, commitments, x): return expected - def _verify_hash_based_commitment(self, value, combined_randomizer, x, expected_commitment, context=None): + def _verify_hash_based_commitment(self, value, combined_randomizer, x, expected_commitment, context=None, extra_entropy=None): """ Description: Verify a hash-based commitment for a value at point x. @@ -1139,7 +1252,7 @@ def _verify_hash_based_commitment(self, value, combined_randomizer, x, expected_ """ # Compute the hash commitment computed_commitment = self._compute_hash_commitment( - value, combined_randomizer, x, context + value, combined_randomizer, x, context, extra_entropy ) # Compare with expected commitment using constant-time comparison @@ -1159,6 +1272,12 @@ def create_commitments(self, coefficients): Outputs: list: List of (hash, randomizer) tuples representing hash-based commitments. """ + # Input validation + if not isinstance(coefficients, list): + raise TypeError("coefficients must be a list") + if not coefficients: + raise ValueError("coefficients list cannot be empty") + # Use the enhanced commitment creation method for better security return self.create_enhanced_commitments(coefficients) @@ -1179,6 +1298,12 @@ def create_enhanced_commitments(self, coefficients, context=None): Outputs: list: List of (hash, randomizer) tuples. """ + # Input validation + if not isinstance(coefficients, list): + raise TypeError("coefficients must be a list") + if context is not None and not isinstance(context, str): + raise TypeError("context must be a string if provided") + if not coefficients: raise ParameterError("Coefficients list cannot be empty") @@ -1199,14 +1324,14 @@ def create_enhanced_commitments(self, coefficients, context=None): # Add extra entropy for the secret if needed extra_entropy = None if i == 0 and might_have_low_entropy: - extra_entropy = secrets.token_bytes(32) # Increased from 16 bytes + extra_entropy = secrets.token_bytes(32) # Use the dedicated hash commitment function commitment = self._compute_hash_commitment( coeff, r_i, i, context or "polynomial", extra_entropy) # Store commitment and randomizer - commitments.append((commitment, r_i)) + commitments.append((commitment, r_i, extra_entropy)) return commitments @@ -1229,7 +1354,7 @@ def _verify_share_hash_based_single(self, x, y, commitments): bool: True if the share is valid, False otherwise. """ # Extract randomizers from commitments - randomizers = [r_i for _, r_i in commitments] + randomizers = [r_i for _, r_i, _ in commitments] # Compute combined randomizer r_combined = self._compute_combined_randomizer(randomizers, x) @@ -1237,8 +1362,17 @@ def _verify_share_hash_based_single(self, x, y, commitments): # Compute expected commitment expected_commitment = self._compute_expected_commitment(commitments, x) + # Extract extra_entropy if needed for this point + # The extra entropy should only be used for evaluating the constant term (i=0) + # which happens with x^0 = 1 in polynomial evaluation + extra_entropy = None + # Extract extra_entropy if present (should be in the first coefficient only) + extra_entropy = None + if len(commitments) > 0 and len(commitments[0]) > 2: + extra_entropy = commitments[0][2] # Get extra_entropy from first coefficient + # Verify using helper method - return self._verify_hash_based_commitment(y, r_combined, x, expected_commitment) + return self._verify_hash_based_commitment(y, r_combined, x, expected_commitment, extra_entropy=extra_entropy) def verify_share(self, share_x, share_y, commitments): """ @@ -1261,6 +1395,18 @@ def verify_share(self, share_x, share_y, commitments): Outputs: bool: True if the share is valid, False otherwise. """ + # Input validation + if not isinstance(share_x, (int, gmpy2.mpz)): + raise TypeError("share_x must be an integer") + if not isinstance(share_y, (int, gmpy2.mpz)): + raise TypeError("share_y must be an integer") + if not isinstance(commitments, list) or not commitments: + raise TypeError("commitments must be a non-empty list") + + # Validate commitment format + if not all(isinstance(c, tuple) and len(c) >= 2 for c in commitments): + raise TypeError("commitments must be a list of (commitment, randomizer) tuples") + # Convert to integers and use redundant verification x, y = gmpy2.mpz(share_x), gmpy2.mpz(share_y) return secure_redundant_execution(self._verify_share_hash_based_single, x, y, commitments) @@ -1284,6 +1430,19 @@ def batch_verify_shares(self, shares, commitments): Outputs: tuple: (all_valid: bool, results: Dict mapping share indices to verification results). """ + # Input validation + if not isinstance(shares, list): + raise TypeError("shares must be a list of (x, y) tuples") + if not shares: + raise ValueError("shares list cannot be empty") + if not all(isinstance(s, tuple) and len(s) == 2 for s in shares): + raise TypeError("Each share must be a tuple of (x, y)") + + if not isinstance(commitments, list) or not commitments: + raise TypeError("commitments must be a non-empty list") + if not all(isinstance(c, tuple) and len(c) >= 2 for c in commitments): + raise TypeError("commitments must be a list of (commitment, randomizer) tuples") + results = {} all_valid = True @@ -1297,7 +1456,12 @@ def batch_verify_shares(self, shares, commitments): return all_valid, results # Extract randomizers for more efficient processing - randomizers = [r_i for _, r_i in commitments] + randomizers = [r_i for _, r_i, _ in commitments] + + # Extract extra_entropy if present (only for first coefficient) + extra_entropy = None + if len(commitments) > 0 and len(commitments[0]) > 2: + extra_entropy = commitments[0][2] # For larger batches, use optimized verification approach with caching # Precompute powers of x for each share to avoid redundant calculations @@ -1334,7 +1498,7 @@ def batch_verify_shares(self, shares, commitments): for i, (x, y, r_combined, expected_commitment) in enumerate(batch): idx = batch_start + i is_valid = self._verify_hash_based_commitment( - y, r_combined, x, expected_commitment + y, r_combined, x, expected_commitment, extra_entropy=extra_entropy ) results[idx] = is_valid @@ -1357,8 +1521,19 @@ def serialize_commitments(self, commitments): Outputs: str: String with base64-encoded serialized data with embedded checksum. """ + # Input validation + if not isinstance(commitments, list): + raise TypeError("commitments must be a list") + if not commitments: + raise ValueError("commitments list cannot be empty") + if not all(isinstance(c, tuple) and len(c) >= 2 for c in commitments): + raise TypeError("Each commitment must be a tuple with at least (commitment, randomizer)") + # Extract commitment values - commitment_values = [(int(c), int(r)) for c, r in commitments] + commitment_values = [ + (int(c), int(r), e.hex() if e else None) + for c, r, e in commitments + ] # Create the data structure result = { @@ -1367,7 +1542,7 @@ def serialize_commitments(self, commitments): "generator": int(self.generator), "prime": int(self.group.prime), "commitments": commitment_values, - "hash_based": True # Always hash-based in PQ-secure version + "hash_based": True } try: @@ -1400,6 +1575,12 @@ def deserialize_commitments(self, data): Outputs: tuple: (commitments, generator, prime, timestamp, is_hash_based). """ + # Input validation + if not isinstance(data, str): + raise TypeError("Data must be a string") + if not data: + raise ValueError("Data cannot be empty") + try: # Decode from URL-safe base64 decoded = urlsafe_b64decode(data.encode("utf-8")) @@ -1456,6 +1637,8 @@ def deserialize_commitments(self, data): if prime not in SAFE_PRIMES.values() and self.config.safe_prime: if not CyclicGroup._is_probable_prime(prime): raise SecurityError("Deserialized prime value failed primality test") + if self.config.safe_prime and not CyclicGroup._is_safe_prime(prime): # Add this check + raise SecurityError("Deserialized prime is not a safe prime") # Validate generator is in the correct range if generator <= 1 or generator >= prime - 1: @@ -1466,7 +1649,14 @@ def deserialize_commitments(self, data): raise SecurityError("Only hash-based commitments are supported in this post-quantum secure version") # Reconstruct hash-based commitments - commitments = [(gmpy2.mpz(c), gmpy2.mpz(r)) for c, r in commitments] + commitments = [ + ( + gmpy2.mpz(c), + gmpy2.mpz(r), + bytes.fromhex(e) if e else None + ) + for c, r, e in commitments + ] return commitments, gmpy2.mpz(generator), gmpy2.mpz(prime), timestamp, is_hash_based @@ -1493,6 +1683,14 @@ def verify_share_from_serialized(self, share_x, share_y, serialized_commitments) Outputs: bool: True if the share is valid, False otherwise. """ + # Input validation + if not isinstance(share_x, (int, gmpy2.mpz)): + raise TypeError("share_x must be an integer") + if not isinstance(share_y, (int, gmpy2.mpz)): + raise TypeError("share_y must be an integer") + if not isinstance(serialized_commitments, str) or not serialized_commitments: + raise TypeError("serialized_commitments must be a non-empty string") + try: # Deserialize the commitments commitments, generator, prime, timestamp, is_hash_based = self.deserialize_commitments( @@ -1575,6 +1773,27 @@ def refresh_shares(self, shares, threshold, total_shares, original_commitments=N Outputs: tuple: (new_shares, new_commitments, verification_data). """ + # Input validation + if not isinstance(shares, dict): + raise TypeError("shares must be a dictionary mapping participant IDs to (x, y) tuples") + if not all(isinstance(v, tuple) and len(v) == 2 for v in shares.values()): + raise TypeError("Each share must be a tuple of (x, y)") + + if not isinstance(threshold, int) or threshold < 2: + raise ValueError("threshold must be an integer >= 2") + + if not isinstance(total_shares, int) or total_shares < threshold: + raise ValueError("total_shares must be an integer >= threshold") + + if original_commitments is not None and not isinstance(original_commitments, list): + raise TypeError("original_commitments must be a list if provided") + + if participant_ids is not None: + if not isinstance(participant_ids, list): + raise TypeError("participant_ids must be a list if provided") + if len(participant_ids) != total_shares: + raise ValueError("Number of participant_ids must match total_shares") + if len(shares) < threshold: raise ParameterError(f"Need at least {threshold} shares to refresh") @@ -1689,8 +1908,50 @@ def _refresh_shares_additive(self, shares, threshold, total_shares, participant_ zero_commitments, zero_sharings, participant_ids ) - # Byzantine detection for each party + # Identify Byzantine parties with adaptive quorum-based detection + byzantine_parties = {} + # Calculate consistency statistics per party + consistency_counts = {} + for (party_id, _), is_consistent in echo_consistency.items(): + if party_id not in consistency_counts: + consistency_counts[party_id] = {"consistent": 0, "inconsistent": 0, "total": 0} + + consistency_counts[party_id]["total"] += 1 + if is_consistent: + consistency_counts[party_id]["consistent"] += 1 + else: + consistency_counts[party_id]["inconsistent"] += 1 + + # Adaptive quorum calculation based on threat model and participant count + # More participants = higher required consistency ratio + base_quorum_ratio = 0.5 # Start at 50% + consistency_ratio_requirement = min(0.8, base_quorum_ratio + 0.1 * (len(shares) / threshold - 1)) + + # Identify parties that failed to reach consistency quorum + for party_id, counts in consistency_counts.items(): + if counts["total"] > 0: + consistency_ratio = counts["consistent"] / counts["total"] + if consistency_ratio < consistency_ratio_requirement: + evidence = { + "type": "insufficient_consistency_quorum", + "consistency_ratio": consistency_ratio, + "required_ratio": consistency_ratio_requirement, + "consistent_count": counts["consistent"], + "inconsistent_count": counts["inconsistent"], + "total_checked": counts["total"] + } + byzantine_parties[party_id] = evidence + warnings.warn( + f"Party {party_id} failed to reach consistency quorum " + f"({consistency_ratio:.2f} < {consistency_ratio_requirement:.2f})", + SecurityWarning + ) + + # Standard Byzantine detection for each party for party_id in shares.keys(): + if party_id in byzantine_parties: + continue # Already identified as Byzantine + is_byzantine, evidence = self._detect_byzantine_behavior( party_id, zero_commitments[party_id], @@ -1699,12 +1960,10 @@ def _refresh_shares_additive(self, shares, threshold, total_shares, participant_ ) if is_byzantine: - # Log detection warnings.warn( - f"Detected Byzantine behavior from party {party_id}", + f"Detected Byzantine behavior from party {party_id}: {evidence.get('type', 'unknown')}", SecurityWarning ) - # Store evidence for later analysis byzantine_parties[party_id] = evidence # More efficient batch verification with adaptive batch sizing @@ -1718,8 +1977,12 @@ def _refresh_shares_additive(self, shares, threshold, total_shares, participant_ # Process verification with improved parallelism verification_results = self._process_verification_batches(verification_batches) - # Process verification results with enhanced security checks + # Process verification results with Byzantine exclusion for (party_id, p_id), is_valid in verification_results: + # Skip shares from Byzantine parties + if party_id in byzantine_parties: + continue + if is_valid and echo_consistency.get((party_id, p_id), True): # Store verified share with additional consistency check share_value = self._get_share_value_from_results( @@ -1806,6 +2069,8 @@ def _refresh_shares_additive(self, shares, threshold, total_shares, participant_ "participants_with_full_verification": sum(1 for p_id in participant_ids if len(verified_zero_shares[p_id]) == len(shares)), "potential_collusion_detected": bool(potential_collusion), + "byzantine_parties_excluded": len(byzantine_parties), + "byzantine_party_ids": list(byzantine_parties.keys()) if byzantine_parties else [], "security_parameters": { "min_verified_shares": min_verified_shares, "security_factor": security_factor @@ -2303,7 +2568,7 @@ def verify_batch(batch_items): else: # Fallback to individual verification return [(results[idx], self.verify_share(x, y, commitments)) - for idx, (party_id, p_id, x, y, commitments) in enumerate(batch_items)] + for idx, (party_id, p_id, x, y, commitments) in enumerate(batch_items)] # Try parallel verification with improved error handling try: @@ -2525,6 +2790,18 @@ def create_polynomial_proof(self, coefficients, commitments): Outputs: dict: Proof data structure containing the necessary components for verification. """ + # Add validation + if not isinstance(coefficients, list): + raise TypeError("coefficients must be a list") + if not coefficients: + raise ValueError("coefficients list cannot be empty") + if not isinstance(commitments, list): + raise TypeError("commitments must be a list") + if not commitments: + raise ValueError("commitments list cannot be empty") + if not all(isinstance(c, tuple) and len(c) >= 2 for c in commitments): + raise TypeError("Each commitment must be a tuple with at least (commitment, randomizer)") + # Convert coefficients to integers for consistent arithmetic coeffs_int = [gmpy2.mpz(coeff) % self.field.prime for coeff in coefficients] @@ -2590,6 +2867,14 @@ def verify_polynomial_proof(self, proof, commitments): Outputs: bool: True if verification succeeds, False otherwise. """ + # Add validation + if not isinstance(proof, dict): + raise TypeError("proof must be a dictionary") + if not isinstance(commitments, list): + raise TypeError("commitments must be a list") + if not commitments: + raise ValueError("commitments list cannot be empty") + # Extract proof components with parameter validation try: blinding_commitments = proof["blinding_commitments"] @@ -2600,6 +2885,20 @@ def verify_polynomial_proof(self, proof, commitments): except (KeyError, TypeError): warnings.warn("Incomplete or malformed proof structure", SecurityWarning) return False + + # Enhanced validation for proof structure + if not isinstance(blinding_commitments, list): + warnings.warn("blinding_commitments must be a list", SecurityWarning) + return False + if not all(isinstance(c, tuple) and len(c) >= 2 for c in blinding_commitments): + warnings.warn("Each blinding commitment must be a tuple with at least (commitment, randomizer)", SecurityWarning) + return False + if not isinstance(challenge, (int, gmpy2.mpz)): + warnings.warn("challenge must be an integer", SecurityWarning) + return False + if not isinstance(responses, list) or not all(isinstance(r, (int, gmpy2.mpz)) for r in responses): + warnings.warn("responses must be a list of integers", SecurityWarning) + return False # Validate that all component lists have the correct size if (len(responses) != len(commitments) or @@ -2730,6 +3029,18 @@ def detect_byzantine_party(self, party_id, commitments, shares, consistency_resu Outputs: tuple: (is_byzantine, evidence_details). """ + # Add validation + if not isinstance(party_id, (int, str)): + raise TypeError("party_id must be an integer or string") + if not isinstance(commitments, list): + raise TypeError("commitments must be a list") + if not commitments: + raise ValueError("commitments list cannot be empty") + if not isinstance(shares, dict): + raise TypeError("shares must be a dictionary") + if consistency_results is not None and not isinstance(consistency_results, dict): + raise TypeError("consistency_results must be a dictionary if provided") + return self._detect_byzantine_behavior(party_id, commitments, shares, consistency_results) def _evaluate_polynomial(self, coefficients, x): @@ -2868,6 +3179,12 @@ def _find_secure_pivot(self, matrix, col, n): """ Description: Find a non-zero pivot using side-channel resistant selection. + + This method implements a randomized pivot selection strategy that prevents + timing-based side-channel attacks during Gaussian elimination. Instead of + selecting the first suitable pivot (which would create timing variations), + it assigns random values to all potential pivots and selects one with minimal + random value, ensuring constant-time behavior regardless of matrix content. Arguments: matrix (list): The matrix being processed. @@ -2875,23 +3192,39 @@ def _find_secure_pivot(self, matrix, col, n): n (int): Matrix dimension. Inputs: - matrix: matrix - col: col - n: n + matrix: Matrix of coefficients. + col: Current column being processed. + n: Matrix dimension. + Outputs: int: Index of selected pivot row or None if no valid pivot exists. - """ - # Gather all valid pivot candidates - candidates = [] - for i in range(col, n): - if matrix[i][col] != 0: - candidates.append(i) - - # Use cryptographically secure random selection if candidates exist - if candidates: - return secrets.choice(candidates) - - return None + + Security properties: + - Constant-time with respect to the values in the matrix + - Uses cryptographically secure randomness via secrets.token_bytes() + - Resistant to timing side-channel attacks + - Prevents information leakage about matrix structure + """ + # Generate a single random block for all rows at once (more efficient) + range_size = n - col + all_random_bytes = secrets.token_bytes(32 * range_size) + + # Find the valid pivot with the smallest random value + min_value = float('inf') + pivot_row = None + + for k in range(range_size): + row = col + k + # Extract random value for this row + offset = k * 32 + row_random = int.from_bytes(all_random_bytes[offset:offset+32], byteorder='big') + + # Update minimum if valid pivot and has smaller random value + if matrix[row][col] != 0 and row_random < min_value: + min_value = row_random + pivot_row = row + + return pivot_row def create_commitments_with_proof(self, coefficients, context=None): """ @@ -2913,6 +3246,13 @@ def create_commitments_with_proof(self, coefficients, context=None): Outputs: tuple: (commitments, proof) where both are suitable for verification. """ + # Input validation + if not isinstance(coefficients, list) or not coefficients: + raise TypeError("coefficients must be a non-empty list") + + if context is not None and not isinstance(context, str): + raise TypeError("context must be a string if provided") + # Create commitments first commitments = self.create_commitments(coefficients, context) @@ -2938,6 +3278,23 @@ def verify_commitments_with_proof(self, commitments, proof): Outputs: bool: True if the proof is valid, False otherwise. """ + # Input validation + if not isinstance(commitments, list): + raise TypeError("commitments must be a list") + if not commitments: + raise ValueError("commitments list cannot be empty") + if not isinstance(proof, dict): + raise TypeError("proof must be a dictionary") + if not all(isinstance(c, tuple) and len(c) >= 2 for c in commitments): + raise TypeError("Each commitment must be a tuple with at least (commitment, randomizer)") + + # Validate proof has all required keys before proceeding + required_keys = ["blinding_commitments", "challenge", "responses", + "commitment_randomizers", "blinding_randomizers"] + if not all(key in proof for key in required_keys): + warnings.warn("Proof missing required keys", SecurityWarning) + return False + return self.verify_polynomial_proof(proof, commitments) def serialize_commitments_with_proof(self, commitments, proof): @@ -2956,6 +3313,27 @@ def serialize_commitments_with_proof(self, commitments, proof): Outputs: str: String with base64-encoded serialized data. """ + # Input validation + if not isinstance(commitments, list) or not commitments: + raise TypeError("commitments must be a non-empty list") + if not all(isinstance(c, tuple) and len(c) >= 2 for c in commitments): + raise TypeError("Each commitment must be a tuple of at least (commitment, randomizer)") + + # Add validation for proof parameter + if not isinstance(proof, dict): + raise TypeError("proof must be a dictionary") + + required_proof_keys = ["blinding_commitments", "challenge", "responses", + "commitment_randomizers", "blinding_randomizers", "timestamp"] + for key in required_proof_keys: + if key not in proof: + raise ValueError(f"proof is missing required key: {key}") + + if not isinstance(proof["blinding_commitments"], list) or not proof["blinding_commitments"]: + raise TypeError("proof['blinding_commitments'] must be a non-empty list") + if not all(isinstance(c, tuple) and len(c) >= 2 for c in proof["blinding_commitments"]): + raise TypeError("Each blinding commitment must be a tuple with at least (commitment, randomizer)") + # First serialize the commitments as before commitment_values = [(int(c), int(r)) for c, r in commitments] @@ -3001,6 +3379,12 @@ def deserialize_commitments_with_proof(self, data): Outputs: tuple: (commitments, proof, generator, prime, timestamp). """ + # Add validation + if not isinstance(data, str): + raise TypeError("data must be a string") + if not data: + raise ValueError("data cannot be empty") + try: # Decode and unpack the data decoded = urlsafe_b64decode(data.encode("utf-8")) @@ -3054,6 +3438,14 @@ def verify_share_with_proof(self, share_x, share_y, serialized_data): Outputs: tuple: (share_valid, proof_valid) indicating validation results. """ + # Input validation + if not isinstance(share_x, (int, gmpy2.mpz)): + raise TypeError("share_x must be an integer") + if not isinstance(share_y, (int, gmpy2.mpz)): + raise TypeError("share_y must be an integer") + if not isinstance(serialized_data, str) or not serialized_data: + raise TypeError("serialized_data must be a non-empty string") + try: # Deserialize the commitments and proof commitments, proof, generator, prime, timestamp = self.deserialize_commitments_with_proof( @@ -3092,6 +3484,16 @@ def get_feldman_vss(field, **kwargs): Outputs: FeldmanVSS: FeldmanVSS instance configured for post-quantum security. """ + # Add validation for field parameter + if field is None: + raise TypeError("field cannot be None") + + if not hasattr(field, 'prime'): + raise TypeError("field must have 'prime' attribute") + + if not isinstance(field.prime, (int, gmpy2.mpz)): + raise TypeError("field.prime must be an integer type") + config = kwargs.get("config", None) if config is None: @@ -3166,6 +3568,16 @@ def integrate_with_pedersen(feldman_vss, pedersen_vss, shares, coefficients): Outputs: dict: Dictionary with both Feldman and Pedersen verification data. """ + # Input validation + if not isinstance(feldman_vss, FeldmanVSS): + raise TypeError("feldman_vss must be a FeldmanVSS instance") + if not hasattr(pedersen_vss, 'create_commitments'): + raise TypeError("pedersen_vss must have a create_commitments method") + if not isinstance(shares, dict): + raise TypeError("shares must be a dictionary") + if not isinstance(coefficients, list) or not coefficients: + raise TypeError("coefficients must be a non-empty list") + # Generate Feldman commitments feldman_commitments = feldman_vss.create_commitments(coefficients) @@ -3217,6 +3629,29 @@ def create_dual_commitment_proof(feldman_vss, pedersen_vss, coefficients, Outputs: dict: Proof data structure. """ + # Input validation for all parameters + if not isinstance(feldman_vss, FeldmanVSS): + raise TypeError("feldman_vss must be a FeldmanVSS instance") + + if not hasattr(pedersen_vss, 'commit_to_blinding_factors'): + raise TypeError("pedersen_vss must have a 'commit_to_blinding_factors' method") + + if not hasattr(pedersen_vss, 'g') or not hasattr(pedersen_vss, 'h'): + raise TypeError("pedersen_vss must have 'g' and 'h' attributes") + + if not isinstance(coefficients, list) or not coefficients: + raise TypeError("coefficients must be a non-empty list") + + if not isinstance(feldman_commitments, list) or not feldman_commitments: + raise TypeError("feldman_commitments must be a non-empty list") + + if not isinstance(pedersen_commitments, list) or not pedersen_commitments: + raise TypeError("pedersen_commitments must be a non-empty list") + + if len(coefficients) != len(feldman_commitments) or len(coefficients) != len(pedersen_commitments): + raise ValueError("coefficients, feldman_commitments, and pedersen_commitments must have the same length") + + # Generate random blinding factors blindings = [feldman_vss.group.secure_random_element() for _ in range(len(coefficients))] @@ -3318,6 +3753,24 @@ def verify_dual_commitments(feldman_vss, pedersen_vss, feldman_commitments, Outputs: bool: True if verification succeeds, False otherwise. """ + # Input validation + if not isinstance(feldman_vss, FeldmanVSS): + raise TypeError("feldman_vss must be a FeldmanVSS instance") + if not hasattr(pedersen_vss, 'verify_response_equation'): + raise TypeError("pedersen_vss must have a verify_response_equation method") + if not isinstance(feldman_commitments, list) or not feldman_commitments: + raise TypeError("feldman_commitments must be a non-empty list") + if not isinstance(pedersen_commitments, list) or not pedersen_commitments: + raise TypeError("pedersen_commitments must be a non-empty list") + if not isinstance(proof, dict): + raise TypeError("proof must be a dictionary") + + # Required proof components + required_keys = ["feldman_blinding_commitments", "pedersen_blinding_commitments", + "challenge", "responses"] + if not all(key in proof for key in required_keys): + raise ValueError("Proof is missing required components") + # Extract proof components feldman_blinding_commitments = proof["feldman_blinding_commitments"] pedersen_blinding_commitments = proof["pedersen_blinding_commitments"]