114 lines
3.5 KiB
Python
114 lines
3.5 KiB
Python
#!/usr/bin/env python3
|
|
# Python implementation of a Linear Congruential Generator for IP Sharding - Developed by acidvegas in Python (https://git.acid.vegas/pylcg)
|
|
# pylcg.py
|
|
|
|
import argparse
|
|
import ipaddress
|
|
import random
|
|
|
|
|
|
class LCG:
|
|
'''Linear Congruential Generator for deterministic random number generation'''
|
|
|
|
def __init__(self, seed: int, m: int = 2**32):
|
|
self.m = m
|
|
self.a = 1664525
|
|
self.c = 1013904223
|
|
self.current = seed
|
|
|
|
|
|
def next(self) -> int:
|
|
'''Generate next random number'''
|
|
|
|
self.current = (self.a * self.current + self.c) % self.m
|
|
|
|
return self.current
|
|
|
|
|
|
class IPRange:
|
|
'''Memory-efficient IP range iterator'''
|
|
|
|
def __init__(self, cidr: str):
|
|
network = ipaddress.ip_network(cidr)
|
|
self.start = int(network.network_address)
|
|
self.total = int(network.broadcast_address) - self.start + 1
|
|
|
|
|
|
def get_ip_at_index(self, index: int) -> str:
|
|
'''
|
|
Get IP at specific index without generating previous IPs
|
|
|
|
:param index: The index of the IP to get
|
|
'''
|
|
|
|
if not 0 <= index < self.total:
|
|
raise IndexError('IP index out of range')
|
|
|
|
return str(ipaddress.ip_address(self.start + index))
|
|
|
|
|
|
def ip_stream(cidr: str, shard_num: int = 1, total_shards: int = 1, seed: int = 0):
|
|
'''
|
|
Stream random IPs from the CIDR range. Optionally supports sharding.
|
|
Each IP in the range will be yielded exactly once in a pseudo-random order.
|
|
|
|
:param cidr: Target IP range in CIDR format
|
|
:param shard_num: Shard number (1-based), defaults to 1
|
|
:param total_shards: Total number of shards, defaults to 1 (no sharding)
|
|
:param seed: Random seed for LCG (default: random)
|
|
'''
|
|
# Convert to 0-based indexing internally
|
|
shard_index = shard_num - 1
|
|
|
|
# Initialize IP range and LCG
|
|
ip_range = IPRange(cidr)
|
|
|
|
# Use random seed if none provided
|
|
if not seed:
|
|
seed = random.randint(0, 2**32-1)
|
|
|
|
# Initialize LCG
|
|
lcg = LCG(seed + shard_index)
|
|
|
|
# Calculate how many IPs this shard should generate
|
|
shard_size = ip_range.total // total_shards
|
|
|
|
# Distribute remainder
|
|
if shard_index < (ip_range.total % total_shards):
|
|
shard_size += 1
|
|
|
|
# Remaining IPs to yield
|
|
remaining = shard_size
|
|
|
|
while remaining > 0:
|
|
index = lcg.next() % ip_range.total
|
|
if total_shards == 1 or index % total_shards == shard_index:
|
|
yield ip_range.get_ip_at_index(index)
|
|
remaining -= 1
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Ultra-fast random IP address generator with optional sharding')
|
|
parser.add_argument('cidr', help='Target IP range in CIDR format')
|
|
parser.add_argument('--shard-num', type=int, default=1, help='Shard number (1-based)')
|
|
parser.add_argument('--total-shards', type=int, default=1, help='Total number of shards (default: 1, no sharding)')
|
|
parser.add_argument('--seed', type=int, default=0, help='Random seed for LCG')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.total_shards < 1:
|
|
raise ValueError('Total shards must be at least 1')
|
|
|
|
if args.shard_num > args.total_shards:
|
|
raise ValueError('Shard number must be less than or equal to total shards')
|
|
|
|
if args.shard_num < 1:
|
|
raise ValueError('Shard number must be at least 1')
|
|
|
|
for ip in ip_stream(args.cidr, args.shard_num, args.total_shards, args.seed):
|
|
print(ip)
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main() |