Greatly improved LCG math and code

This commit is contained in:
Dionysus 2024-11-26 00:09:16 -05:00
parent 080d46ea3d
commit 9eadeb54b3
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE
3 changed files with 167 additions and 267 deletions

121
README.md
View File

@ -1,13 +1,12 @@
# PyLCG
> Linear Congruential Generator for IP Sharding
PyLCG is a Python implementation of a memory-efficient IP address sharding system using Linear Congruential Generators *(LCG)* for deterministic random number generation. This tool enables distributed scanning and network reconnaissance by efficiently dividing IP ranges across multiple machines.
PyLCG is a Python implementation of a memory-efficient IP address sharding system using Linear Congruential Generators *(LCG)* for deterministic random number generation. This tool aids in distributed scanning & network reconnaissance by efficiently dividing IP ranges across multiple machines while being in a pseudo-random order.
___
## Table of Contents
- [Project Origins & Purpose](#project-origins-and-purpose)
- [Overview](#overview)
- [How It Works](#how-it-works)
- [Understanding IP Addresses](#understanding-ip-addresses)
@ -20,20 +19,6 @@ ___
___
## Project Origins & Purpose
PyLCG was inspired by the elegant IP distribution system used in [masscan](https://github.com/robertdavidgraham/masscan), the popular mass IP port scanner. While masscan implements this logic as part of its larger codebase, I wanted to isolate and implement this specific component as a standalone Python library that developers can easily integrate into their own projects.
The goal was to create a clean, well-documented implementation that:
- Can be used as a drop-in solution for any project needing IP distribution capabilities
- Provides the same reliable mathematical foundation as masscan's approach
- Is easy to understand and modify for specific needs
- Works well with modern Python async patterns
By extracting this functionality into its own library, developers can add sophisticated IP distribution capabilities to their network tools without having to reinvent the wheel or extract code from larger projects.
___
## Overview
When performing network reconnaissance or scanning large IP ranges, it's often necessary to split the work across multiple machines. However, this presents several challenges:
@ -43,7 +28,7 @@ When performing network reconnaissance or scanning large IP ranges, it's often n
3. You need a way to resume scans if a machine fails
4. You can't load millions of IPs into memory at once
PyLCG solves these challenges through clever mathematics and efficient algorithms.
PyLCG solves these challenges through clever mathematics & efficient algorithms.
___
@ -53,9 +38,10 @@ ___
First, let's understand how IP addresses work in our system:
- An IP address like `192.168.1.1` is really just a 32-bit number
- An IP address like `192.168.1.1` is really just a 32-bit number equal to `3232235777` or `0xC0A80101` in hexadecimal
- A CIDR range like `192.168.0.0/16` represents a continuous range of these numbers
- For example, `192.168.0.0/16` includes all IPs from `192.168.0.0` to `192.168.255.255` *(65,536 addresses)*
- For example, `192.168.0.0/16` includes all IPs from `192.168.0.0` to `192.168.255.255` *(65,536 addresses)*
- The 32-bit number can be represented as `0xC0A80000` in hexadecimal & its from `3232235520` to `3232239103` in decimal
### The Magic of Linear Congruential Generators
@ -63,63 +49,47 @@ At the heart of PyLCG is something called a Linear Congruential Generator *(LCG)
Here's how it works:
1. Start with a number *(called the seed)*
2. Multiply it by a carefully chosen constant *(1597 in our case)*
3. Add another carefully chosen constant *(51749)*
4. Take the remainder when divided by 2^32
5. That's your next number! Repeat the process to get more numbers
1. Start with a number *(called the seed, which can be random)*
2. Multiply it by `1664525` & add `1013904223`
3. Take the remainder when divided by `2^32` *(the modulo operando)*
4. Repeat the process to continue the sequence
In mathematical notation:
```
Next_Number = (1597 * Current_Number + 51749) mod 2^32
###### Mathematical notation:
```math
Next_Number = (1664525 * Current_Number + 1013904223) mod 2^32
```
Why these specific numbers?
- `1597` and `51749` were chosen because they create a sequence that:
- Visits every possible number before repeating *(maximum period)*
- Spreads numbers evenly across the range
- Can be calculated quickly on computers
- `2^32` *(4,294,967,296)* is used because it:
- Matches the size of a 32-bit integer
- Is large enough to handle any IP range
- Makes calculations efficient on modern CPUs
###### Why these specific numbers?
The numbers `1664525` and `1013904223` are the multiplier and increment values used in a Linear Congruential Generator *(LCG)* for random number generation. This specific combination was featured in "Numerical Recipes in C" and became widely known through its use in glibc's rand() implementation.
### Sharding: Dividing the Work
Let's say you want to scan a /16 network *(65,536 IPs)* using 4 machines. Here's how PyLCG handles it:
PyLCG uses an interleaved sharding approach to ensure truly distributed scanning. Here's how it works:
1. **Division**: First, it divides the total IPs evenly:
- 65,536 ÷ 4 = 16,384 IPs per shard
- Machine 1: IPs 0-16,383
- Machine 2: IPs 16,384-32,767
- Machine 3: IPs 32,768-49,151
- Machine 4: IPs 49,152-65,535
1. **Interleaved Distribution**: Instead of dividing the IP range into sequential blocks, PyLCG distributes IPs across shards using an offset pattern:
- For 4 shards scanning a network:
- Shard 0 handles IPs at indices: 0, 4, 8, 12, ...
- Shard 1 handles IPs at indices: 1, 5, 9, 13, ...
- Shard 2 handles IPs at indices: 2, 6, 10, 14, ...
- Shard 3 handles IPs at indices: 3, 7, 11, 15, ...
2. **Randomization**: Within each shard, IPs are randomized using the LCG:
- Each IP index *(0 to 65,535)* is fed through the LCG
- The resulting numbers determine the scan order
- Because we use the same seed, this order is consistent across runs
2. **Randomization**: Within each shard, the LCG randomizes the order of IPs:
- Each index is fed through the LCG to generate a random value
- IPs are scanned in order of these random values
- The same seed ensures consistent ordering across runs
Example of how IPs might be ordered in Shard 1:
```
Original order: 0, 1, 2, 3, 4, 5...
LCG values: 51749, 134238, 297019, 12983...
Final order: 3, 5, 1, 4, 2, 0... (sorted by LCG values)
```
This approach ensures:
- Even distribution across the entire IP space
- No sequential scanning patterns that could trigger alerts
- Perfect distribution of work across shards
- Deterministic results that can be reproduced
### Memory-Efficient Processing
To handle large IP ranges without consuming too much memory, PyLCG uses several techniques:
1. **Chunked Processing**
Instead of loading all IPs at once, it processes them in chunks:
```python
# Example with chunk_size = 1000
Chunk 1: Process IPs 0-999
Chunk 2: Process IPs 1000-1999
...and so on
```
Instead of loading all IPs at once, it processes them in chunks.
2. **Lazy Generation**
- IPs are generated only when needed using Python's async generators
@ -133,28 +103,15 @@ To handle large IP ranges without consuming too much memory, PyLCG uses several
___
## Real-World Applications
## Roadmap
### Network Security Testing
Imagine you're testing the security of a large corporate network:
- You have 5 scanning machines
- You need to scan 1 million IPs
- You want to avoid triggering IDS/IPS systems
PyLCG helps by:
1. Dividing the IPs evenly across your 5 machines
2. Randomizing the scan order to avoid detection
3. Allowing you to pause/resume scans from any point
4. Using minimal memory on each machine
### Cloud-Based Scanning
In cloud environments, PyLCG is particularly useful:
- Easily scale up/down the number of scanning instances
- Each instance knows exactly which IPs to scan
- Consistent results across multiple runs
- Efficient resource usage keeps costs down
- [ ] Add support for IPv6
- [ ] Add support for custom LCG parameters like adding port numbers
- [ ] Add support for custom chunk sizes & auto-tuning based on available system resources
- [ ] Add support for resuming from a specific point in the sequence
- [ ] Add support for saving the state of the LCG to a file so you can resume later
- [ ] Add support for sharding line-based input files locally, from as s3 bucket, or from a URL by reading it in chunks.
- [ ] Update the unit tests to include benchmarks & better coverage for future efficiency improvements & validation.
___

116
pylcg.py
View File

@ -3,9 +3,8 @@
# pylcg.py
import argparse
import asyncio
import ipaddress
from math import ceil
import random
class LCG:
@ -13,33 +12,11 @@ class LCG:
def __init__(self, seed: int, m: int = 2**32):
self.m = m
self.a = 1597
self.c = 51749
self.seed = seed
self.a = 1664525
self.c = 1013904223
self.current = seed
def get_nth(self, n: int) -> int:
'''
Get the nth number in the sequence without generating previous numbers.
:param n: The index of the number to get
'''
# For large n, use the standard next() method to avoid modular arithmetic issues
if n > 1000:
self.current = self.seed
for _ in range(n):
self.next()
return self.current
# For smaller n, use direct calculation
result = self.seed
for _ in range(n):
result = (self.a * result + self.c) % self.m
return result
def next(self) -> int:
'''Generate next random number'''
@ -48,15 +25,14 @@ class LCG:
return self.current
class IPRange:
'''Memory-efficient IP range iterator'''
def __init__(self, cidr: str):
network = ipaddress.ip_network(cidr)
self.start = int(network.network_address)
self.end = int(network.broadcast_address)
self.total = self.end - self.start + 1
self.total = int(network.broadcast_address) - self.start + 1
def get_ip_at_index(self, index: int) -> str:
'''
@ -71,64 +47,68 @@ class IPRange:
return str(ipaddress.ip_address(self.start + index))
async def get_shard_ips(cidr: str, shard_num: int, total_shards: int, seed: int, chunk_size: int = 1000):
def ip_stream(cidr: str, shard_num: int = 1, total_shards: int = 1, seed: int = 0):
'''
Asynchronously generate IPs for the specified shard.
Stream random IPs from the CIDR range. Optionally supports sharding.
Each IP in the range will be yielded exactly once in a pseudo-random order.
:param cidr: The CIDR range to shard
:param shard_num: The number of the shard to generate
:param total_shards: The total number of shards
:param seed: The seed for the random number generator
:param chunk_size: The size of the chunks to process
:param cidr: Target IP range in CIDR format
:param shard_num: Shard number (1-based), defaults to 1
:param total_shards: Total number of shards, defaults to 1 (no sharding)
:param seed: Random seed for LCG (default: random)
'''
# Convert to 0-based indexing internally
shard_index = shard_num - 1
# Initialize the IP range and LCG
ip_range = IPRange(cidr)
lcg = LCG(seed)
total_ips = ip_range.total
# Initialize IP range and LCG
ip_range = IPRange(cidr)
# Calculate which indices belong to this shard
shard_size = ceil(total_ips / total_shards)
start_idx = shard_num * shard_size
end_idx = min(start_idx + shard_size, total_ips)
# Use random seed if none provided
if not seed:
seed = random.randint(0, 2**32-1)
# Process in chunks to maintain memory efficiency
for chunk_start in range(start_idx, end_idx, chunk_size):
chunk_end = min(chunk_start + chunk_size, end_idx)
chunk_indices = list(range(chunk_start, chunk_end))
# Initialize LCG
lcg = LCG(seed + shard_index)
# Generate random values for this chunk
chunk_random_values = [(i, lcg.get_nth(i)) for i in chunk_indices]
chunk_random_values.sort(key=lambda x: x[1])
# Calculate how many IPs this shard should generate
shard_size = ip_range.total // total_shards
# Yield IPs in randomized order
for idx, _ in chunk_random_values:
yield ip_range.get_ip_at_index(idx)
# Distribute remainder
if shard_index < (ip_range.total % total_shards):
shard_size += 1
# Allow other tasks to run (do we need this?)
await asyncio.sleep(0)
# Remaining IPs to yield
remaining = shard_size
while remaining > 0:
index = lcg.next() % ip_range.total
if total_shards == 1 or index % total_shards == shard_index:
yield ip_range.get_ip_at_index(index)
remaining -= 1
async def main():
parser = argparse.ArgumentParser(description='Async IP address sharding tool')
def main():
parser = argparse.ArgumentParser(description='Ultra-fast random IP address generator with optional sharding')
parser.add_argument('cidr', help='Target IP range in CIDR format')
parser.add_argument('shard_num', type=int, help='Shard number (0-based)')
parser.add_argument('total_shards', type=int, help='Total number of shards')
parser.add_argument('--seed', type=int, default=12345, help='Random seed for LCG')
parser.add_argument('--chunk-size', type=int, default=1000, help='Processing chunk size')
parser.add_argument('--shard-num', type=int, default=1, help='Shard number (1-based)')
parser.add_argument('--total-shards', type=int, default=1, help='Total number of shards (default: 1, no sharding)')
parser.add_argument('--seed', type=int, default=0, help='Random seed for LCG')
args = parser.parse_args()
if args.shard_num >= args.total_shards:
raise ValueError('Shard number must be less than total shards')
if args.total_shards < 1:
raise ValueError('Total shards must be at least 1')
if args.shard_num < 0 or args.total_shards < 1:
raise ValueError('Invalid shard configuration')
if args.shard_num > args.total_shards:
raise ValueError('Shard number must be less than or equal to total shards')
async for ip in get_shard_ips(args.cidr, args.shard_num, args.total_shards, args.seed, args.chunk_size):
if args.shard_num < 1:
raise ValueError('Shard number must be at least 1')
for ip in ip_stream(args.cidr, args.shard_num, args.total_shards, args.seed):
print(ip)
if __name__ == '__main__':
asyncio.run(main())
main()

View File

@ -1,15 +1,9 @@
#!/usr/bin/env python3
# Python implementation of a Linear Congruential Generator for IP Sharding - Developed by acidvegas in Python (https://git.acid.vegas/pylcg)
# pylcg.py
import unittest
import asyncio
import ipaddress
import sys
import time
from pylcg import IPRange, get_shard_ips, LCG
from pylcg import IPRange, ip_stream, LCG
# ANSI color codes
class Colors:
BLUE = '\033[94m'
GREEN = '\033[92m'
@ -18,155 +12,124 @@ class Colors:
RED = '\033[91m'
ENDC = '\033[0m'
def progress_bar(iteration: int, total: int, prefix: str = '', length: int = 50) -> None:
'''Simple progress bar using standard Python'''
percent = f"{100 * (iteration / float(total)):.1f}"
filled_length = int(length * iteration // total)
bar = '' * filled_length + '-' * (length - filled_length)
sys.stdout.write(f'\r{Colors.CYAN}{prefix} |{bar}| {percent}%{Colors.ENDC} ')
if iteration == total:
sys.stdout.write('\n')
sys.stdout.flush()
def print_header(message: str) -> None:
'''Print formatted header'''
print(f'\n{Colors.BLUE}{"="*80}')
print(f'\n\n{Colors.BLUE}{"="*80}')
print(f'TEST: {message}')
print(f'{"="*80}{Colors.ENDC}\n')
def print_success(message: str) -> None:
'''Print success message'''
print(f'{Colors.GREEN}{message}{Colors.ENDC}')
def print_info(message: str) -> None:
print(f"{Colors.CYAN} {message}{Colors.ENDC}")
def print_progress(message: str) -> None:
'''Print progress message'''
print(f"{Colors.YELLOW}{message}{Colors.ENDC}")
def print_warning(message: str) -> None:
print(f"{Colors.YELLOW}! {message}{Colors.ENDC}")
class TestIPSharder(unittest.TestCase):
@classmethod
def setUpClass(cls):
'''Set up test parameters'''
print_header('Setting up test environment')
cls.test_cidr = '192.0.0.0/16' # 65,536 IPs
cls.test_seed = 12345
cls.total_shards = 4
cls.chunk_size = 1000
# Calculate expected IPs
network = ipaddress.ip_network(cls.test_cidr)
cls.all_ips = {str(ip) for ip in network}
print_success(f"Initialized test environment with {len(cls.all_ips):,} IPs")
def setUp(self):
'''Create event loop for each test'''
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def tearDown(self):
'''Clean up event loop'''
self.loop.close()
async def collect_shard_ips(self, shard_num: int):
'''Helper to collect IPs from a shard'''
return {ip async for ip in get_shard_ips(self.test_cidr, shard_num, self.total_shards, self.test_seed, self.chunk_size)}
def test_ip_range_initialization(self):
'''Test IPRange class initialization and calculations'''
print_header('Testing IPRange initialization')
ip_range = IPRange(self.test_cidr)
start_time = time.perf_counter()
ip_range = IPRange(self.test_cidr)
self.assertEqual(ip_range.total, 65536)
print_success('IP range size correctly calculated')
first_ip = ip_range.get_ip_at_index(0)
last_ip = ip_range.get_ip_at_index(ip_range.total - 1)
print_success(f'IP range spans from {first_ip} to {last_ip}')
def test_shard_completeness(self):
'''Test that all IPs are covered exactly once across all shards'''
print_header('Testing shard completeness')
async def check_completeness():
seen_ips = set()
shard_sizes = []
for shard_num in range(self.total_shards):
progress_bar(shard_num, self.total_shards-1, prefix='Processing shards')
shard_ips = await self.collect_shard_ips(shard_num)
shard_sizes.append(len(shard_ips))
# Check for duplicates and overlap
self.assertEqual(len(shard_ips), len(set(shard_ips)),
f'Duplicates found in shard {shard_num}')
overlap = seen_ips & shard_ips
self.assertEqual(len(overlap), 0,
f'Overlap found with previous shards: {overlap}')
seen_ips.update(shard_ips)
# Verify all IPs are covered
self.assertEqual(seen_ips, self.all_ips,
'Not all IPs were covered by the shards')
print_success(f'All {len(self.all_ips):,} IPs were distributed across shards')
# Print distribution information
for i, size in enumerate(shard_sizes):
print(f"{Colors.CYAN}Shard {i}: {size:,} IPs{Colors.ENDC}")
self.loop.run_until_complete(check_completeness())
elapsed = time.perf_counter() - start_time
print_success(f'IP range initialization completed in {elapsed:.6f}s')
print_info(f'IP range spans from {first_ip} to {last_ip}')
print_info(f'Total IPs in range: {ip_range.total:,}')
def test_lcg_sequence(self):
'''Test LCG sequence generation and performance'''
print_header('Testing LCG sequence generation')
# Test sequence generation speed
lcg = LCG(seed=self.test_seed)
iterations = 1_000_000
# Test small sequence
small_n = 100
start_time = time.perf_counter()
small_result = lcg.get_nth(small_n)
small_time = time.perf_counter() - start_time
print_success(f'Small sequence (n={small_n:,}) generated in {small_time:.6f}s')
start_time = time.perf_counter()
for _ in range(iterations):
lcg.next()
elapsed = time.perf_counter() - start_time
# Test large sequence
large_n = 1_000_000
start_time = time.perf_counter()
large_result = lcg.get_nth(large_n)
large_time = time.perf_counter() - start_time
print_success(f'Large sequence (n={large_n:,}) generated in {large_time:.6f}s')
print_success(f'Generated {iterations:,} random numbers in {elapsed:.6f}s')
print_info(f'Average time per number: {(elapsed/iterations)*1000000:.2f} microseconds')
# Verify deterministic behavior
# Test deterministic behavior
lcg1 = LCG(seed=self.test_seed)
lcg2 = LCG(seed=self.test_seed)
print_progress('Verifying sequence determinism...')
for i in range(large_n):
if i % (large_n // 100) == 0: # Update progress every 1%
progress_bar(i, large_n, prefix='Verifying sequence')
lcg2.next()
progress_bar(large_n, large_n, prefix='Verifying sequence')
self.assertEqual(large_result, lcg2.current, 'LCG sequence is not deterministic')
print_success('LCG produces consistent results')
start_time = time.perf_counter()
for _ in range(1000):
self.assertEqual(lcg1.next(), lcg2.next())
elapsed = time.perf_counter() - start_time
print_success(f'Verified LCG determinism in {elapsed:.6f}s')
def test_shard_distribution(self):
print_header('Testing shard distribution and randomness')
# Test distribution across shards
sample_size = 65_536 # Full size for /16
shard_counts = {i: 0 for i in range(1, self.total_shards + 1)} # 1-based sharding
unique_ips = set()
duplicate_count = 0
start_time = time.perf_counter()
# Collect IPs from each shard
for shard in range(1, self.total_shards + 1): # 1-based sharding
ip_gen = ip_stream(self.test_cidr, shard, self.total_shards, self.test_seed)
shard_unique = set()
# Get all IPs from this shard
for ip in ip_gen:
if ip in unique_ips:
duplicate_count += 1
else:
unique_ips.add(ip)
shard_unique.add(ip)
shard_counts[shard] = len(shard_unique)
elapsed = time.perf_counter() - start_time
# Print distribution statistics
print_success(f'Generated {len(unique_ips):,} IPs in {elapsed:.6f}s')
print_info(f'Average time per IP: {(elapsed/len(unique_ips))*1000000:.2f} microseconds')
print_info(f'Unique IPs generated: {len(unique_ips):,}')
if duplicate_count > 0:
print_warning(f'Duplicates found: {duplicate_count:,} ({(duplicate_count/len(unique_ips))*100:.2f}%)')
expected_per_shard = sample_size // self.total_shards
for shard, count in shard_counts.items():
deviation = abs(count - expected_per_shard) / expected_per_shard * 100
print_info(f'Shard {shard}: {count:,} unique IPs ({deviation:.2f}% deviation from expected)')
# Test randomness by checking sequential patterns
ips_list = sorted([int(ipaddress.ip_address(ip)) for ip in list(unique_ips)[:1000]])
sequential_count = sum(1 for i in range(len(ips_list)-1) if ips_list[i] + 1 == ips_list[i+1])
sequential_percentage = (sequential_count / (len(ips_list)-1)) * 100
print_info(f'Sequential IP pairs in first 1000: {sequential_percentage:.2f}% (lower is more random)')
if __name__ == '__main__':
print(f"\n{Colors.CYAN}{'='*80}")
print(f"Starting IP Sharder Tests - Testing with {65536:,} IPs (/16 network)")
print(f"Starting IP Sharder Tests - Testing with 65,536 IPs (/16 network)")
print(f"{'='*80}{Colors.ENDC}\n")
unittest.main(verbosity=2)