# Author: perp import argparse import aiohttp import asyncio from rich.tree import Tree from rich.text import Text from rich import print as rprint class Yoink: """ Yoink is a program to verify Shodan keys """ def __init__(self, file, filt, output): """ Constructor. Args: file: str, Shodan keys filt: str, Filter for Shodan keys output: str, Output file to write """ self.file = file self.filter = filt self.output = output self.tree = Tree("[bold yellow]Shodan Keys", guide_style="bold yellow") self.tasks = [] self.valid = [] self.checked = 0 async def add_tree(self, key, query_credits, scan_credits, plan): """ Add the key to a tree. Args: key: str, Current valid key query_credits: int, Query credits left scan_credits: int, Scan credits left plan: str, Current plan """ self.valid.append(key) text = self.tree.add(f"[green]Key: [bold blue]{key}") text.add(f"[green]Plan: [blue]{plan}") text.add(f"[green]Query Credits: [blue]{query_credits}") text.add(f"[blue]Scan Credits: [green]{scan_credits}") async def verify(self, session, key): """ Verify the key by a GET request. Args: session: object, AioHTTP ClientSession key: str, Shodan key """ # Loop forever (Ratelimit retries) while True: # GET request to API async with session.get(f"https://api.shodan.io/api-info?key={key}") as request: # 200 worked if request.status == 200: # Parse the JSON json = await request.json() query_credits = json["query_credits"] scan_credits = json["scan_credits"] plan = json["plan"] # We don't want empty credits if scan_credits != 0 or query_credits != 0: # Check the plan with filter match plan: case "oss": if self.filter == "oss": await self.add_tree(key, query_credits, scan_credits, plan) case "dev": if self.filter == "dev": await self.add_tree(key, query_credits, scan_credits, plan) case "edu": if self.filter == "edu": await self.add_tree(key, query_credits, scan_credits, plan) case _: await self.add_tree(key, query_credits, scan_credits, plan) break # 429 ratelimited elif request.status == 429: await asyncio.sleep(2) continue self.checked += 1 async def run(self): """ Creates AioHTTP ClientSession, open the file, add each key as a task, gathers the tasks. """ # Create a reusable session async with aiohttp.ClientSession() as session: # Open the keys file with open(self.file, "r") as file: # Create each task & append for key in file.readlines(): task = asyncio.ensure_future(self.verify(session, key.strip())) self.tasks.append(task) # Start each task await asyncio.gather(*self.tasks) rprint(self.tree) rprint(f"Checked [bold blue]{self.checked}[reset], Valid/Filtered [bold green]{len(self.valid)}[reset]") if self.output != None: # Save to output file with open(self.output, "a") as file: for key in self.valid: file.write(key+"\n") if __name__ == "__main__": # Add arguments & parse parser = argparse.ArgumentParser(description="Yoink is a program to verify Shodan keys") parser.add_argument("-f", "--file", help="Path to Shodan keys", required=True) parser.add_argument("-t", "--filter", help="Filter type of Shodan plan (oss, dev, edu)", required=False) parser.add_argument("-o", "--output", help="Path to output", required=False) args = parser.parse_args() # Create event loop & run loop = asyncio.new_event_loop() loop.run_until_complete(Yoink(args.file, args.filter, args.output).run())