libutp源码简析

libutp是uTorrent使用的类似TCP的传输层实现。它基于UDP提供可信的、有序的点对点的传输,并具有最少的时延。需要注意的是utp和另一个基于UDP的传输协议QUIC的实现思路和优势并不一样,例如QUIC更强调连接建立的快速性,建立连接时只需要1至2次握手,又如QUIC将TLS整合到协议中,实现了0RTT,而基于HTTP2的协议需要2/3RTT。QUIC还允许在用户层面配置各种拥塞控制算法。
网上有关libutp实现的介绍几乎没有,因此我打算就其源码做一个简单的分析。这里要注意UTP是基于包的而不像TCP是基于流的,虽然它提供的API还是基于流的。这样有一些影响,例如黏包问题的处理、缓冲区的管理(例如可以去掉PUSH标记)、窗口管理、重新分组等方面。

uTP源码简介

utp.h以 C89 的形式提供接口。例如 utp_write 是以 proactive 的方式实现的。
utp 相关的实现大多在 utp_internal.cpp 文件中。
utp_packedsockaddr.cpp 中封装了 sockaddr_in 结构。
ucat.c 基于 uTP 框架构建了一个基础的应用。
uTP 的设计主要是异步的,应用代码不会阻塞在异步 IO 操作上,而是指定回调函数并立即返回。utp_callbacks.cpp中注册了各种回调函数,utp 向外界传输消息都是以这里回调的形式开展的。例如当收到数据包时,ctx->callbacks[UTP_ON_READ] 这个回调函数就会被调用。

使用回调函数也体现了 libutp 总体的设计思路:

  1. 回调函数能够屏蔽掉套接字 API 的细节
    一个可靠通信协议的主要任务是在不可靠的设施上建立可靠的传输通道,至于使用哪一种不可靠的传输方式并不是核心问题。uTP 协议的内部实现能够与 UDP 套接字等做到隔离,utp 不是继承或者封装了 UDP 套接字描述符,然后提供一个 TCP 的“鸭子协议”。而是完全工作在 UDP 上层的用户层,打包了一些对 UDP 的操作,方便用户调用。
    例如 uTP 剥离了发送下层包的实现,用户自己选择使用 send 还是 sendto 还是 write 还是 sendmsg,然后写成回调,uTP 只需要在它需要发送数据报时调用这个回调就好了。
    又例如从 UDP 套接口收到一个消息时,它并不是直接处理,而是调用 utp_process_udp 函数。对于一个已连接的套接字,这个函数会找到对应的 UTPSocket 结构,并调用 utp_process_incoming 函数,该函数是个非常大的函数,里面 uTP 协议根据自己的报头处理了相关消息之后,调用用户设置的回调函数通知收到了消息。
    对于连接请求,我们将在下面的被动连接上详细讨论。
  2. 回调函数方便实现 proactive 和 reactive 风格的 API
    常见的反射式(reactive)异步IO模型包括select、poll、kqueue、Java NIO等,只会通知到某IO设备上产生了IO事件,然后由用户来发起IO请求,例如调用readrecv等。前摄式(proactive)包括IOCP、Boost Asio等,用户主动发送IO请求(即使现在IO设备还没有准备好)并提前向系统注册一个回调函数,当实际的IO事件发生时由系统处理该IO操作,并在完成后触发指定的回调函数,因此前摄式能够避免用户将数据从内核取回来的开销。因此前摄式强调的是对未来读取事件的预期,抽象程度要高一点,用户可以利用Proactor的回调构造一条执行顺序链,而Reactor必须手动维护接受的状态。
  3. 回调函数能减少处理并发问题的难度

鉴于以上的这几点,在分析 uTP 协议时必须要将 ucat.c 纳入考虑范围,不然很难搞懂原理。

ucat简介

ucat 使用了 poll 来维护了两个 fd,stdin 和套接口,并且设置了500ms的超时时间。

uTP重要数据结构

utp_context

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// utp.h
typedef struct struct_utp_context utp_context;
// utp_internal.h
struct struct_utp_context {
void *userdata;
utp_callback_t* callbacks[UTP_ARRAY_SIZE];

uint64 current_ms;
utp_context_stats context_stats;
UTPSocket *last_utp_socket;
Array<UTPSocket*> ack_sockets;
Array<RST_Info> rst_info;
UTPSocketHT *utp_sockets;
size_t target_delay;
size_t opt_sndbuf;
size_t opt_rcvbuf;
uint64 last_check;

// utp_api.cpp
struct_utp_context::struct_utp_context()
: userdata(NULL), current_ms(0), last_utp_socket(NULL), log_normal(false), log_mtu(false), log_debug(false) {
memset(&context_stats, 0, sizeof(context_stats));
memset(callbacks, 0, sizeof(callbacks));
target_delay = CCONTROL_TARGET;
utp_sockets = new UTPSocketHT;

callbacks[UTP_GET_UDP_MTU] = &utp_default_get_udp_mtu;
callbacks[UTP_GET_UDP_OVERHEAD] = &utp_default_get_udp_overhead;
callbacks[UTP_GET_MILLISECONDS] = &utp_default_get_milliseconds;
callbacks[UTP_GET_MICROSECONDS] = &utp_default_get_microseconds;
callbacks[UTP_GET_RANDOM] = &utp_default_get_random;

// 1 MB of receive buffer (i.e. max bandwidth delay product)
// means that from a peer with 200 ms RTT, we cannot receive
// faster than 5 MB/s
// from a peer with 10 ms RTT, we cannot receive faster than
// 100 MB/s. This is assumed to be good enough, since bandwidth
// often is proportional to RTT anyway
// when setting a download rate limit, all sockets should have
// their receive buffer set much lower, to say 60 kiB or so
opt_rcvbuf = opt_sndbuf = 1024 * 1024;
last_check = 0;
}
~struct_utp_context(){
delete this->utp_sockets;
}

void log(int level, utp_socket *socket, char const *fmt, ...);
void log_unchecked(utp_socket *socket, char const *fmt, ...);
bool would_log(int level);

bool log_normal:1; // log normal events?
bool log_mtu:1; // log MTU related events?
bool log_debug:1; // log debugging events? (Must also compile with UTP_DEBUG_LOGGING defined)
};

utp_context的成员

  1. utp_sockets
    utp_sockets指向一个UTPSocketHT : utpHashTable<UTPSocketKey, UTPSocketKeyData>哈希表。这个哈希表维护了所有的套接字:

    • utp_sockets析构时调用UTP_FreeAll释放所有的套接字。
    • 当UDP包被接受时,会调用utp_process_udp这个处理程序。此时我们仅能获得对应的套接字地址const struct sockaddr *,因此需要能够通过这个指针找到对应的套接字。

    UTPSocketHT中的键UTPSocketKey和值UTPSocketKeyData的构造如下:

    • UTPSocketKey中存放了对应UTPSocket中的PackedSockAddr addr以及recv_id字段。
      PackedSockAddr addr字段是在utp_initialize_socket设置的,表示指向的目标地址。
      recv_id对应着套接字里面的conn_id_recv字段,是在utp_initialize_socket中随机生成的。这里的recv_id的主要功能是作为ATP协议中“host端的端口号”来使用。查看相关代码我们可以发现conn_id_send始终比conn_id_recv要大1。

      1
      2
      3
      4
      5
      6
      // utp_process_udp
      utp_initialize_socket(conn, to, tolen, false, id, id+1, id);
      // void utp_initialize_socket(utp_socket *conn, const struct sockaddr *addr, socklen_t addrlen, bool need_seed_gen, uint32 conn_seed, uint32 conn_id_recv, uint32 conn_id_send){
      // ...
      conn_id_recv += conn_seed;
      conn_id_send += conn_seed;

      下面讨论了几个重要的问题

      • 为什么我们要使用id而不直接使用四元组呢?
        使用id能够方便地实现以下的机制(虽然libutp并不一定实现了)
        1. 使用三次握手和四次挥手的很高创建成本很高,使用ID能够复用已经创建好的连接。当然这样的复用可能带来队头阻塞问题,需要小心处理。
        2. 连接概念独立于四元组概念,方便隔离底层,从而实现连接迁移。QUIC实现了这个特性。
      • 为什么要有两个id呢?
        这是因为在同一个UDP port上会存在多个uTP连接,因此我们需要增设一个ID字段来区分这些连接。对于每一个套接口,uTP选择它的发送和接受都设置一个ID。当utp_process_udp接受到UDP包的时候,他获得的是一个sockaddr地址,所以需要找到对应的UTPSocket套接字,当套接字不存在时,需要发送RST包。当套接字关闭时,需要它来维护2MSL的等待时间,实际上由于UDP的UTPSocketKey包含了recv_id,所以2MSL是不必要的,在UTPSocket::check_timeouts代码中看到只等到rto_timeout就行。
      • 为什么选择conn_id_recv而不是conn_id_send来作为哈希值呢?
        这是因为当数据报到达时,要通过里面的recv_id找到具有特定conn_id_recv的套接字。
    • UTPSocketKeyData中主要持有了对应的UTPSocket *的指针。

  2. opt_sndbufopt_rcvbuf
    这两个size_t表示发送缓冲区和接收缓冲区的默认大小。缓冲区的大小与窗口大小形成协同。在创建套接字时,套接字的opt_sndbufopt_rcvbuf会“继承自”对应的context。

  3. target_delay单位为微秒,初始值为CCONTROL_TARGET = 100 * 1000

  4. current_ms的作用是用来保存当前时间,这样可以避免多次调用获取时间函数的开销。

  5. context_stats是一个utp_context_stats类型的结构,用来统计不同大小的uTP包的数量。

  6. ack_sockets与schedule_ack机制有关,详见超时重传部分。

  7. rst_info维护了RST_INFO_LIMIT个reset信息,详见连接重置部分。

utp_context的用途

  1. 方便集中管理的UTP套接口UTPSocket
    从上面的结构中看到所有的UTPSocket被放到一个哈希表里面。当UTPSocket销毁时,要将哈希表中对应的<UTPSocketKey, UTPSocketKeyData>键值对删掉,在utp_initialize_socket函数中要往context里面注册自己,这些操作实际上都是为了方便集中管理套接字。
    utp_check_timeouts函数为例,这个函数作为每次“时钟中断”的入口,接受的是一个utp_context而不是一个UTPSocket,context里面对所有的UTPSocket调用了check_timeouts,这样避免了为每一个套接字维护一个时钟信号的开销。
  2. 方便实现UTP服务
    libutp是工作在用户态的,所以并不能向外提供系统调用,因此每一个进程会维护一个utp_context。

utp_socket

1
2
3
4
5
6
// utp.h
typedef struct UTPSocket utp_socket;
// utp_internal.cpp
struct UTPSocket {
// ...
};

UTPSocket类型用来维护一个套接字的上下文,里面东西比较多,将在下面展开讨论。

OutgoingPacket

对于一个(将要)被发出去的包,有一个OutgoingPacket与其对应。

1
2
3
4
5
6
7
8
struct OutgoingPacket{
size_t length; // 总长
size_t payload; // 有效载荷
uint64_t time_sent; // microseconds
uint32_t transmissions; // 总传输次数
bool need_resend;
char data[1];
};

这里的data是个 Flex Array(这里勘误一下,VLA是另外一个C99特性)。实际上是包头+数据包的全部内容。注意到最好不要将包头和数据包分开存放,不然又要多一次复制的开销。

PacketFormatV1/PacketFormatAckV1

首先查看基础的PacketFormatV1,这是一个uTP常规数据报的报头。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// utp_internal.cpp
struct PACKED_ATTRIBUTE PacketFormatV1 {
// packet_type (4 high bits)
// protocol version (4 low bits)
byte ver_type;
byte version() const { return ver_type & 0xf; }
byte type() const { return ver_type >> 4; }
void set_version(byte v) { ver_type = (ver_type & 0xf0) | (v & 0xf); }
void set_type(byte t) { ver_type = (ver_type & 0xf) | (t << 4); }

// Type of the first extension header
byte ext;
// connection ID
uint16_big connid;
uint32_big tv_usec;
uint32_big reply_micro;
// receive window size in bytes
uint32_big windowsize;
// Sequence number
uint16_big seq_nr;
// Acknowledgment number
uint16_big ack_nr;
};
  1. ver_type
    ver_type标志了数据报的类型,这个类似于压缩了后的TCP报头中的flags字段,节约了一些空间,并且更加直观。包含下面的5种。

    1
    2
    3
    4
    5
    6
    7
    8
    enum {
    ST_DATA = 0, // Data packet.
    ST_FIN = 1, // Finalize the connection. This is the last packet.
    ST_STATE = 2, // State packet. Used to transmit an ACK with no data.
    ST_RESET = 3, // Terminate connection forcefully.
    ST_SYN = 4, // Connect SYN
    ST_NUM_STATES, // used for bounds checking
    };

    这里ST_STATE即一个不带数据的探查包,因此并不会增加seq_nr

  2. ext
    这个表示扩展号,默认是0,设为1时表示使用了EACK的扩展,对应着扩展后的PacketFormatAckV1类型的数据包。

  3. connid
    connid的用途已在前面的utp_context进行了论述。

  4. tv_usec
    tv_usec是一个时间戳,表示数据包的发送时间,在send_data中被设置。可以看到相比TCP则保守地用了TCP Timestamps Option这个选项,UTP中强制将其整合了进来。
    时间戳的作用是非常大的,可以更精确地计算出RTT。否则我们对非重传的数据包进行采样(Karn算法)。
    时间戳还能方便对高带宽下序号迅速耗尽进行PAWS(Protect Againest Wrapped Sequence numbers)防范,不过我检查下代码发现UTP里面并没有PAWS的机制(详见后面收包的部分)。

  5. reply_micro
    reply_microutp_process_incoming中被设置,当A发送数据包给B时,B提取收到数据包中的tv_usec字段并和自己的本地时间作差,得到更新的reply_micro。这个值会随着下一个数据包被传送给对端(send_data函数)。

这里再提一下PACKED_ATTRIBUTE这个属性,在utp_internal.cpp中已经使用了#pragma pack

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// utp_internal.cpp
#if (defined(__SVR4) && defined(__sun))
#pragma pack(1)
#else
#pragma pack(push,1)
#endif
// utp_types.h
// Allow libutp consumers or prerequisites to override PACKED_ATTRIBUTE
#ifndef PACKED_ATTRIBUTE
#if defined BROKEN_GCC_STRUCTURE_PACKING && defined __GNUC__
// Used for gcc tool chains accepting but not supporting pragma pack
// See http://gcc.gnu.org/onlinedocs/gcc/Type-Attributes.html
#define PACKED_ATTRIBUTE __attribute__((__packed__))
#else
#define PACKED_ATTRIBUTE
#endif // defined BROKEN_GCC_STRUCTURE_PACKING && defined __GNUC__

PacketFormatAckV1这个包表示当这个包是EACK包时的附加数据,EACK包是类似于SACK的一种机制,用于选择性确认。在UTPSocket::send_ack函数中能看到EACK将acr_nr前最多32个报文的接受情况按位放到长度为4的字节数组里面,这是一个非常巧妙的方法。

1
2
3
4
5
6
struct PACKED_ATTRIBUTE PacketFormatAckV1 {
PacketFormatV1 pf;
byte ext_next;
byte ext_len;
byte acks[4];
};

SizableCircularBuffer

这是一个环形缓冲区,值得注意的是这个缓冲区并不是线程安全的,不过libutp的接口不是线程安全的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
struct SizableCircularBuffer {
// This is the mask. Since it's always a power of 2, adding 1 to this value will return the size.
size_t mask;
// This is the elements that the circular buffer points to
void **elements;

void *get(size_t i) const { assert(elements); return elements ? elements[i & mask] : NULL; }
void put(size_t i, void *data) { assert(elements); elements[i&mask] = data; }

void grow(size_t item, size_t index){
// Figure out the new size.
size_t size = mask + 1; do size *= 2; while (index >= size);
// Allocate the new buffer
void **buf = (void**)calloc(size, sizeof(void*));
size--;
// Copy elements from the old buffer to the new buffer
for (size_t i = 0; i <= mask; i++) {
buf[(item - index + i) & size] = get(item - index + i);
}
// Swap to the newly allocated buffer
mask = size; free(elements); elements = buf;
}
void ensure_size(size_t item, size_t index) { if (index > mask) grow(item, index); }
size_t size() { return mask + 1; }
};

环形缓冲区的大小size始终是2的整数幂,这里的mask等于size - 1,因此全部为1。mask起到类似取模的作用。
这里的ensure_sizegrow的参数有点奇怪,其实查看调用情况可以发现item表示要插入的元素的编号,如seq_nr;而index表示当前队列中元素的个数,如cur_window_packets,这样队列就不会出现假溢出的现象。如果说队列中元素比容量size = mask + 1要多了,那么就要扩展队列。由于扩展队列变了模数,不同余了,所以要按照模前的数(item - index + i)进行复制。

环形缓冲区的增长

grow需要提供itemindex,两个变量可以分别理解为缓冲区中最旧的序号和最新的序号,其中item - index表示最老的未确认的ATPPacket的序号。
对于缓冲区中的序号$seq$,有$seq , mod , m1 = x$,当mask从$m1$增长到$m2$时,需要求出在$seq$未知的情况下求出$seq , mod , m2 = y$。
其实有个简单的开销较大的办法,就是用一个std::pair把原始的序号计算出来。

套接字的连接关闭与读写操作

创建套接字

utp_create_socket用来创建一个套接字,在创建套接字时并不向context进行注册,这也是因为目前已有信息无法计算出哈希值的缘故。
state = CS_UNINITIALIZED

定位套接字

在uTP中,当我们收到一个UDP数据包时会调用utp_process_udp函数通告context,context会根据端口和报头来定位到具体套接字。

主动连接

实现在函数utp_connect内。

  1. 首先调用utp_initialize_socket
    utp_initialize_socket的作用初始化套接字,这里的操作包括设置dest端的地址/端口,初始化conn_id_recvconn_id_send,初始化套接字的部分字段。向context注册自己。
    在初始化之后,套接字具有状态state = CS_IDLE
  2. 然后调用UTPSocket::send_packet
    这个函数详见下面

被动连接

uTP对被动连接的处理在utp_process_udp函数中。首先context会判断是否接受连接,条件如下:

  1. 是否设置了UTP_ON_ACCEPT回调
  2. 是否已存在该连接
  3. context中保持的连接数是否已超过3000
    可以看出,uTP对连接的限制还是比较放松的,在TCP协议中还会对在连接队列中(三次握手尚未完成)的套接口有限制,即listenbacklog参数。事实上协议实现会分别维护连接中的队列so_q0和已连接的队列so_q,并保持一个so_head指向accept套接字指针。
  4. 来自防火墙回调utp_call_on_firewall的反馈

发包操作

UTPSocket::send_packet是主要的发包函数

  1. UTPSocket::send_packetUTPSocket::send_acksend_rst
    send_packet函数用来发送构造好的OutgoingPacket::data
    send_ack会就地构造一个ACK包,然后调用send_data发送。
    send_rst直接调用更基础的send_to_addr发送RST包。
  2. UTPSocket::send_data
    这个函数的存在主要是处理一些UTPSocket::send_packetUTPSocket::send_ack的共同部分
  3. send_to_addr
    这个函数位于调用链的最下端,调用了注册的callback函数来发送数据包,同时调用utp_register_sent_packetutp_context::context_stats报告了发送长度用来统计。这个统计信息可以被API函数utp_get_context_stats取得,以供用户分析。

写操作

utp_write

utp_write被作为utp_writev的一个特例来处理。

utp_writev

同UNIX套接口函数writev一样,utp_writev接受一个指向iovec数组的指针iovec_input

1
2
3
4
struct iovec{
void *iov_base;
size_t iov_len;
};

utp_writev按照iovec_input[0 .. num_iovecs-1]的顺序从缓冲区发送数据,并返回成功发送的总字节数。
utp_writev主要做一些检查,如num_iovecs是否超过了UTP_IOV_MAX。然后将iovec_input复制到自己的一块缓存iovec里面(为啥呢),计算所有iovec的大小的总和到bytes。我们实际发送的数据量num_to_sendbytes和连接最多允许的数据包大小packet_size(由MTU决定)两者的最小值。当bytes过大时,就需要分批发送,如下面的代码所示。

1
2
3
4
5
6
size_t packet_size = conn->get_packet_size();
size_t num_to_send = min<size_t>(bytes, packet_size);
while (!conn->is_full(num_to_send)) {
bytes -= num_to_send;
sent += num_to_send;
conn->write_outgoing_packet(num_to_send, ST_DATA, iovec, num_iovecs);

这里的is_full由窗口决定。
utp_writev下面会调用write_outgoing_packet(size_t payload, uint flags, struct utp_iovec *iovec, size_t num_iovecs)。我们知道OutgoingPacket用来描述一个数据包的上下文,在第一次握手时由于没有数据需要传输,所以直接调用的utp_send_packet,相当于只发送了一个包头。而对于utp_writev来说,需要在包头后面加上数据。
下面的代码将每个iov[i]中的iov_base复制到。

1
2
3
4
5
6
7
8
9
10
11
12
13
for (size_t i = 0; i < num_iovecs && needed; i++) {
if (iovec[i].iov_len == 0)
continue;

size_t num = min<size_t>(needed, iovec[i].iov_len);
memcpy(p, iovec[i].iov_base, num);

p += num;

iovec[i].iov_len -= num;
iovec[i].iov_base = (byte*)iovec[i].iov_base + num; // iovec[i].iov_base += num, but without void* pointers
needed -= num;
}

write_outgoing_packet

write_outgoing_packet接受一个utp_iovec数组,然后组织数据包结构,并将其放入发送缓存。注意write_outgoing_packet函数并不会直接调用send_packet发送数据包。
write_outgoing_packet主要是一个大循环

1
2
3
4
5
do {
// ...
payload -= added;
} while (payload);
flush_packets();

首先write_outgoing_packet在发送缓冲区中取出前一个seq_nr - 1序号的pkt,试图重用它。注意到如果窗口小于等于0,那么实际的pkt就是NULL,这是为了保证当窗口小于等于0时,不会再往当前的pkt里面放东西了,否则pkt的容量超过窗口导致被缓存。相反的,我们重新开一个包放超出窗口的数据,这样只有这个新开的包会被缓存。

1
2
3
if (cur_window_packets > 0) {
pkt = (OutgoingPacket*)outbuf.get(seq_nr - 1);
}

下面的代码的上半部分,当数据包pkt尚未满载,并且尚未发送时,在本次循环中会重新使用它,在它的后面续上added长度的空间,供添加数据使用。sizeof(OutgoingPacket) - 1是减去VLA的一个字节。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// if there's any room left in the last packet in the window
// and it hasn't been sent yet, fill that frame first
if (payload && pkt && !pkt->transmissions && pkt->payload < packet_size) {
// Use the previous unsent packet
added = min(payload + pkt->payload, max<size_t>(packet_size, pkt->payload)) - pkt->payload;
pkt = (OutgoingPacket*)realloc(pkt, (sizeof(OutgoingPacket) - 1) + header_size + pkt->payload + added);
outbuf.put(seq_nr - 1, pkt);
append = false;
assert(!pkt->need_resend);
} else {
// Create the packet to send.
added = payload;
pkt = (OutgoingPacket*)malloc((sizeof(OutgoingPacket) - 1) + header_size + added);
pkt->payload = 0;
pkt->transmissions = 0;
pkt->need_resend = false;
}

下面的代码紧接着上面,为pkt添加added长度的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
if (added) {
assert(flags == ST_DATA);

// Fill it with data from the upper layer.
unsigned char *p = pkt->data + header_size + pkt->payload;
size_t needed = added;

for (size_t i = 0; i < num_iovecs && needed; i++) {
if (iovec[i].iov_len == 0)
continue;

size_t num = min<size_t>(needed, iovec[i].iov_len);
memcpy(p, iovec[i].iov_base, num);

p += num;

iovec[i].iov_len -= num;
iovec[i].iov_base = (byte*)iovec[i].iov_base + num; // iovec[i].iov_base += num, but without void* pointers
needed -= num;
}

assert(needed == 0);
}

append表示是否是一个需要被加入缓冲区的新数据包,在上面的if块中被设置

1
2
3
4
5
6
7
8
if (append) {
// Remember the message in the outgoing queue.
outbuf.ensure_size(seq_nr, cur_window_packets);
outbuf.put(seq_nr, pkt);
p1->seq_nr = seq_nr;
seq_nr++;
cur_window_packets++;
}

接下来write_outgoing_packet调用flush_packets来刷新缓冲区

flush_packets

flush_packets函数在发包时和重传计时器超时时被调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
bool UTPSocket::flush_packets()
{
size_t packet_size = get_packet_size();

// send packets that are waiting on the pacer to be sent
// i has to be an unsigned 16 bit counter to wrap correctly
// signed types are not guaranteed to wrap the way you expect
for (uint16 i = seq_nr - cur_window_packets; i != seq_nr; ++i) {
OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(i);
if (pkt == 0 || (pkt->transmissions > 0 && pkt->need_resend == false)) continue;
// have we run out of quota?
if (is_full()) return true;

// Nagle check
// don't send the last packet if we have one packet in-flight
// and the current packet is still smaller than packet_size.
if (i != ((seq_nr - 1) & ACK_NR_MASK) ||
cur_window_packets == 1 ||
pkt->payload >= packet_size) {
send_packet(pkt);
}
}
return false;
}

读操作

ucat.c中,当context收到UDP包之后会交给对应socket的udp_process_incoming函数进行处理,这个函数中涉及和uTP实现可靠传输有关的很多内容,因此与之相关的内容将放在下面的章节中进行讨论,本节主要讨论uTP处理出数据并返回上层的相关内容。
首先我们看到udp_process_incoming调用了utp_register_recv_packet,这是一个统计用的函数,我们可以不做考虑。

主动关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void utp_close(UTPSocket *conn)
{
switch(conn->state) {
case CS_CONNECTED:
case CS_CONNECTED_FULL:
conn->state = CS_FIN_SENT;
conn->write_outgoing_packet(0, ST_FIN, NULL, 0);
break;

case CS_SYN_SENT:
conn->rto_timeout = utp_call_get_milliseconds(conn->ctx, conn) + min<uint>(conn->rto * 2, 60);
// fall through
case CS_GOT_FIN:
conn->state = CS_DESTROY_DELAY;
break;
case CS_SYN_RECV:
// fall through
default:
conn->state = CS_DESTROY;
break;
}
}

超时重传与可靠传输

引起发送方超时重传的原因有大致三种:

  1. 分组丢失。这里指报文并没有顺利到达接收方,因此需要发送发进行重传。
  2. 确认丢失。这里报文顺利传送到接收方,但接收方返回的ACK报文丢失了。这种情况下发送方很可能会在超时之后重新发送该分组,而接收方应该选择丢弃并重新确认。
  3. 经受延迟。这里报文和ACK都顺利传送,但整个过程耗时超过了Timeout,这时发送方也会进行重传。

对于发送出去的每个数据包设置一个定时器,等定时器超时之后触发回调进行重传是开销很大的。实际上可以维护一个超时重传计时器,当对方有数据包过来时就重置这个计时器,否则当计时器超时时,就重传发送队列中的所有数据包。

发送ACK

当对方封包过来时,需要根据其序号更新自己的确认号,并进行相关处理,如发送ACK、处理乱序包等。这是一个复杂的流程,本部分介绍如何向对方发送一个ACK。而在此之前的例如接受封包,更新自己的ack_nr则在下面的数据包接收(确认部分)进行介绍。

延迟确认

uTP使用了schedule_ack的机制来实现Delayed ACK特性。首先使用UTPSocket::schedule_ack向context注册socket自己,表示请不要立即发送一个空的ACK包,而是尝试将ACK放到带用户数据的包里面。当计时器超时时,utp_issue_deferred_acks函数会被调用(在ucat.c里面)。这个函数调用ack_sockets里面所有注册了的socket的send_ack()方法,发送ACK包。

1
2
3
4
5
for (size_t i = 0; i < ctx->ack_sockets.GetCount(); i++) {
UTPSocket *conn = ctx->ack_sockets[i];
conn->send_ack();
i--;
}

当新的ACK能够随着带用户数据的包一同发送时,就能免于发送一个空ACK包的开销。这时候会调用removeSocketFromAckList函数将这个socket从ack_sockets列表中删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void removeSocketFromAckList(UTPSocket *conn)
{
if (conn->ida >= 0)
{
UTPSocket *last = conn->ctx->ack_sockets[conn->ctx->ack_sockets.GetCount() - 1];

assert(last->ida < (int)(conn->ctx->ack_sockets.GetCount()));
assert(conn->ctx->ack_sockets[last->ida] == last);
last->ida = conn->ida;
conn->ctx->ack_sockets[conn->ida] = last;
conn->ida = -1;

// Decrease the count
conn->ctx->ack_sockets.SetCount(conn->ctx->ack_sockets.GetCount() - 1);
}
}

这个函数会将需要移除的指针和队尾指针互换,并弹出队尾。
需要注意的是延迟确认存在很多特殊情况:

  1. 保活心跳包立即发送
  2. SYN和FIN等关键包立即发送
  3. 当窗口变为0立即发送,因为窗口为0表示一段时间内不能向对方发送数据了

具体实现

从具体实现上看,libutp在void UTPSocket::ack_packet(uint16 seq)函数里面处理对方发过来的ACK,在void UTPSocket::send_ack(bool synack)里面向对方发送自己对对方的ACK。
ack_packet函数有3个返回值,返回0表示正常ACK,返回1表示这个包已经被ACK过了,返回2表示这个包还没有被发送。

数据包接收(确认部分)

TCP协议中使用的是后退N重传(Go-Back-N)协议,即从第一个未确认的包开始全部传送。TCP用ACK号表示小于ACK号的所有字节都已经被接受到。例如A发送了1/2/3/4四个数据包,如果截止到A的RTO超时,B只接受到了1/3,那么它只能ACK到1。这时候A就必须重传2/3/4三个数据包,但其实3是可以不重传的。此时在传输过程中发生了乱序,这里数据包3号早于数据包2号到达了。
uTP使用reorder_count记录数据包乱序抵达的情况。我们查看在utp_process_incoming中有关处理对方序号的部分

1
2
// seqnr is the number of packets past the expected packet this is. ack_nr is the last acked, seq_nr is the current. Subtracring 1 makes 0 mean "this is the next expected packet".
const uint seqnr = (pk_seq_nr - conn->ack_nr - 1) & SEQ_NR_MASK;

这里的pk_seq_nr指的是数据包包头中的seq_nr字段,而conn->ack_nr表示我们最后确认的序号,因此seqnr为0时表示这个包是序号紧接着的数据包。注意这个是能够正确处理溢出的情况的。
接下来跳过若干行,在utp_process_incoming函数的最后,对当前的数据包进行确认工作,并调用utp_call_on_read回调。查看代码,这里对seqnr是否为0,也就是是否为乱序包展开了讨论,首先查看不是乱序包的情况,我们直接在代码中进行注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
if (seqnr == 0) {
size_t count = packet_end - data;
if (count > 0 && conn->state != CS_FIN_SENT) {
// Post bytes to the upper layer
utp_call_on_read(conn->ctx, conn, data, count);
}
conn->ack_nr++;

// Check if the next packet has been received too, but waiting in the reorder buffer.
// 这里检查是否可以释放缓存着的乱序包,例如seqnr==1的包可能已经到达,但由于当前seqnr==0的包还未到达,所以无法确认,只能缓存着
for (;;) {

if (conn->got_fin && conn->eof_pkt == conn->ack_nr) {
if (conn->state != CS_FIN_SENT) {
conn->state = CS_GOT_FIN;
conn->rto_timeout = conn->ctx->current_ms + min<uint>(conn->rto * 3, 60);
utp_call_on_state_change(conn->ctx, conn, UTP_STATE_EOF);
}
// if the other end wants to close, ack
conn->send_ack();
// reorder_count is not necessarily 0 at this point. even though it is most of the time, the other end may have sent packets with higher sequence numbers than what later end up being eof_pkt since we have received all packets up to eof_pkt just ignore the ones after it.
conn->reorder_count = 0;
}
// 当已经没有乱序包了,就直接退出循环。这里和后面的assert联动
// Quick get-out in case there is nothing to reorder
if (conn->reorder_count == 0)
break;
// Check if there are additional buffers in the reorder buffers
// that need delivery.
byte *p = (byte*)conn->inbuf.get(conn->ack_nr+1);
if (p == NULL)
break;
conn->inbuf.put(conn->ack_nr+1, NULL);
count = *(uint*)p;
if (count > 0 && conn->state != CS_FIN_SENT) {
// Pass the bytes to the upper layer
utp_call_on_read(conn->ctx, conn, p + sizeof(uint), count);
}
conn->ack_nr++;

// Free the element from the reorder buffer
free(p);
assert(conn->reorder_count > 0);
conn->reorder_count--;
}
// 向context注册一个延迟确认
conn->schedule_ack();
}

下面查看是乱序包的情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// if we have received a FIN packet, and the EOF-sequence number is lower than the sequence number of the packet we just received something is wrong.
if (conn->got_fin && pk_seq_nr > conn->eof_pkt) {
return 0;
}

// 如果这里接受到一个序号距离ack_nr偏移非常严重的包,选择直接丢弃。注意到这里实际上也处理了一个序号在pk_seq_nr前的包的情况
// if the sequence number is entirely off the expected one, just drop it. We can't allocate buffer space in the inbuf entirely based on untrusted input
if (seqnr > 0x3ff) {
return 0;
}

// we need to grow the circle buffer before we check if the packet is already in here, so that we don't end up looking at an older packet (since the indices wraps around).
conn->inbuf.ensure_size(pk_seq_nr + 1, seqnr + 1);

// 一个提前抵达的包同样可能已经被处理过
// Has this packet already been received? (i.e. a duplicate) If that is the case, just discard it.
if (conn->inbuf.get(pk_seq_nr) != NULL) {
return 0;
}

// Allocate memory to fit the packet that needs to re-ordered
byte *mem = (byte*)malloc((packet_end - data) + sizeof(uint));
*(uint*)mem = (uint)(packet_end - data);
memcpy(mem + sizeof(uint), data, packet_end - data);

// Insert into reorder buffer and increment the count of # of packets to be reordered. we add one to seqnr in order to leave the last entry empty, that way the assert in send_ack is valid. we have to add one to seqnr too, in order to make the circular buffer grow around the correct point (which is conn->ack_nr + 1).
assert(conn->inbuf.get(pk_seq_nr) == NULL);
assert((pk_seq_nr & conn->inbuf.mask) != ((conn->ack_nr+1) & conn->inbuf.mask));
conn->inbuf.put(pk_seq_nr, mem);
conn->reorder_count++;

#if UTP_DEBUG_LOGGING
conn->log(UTP_LOG_DEBUG, "0x%08x: Got out of order data reorder_count:%u len:%u (rb:%u)",
conn->reorder_count, (uint)(packet_end - data), (uint)utp_call_get_read_buffer_size(conn->ctx, conn));
#endif

// 向context注册一个延迟确认
conn->schedule_ack();

RTT与RTO的计算

RTT(Round-Trip Time)即往返时间。受到链路的传播时间、终端系统的处理时间、路由器缓存与处理时间的影响。如果我们使用发送时间戳TS和收到ACK的时间戳TR来计算$RTT = TR - TS$,那么这个值是偏大的,因为它包含了链路中的来回时间,但是也包含了对端在收到数据包之后到发送ACK包之间的包括缓存、处理的时间(称为ack delay)。因此一个较好的RTT计算方式应该是$RTT = TR - TS - ACKDELAY$
RTO(Retransmission TimeOut),超时重传时间,与RTT有关。RFC793中使用低通过滤器对RTT进行平滑,然后再乘上一个因子$\beta$得到初次重传RTO。此外在往返时间变化起伏较大是,还要根据均值和方差计算RTO。RTO随着重传次数是按照指数增长的,即第二次超时则重传时间变为2倍的RTO。在新的RFC2988/6298中又更新了相关的算法,在此不详述。

uTP中RTT和初始RTO的计算实现在ack_packet函数里面。ack_packet是作用在发送队列上的,当数据包没有被重传的时候,使用当前时间减去它的发送时间来计算出ertt,然后计算出rttrto。而rto_timeout指的是超时的时刻,初始化时有rto_timeout = ctx->current_ms + retransmit_timeout。当ctx->current_ms - rto_timeout时则超时条件触发。下面是ack_packet中具体的代码。注意我们只对没有重传的包计算RTT,这是因为如果数据包经历了重传,并且我们收到了来自对端的ACK,我们无法知道这个ACK是对原始包还是被重传包的响应。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 我们只对没有重传的包计算RTT
if (pkt->transmissions == 1) {
// Estimate the round trip time.
const uint32 ertt = (uint32)((utp_call_get_microseconds(this->ctx, this) - pkt->time_sent) / 1000);
if (rtt == 0) {
// First round trip time sample
rtt = ertt;
rtt_var = ertt / 2;
// sanity check. rtt should never be more than 6 seconds
// assert(rtt < 6000);
} else {
// Compute new round trip times
const int delta = (int)rtt - ertt;
rtt_var = rtt_var + (int)(abs(delta) - rtt_var) / 4;
rtt = rtt - rtt/8 + ertt/8;
// sanity check. rtt should never be more than 6 seconds
// assert(rtt < 6000);
rtt_hist.add_sample(ertt, ctx->current_ms);
}
rto = max<uint>(rtt + rtt_var * 4, 1000);
}
retransmit_timeout = rto;
rto_timeout = ctx->current_ms + rto;

QUIC的解决方案

QUIC协议的一个非常不同的地方在于它虽然也是按照Packet编号的,称为Packet Number。但是它的编号是严格递增的。这么做也是为了解决TCP中存在的重传歧义问题,但显而易见我们无法维护有序性了,因此对于承载的流数据,QUIC提供了stream offset来维护其顺序和可靠性。

超时重传

TCP 的实现超时重传一般是设置一个超时重传定时器 icsk->icsk_retransmit_timer,通过 inet_csk_init_xmit_timers来注册。

常用的超时方式有使用 alarm 信号、select、设置 SO_RCVTIMEOSO_SNDTIMEO 字段、使用 Linux 提供的定时器即 setitimer 等。但在 uTP 的实现里面我并没有发现使用上面定时器的痕迹,这个定时器在哪里呢?

uTP 的超时重传实现在 void UTPSocket::check_timeouts 函数里,而这个函数只被 utp_check_timeouts 调用。utp_check_timeouts是作为 uTP 的 API 函数,在应用程序ucat.c中,每次 network_loop 中的 poll 函数超时时,utp_check_timeouts就被调用。其实 libutp 框架更类似于一个个中断处理程序,而不是一个服务,它需要来自外部的信号才能驱动。
下面查看UTPSocket::check_timeouts这个方法。这个方法中出发了对坚持定时器zerowindow_time、重传定时器rto_timeout、保活定时器last_sent_packet和时间等待定时器rto_timeout(这里复用了)。
首先查看最基本的重传定时器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
...
case CS_SYN_SENT:
case CS_SYN_RECV:
case CS_CONNECTED_FULL:
case CS_CONNECTED:
case CS_FIN_SENT: {
if ((int)(ctx->current_ms - rto_timeout) >= 0 && rto_timeout > 0) {
// 这里删去了处理mtu探测的部分内容,移到专门的章节

// 这里删除了一段被注释了的重传次数大于4就重新计算rtt的策略

// Increase RTO
const uint new_timeout = ignore_loss ? retransmit_timeout : retransmit_timeout * 2;

// 这里防范恶意连接的情况,当第三次握手超时时就直接关闭连接
if (state == CS_SYN_RECV) {
state = CS_DESTROY;
utp_call_on_error(ctx, this, UTP_ETIMEDOUT);
return;
}

// 这里删去了处理连接超时的部分内容,移到专门章节

retransmit_timeout = new_timeout;
rto_timeout = ctx->current_ms + new_timeout;

if (!ignore_loss) {
// 此时连接超时,ignore_loss只有当执行mtu探测任务时才为true
duplicate_ack = 0;
int packet_size = get_packet_size();

if ((cur_window_packets == 0) && ((int)max_window > packet_size)) {
// 这时连接处于闲置状态,并不急切需要重置拥塞窗口
max_window = max(max_window * 2 / 3, size_t(packet_size));
} else {
// 此时延迟非常大,因此将拥塞窗口缩小到1个数据包,并开始慢启动算法
max_window = packet_size;
slow_start = true;
}
}

// 这个时候使用后退N协议全部重传
for (int i = 0; i < cur_window_packets; ++i) {
OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - i - 1);
if (pkt == 0 || pkt->transmissions == 0 || pkt->need_resend) continue;

// uTP使用`need_resend`来描述一个包是否需要被重传。
pkt->need_resend = true;
assert(cur_window >= pkt->payload);
cur_window -= pkt->payload;
}

if (cur_window_packets > 0) {
retransmit_count++;
fast_timeout = true;
timeout_seq_nr = seq_nr;

OutgoingPacket *pkt = (OutgoingPacket*)outbuf.get(seq_nr - cur_window_packets);
assert(pkt);

// Re-send the packet.
send_packet(pkt);
}
}

// Mark the socket as writable. If the cwnd has grown, or if the number of
// bytes in-flight is lower than cwnd, we need to make the socket writable again
// in case it isn't
if (state == CS_CONNECTED_FULL && !is_full()) {
state = CS_CONNECTED;

#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "Socket writable. max_window:%u cur_window:%u packet_size:%u",
(uint)max_window, (uint)cur_window, (uint)get_packet_size());
#endif
utp_call_on_state_change(this->ctx, this, UTP_STATE_WRITABLE);
}

if (state >= CS_CONNECTED && state < CS_GOT_FIN) {
if ((int)(ctx->current_ms - last_sent_packet) >= KEEPALIVE_INTERVAL) {
send_keep_alive();
}
}
break;
...

注意这里对于一个 socket 而不是一个数据包维护一个 retransmit_count
下面查看时间等待定时器,这里并不需要等 2MSL 的时间,而是 3*RTO 和 60 之间的较小值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// check_timeouts
case CS_GOT_FIN:
case CS_DESTROY_DELAY:
if ((int)(ctx->current_ms - rto_timeout) >= 0) {
state = (state == CS_DESTROY_DELAY) ? CS_DESTROY : CS_RESET;
if (cur_window_packets > 0) {
utp_call_on_error(ctx, this, UTP_ECONNRESET);
}
}
break;

// utp_process_incoming
if (conn->state != CS_FIN_SENT) {
conn->state = CS_GOT_FIN;
conn->rto_timeout = conn->ctx->current_ms + min<uint>(conn->rto * 3, 60);
utp_call_on_state_change(conn->ctx, conn, UTP_STATE_EOF);
}

Fast retransmit

Fast retransmit虽然也是超时重传行为,但实际上是拥塞避免算法中的一部分。因此将在拥塞控制部分论述。

Selective Acknowledgment

在 TCP 协议中使用 SACK 选项进行选择确认,使用若干组 [start, end) 来表示已经接受到数据的区间。SACK 能够有效减少重传数据包的数量,对于带宽紧张的网络十分有用。不过需要注意恶意使用 SACK 对 CPU 资源造成的损害。
在先前的数据包头构造部分已经提到了 UTP 的 EACK 机制,这是一个非常巧妙的方案:
用一个32位的比特串来表示从 ack_nr + 2 开始的 32 个未确认的包中有哪些是已经收到了在缓存里的,这里不从 ack_nr + 1 开始,因为可以默认这个包丢了,不然的话 ack_nr 就至少会到ack_nr + 1的值了。有关 SACK 的代码在 UTPSocket::send_ack 方法里面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
if (reorder_count != 0 && state < CS_GOT_FIN) {
// if reorder count > 0, send an EACK. reorder count should always be 0 for synacks, so this should not be as synack
assert(!synack);
pfa.pf.ext = 1;
pfa.ext_next = 0;
pfa.ext_len = 4;
uint m = 0;

// reorder count should only be non-zero if the packet ack_nr + 1 has not yet been received
assert(inbuf.get(ack_nr + 1) == NULL);
size_t window = min<size_t>(14+16, inbuf.size());
// Generate bit mask of segments received.
for (size_t i = 0; i < window; i++) {
if (inbuf.get(ack_nr + i + 2) != NULL) {
m |= 1 << i;
}
}
pfa.acks[0] = (byte)m;
pfa.acks[1] = (byte)(m >> 8);
pfa.acks[2] = (byte)(m >> 16);
pfa.acks[3] = (byte)(m >> 24);
len += 4 + 2;
}

首先看到条件是 reorder_count != 0 && state < CS_GOT_FIN,这表明在非连接建立/关闭时,当出现数据包乱序抵达时,启动 EACK 机制。
EACK 是不允许 Reneging 的,

连接的异常终止

重传失败

在 Linux 中使用 tcp_retries1 = 3tcp_retries2 = 15(计算得到的一个时间戳)来限定普通包的重传次数。
特别地,在握手时设有专门的 tcp_syn_retries,这为了防止 SYN Flood 攻击,所以有个专门的值。
在UTP中的重传失败机制比较简单。

1
2
3
4
5
6
7
8
9
10
11
if (retransmit_count >= 4 || (state == CS_SYN_SENT && retransmit_count >= 2)) {
// 4 consecutive transmissions have timed out. Kill it. If we
// haven't even connected yet, give up after only 2 consecutive
// failed transmissions.
if (state == CS_FIN_SENT)
state = CS_DESTROY;
else
state = CS_RESET;
utp_call_on_error(ctx, this, UTP_ETIMEDOUT);
return;
}

连接重置

保活定时器

TCP 的保活机制详见TCP的可靠传输.
在 libutp 中设定了一个 KEEPALIVE_INTERVAL 29000 的阈值,表示29秒后会启动保活探测,它同时指出这个阈值来自对很多家庭 NAT 设备的计量。保活探测很简单,就是发送一个重复的 ACK。

1
2
3
4
5
6
7
8
9
10
11
void UTPSocket::send_keep_alive()
{
ack_nr--;

#if UTP_DEBUG_LOGGING
log(UTP_LOG_DEBUG, "Sending KeepAlive ACK %u [%u]", ack_nr, conn_id_send);
#endif

send_ack();
ack_nr++;
}

流量控制和拥塞控制

TCP的流量控制以及拥塞控制算法

被迁移到这一篇文章中

uTP拥塞控制概述

uTP 拥塞控制是一种 LEDBAT 算法,它选择了丢包率和单向缓冲时延(one way buffer delay)进行度量。在链路中存在一些设备能够缓存几秒钟内通过的数据,但 uTP 希望实现0字节的发送缓存。在实践中 uTP 的目标延迟在 100ms,即当套接字侦测到自己发送数据包经受了 100ms 以上的延迟时,它就会调整拥塞窗口。下面查看实现。

在 uTP 报文头部可以看到 tv_usecreply_micro,通过这两个值 uTP 可以维护一个两分钟内的最小延迟值 delay_base,将它作为两个服务器之间的延迟的基线,如果实际延迟超过基线 100ms,那么就认为网络发生了拥塞。delay_baseDelayHist::add_sample 中更新,DelayHist 使用数组 cur_delay_hist 维护所有 delay 值相对于 delay_base 的偏移。在下面的时间测量部分继续讨论 DelayHist 的相关实现。
UTPSocket::apply_ccontrol 中,首先取 our_delayDelayHist::get_value() 获得的所有 cur_delay_hist 的最小值,注意由于没有样本时返回 UINT_MAX,所以此时 our_delay 取 RTT。

1
2
3
4
5
6
7
8
9
10
// DelayHist::get_value()
uint32 get_value()
{
uint32 value = UINT_MAX;
for (size_t i = 0; i < CUR_DELAY_SIZE; i++) {
value = min<uint32>(cur_delay_hist[i], value);
}
// value could be UINT_MAX if we have no samples yet...
return value;
}

在套接字中定义 target_delay 默认为 100000 微秒即 100 毫秒,我们希望每个套接字发送端的延时不超过 100ms。

uTP的窗口

libutp定义了一些有关窗口的变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// the number of packets in the send queue. Packets that haven't
// yet been sent count as well as packets marked as needing resend
// the oldest un-acked packet in the send queue is seq_nr - cur_window_packets
uint16 cur_window_packets;

// how much of the window is used, number of bytes in-flight
// packets that have not yet been sent do not count, packets
// that are marked as needing to be re-sent (due to a timeout)
// don't count either
size_t cur_window;
// maximum window size, in bytes
size_t max_window;

// max receive window for other end, in bytes
size_t max_window_user;

其中 cur_window_packets 表示数据包的滑动窗口窗口,包含所有在发送队列的数据包,无论是否已经被发送,或者是否需要重传。因此最旧的未被对方确认的序号是 seq_nr - cur_window_packets。UTP 对 cur_window_packets 的限制是一定要小于 OUTGOING_BUFFER_MAX_SIZE

cur_window 就是按字节算的通常意义上的滑动窗口,在计算时不包含需要重传的包。max_window 表示最大的窗口,它和 max_window_user 不同的是 max_window 还包含了拥塞窗口。max_window_user 来自对方,表示对方缓冲区的大小。
下面的 is_full 判断从 cur_window_packetscur_window 角度窗口是否饱和。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
bool UTPSocket::is_full(int bytes)
{
size_t packet_size = get_packet_size();
if (bytes < 0) bytes = packet_size;
else if (bytes > (int)packet_size) bytes = (int)packet_size;
size_t max_send = min(max_window, opt_sndbuf, max_window_user);

// subtract one to save space for the FIN packet
if (cur_window_packets >= OUTGOING_BUFFER_MAX_SIZE - 1) {
last_maxed_out_window = ctx->current_ms;
return true;
}
if (cur_window + bytes > max_send) {
last_maxed_out_window = ctx->current_ms;
return true;
}
return false;
}

Fast retransmit

当发送方连续收到3次相同的 ACK,那么就重传可能被丢了的包。为什么至少收到3次而不是2次?因为丢包情况下发送方至少会收到三次重复的ACK,否则无法区分 reorder 和 loss。从实现上看,有的快速重传选择只重传最初被丢的包,有的选择重传所有被丢的包。uTP 使用 duplicate_ack 来记录收到重复 ACK 的次数。在 utp_process_incoming 函数中对 duplicate_ack 进行更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
if (conn->cur_window_packets > 0) {
// 当ack_nr等于最后被对方确认的序号时,这里`conn->seq_nr - conn->cur_window_packets`等于第一个没被对方确认的包
if (pk_ack_nr == ((conn->seq_nr - conn->cur_window_packets - 1) & ACK_NR_MASK)
// 这里作者强调了当数据包带上了用户数据后,就不应该算入重复ACK中,这和BSD4.4 TCP实现是一致的
&& pk_flags == ST_STATE) {
++conn->duplicate_ack;
if (conn->duplicate_ack == DUPLICATE_ACKS_BEFORE_RESEND && conn->mtu_probe_seq) {
// It's likely that the probe was rejected due to its size, but we haven't got an ICMP report back yet
if (pk_ack_nr == ((conn->mtu_probe_seq - 1) & ACK_NR_MASK)) {
conn->mtu_ceiling = conn->mtu_probe_size - 1;
conn->mtu_search_update();
} else {
// A non-probe was blocked before our probe. Can't conclude much, send a new probe
conn->mtu_probe_seq = conn->mtu_probe_size = 0;
}
}
} else {
conn->duplicate_ack = 0;
}
// TODO: if duplicate_ack == DUPLICATE_ACK_BEFORE_RESEND and fast_resend_seq_nr <= ack_nr + 1, resend ack_nr + 1 also call maybe_decay_win()
}

报文分段

MTU(Maximum Transmission Unit),最大传输单元。通常地,以太网的MTU是1500,而IP是65535(包括头部),Internet的标准MTU是576。所以对于较大的IP包,如果在以太网上传输就需要进行分片。而TCP协议提供了MSS选项用来在建立连接时写上MSS大小,也就是TCP的最大的分段大小,超过这个MSS的数据就需要进行分段传输。MSS的协商在前两次的SYN握手时处理。
而UDP是不带有分片功能的,所以对于较大的数据包是采用IP进行分片的。这其中带来一些问题,IP分片后只有第一个分片带有UDP头部,因此只要有一个IP数据报传输失败,那么整个UDP报文就无法交付(校验和和长度都通不过)。而IP协议本身并没有重传功能,且分片可能发生在链路上的任一路由器上,实际上根本无法知道原数据包是怎么被分片的。因此如果在UDP上层的实现要求重传(UDP本身不带重传),必须整个UDP数据报全部重传。

所以重新考虑 UDP 协议,会发现它头部的2字节的长度显得很不必要,因为根本不会发这么长的数据报。事实上按照 TCP 的按字节编码省掉一个长度字段也是方便的。因此基于 UDP 实现的传输层协议首先要做的就是避免IP层为我们分片,这样就能保证每个 IP 数据报中都要带有 UDP 头和我们的协议头,是个完整的传输层协议包。所以需要让 MSS + HEAD + UDP_HEAD + IP_HEAD 小于可能的链路层的 MTU。

MTU探测

UTP 通过截取 ICMP Type3 Code4(fragmentation needed) 来获得分片情况,即在 IP 首部设置了不可分片标志,但如果 UDP 报文达到 MTU 上限则会丢弃该 IP 报,返回 ICMP 不可达差错。UTP 通过这个机制使用二分法来找到一个合适的 MTU。
UTPSocket::mtu_reset 函数中,预置了 MTU 搜寻空间为 [576, udp_mtu],也就是 default IP maximum datagem size。不过由于以太网的流行,所以将 576 作为下限,此时对应于 TCP 的 MSS 为 536。

1
2
3
4
5
6
void UTPSocket::mtu_reset()
{
mtu_ceiling = get_udp_mtu();
mtu_floor = 576;
mtu_discover_time = utp_call_get_milliseconds(this->ctx, this) + 30 * 60 * 1000;
}

时间测量

本章将详细讨论在前面拥塞控制等章节遇到的各类延迟的计算。

延迟测量

在数据报报头的tv_usecreply_micro两个字段用来测量延迟。
发送端S设置tv_usec表示发送时间S1

1
2
3
// send_data
uint64 time = utp_call_get_microseconds(ctx, this);
b1->tv_usec = (uint32)time;

接收端R计算reply_micro表示接受时间R1与发送时间S1的差,粗略地估计了从S到R的经历的时间。注意由于两个主机的时钟不一定一致,所以这个值不精确。

1
2
3
4
5
6
7
// utp_process_incoming
uint64 p = pf1->tv_usec;

// get delay in both directions
// record the delay to report back
const uint32 their_delay = (uint32)(p == 0 ? 0 : time - p);
conn->reply_micro = their_delay;

为了消除误差,UTP借助了NTP授时协议的机制。这里需要假设S与R之间的网络状况是对等的(这是一个很强的假设),即从S到R的速度不至于显著慢或快于从R到S的速度。这时候从R往S端发送一个回复的数据包,记录下这次的发送时间R2和接收时间S2。可以计算得到仅由网络原因造成的延时为$(S2 - S1) - (R2 - R1)$,还能得到S和R两个主机之间的时间差是$\frac{(R1 - S1) + (R2 - S2)}{2}$。

1
2
// send_data
b1->reply_micro = reply_micro;

DelayHist类型

DelayHist记录了时间的延迟,具有以下的方法

  1. shift
    用来将所有的delay_base_hist向右偏移一段时间长度

clock drift问题

由于UTP被用来实现一些BT下载软件,这个机制是UTP用来防止用户故意调慢时钟从而霸占带宽设计的,并且不会产生误报(False positive)。

uTP数据包统计

utp_context_statsutp_context_stats中进行context和socket级别的统计。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Returned by utp_get_context_stats()
typedef struct {
uint32 _nraw_recv[5]; // total packets recieved less than 300/600/1200/MTU bytes fpr all connections (context-wide)
uint32 _nraw_send[5]; // total packets sent less than 300/600/1200/MTU bytes for all connections (context-wide)
} utp_context_stats;

// Returned by utp_get_stats()
typedef struct {
uint64 nbytes_recv; // total bytes received
uint64 nbytes_xmit; // total bytes transmitted
uint32 rexmit; // retransmit counter
uint32 fastrexmit; // fast retransmit counter
uint32 nxmit; // transmit counter
uint32 nrecv; // receive counter (total)
uint32 nduprecv; // duplicate receive counter
uint32 mtu_guess; // Best guess at MTU
} utp_socket_stats;

此外在utp_context中也维护了rst_info等信息。

序号溢出问题

TCP中使用了32位的序号,并且具有PAWS机制防止在大带宽的情况下序号被迅速耗尽后产生回绕。如前文所展示的,在uTP的实现中利用了无符号整数的溢出来避免回绕时序号变为0的问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// compare if lhs is less than rhs, taking wrapping
// into account. if lhs is close to UINT_MAX and rhs
// is close to 0, lhs is assumed to have wrapped and
// considered smaller
bool wrapping_compare_less(uint32 lhs, uint32 rhs, uint32 mask)
{
// distance walking from lhs to rhs, downwards
const uint32 dist_down = (lhs - rhs) & mask;
// distance walking from lhs to rhs, upwards
const uint32 dist_up = (rhs - lhs) & mask;

// if the distance walking up is shorter, lhs
// is less than rhs. If the distance walking down
// is shorter, then rhs is less than lhs
return dist_up < dist_down;
}

Reference