zmap-mark-ii/src/send.c

506 lines
15 KiB
C

/*
* ZMap Copyright 2013 Regents of the University of Michigan
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy
* of the License at http://www.apache.org/licenses/LICENSE-2.0
*/
#include "send.h"
#include <stdlib.h>
#include <stdio.h>
#include <stdint.h>
#include <time.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include <signal.h>
#include "../lib/includes.h"
#include "../lib/util.h"
#include "../lib/logger.h"
#include "../lib/random.h"
#include "../lib/blocklist.h"
#include "../lib/lockfd.h"
#include "../lib/pbm.h"
#include "aesrand.h"
#include "get_gateway.h"
#include "iterator.h"
#include "probe_modules/packet.h"
#include "probe_modules/probe_modules.h"
#include "shard.h"
#include "state.h"
#include "validate.h"
// OS specific functions called by send_run
static inline int send_packet(sock_t sock, void *buf, int len, uint32_t idx);
static inline int send_batch(sock_t sock, batch_t *batch, int retries);
static inline int send_run_init(sock_t sock);
// Include the right implementations
#if defined(PFRING)
#include "send-pfring.h"
#elif defined(__APPLE__) || defined(__FreeBSD__) || defined(__NetBSD__) || \
defined(__DragonFly__)
#include "send-bsd.h"
#else /* LINUX */
#include "send-linux.h"
#endif /* __APPLE__ || __FreeBSD__ || __NetBSD__ || __DragonFly__ */
// The iterator over the cyclic group
// Lock for send run
static pthread_mutex_t send_mutex = PTHREAD_MUTEX_INITIALIZER;
// Source ports for outgoing packets
static uint16_t num_src_ports;
void sig_handler_increase_speed(UNUSED int signal)
{
int old_rate = zconf.rate;
zconf.rate += (zconf.rate * 0.05);
log_info("send", "send rate increased from %i to %i pps.", old_rate,
zconf.rate);
}
void sig_handler_decrease_speed(UNUSED int signal)
{
int old_rate = zconf.rate;
zconf.rate -= (zconf.rate * 0.05);
log_info("send", "send rate decreased from %i to %i pps.", old_rate,
zconf.rate);
}
// global sender initialize (not thread specific)
iterator_t *send_init(void)
{
// generate a new primitive root and starting position
iterator_t *it;
uint32_t num_subshards =
(uint32_t)zconf.senders * (uint32_t)zconf.total_shards;
if (num_subshards > blocklist_count_allowed()) {
log_fatal("send", "senders * shards > allowed probes");
}
if (zsend.max_targets && (num_subshards > zsend.max_targets)) {
log_fatal("send", "senders * shards > max targets");
}
uint64_t num_addrs = blocklist_count_allowed();
it = iterator_init(zconf.senders, zconf.shard_num, zconf.total_shards,
num_addrs, zconf.ports->port_count);
// determine the source address offset from which we'll send packets
struct in_addr temp;
temp.s_addr = zconf.source_ip_addresses[0];
log_debug("send", "srcip_first: %s", inet_ntoa(temp));
temp.s_addr = zconf.source_ip_addresses[zconf.number_source_ips - 1];
log_debug("send", "srcip_last: %s", inet_ntoa(temp));
// process the source port range that ZMap is allowed to use
num_src_ports = zconf.source_port_last - zconf.source_port_first + 1;
log_debug("send", "will send from %u address%s on %hu source ports",
zconf.number_source_ips,
((zconf.number_source_ips == 1) ? "" : "es"), num_src_ports);
// global initialization for send module
assert(zconf.probe_module);
if (zconf.probe_module->global_initialize) {
if (zconf.probe_module->global_initialize(&zconf)) {
log_fatal(
"send",
"global initialization for probe module failed.");
}
}
// only allow bandwidth or rate
if (zconf.bandwidth > 0 && zconf.rate > 0) {
log_fatal(
"send",
"must specify rate or bandwidth, or neither, not both.");
}
// Convert specified bandwidth to packet rate. This is an estimate using the
// max packet size a probe module will generate.
if (zconf.bandwidth > 0) {
size_t pkt_len = zconf.probe_module->max_packet_length;
pkt_len *= 8;
// 7 byte MAC preamble, 1 byte Start frame, 4 byte CRC, 12 byte
// inter-frame gap
pkt_len += 8 * 24;
// adjust calculated length if less than the minimum size of an
// ethernet frame
if (pkt_len < 84 * 8) {
pkt_len = 84 * 8;
}
// rate is a uint32_t so, don't overflow
if (zconf.bandwidth / pkt_len > 0xFFFFFFFFu) {
zconf.rate = 0;
} else {
zconf.rate = zconf.bandwidth / pkt_len;
if (zconf.rate == 0) {
log_warn(
"send",
"bandwidth %lu bit/s is slower than 1 pkt/s, "
"setting rate to 1 pkt/s",
zconf.bandwidth);
zconf.rate = 1;
}
}
log_debug(
"send",
"using bandwidth %lu bits/s for %zu byte probe, rate set to %d pkt/s",
zconf.bandwidth, pkt_len / 8, zconf.rate);
}
// convert default placeholder to default value
if (zconf.rate == -1) {
// default 10K pps
zconf.rate = 10000;
}
// log rate, if explicitly specified
if (zconf.rate < 0) {
log_fatal("send", "rate impossibly slow");
}
if (zconf.rate > 0 && zconf.bandwidth <= 0) {
log_debug("send", "rate set to %d pkt/s", zconf.rate);
}
// Get the source hardware address, and give it to the probe
// module
if (!zconf.hw_mac_set) {
if (get_iface_hw_addr(zconf.iface, zconf.hw_mac)) {
log_fatal(
"send",
"ZMap could not retrieve the hardware (MAC) address for "
"the interface \"%s\". You likely do not privileges to open a raw packet socket. "
"Are you running as root or with the CAP_NET_RAW capability? If you are, you "
"may need to manually set the source MAC address with the \"--source-mac\" flag.",
zconf.iface);
return NULL;
}
log_debug(
"send",
"no source MAC provided. "
"automatically detected %02x:%02x:%02x:%02x:%02x:%02x as hw "
"interface for %s",
zconf.hw_mac[0], zconf.hw_mac[1], zconf.hw_mac[2],
zconf.hw_mac[3], zconf.hw_mac[4], zconf.hw_mac[5],
zconf.iface);
}
log_debug("send", "source MAC address %02x:%02x:%02x:%02x:%02x:%02x",
zconf.hw_mac[0], zconf.hw_mac[1], zconf.hw_mac[2],
zconf.hw_mac[3], zconf.hw_mac[4], zconf.hw_mac[5]);
if (zconf.dryrun) {
log_info("send", "dryrun mode -- won't actually send packets");
}
// initialize random validation key
validate_init();
// setup signal handlers for changing scan speed
signal(SIGUSR1, sig_handler_increase_speed);
signal(SIGUSR2, sig_handler_decrease_speed);
zsend.start = now();
return it;
}
static inline ipaddr_n_t get_src_ip(ipaddr_n_t dst, int local_offset)
{
if (zconf.number_source_ips == 1) {
return zconf.source_ip_addresses[0];
}
return zconf.source_ip_addresses[(ntohl(dst) + local_offset) %
zconf.number_source_ips];
}
// one sender thread
int send_run(sock_t st, shard_t *s)
{
log_debug("send", "send thread started");
pthread_mutex_lock(&send_mutex);
// Allocate a buffer to hold the outgoing packet
char buf[MAX_PACKET_SIZE];
// allocate batch
batch_t* batch = create_packet_batch(zconf.batch);
// OS specific per-thread init
if (send_run_init(st)) {
pthread_mutex_unlock(&send_mutex);
return EXIT_FAILURE;
}
// MAC address length in characters
char mac_buf[(ETHER_ADDR_LEN * 2) + (ETHER_ADDR_LEN - 1) + 1];
char *p = mac_buf;
for (int i = 0; i < ETHER_ADDR_LEN; i++) {
if (i == ETHER_ADDR_LEN - 1) {
snprintf(p, 3, "%.2x", zconf.hw_mac[i]);
p += 2;
} else {
snprintf(p, 4, "%.2x:", zconf.hw_mac[i]);
p += 3;
}
}
log_debug("send", "source MAC address %s", mac_buf);
void *probe_data;
if (zconf.probe_module->thread_initialize) {
zconf.probe_module->thread_initialize(
buf, zconf.hw_mac, zconf.gw_mac, &probe_data);
}
pthread_mutex_unlock(&send_mutex);
// adaptive timing to hit target rate
uint64_t count = 0;
uint64_t last_count = count;
double last_time = steady_now();
uint32_t delay = 0;
int interval = 0;
volatile int vi;
struct timespec ts, rem;
double send_rate =
(double)zconf.rate /
((double)zconf.senders * zconf.packet_streams);
const double slow_rate = 50; // packets per seconds per thread
// at which it uses the slow methods
long nsec_per_sec = 1000 * 1000 * 1000;
long long sleep_time = nsec_per_sec;
if (zconf.rate > 0) {
delay = 10000;
if (send_rate < slow_rate) {
// set the initial time difference
sleep_time = nsec_per_sec / send_rate;
last_time = steady_now() - (1.0 / send_rate);
} else {
// estimate initial rate
for (vi = delay; vi--;)
;
delay *= 1 / (steady_now() - last_time) /
((double)zconf.rate /
(double)zconf.senders);
interval = ((double)zconf.rate /
(double)zconf.senders) /
20;
last_time = steady_now();
}
}
// Get the initial IP to scan.
target_t current = shard_get_cur_target(s);
uint32_t current_ip = current.ip;
uint16_t current_port = current.port;
// If provided a list of IPs to scan, then the first generated address
// might not be on that list. Iterate until the current IP is one the
// list, then start the true scanning process.
if (zconf.list_of_ips_filename) {
while (!pbm_check(zsend.list_of_ips_pbm, current_ip)) {
current = shard_get_next_target(s);
current_ip = current.ip;
current_port = current.port;
if (current.status == ZMAP_SHARD_DONE) {
log_debug(
"send",
"never made it to send loop in send thread %i",
s->thread_id);
goto cleanup;
}
}
}
int attempts = zconf.retries + 1;
uint32_t idx = 0;
while (1) {
// Adaptive timing delay
if (count && delay > 0) {
if (send_rate < slow_rate) {
double t = steady_now();
double last_rate = (1.0 / (t - last_time));
sleep_time *= ((last_rate / send_rate) + 1) / 2;
ts.tv_sec = sleep_time / nsec_per_sec;
ts.tv_nsec = sleep_time % nsec_per_sec;
log_debug("sleep",
"sleep for %d sec, %ld nanoseconds",
ts.tv_sec, ts.tv_nsec);
while (nanosleep(&ts, &rem) == -1) {
}
last_time = t;
} else {
for (vi = delay; vi--;)
;
if (!interval || (count % interval == 0)) {
double t = steady_now();
assert(count > last_count);
assert(t > last_time);
double multiplier =
(double)(count - last_count) /
(t - last_time) /
(zconf.rate / zconf.senders);
uint32_t old_delay = delay;
delay *= multiplier;
if (delay == old_delay) {
if (multiplier > 1.0) {
delay *= 2;
} else if (multiplier < 1.0) {
delay *= 0.5;
}
}
last_count = count;
last_time = t;
}
}
}
// Check if the program has otherwise completed and break out of the send loop.
if (zrecv.complete) {
goto cleanup;
}
if (zconf.max_runtime &&
zconf.max_runtime <= now() - zsend.start) {
goto cleanup;
}
// Check if we've finished this shard or thread before sending each
// packet, regardless of batch size.
if (s->state.max_targets &&
s->state.targets_scanned >= s->state.max_targets) {
log_debug(
"send",
"send thread %hhu finished (max targets of %u reached)",
s->thread_id, s->state.max_targets);
goto cleanup;
}
if (s->state.max_packets &&
s->state.packets_sent >= s->state.max_packets) {
log_debug(
"send",
"send thread %hhu finished (max packets of %u reached)",
s->thread_id, s->state.max_packets);
goto cleanup;
}
if (current.status == ZMAP_SHARD_DONE) {
log_debug(
"send",
"send thread %hhu finished, shard depleted",
s->thread_id);
goto cleanup;
}
for (int i = 0; i < zconf.packet_streams; i++) {
count++;
uint32_t src_ip = get_src_ip(current_ip, i);
uint32_t validation[VALIDATE_BYTES /
sizeof(uint32_t)];
validate_gen(src_ip, current_ip,
htons(current_port),
(uint8_t *)validation);
uint8_t ttl = zconf.probe_ttl;
size_t length = 0;
zconf.probe_module->make_packet(
buf, &length, src_ip, current_ip,
htons(current_port), ttl, validation, i,
probe_data);
if (length > MAX_PACKET_SIZE) {
log_fatal(
"send",
"send thread %hhu set length (%zu) larger than MAX (%zu)",
s->thread_id, length,
MAX_PACKET_SIZE);
}
if (zconf.dryrun) {
lock_file(stdout);
zconf.probe_module->print_packet(stdout,
buf);
unlock_file(stdout);
} else {
void *contents =
buf +
zconf.send_ip_pkts *
sizeof(struct ether_header);
length -= (zconf.send_ip_pkts *
sizeof(struct ether_header));
// add packet to batch and update metadata
// this is an additional memcpy (packet created in buf, buf -> batch)
// but when I modified the TCP SYN module to write packet to batch directly, there wasn't any noticeable speedup.
// Using this approach for readability/minimal changes
memcpy(((uint8_t *)batch->packets) + (batch->len * MAX_PACKET_SIZE), contents, length);
batch->lens[batch->len] = length;
batch->ips[batch->len] = current_ip;
batch->len++;
if (batch->len == batch->capacity) {
// batch is full, sending
int rc = send_batch(st, batch, attempts);
// whether batch succeeds or fails, this was the only attempt. Any re-tries are handled within batch
if (rc < 0) {
log_error("send_batch", "could not send any batch packets: %s", strerror(errno));
// rc is the last error code if all packets couldn't be sent
s->state.packets_failed += batch->len;
} else {
// rc is number of packets sent successfully, if > 0
s->state.packets_failed += batch->len - rc;
}
// reset batch length for next batch
batch->len = 0;
idx++;
idx &= 0xFF;
}
}
s->state.packets_sent++;
}
// Track the number of targets (ip,port)s we actually scanned.
s->state.targets_scanned++;
// Get the next IP to scan
current = shard_get_next_target(s);
current_ip = current.ip;
current_port = current.port;
if (zconf.list_of_ips_filename &&
current.status != ZMAP_SHARD_DONE) {
// If we have a list of IPs bitmap, ensure the next IP
// to scan is on the list.
while (!pbm_check(zsend.list_of_ips_pbm,
current_ip)) {
current = shard_get_next_target(s);
current_ip = current.ip;
if (current.status == ZMAP_SHARD_DONE) {
log_debug(
"send",
"send thread %hhu shard finished in get_next_ip_loop depleted",
s->thread_id);
goto cleanup;
}
}
}
}
cleanup:
if (!zconf.dryrun && send_batch(st, batch, attempts) < 0) {
log_error("send_batch cleanup", "could not send remaining batch packets: %s", strerror(errno));
}
free_packet_batch(batch);
s->cb(s->thread_id, s->arg);
if (zconf.dryrun) {
lock_file(stdout);
fflush(stdout);
unlock_file(stdout);
}
log_debug("send", "thread %hu cleanly finished", s->thread_id);
return EXIT_SUCCESS;
}
batch_t* create_packet_batch(uint8_t capacity) {
// calculate how many bytes are needed for each component of a batch
int size_of_packet_array = MAX_PACKET_SIZE * capacity;
int size_of_ips_array = sizeof(uint32_t) * capacity;
int size_of_lens_array = sizeof(int) * capacity;
// allocating batch and associated data structures in single calloc for cache locality
void* batch_and_batch_arrs = calloc(sizeof(batch_t) + size_of_packet_array + size_of_ips_array + size_of_lens_array, sizeof(char));
// chunk off parts of batch
batch_t* batch = batch_and_batch_arrs;
batch->packets = (char *)batch + sizeof(batch_t);
batch->ips = (uint32_t *)(batch->packets + size_of_packet_array);
batch->lens = (int *) ((char *)batch->ips + size_of_ips_array);
batch->capacity = capacity;
batch->len = 0;
return batch;
}
void free_packet_batch(batch_t* batch) {
// batch was created with a single calloc, so this will free all the component arrays too
free(batch);
}