Updated most of the proxy tools and added a few scripts I had laying around. Tor work has improved

This commit is contained in:
Dionysus 2023-07-25 20:07:32 -04:00
parent e0814f7a50
commit 42b0bb85fb
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE
10 changed files with 278 additions and 342 deletions

View File

@ -9,4 +9,4 @@
___ ___
###### Mirrors ###### Mirrors
[acid.vegas](https://git.acid.vegas/proxytools) • [GitHub](https://github.com/acidvegas/proxytools) • [GitLab](https://gitlab.com/acidvegas/proxytools) • [SourceHut](https://git.sr.ht/~acidvegas/proxytools) • [SuperNETs](https://git.supernets.org/acidvegas/proxytools) [acid.vegas](https://git.acid.vegas/proxytools) • [GitHub](https://github.com/acidvegas/proxytools) • [GitLab](https://gitlab.com/acidvegas/proxytools) • [SuperNETs](https://git.supernets.org/acidvegas/proxytools)

96
checkdnsbl.sh Normal file
View File

@ -0,0 +1,96 @@
#!/bin/bash
#################################################################################
## checkdnsbl.sh by rojo (rojo @ headcandy.org) and
## outsider (outsider @ scarynet.org) and
## remco (remco @ webconquest.com)
##
## LICENSE AGREEMENT
## By using this script, you are implying acceptance of the idea that this script
## is a stimulating piece of prose. As such, PLEASE DO NOT PLAGIARIZE IT. As
## long as you give me credit for my work, feel free to redistribute / make a
## profit / rewrite / whatever you wish to the script. Just don't mess it up
## and pretend that the bug was my fault. My code is bug-free, dammit!
##
## syntax: /usr/local/sbin/checkdnsbl.sh ip_addr
## where ip_addr is a valid four-octet IPv4 address
## * exits 0 if a match is found; exits 1 for no match
## * intended to be called from /etc/hosts.deny via aclexec
##
## example hosts.deny:
#
# sshd : 10.0.0.0/24, 127.0.0.1 : allow
# ALL : 192.168.0.0/32 : deny
# ALL EXCEPT httpd : ALL : aclexec /usr/local/sbin/checkdnsbl %a
#
## This will deny connections from DNSBL-flagged hosts, and assume the rest are
## safe. MAKE SURE THAT THIS SCRIPT IS RUN AFTER ALL EXPLICITLY DEFINED
## ADDRESSES! After tcpwrappers spawns this script, the connection is either
## passed or failed, with no further rule matching.
##
## As of the writing of this script, aclexec in hosts.allow allows every client
## to connect, regardless of returned exit code. This script will NOT work if
## called from hosts.allow. It should only be called from hosts.deny.
##
## To test whether this script works, try binding to a banned address. Both
## dronebl.org and spamhaus.org, for example, include 127.0.0.2 in their
## databases for testing. So, if this script monitors ssh connections, and such
## a service exists in your array of DNSBL hosts, try the following command:
# ssh -o BindAddress=127.0.0.2 localhost
## If all works as intended, you should see "ssh_exchange_identification:
## Connection closed by remote host." And so will other blacklisted clients.
#################################################################################
# DNSBL[x] -- array of DNSBL hosts to query
DNSBL[0]="dnsbl.dronebl.org"
DNSBL[1]="rbl.efnetrbl.org"
DNSBL[2]="dnsbl.swiftbl.net"
DNSBL[3]="combined.abuse.ch"
DNSBL[4]="bogons.cymru.com"
# Number of minutes to cache queries
QUERY_EXPIRE=5
# Location for cache
CACHE_FOLDER="/tmp/checkdnsbl"
# UMASK value for created files and directory
UMASK="077"
################################# stop editing ##################################
IPADDR=`echo $1 | sed -r -e 's/^::ffff://'`
IP_BACKWARD=`host $IPADDR|grep -E -o -e '[0-9a-f\.]+\.(in-addr|ip6)\.arpa'|sed -r -e 's/\.i.+$//'`
umask $UMASK
if [ ! -d "$CACHE_FOLDER" ]; then mkdir $CACHE_FOLDER;
elif [ -f "$CACHE_FOLDER/$IPADDR-0" ]; then {
echo CACHED: $IPADDR found in `cat $CACHE_FOLDER/$IPADDR-0`
exit 0
};
elif [ -f "$CACHE_FOLDER/$IPADDR-1" ]; then {
echo CACHED: $IPADDR not found in any DNSBLs.
exit 1
}; fi
for (( x=0; x<${#DNSBL[@]}; x++ )); do {
DNSBLQUERY=$IP_BACKWARD.${DNSBL[$x]}
echo -n "checking $DNSBLQUERY... "
DNSBLOUT=`host $DNSBLQUERY | grep -E -o -e '[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$'`
if [ "$DNSBLOUT" != "" ]; then
echo "MATCH: $DNSBLOUT"
echo "${DNSBL[$x]} : $DNSBLOUT" >>$CACHE_FOLDER/$IPADDR-0
sleep $(( $QUERY_EXPIRE * 60 )) && {
rm -f $CACHE_FOLDER/$IPADDR-0
} &
exit 0
else
echo "no match."
fi
}; done
touch $CACHE_FOLDER/$IPADDR-1
sleep $(( $QUERY_EXPIRE * 60 )) && {
rm -f $CACHE_FOLDER/$IPADDR-1
} &
exit 1

View File

@ -1,48 +1,16 @@
#!/usr/bin/env python #!/usr/bin/env python
# FloodBL - Developed by acidvegas in Python (https://git.acid.vegas/proxytools) # FloodBL - Developed by acidvegas in Python (https://git.acid.vegas/proxytools)
'''
Notes for future improvement:
To query an IPv6 address, you must expand it, then reverse it into "nibble" format.
e.g. if the IP was 2001:db8::1, you expand it to 2001:0db8:0000:0000:0000:0000:0000:0001 and reverse it.
In nibble format it is 1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2 and add on the dns blacklist you require.
e.g. 1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2.tor.dan.me.uk
If the IP has a match, the DNS server will respond with an "A" record of 127.0.0.100.
It will also respond with a "TXT" record with extra information as per below:
N:<nodename>/P:<port1[,port2]>/F:<flags>
port1 is the OR (onion router) port, port2 (if specified) is the DR (directory) port.
Flags are defined as follows:
E Exit
X Hidden Exit
A Authority
B BadExit
C NoEdConsensus
D V2Dir
F Fast
G Guard
H HSDir
N Named
R Running
S Stable
U Unnamed
V Valid
'''
import argparse import argparse
import concurrent.futures import concurrent.futures
import ipaddress
import os import os
import re import re
import socket
try: try:
import dns.resolver import dns.resolver
except ImportError: except ImportError:
raise SystemExit('error: missing required \'dnspython\' library (https://pypi.org/project/dnspython/)') raise SystemExit('missing required \'dnspython\' library (https://pypi.org/project/dnspython/)')
# Globals # Globals
good = list() good = list()
@ -67,36 +35,22 @@ blackholes = {
'15' : 'Compromised router / gateway', '15' : 'Compromised router / gateway',
'16' : 'Autorooting worms', '16' : 'Autorooting worms',
'17' : 'Automatically determined botnet IPs (experimental)', '17' : 'Automatically determined botnet IPs (experimental)',
'18' : 'DNS/MX type' '18' : 'DNS/MX type',
'19' : 'Abused VPN Service',
'255': 'Uncategorzied threat class'
}, },
# 'rbl.efnetrbl.org': { # NOTE: Most IRC networks use DroneBL, un-comment this section to check the EFnetRBL 'rbl.efnetrbl.org': {
# '1' : "Open Proxy", '1' : "Open Proxy",
# '2' : "spamtrap666", '2' : "spamtrap666",
# '3' : "spamtrap50", '3' : "spamtrap50",
# '4' : "TOR", '4' : "TOR",
# '5' : "Drones / Flooding" '5' : "Drones / Flooding"
# }, }
# 'torexit.dan.me.uk': { # TODO: The require a TXT lookup, although IRC daemons do numeric replies...will look into this
# 'E' : 'Exit',
# 'X' : 'Hidden Exit',
# 'A' : 'Authority',
# 'B' : 'BadExit',
# 'C' : 'NoEdConsensus',
# 'D' : 'V2Dir',
# 'F' : 'Fast',
# 'G' : 'Guard',
# 'H' : 'HSDir',
# 'N' : 'Named',
# 'R' : 'Running',
# 'S' : 'Stable',
# 'U' : 'Unnamed',
# 'V' : 'Valid'
# }
} }
def check(proxy): def check(proxy):
proxy_ip = proxy.split(':')[0] proxy_ip = proxy.split(':')[0]
formatted_ip = '.'.join(proxy_ip.split('.')[::-1]) formatted_ip = ipaddress.ip_address(proxy_ip).reverse_pointer
for blackhole in blackholes: for blackhole in blackholes:
try: try:
results = dns.resolver.resolve(f'{formatted_ip}.{blackhole}', 'A') results = dns.resolver.resolve(f'{formatted_ip}.{blackhole}', 'A')
@ -112,7 +66,7 @@ def check(proxy):
unknown.append(proxy) unknown.append(proxy)
else: else:
print(f'{proxy_ip.ljust(15)} \033[1;30m|\033[0m {blackhole.ljust(17)} \033[1;30m|\033[0m Error (No results)') print(f'{proxy_ip.ljust(15)} \033[1;30m|\033[0m {blackhole.ljust(17)} \033[1;30m|\033[0m Error (No results)')
unkown.append(proxy) unknown.append(proxy)
except Exception as ex: except Exception as ex:
print(f'{proxy_ip.ljust(15)} \033[1;30m|\033[0m {blackhole.ljust(17)} \033[1;30m|\033[0m \033[1;32mGOOD\033[0m') print(f'{proxy_ip.ljust(15)} \033[1;30m|\033[0m {blackhole.ljust(17)} \033[1;30m|\033[0m \033[1;32mGOOD\033[0m')
if proxy not in bad: if proxy not in bad:
@ -134,7 +88,7 @@ args = parser.parse_args()
if not os.path.isfile(args.input): if not os.path.isfile(args.input):
raise SystemExit('no such input file') raise SystemExit('no such input file')
initial = len(open(args.input).readlines()) initial = len(open(args.input).readlines())
proxies = set([proxy.split(':')[0] for proxy in re.findall('[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+', open(args.input).read(), re.MULTILINE)]) proxies = set([proxy.split(':')[0] for proxy in re.findall('[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+', open(args.input).read(), re.MULTILINE)]) # TODO: handle IPv6 better
if not proxies: if not proxies:
raise SystemExit('no proxies found from input file') raise SystemExit('no proxies found from input file')
with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor: with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor:

20
shellscrape Executable file
View File

@ -0,0 +1,20 @@
#!/bin/env bash
# shellscrape - developed by acidvegas (https://git.acid.vegas/proxytools)
URLS=(
"https://raw.githubusercontent.com/ALIILAPRO/Proxy/main/socks4.txt"
"https://raw.githubusercontent.com/ALIILAPRO/Proxy/main/socks5.txt"
"https://raw.githubusercontent.com/B4RC0DE-TM/proxy-list/main/SOCKS4.txt"
"https://raw.githubusercontent.com/B4RC0DE-TM/proxy-list/main/SOCKS5.txt"
"https://raw.githubusercontent.com/hookzof/socks5_list/master/proxy.txt"
"https://raw.githubusercontent.com/HyperBeats/proxy-list/main/socks4.txt"
"https://raw.githubusercontent.com/HyperBeats/proxy-list/main/socks5.txt"
)
[ -f proxies.txt ] >proxies.txt
for URL in "${URLS[@]}"; do
echo "Downloading from $URL"
curl -s $URL >> proxies.txt &
done
sort -u -o proxies.txt proxies.txt
echo "done"

View File

@ -1,23 +1,15 @@
#!/usr/bin/env python #!/usr/bin/env python
# SockSpot Proxy Scraper - Developed by acidvegas in Python (https://git.acid.vegas/proxytools) # SockHub Proxy Scraper - Developed by acidvegas in Python (https://git.acid.vegas/proxytools)
'''
Scrap IP:PORT proxies from a URL list
'''
import concurrent.futures
import os import os
import re import re
import time
import urllib.request import urllib.request
# Can be any URL containing a list of IP:PORT proxies (does not have to be socks5) # Can be any URL containing a list of IP:PORT proxies (does not have to be socks5)
# The current list contains proxy sources that are updated frequently with new proxies # The current list contains proxy sources that are updated frequently with new proxies
# Almost all of the Github repos pull from the same place & contain duplicates (which are removed) # Almost all of the Github repos pull from the same place & contain duplicates (which are removed)
urls = set(( urls = set((
'https://api.openproxylist.xyz/socks4.txt' 'https://api.openproxylist.xyz/socks4.txt',
'https://api.openproxylist.xyz/socks5.txt', 'https://api.openproxylist.xyz/socks5.txt',
'https://api.proxyscrape.com/?request=displayproxies&proxytype=socks4', 'https://api.proxyscrape.com/?request=displayproxies&proxytype=socks4',
'https://api.proxyscrape.com/v2/?request=displayproxies&protocol=socks4', 'https://api.proxyscrape.com/v2/?request=displayproxies&protocol=socks4',
@ -76,10 +68,11 @@ urls = set((
'https://spys.one/en/socks-proxy-list/' 'https://spys.one/en/socks-proxy-list/'
)) ))
def get_source(url): def get_source(url: str) -> str:
''' Get the source of a URL using a Googlebot user-agent. '''
req = urllib.request.Request(url) req = urllib.request.Request(url)
req.add_header('User-Agent', 'Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)') req.add_header('User-Agent', 'Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)')
source = urllib.request.urlopen(req, timeout=10) source = urllib.request.urlopen(req, timeout=15)
return source.read().decode() return source.read().decode()
# Main # Main

View File

@ -1,131 +0,0 @@
#!/usr/bin/env python
# SockSpot - Developed by acidvegas in Python (https://git.acid.vegas/proxytools)
'''
This script will scan popular blogspots that posts freesh proxies daily
Edit: It seems Blogspots for proxies in 2023 is no longer a reliable source.
This code is old & will possibly be updated again in the future.
'''
import datetime
import json
import base64
import os
import re
import threading
import time
import urllib.request
# Blogspot URLs
blogspot_list = (
'live-socks.net',
'newfreshproxies-24.blogspot.sg',
'proxyserverlist-24.blogspot.sg',
'socks24.org',
'sock5us.blogspot.com',
'sockproxy.blogspot.com',
'socksproxylist24.blogspot.com',
'newsocks.info',
'socksecurelist.ca',
'canada-socks247.com',
'sock5us.blogspot.com',
'socks24.org',
'sslproxies24.blogspot.com',
'vip-socks24.blogspot.com'
)
# Settings
max_results = 100 # Maximum number of results per-page.
post_depth = 1 # How many days back from the current date to pull posts from. (1 = Today Only)
timeout = 30 # Timeout for HTTP requests.
# Globals
proxy_file = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'proxies.txt')
proxy_list = list()
threads = dict()
def debug(msg):
print(f'{get_time()} | [~] - {msg}')
def error(msg, reason):
print(f'{get_time()} | [!] - {msg} ({reason})')
def get_time():
return time.strftime('%I:%M:%S')
def get_date():
date = datetime.datetime.today()
return '{0}-{1:02d}-{2:02d}'.format(date.year, date.month, date.day)
def get_date_range():
date_range = datetime.datetime.today() - datetime.timedelta(days=post_depth)
return '{0}-{1:02d}-{2:02d}'.format(date_range.year, date_range.month, date_range.day)
def get_source(url):
req = urllib.request.Request(url)
req.add_header('User-Agent', 'Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)')
source = urllib.request.urlopen(req, timeout=timeout)
charset = source.headers.get_content_charset()
if charset:
return source.read().decode(charset)
else:
return source.read().decode()
def parse_blogspot(url):
global proxy_list
try:
source = json.loads(get_source(f'http://{url}/feeds/posts/default?max-results={max_results}&alt=json&updated-min={get_date_range()}T00:00:00&updated-max={get_date()}T23:59:59&orderby=updated'))
found = []
if source['feed'].get('entry'):
for item in source['feed']['entry']:
data = item['content']['$t']
proxies = re.findall('[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+:[0-9]+', data, re.MULTILINE)
if proxies:
found += proxies
proxy_list += proxies
debug('Found {0} proxies on {1}'.format(format(len(found), ',d'), url))
else:
error('No posts found on page!', url)
except Exception as ex:
proxy_value = ex
def scan_blogspots():
for url in blogspot_list:
threads[url] = threading.Thread(target=parse_blogspot, args=(url,))
for thread in threads:
threads[thread].start()
time.sleep(10)
for thread in threads:
threads[thread].join()
debug('Found {0} total proxies!'.format(format(len(proxy_list), ',d')))
with open (proxy_file, 'w') as proxy__file:
for proxy in proxy_list:
proxy__file.write(proxy + '\n')
# Main
print('#'*56)
print('#{0}#'.format(''.center(54)))
print('#{0}#'.format('SockSpot Proxy Scraper'.center(54)))
print('#{0}#'.format('Developed by acidvegas in Python'.center(54)))
print('#{0}#'.format('https://git.acid.vegas/proxytools'.center(54)))
print('#{0}#'.format(''.center(54)))
print('#'*56)
debug(f'Scanning {len(blogspot_list)} URLs from list...')
threading.Thread(target=scan_blogspots).start()
for url in blogspot_list:
threads[url] = threading.Thread(target=parse_blogspot, args=(url,))
for thread in threads:
threads[thread].start()
time.sleep(10)
for thread in threads:
threads[thread].join()
if proxy_value == 0:
error('no socks found')
debug('Found {0} total proxies!'.format(format(len(proxy_list), ',d')))
with open (proxy_file, 'w') as proxy__file:
for proxy in proxy_list:
proxy__file.write(proxy + '\n')

View File

@ -1,76 +0,0 @@
import StringIO
import time
import pycurl
import stem.control
# https://metrics.torproject.org/rs.html#details/379FB450010D17078B3766C2273303C358C3A442
EXIT_FINGERPRINT = '379FB450010D17078B3766C2273303C358C3A442'
SOCKS_PORT = 9050
CONNECTION_TIMEOUT = 30 # timeout before we give up on a circuit
def query(url):
"""
Uses pycurl to fetch a site using the proxy on the SOCKS_PORT.
"""
output = StringIO.StringIO()
query = pycurl.Curl()
query.setopt(pycurl.URL, url)
query.setopt(pycurl.PROXY, 'localhost')
query.setopt(pycurl.PROXYPORT, SOCKS_PORT)
query.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5_HOSTNAME)
query.setopt(pycurl.CONNECTTIMEOUT, CONNECTION_TIMEOUT)
query.setopt(pycurl.WRITEFUNCTION, output.write)
try:
query.perform()
return output.getvalue()
except pycurl.error as exc:
raise ValueError("Unable to reach %s (%s)" % (url, exc))
def scan(controller, path):
"""
Fetch check.torproject.org through the given path of relays, providing back
the time it took.
"""
circuit_id = controller.new_circuit(path, await_build = True)
def attach_stream(stream):
if stream.status == 'NEW':
controller.attach_stream(stream.id, circuit_id)
controller.add_event_listener(attach_stream, stem.control.EventType.STREAM)
try:
controller.set_conf('__LeaveStreamsUnattached', '1') # leave stream management to us
start_time = time.time()
check_page = query('https://check.torproject.org/')
if 'Congratulations. This browser is configured to use Tor.' not in check_page:
raise ValueError("Request didn't have the right content")
return time.time() - start_time
finally:
controller.remove_event_listener(attach_stream)
controller.reset_conf('__LeaveStreamsUnattached')
with stem.control.Controller.from_port() as controller:
controller.authenticate()
relay_fingerprints = [desc.fingerprint for desc in controller.get_network_statuses()]
for fingerprint in relay_fingerprints:
try:
time_taken = scan(controller, [fingerprint, EXIT_FINGERPRINT])
print('%s => %0.2f seconds' % (fingerprint, time_taken))
except Exception as exc:
print('%s => %s' % (fingerprint, exc))

76
torglass.py Normal file
View File

@ -0,0 +1,76 @@
#!/usr/bin/env python
# Tor Glass - Developed by acidvegas in Python (https://git.acid.vegas/proxytools)
import json
try:
import stem.descriptor.remote
except ImportError:
raise SystemExit('missing required library \'stem\' (https://pypi.org/project/stem/)')
def get_descriptors() -> dict:
''' Generate a json database of all Tor relays & exit nodes '''
tor_map = {'relay':list(),'exit':list()}
for relay in stem.descriptor.remote.get_server_descriptors():
data = {
'nickname' : relay.nickname,
'fingerprint' : relay.fingerprint,
'published' : str(relay.published) if relay.published else None,
'address' : relay.address,
'or_port' : relay.or_port,
'socks_port' : relay.socks_port,
'dir_port' : relay.dir_port,
'platform' : str(relay.platform) if relay.platform else None,
'tor_version' : str(relay.tor_version),
'operating_system' : relay.operating_system,
'uptime' : relay.uptime,
'contact' : str(relay.contact) if relay.contact else None,
'exit_policy' : str(relay.exit_policy) if relay.exit_policy else None,
'exit_policy_v6' : str(relay.exit_policy_v6) if relay.exit_policy_v6 else None,
'bridge_distribution' : relay.bridge_distribution,
'family' : list(relay.family) if relay.family else None,
'average_bandwidth' : relay.average_bandwidth,
'burst_bandwidth' : relay.burst_bandwidth,
'observed_bandwidth' : relay.observed_bandwidth,
'link_protocols' : relay.link_protocols,
'circuit_protocols' : relay.circuit_protocols,
'is_hidden_service_dir' : relay.is_hidden_service_dir,
'hibernating' : relay.hibernating,
'allow_single_hop_exits' : relay.allow_single_hop_exits,
'allow_tunneled_dir_requests' : relay.allow_tunneled_dir_requests,
'extra_info_cache' : relay.extra_info_cache,
'extra_info_digest' : relay.extra_info_digest,
'extra_info_sha256_digest' : relay.extra_info_sha256_digest,
'eventdns' : relay.eventdns,
'ntor_onion_key' : relay.ntor_onion_key,
'or_addresses' : relay.or_addresses,
'protocols' : relay.protocols
}
if relay.exit_policy.is_exiting_allowed():
tor_map['exit'].append(data)
else:
tor_map['relay'].append(data)
return tor_map
if __name__ == '__main__':
print('loading Tor descriptors... (this could take a while)')
tor_data = get_descriptors()
with open('tor.json', 'w') as fd:
json.dump(tor_data['relay'], fd)
with open('tor.exit.json', 'w') as fd:
json.dump(tor_data['exit'], fd)
print('Relays: {0:,}'.foramt(len(tor_data['relay'])))
print('Exits : {0:,}'.format(len(tor_data['exit'])))
try:
import ipinfo
except ImportError:
print('missing optional library \'ipinfo\' (https://pypi.org/project/ipinfo/) for map visualization')
else:
try:
handler = ipinfo.getHandler('changeme') # put your ipinfo.io API key here
print('Relay Map: ' + handler.getMap([ip['address'] for ip in tor_data['relay']]))
print('Exit Map: ' + handler.getMap([ip['address'] for ip in tor_data['exit']]))
except ipinfo.errors.AuthorizationError:
print('error: invalid ipinfo.io API key (https://ipinfo.io/signup)')
except Exception as ex:
print(f'error generating ipinfo map ({ex})')

View File

@ -1,61 +0,0 @@
#!/usr/bin/env python
# Tor Scan - Developed by acidvegas in Python (https://git.acid.vegas/proxytools)
'''
PUll a list of information about all Tor relays & exit relays into a json database
'''
import json
try:
import stem.descriptor.remote
except ImportError:
raise SystemExit('missing required library \'stem\' (https://pypi.org/project/stem/)')
tor_map = {'relay':list(),'exit':list()}
for relay in stem.descriptor.remote.get_server_descriptors().run():
_map = {
'nickname' : relay.nickname,
'fingerprint' : relay.fingerprint,
'published' : str(relay.published) if relay.published else None,
'address' : relay.address,
'or_port' : relay.or_port,
'socks_port' : relay.socks_port,
'dir_port' : relay.dir_port,
'platform' : str(relay.platform) if relay.platform else None,
'tor_version' : str(relay.tor_version),
'operating_system' : relay.operating_system,
'uptime' : relay.uptime,
'contact' : str(relay.contact) if relay.contact else None,
'exit_policy' : str(relay.exit_policy) if relay.exit_policy else None,
'exit_policy_v6' : str(relay.exit_policy_v6) if relay.exit_policy_v6 else None,
'bridge_distribution' : relay.bridge_distribution,
'family' : list(relay.family) if relay.family else None,
'average_bandwidth' : relay.average_bandwidth,
'burst_bandwidth' : relay.burst_bandwidth,
'observed_bandwidth' : relay.observed_bandwidth,
'link_protocols' : relay.link_protocols,
'circuit_protocols' : relay.circuit_protocols,
'is_hidden_service_dir' : relay.is_hidden_service_dir,
'hibernating' : relay.hibernating,
'allow_single_hop_exits' : relay.allow_single_hop_exits,
'allow_tunneled_dir_requests' : relay.allow_tunneled_dir_requests,
'extra_info_cache' : relay.extra_info_cache,
'extra_info_digest' : relay.extra_info_digest,
'extra_info_sha256_digest' : relay.extra_info_sha256_digest,
'eventdns' : relay.eventdns,
'ntor_onion_key' : relay.ntor_onion_key,
'or_addresses' : relay.or_addresses,
'protocols' : relay.protocols
}
if relay.exit_policy.is_exiting_allowed():
tor_map['exit'].append(_map)
else:
tor_map['relay'].append(_map)
with open('tor.out', 'w') as fd:
json.dump(tor_map['relay'], fd)
with open('tor.exit.out', 'w') as fd:
json.dump(tor_map['exit'], fd)

65
tortest.py Normal file
View File

@ -0,0 +1,65 @@
#!/usr/bin/env python
# Tor Test - Developed by acidvegas in Python (https://git.acid.vegas/proxytools)
import io
import time
try:
import pycurl
except ImportError:
raise Exception('missing required library \'pycurl\' (https://pypi.org/project/pycurl/)')
try:
import stem.control
except ImportError:
raise Exception('missing required library \'stem\' (https://pypi.org/project/stem/)')
# Globals
EXIT_FINGERPRINT = '379FB450010D17078B3766C2273303C358C3A442' # https://metrics.torproject.org/rs.html#details/379FB450010D17078B3766C2273303C358C3A442
SOCKS_PORT = 9050
CONNECTION_TIMEOUT = 30 # timeout before we give up on a circuit
def query(url):
''' Uses pycurl to fetch a site using the proxy on the SOCKS_PORT. '''
output = io.StringIO.StringIO()
query = pycurl.Curl()
query.setopt(pycurl.URL, url)
query.setopt(pycurl.PROXY, 'localhost')
query.setopt(pycurl.PROXYPORT, SOCKS_PORT)
query.setopt(pycurl.PROXYTYPE, pycurl.PROXYTYPE_SOCKS5_HOSTNAME)
query.setopt(pycurl.CONNECTTIMEOUT, CONNECTION_TIMEOUT)
query.setopt(pycurl.WRITEFUNCTION, output.write)
try:
query.perform()
return output.getvalue()
except pycurl.error as exc:
raise ValueError("Unable to reach %s (%s)" % (url, exc))
def scan(controller, path):
''' Test the connection to a website through the given path of relays using the given controller '''
circuit_id = controller.new_circuit(path, await_build = True)
def attach_stream(stream):
if stream.status == 'NEW':
controller.attach_stream(stream.id, circuit_id)
controller.add_event_listener(attach_stream, stem.control.EventType.STREAM)
try:
controller.set_conf('__LeaveStreamsUnattached', '1') # leave stream management to us
start_time = time.time()
check_page = query('https://check.torproject.org/')
if 'Congratulations. This browser is configured to use Tor.' not in check_page:
raise ValueError("Request didn't have the right content")
return time.time() - start_time
finally:
controller.remove_event_listener(attach_stream)
controller.reset_conf('__LeaveStreamsUnattached')
# Main
with stem.control.Controller.from_port(port=9056) as controller:
controller.authenticate('loldongs')
relay_fingerprints = [desc.fingerprint for desc in controller.get_network_statuses()]
for fingerprint in relay_fingerprints:
try:
time_taken = scan(controller, [fingerprint, EXIT_FINGERPRINT])
print('%s => %0.2f seconds' % (fingerprint, time_taken))
except Exception as exc:
print('%s => %s' % (fingerprint, exc))