From 080d46ea3d427ce080e7b4d22a573a7e88454fee Mon Sep 17 00:00:00 2001 From: acidvegas Date: Mon, 25 Nov 2024 22:28:06 -0500 Subject: [PATCH] Initial commit --- README.md | 161 +++++++++++++++++++++++++++++++++++++++++++++++ pylcg.py | 134 +++++++++++++++++++++++++++++++++++++++ unit_test.py | 172 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 467 insertions(+) create mode 100644 README.md create mode 100644 pylcg.py create mode 100644 unit_test.py diff --git a/README.md b/README.md new file mode 100644 index 0000000..25c4a8f --- /dev/null +++ b/README.md @@ -0,0 +1,161 @@ +# PyLCG +> Linear Congruential Generator for IP Sharding + +PyLCG is a Python implementation of a memory-efficient IP address sharding system using Linear Congruential Generators *(LCG)* for deterministic random number generation. This tool enables distributed scanning and network reconnaissance by efficiently dividing IP ranges across multiple machines. + +___ + +## Table of Contents + +- [Project Origins & Purpose](#project-origins-and-purpose) +- [Overview](#overview) +- [How It Works](#how-it-works) + - [Understanding IP Addresses](#understanding-ip-addresses) + - [The Magic of Linear Congruential Generators](#the-magic-of-linear-congruential-generators) + - [Sharding: Dividing the Work](#sharding-dividing-the-work) + - [Memory-Efficient Processing](#memory-efficient-processing) +- [Real-World Applications](#real-world-applications) + - [Network Security Testing](#network-security-testing) + - [Cloud-Based Scanning](#cloud-based-scanning) + +___ + +## Project Origins & Purpose + +PyLCG was inspired by the elegant IP distribution system used in [masscan](https://github.com/robertdavidgraham/masscan), the popular mass IP port scanner. While masscan implements this logic as part of its larger codebase, I wanted to isolate and implement this specific component as a standalone Python library that developers can easily integrate into their own projects. + +The goal was to create a clean, well-documented implementation that: +- Can be used as a drop-in solution for any project needing IP distribution capabilities +- Provides the same reliable mathematical foundation as masscan's approach +- Is easy to understand and modify for specific needs +- Works well with modern Python async patterns + +By extracting this functionality into its own library, developers can add sophisticated IP distribution capabilities to their network tools without having to reinvent the wheel or extract code from larger projects. + +___ + +## Overview + +When performing network reconnaissance or scanning large IP ranges, it's often necessary to split the work across multiple machines. However, this presents several challenges: + +1. You want to ensure each machine works on a different part of the network *(no overlap)* +2. You want to avoid scanning IPs in sequence *(which can trigger security alerts)* +3. You need a way to resume scans if a machine fails +4. You can't load millions of IPs into memory at once + +PyLCG solves these challenges through clever mathematics and efficient algorithms. + +___ + +## How It Works + +### Understanding IP Addresses + +First, let's understand how IP addresses work in our system: + +- An IP address like `192.168.1.1` is really just a 32-bit number +- A CIDR range like `192.168.0.0/16` represents a continuous range of these numbers +- For example, `192.168.0.0/16` includes all IPs from `192.168.0.0` to `192.168.255.255` *(65,536 addresses)* + +### The Magic of Linear Congruential Generators + +At the heart of PyLCG is something called a Linear Congruential Generator *(LCG)*. Think of it as a mathematical recipe that generates a sequence of numbers that appear random but are actually predictable if you know the starting point *(seed)*. + +Here's how it works: + +1. Start with a number *(called the seed)* +2. Multiply it by a carefully chosen constant *(1597 in our case)* +3. Add another carefully chosen constant *(51749)* +4. Take the remainder when divided by 2^32 +5. That's your next number! Repeat the process to get more numbers + +In mathematical notation: +``` +Next_Number = (1597 * Current_Number + 51749) mod 2^32 +``` + +Why these specific numbers? + +- `1597` and `51749` were chosen because they create a sequence that: + - Visits every possible number before repeating *(maximum period)* + - Spreads numbers evenly across the range + - Can be calculated quickly on computers +- `2^32` *(4,294,967,296)* is used because it: + - Matches the size of a 32-bit integer + - Is large enough to handle any IP range + - Makes calculations efficient on modern CPUs + +### Sharding: Dividing the Work + +Let's say you want to scan a /16 network *(65,536 IPs)* using 4 machines. Here's how PyLCG handles it: + +1. **Division**: First, it divides the total IPs evenly: + - 65,536 ÷ 4 = 16,384 IPs per shard + - Machine 1: IPs 0-16,383 + - Machine 2: IPs 16,384-32,767 + - Machine 3: IPs 32,768-49,151 + - Machine 4: IPs 49,152-65,535 + +2. **Randomization**: Within each shard, IPs are randomized using the LCG: + - Each IP index *(0 to 65,535)* is fed through the LCG + - The resulting numbers determine the scan order + - Because we use the same seed, this order is consistent across runs + +Example of how IPs might be ordered in Shard 1: +``` +Original order: 0, 1, 2, 3, 4, 5... +LCG values: 51749, 134238, 297019, 12983... +Final order: 3, 5, 1, 4, 2, 0... (sorted by LCG values) +``` + +### Memory-Efficient Processing + +To handle large IP ranges without consuming too much memory, PyLCG uses several techniques: + +1. **Chunked Processing** + Instead of loading all IPs at once, it processes them in chunks: + ```python + # Example with chunk_size = 1000 + Chunk 1: Process IPs 0-999 + Chunk 2: Process IPs 1000-1999 + ...and so on + ``` + +2. **Lazy Generation** + - IPs are generated only when needed using Python's async generators + - The system yields one IP at a time rather than creating huge lists + - This keeps memory usage constant regardless of IP range size + +3. **Direct Calculation** + - The LCG can jump directly to any position in its sequence + - No need to generate all previous numbers + - Enables efficient random access to any part of the sequence + +___ + +## Real-World Applications + +### Network Security Testing + +Imagine you're testing the security of a large corporate network: +- You have 5 scanning machines +- You need to scan 1 million IPs +- You want to avoid triggering IDS/IPS systems + +PyLCG helps by: +1. Dividing the IPs evenly across your 5 machines +2. Randomizing the scan order to avoid detection +3. Allowing you to pause/resume scans from any point +4. Using minimal memory on each machine + +### Cloud-Based Scanning + +In cloud environments, PyLCG is particularly useful: +- Easily scale up/down the number of scanning instances +- Each instance knows exactly which IPs to scan +- Consistent results across multiple runs +- Efficient resource usage keeps costs down + +___ + +###### Mirrors for this repository: [acid.vegas](https://git.acid.vegas/pylcg) • [SuperNETs](https://git.supernets.org/acidvegas/pylcg) • [GitHub](https://github.com/acidvegas/pylcg) • [GitLab](https://gitlab.com/acidvegas/pylcg) • [Codeberg](https://codeberg.org/acidvegas/pylcg) diff --git a/pylcg.py b/pylcg.py new file mode 100644 index 0000000..d715762 --- /dev/null +++ b/pylcg.py @@ -0,0 +1,134 @@ +#!/usr/bin/env python3 +# Python implementation of a Linear Congruential Generator for IP Sharding - Developed by acidvegas in Python (https://git.acid.vegas/pylcg) +# pylcg.py + +import argparse +import asyncio +import ipaddress +from math import ceil + + +class LCG: + '''Linear Congruential Generator for deterministic random number generation''' + + def __init__(self, seed: int, m: int = 2**32): + self.m = m + self.a = 1597 + self.c = 51749 + self.seed = seed + self.current = seed + + + def get_nth(self, n: int) -> int: + ''' + Get the nth number in the sequence without generating previous numbers. + + :param n: The index of the number to get + ''' + + # For large n, use the standard next() method to avoid modular arithmetic issues + if n > 1000: + self.current = self.seed + for _ in range(n): + self.next() + return self.current + + # For smaller n, use direct calculation + result = self.seed + for _ in range(n): + result = (self.a * result + self.c) % self.m + return result + + + def next(self) -> int: + '''Generate next random number''' + + self.current = (self.a * self.current + self.c) % self.m + + return self.current + + + +class IPRange: + '''Memory-efficient IP range iterator''' + + def __init__(self, cidr: str): + network = ipaddress.ip_network(cidr) + self.start = int(network.network_address) + self.end = int(network.broadcast_address) + self.total = self.end - self.start + 1 + + def get_ip_at_index(self, index: int) -> str: + ''' + Get IP at specific index without generating previous IPs + + :param index: The index of the IP to get + ''' + + if not 0 <= index < self.total: + raise IndexError('IP index out of range') + + return str(ipaddress.ip_address(self.start + index)) + + +async def get_shard_ips(cidr: str, shard_num: int, total_shards: int, seed: int, chunk_size: int = 1000): + ''' + Asynchronously generate IPs for the specified shard. + + :param cidr: The CIDR range to shard + :param shard_num: The number of the shard to generate + :param total_shards: The total number of shards + :param seed: The seed for the random number generator + :param chunk_size: The size of the chunks to process + ''' + + # Initialize the IP range and LCG + ip_range = IPRange(cidr) + lcg = LCG(seed) + total_ips = ip_range.total + + # Calculate which indices belong to this shard + shard_size = ceil(total_ips / total_shards) + start_idx = shard_num * shard_size + end_idx = min(start_idx + shard_size, total_ips) + + # Process in chunks to maintain memory efficiency + for chunk_start in range(start_idx, end_idx, chunk_size): + chunk_end = min(chunk_start + chunk_size, end_idx) + chunk_indices = list(range(chunk_start, chunk_end)) + + # Generate random values for this chunk + chunk_random_values = [(i, lcg.get_nth(i)) for i in chunk_indices] + chunk_random_values.sort(key=lambda x: x[1]) + + # Yield IPs in randomized order + for idx, _ in chunk_random_values: + yield ip_range.get_ip_at_index(idx) + + # Allow other tasks to run (do we need this?) + await asyncio.sleep(0) + + +async def main(): + parser = argparse.ArgumentParser(description='Async IP address sharding tool') + parser.add_argument('cidr', help='Target IP range in CIDR format') + parser.add_argument('shard_num', type=int, help='Shard number (0-based)') + parser.add_argument('total_shards', type=int, help='Total number of shards') + parser.add_argument('--seed', type=int, default=12345, help='Random seed for LCG') + parser.add_argument('--chunk-size', type=int, default=1000, help='Processing chunk size') + + args = parser.parse_args() + + if args.shard_num >= args.total_shards: + raise ValueError('Shard number must be less than total shards') + + if args.shard_num < 0 or args.total_shards < 1: + raise ValueError('Invalid shard configuration') + + async for ip in get_shard_ips(args.cidr, args.shard_num, args.total_shards, args.seed, args.chunk_size): + print(ip) + + + +if __name__ == '__main__': + asyncio.run(main()) \ No newline at end of file diff --git a/unit_test.py b/unit_test.py new file mode 100644 index 0000000..23581b5 --- /dev/null +++ b/unit_test.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python3 +# Python implementation of a Linear Congruential Generator for IP Sharding - Developed by acidvegas in Python (https://git.acid.vegas/pylcg) +# pylcg.py + +import unittest +import asyncio +import ipaddress +import sys +import time +from pylcg import IPRange, get_shard_ips, LCG + +# ANSI color codes +class Colors: + BLUE = '\033[94m' + GREEN = '\033[92m' + YELLOW = '\033[93m' + CYAN = '\033[96m' + RED = '\033[91m' + ENDC = '\033[0m' + +def progress_bar(iteration: int, total: int, prefix: str = '', length: int = 50) -> None: + '''Simple progress bar using standard Python''' + + percent = f"{100 * (iteration / float(total)):.1f}" + filled_length = int(length * iteration // total) + bar = '█' * filled_length + '-' * (length - filled_length) + sys.stdout.write(f'\r{Colors.CYAN}{prefix} |{bar}| {percent}%{Colors.ENDC} ') + if iteration == total: + sys.stdout.write('\n') + sys.stdout.flush() + + +def print_header(message: str) -> None: + '''Print formatted header''' + + print(f'\n{Colors.BLUE}{"="*80}') + print(f'TEST: {message}') + print(f'{"="*80}{Colors.ENDC}\n') + + +def print_success(message: str) -> None: + '''Print success message''' + + print(f'{Colors.GREEN}✓ {message}{Colors.ENDC}') + + +def print_progress(message: str) -> None: + '''Print progress message''' + + print(f"{Colors.YELLOW}⟳ {message}{Colors.ENDC}") + + +class TestIPSharder(unittest.TestCase): + @classmethod + def setUpClass(cls): + '''Set up test parameters''' + print_header('Setting up test environment') + cls.test_cidr = '192.0.0.0/16' # 65,536 IPs + cls.test_seed = 12345 + cls.total_shards = 4 + cls.chunk_size = 1000 + + # Calculate expected IPs + network = ipaddress.ip_network(cls.test_cidr) + cls.all_ips = {str(ip) for ip in network} + print_success(f"Initialized test environment with {len(cls.all_ips):,} IPs") + + + def setUp(self): + '''Create event loop for each test''' + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + + def tearDown(self): + '''Clean up event loop''' + self.loop.close() + + + async def collect_shard_ips(self, shard_num: int): + '''Helper to collect IPs from a shard''' + + return {ip async for ip in get_shard_ips(self.test_cidr, shard_num, self.total_shards, self.test_seed, self.chunk_size)} + + + def test_ip_range_initialization(self): + '''Test IPRange class initialization and calculations''' + print_header('Testing IPRange initialization') + ip_range = IPRange(self.test_cidr) + + self.assertEqual(ip_range.total, 65536) + print_success('IP range size correctly calculated') + + first_ip = ip_range.get_ip_at_index(0) + last_ip = ip_range.get_ip_at_index(ip_range.total - 1) + print_success(f'IP range spans from {first_ip} to {last_ip}') + + + def test_shard_completeness(self): + '''Test that all IPs are covered exactly once across all shards''' + print_header('Testing shard completeness') + + async def check_completeness(): + seen_ips = set() + shard_sizes = [] + + for shard_num in range(self.total_shards): + progress_bar(shard_num, self.total_shards-1, prefix='Processing shards') + shard_ips = await self.collect_shard_ips(shard_num) + shard_sizes.append(len(shard_ips)) + + # Check for duplicates and overlap + self.assertEqual(len(shard_ips), len(set(shard_ips)), + f'Duplicates found in shard {shard_num}') + overlap = seen_ips & shard_ips + self.assertEqual(len(overlap), 0, + f'Overlap found with previous shards: {overlap}') + + seen_ips.update(shard_ips) + + # Verify all IPs are covered + self.assertEqual(seen_ips, self.all_ips, + 'Not all IPs were covered by the shards') + print_success(f'All {len(self.all_ips):,} IPs were distributed across shards') + + # Print distribution information + for i, size in enumerate(shard_sizes): + print(f"{Colors.CYAN}Shard {i}: {size:,} IPs{Colors.ENDC}") + + self.loop.run_until_complete(check_completeness()) + + + def test_lcg_sequence(self): + '''Test LCG sequence generation and performance''' + + print_header('Testing LCG sequence generation') + + lcg = LCG(seed=self.test_seed) + + # Test small sequence + small_n = 100 + start_time = time.perf_counter() + small_result = lcg.get_nth(small_n) + small_time = time.perf_counter() - start_time + print_success(f'Small sequence (n={small_n:,}) generated in {small_time:.6f}s') + + # Test large sequence + large_n = 1_000_000 + start_time = time.perf_counter() + large_result = lcg.get_nth(large_n) + large_time = time.perf_counter() - start_time + print_success(f'Large sequence (n={large_n:,}) generated in {large_time:.6f}s') + + # Verify deterministic behavior + lcg2 = LCG(seed=self.test_seed) + print_progress('Verifying sequence determinism...') + for i in range(large_n): + if i % (large_n // 100) == 0: # Update progress every 1% + progress_bar(i, large_n, prefix='Verifying sequence') + lcg2.next() + progress_bar(large_n, large_n, prefix='Verifying sequence') + + self.assertEqual(large_result, lcg2.current, 'LCG sequence is not deterministic') + print_success('LCG produces consistent results') + + + +if __name__ == '__main__': + print(f"\n{Colors.CYAN}{'='*80}") + print(f"Starting IP Sharder Tests - Testing with {65536:,} IPs (/16 network)") + print(f"{'='*80}{Colors.ENDC}\n") + unittest.main(verbosity=2) \ No newline at end of file