pylcg/unit_test.py

172 lines
5.9 KiB
Python
Raw Normal View History

2024-11-26 03:28:06 +00:00
#!/usr/bin/env python3
# Python implementation of a Linear Congruential Generator for IP Sharding - Developed by acidvegas in Python (https://git.acid.vegas/pylcg)
# pylcg.py
import unittest
import asyncio
import ipaddress
import sys
import time
from pylcg import IPRange, get_shard_ips, LCG
# ANSI color codes
class Colors:
BLUE = '\033[94m'
GREEN = '\033[92m'
YELLOW = '\033[93m'
CYAN = '\033[96m'
RED = '\033[91m'
ENDC = '\033[0m'
def progress_bar(iteration: int, total: int, prefix: str = '', length: int = 50) -> None:
'''Simple progress bar using standard Python'''
percent = f"{100 * (iteration / float(total)):.1f}"
filled_length = int(length * iteration // total)
bar = '' * filled_length + '-' * (length - filled_length)
sys.stdout.write(f'\r{Colors.CYAN}{prefix} |{bar}| {percent}%{Colors.ENDC} ')
if iteration == total:
sys.stdout.write('\n')
sys.stdout.flush()
def print_header(message: str) -> None:
'''Print formatted header'''
print(f'\n{Colors.BLUE}{"="*80}')
print(f'TEST: {message}')
print(f'{"="*80}{Colors.ENDC}\n')
def print_success(message: str) -> None:
'''Print success message'''
print(f'{Colors.GREEN}{message}{Colors.ENDC}')
def print_progress(message: str) -> None:
'''Print progress message'''
print(f"{Colors.YELLOW}{message}{Colors.ENDC}")
class TestIPSharder(unittest.TestCase):
@classmethod
def setUpClass(cls):
'''Set up test parameters'''
print_header('Setting up test environment')
cls.test_cidr = '192.0.0.0/16' # 65,536 IPs
cls.test_seed = 12345
cls.total_shards = 4
cls.chunk_size = 1000
# Calculate expected IPs
network = ipaddress.ip_network(cls.test_cidr)
cls.all_ips = {str(ip) for ip in network}
print_success(f"Initialized test environment with {len(cls.all_ips):,} IPs")
def setUp(self):
'''Create event loop for each test'''
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def tearDown(self):
'''Clean up event loop'''
self.loop.close()
async def collect_shard_ips(self, shard_num: int):
'''Helper to collect IPs from a shard'''
return {ip async for ip in get_shard_ips(self.test_cidr, shard_num, self.total_shards, self.test_seed, self.chunk_size)}
def test_ip_range_initialization(self):
'''Test IPRange class initialization and calculations'''
print_header('Testing IPRange initialization')
ip_range = IPRange(self.test_cidr)
self.assertEqual(ip_range.total, 65536)
print_success('IP range size correctly calculated')
first_ip = ip_range.get_ip_at_index(0)
last_ip = ip_range.get_ip_at_index(ip_range.total - 1)
print_success(f'IP range spans from {first_ip} to {last_ip}')
def test_shard_completeness(self):
'''Test that all IPs are covered exactly once across all shards'''
print_header('Testing shard completeness')
async def check_completeness():
seen_ips = set()
shard_sizes = []
for shard_num in range(self.total_shards):
progress_bar(shard_num, self.total_shards-1, prefix='Processing shards')
shard_ips = await self.collect_shard_ips(shard_num)
shard_sizes.append(len(shard_ips))
# Check for duplicates and overlap
self.assertEqual(len(shard_ips), len(set(shard_ips)),
f'Duplicates found in shard {shard_num}')
overlap = seen_ips & shard_ips
self.assertEqual(len(overlap), 0,
f'Overlap found with previous shards: {overlap}')
seen_ips.update(shard_ips)
# Verify all IPs are covered
self.assertEqual(seen_ips, self.all_ips,
'Not all IPs were covered by the shards')
print_success(f'All {len(self.all_ips):,} IPs were distributed across shards')
# Print distribution information
for i, size in enumerate(shard_sizes):
print(f"{Colors.CYAN}Shard {i}: {size:,} IPs{Colors.ENDC}")
self.loop.run_until_complete(check_completeness())
def test_lcg_sequence(self):
'''Test LCG sequence generation and performance'''
print_header('Testing LCG sequence generation')
lcg = LCG(seed=self.test_seed)
# Test small sequence
small_n = 100
start_time = time.perf_counter()
small_result = lcg.get_nth(small_n)
small_time = time.perf_counter() - start_time
print_success(f'Small sequence (n={small_n:,}) generated in {small_time:.6f}s')
# Test large sequence
large_n = 1_000_000
start_time = time.perf_counter()
large_result = lcg.get_nth(large_n)
large_time = time.perf_counter() - start_time
print_success(f'Large sequence (n={large_n:,}) generated in {large_time:.6f}s')
# Verify deterministic behavior
lcg2 = LCG(seed=self.test_seed)
print_progress('Verifying sequence determinism...')
for i in range(large_n):
if i % (large_n // 100) == 0: # Update progress every 1%
progress_bar(i, large_n, prefix='Verifying sequence')
lcg2.next()
progress_bar(large_n, large_n, prefix='Verifying sequence')
self.assertEqual(large_result, lcg2.current, 'LCG sequence is not deterministic')
print_success('LCG produces consistent results')
if __name__ == '__main__':
print(f"\n{Colors.CYAN}{'='*80}")
print(f"Starting IP Sharder Tests - Testing with {65536:,} IPs (/16 network)")
print(f"{'='*80}{Colors.ENDC}\n")
unittest.main(verbosity=2)