Skip to content
zig 版本:0.12.0

Echo TCP Server

我们来进行编写一个小小的示例———— Echo TCP Server(TCP 回显 server),帮助我们理解更多的内容。

代码一共也就一百行左右,简洁但不简单!

前置知识

Socket

Socket(套接字)是计算机网络中用于实现不同计算机或同一台计算机上的不同进程之间的通信的一种技术。它提供了一种标准的 API,程序员可以使用这个 API 来编写网络应用程序。

一个Socket由三个部分组成:协议本地地址远程地址,协议决定了Socket的类型和通信方式,例如TCP或UDP,本地地址是Socket绑定的网络接口和端口号,远程地址是Socket连接的目标网络接口和端口号。

除了常见的 TCPUDP 外,还有一种叫做 Unix Socket,用于在同一台机器上的不同进程间进行通信,并不使用网络协议栈,而是直接在内核中传递数据,比 TCP 和 UDP 更加高效。

IO多路复用

I/O 多路复用是一种允许一个进程同时监视多个 I/O 通道(例如,socket文件描述符等),并知道哪个通道可以进行读写操作的技术。这样,一个进程就可以同时处理多个 I/O 操作,而无需为每个 I/O 操作启动一个新的线程或进程。

I/O 多路复用的主要优点是提高了程序的效率。如果没有 I/O 多路复用,程序可能需要为每个 I/O 操作创建一个新的线程或进程,这会消耗大量的系统资源。通过使用 I/O 多路复用,程序可以在一个单独的线程或进程中处理多个 I/O 操作,从而减少了系统资源的使用。

I/O 多路复用的常见实现包括 select、poll 和 epoll 等系统调用。这些系统调用允许程序指定一个文件描述符列表,并等待其中任何一个文件描述符准备好进行 I/O 操作。当一个或多个文件描述符准备好时,系统调用返回,程序就可以进行相应的读或写操作。

思路讲解

目标:实现一个单线程基于 pollecho server

常规的 socket 编程流程为:

  1. socket( )
  2. bind( )
  3. listen( )
  4. accept( )
  5. read( )
  6. write( )
  7. close( )

tcp

以上是一个常规的 TCP server 的运作图,但是缺点也很明显,那就是这样运行的话 server 一次只能处理一个连接,无法实现并发连接。

故我们引入 poll,它是 POSIX 标准之一,允许我们通知内核替我们监听多个描述符(此处指代 socket 描述符),以一种订阅的方案来监听一组描述符,直到描述符可读或写时通知进程就绪的描述符数量。

🅿️ 提示

严格来说,poll 已经算是一门“过时”的技术,在 linux 平台它被 epoll 取代,BSD 系统(包括 mac )则使用 kqueue,而 windows 使用 IOCP(I/O Completion Ports)Overlapped I/O

WSAPoll For Windows: WSAPoll function

Poll For Linux: poll(2) — Linux manual page

以下是使用 poll 后的运作图:

tcp

实战

为了同时兼容 linux 和 windows,我们需要利用一下 zig 的 builtin 包来判断构建目标来决定使用的函数(poll 在 windows 上的实现不完全标准)。

完整的代码在 Github,测试用的客户端可以使用 telent (windows、linux、mac 均可用)。

server 监听端口的实现:

zig
// 解析地址
const port = 8080;
const address = try net.Address.parseIp4("127.0.0.1", port);
// 初始化一个server,这里就包含了 socket() 和 bind() 两个过程
var server = try address.listen(.{ .reuse_port = true });
defer server.deinit();

定义一些必要的数据:

zig
// 定义最大连接数
const max_sockets = 1000;
// buffer 用于存储 client 发过来的数据
var buf: [1024]u8 = std.mem.zeroes([1024]u8);
// 存储 accept 拿到的 connections
var connections: [max_sockets]?net.Server.Connection = undefined;
// sockfds 用于存储 pollfd, 用于传递给 poll 函数
var sockfds: [max_sockets]if (builtin.os.tag == .windows) windows.ws2_32.pollfd else std.posix.pollfd = undefined;

处理客户端发送的数据的实现:

zig
// 遍历所有的连接,处理事件
for (1..max_sockets) |i| {
    // 这里的 nums 是 poll 返回的事件数量
    // 在windows下,WSApoll允许返回0,未超时且没有套接字处于指定的状态
    if (nums == 0) {
        break;
    }
    const sockfd = sockfds[i];

    // 检查是否是无效的 socket
    if (sockfd.fd == context.INVALID_SOCKET) {
        continue;
    }

    // 由于 windows 针对无效的socket也会触发POLLNVAL
    // 当前 sock 有 IO 事件时,处理完后将 nums 减一
    defer if (sockfd.revents != 0) {
        nums -= 1;
    };

    // 检查是否是 POLLIN 事件,即是否有数据可读
    if (sockfd.revents & (context.POLLIN) != 0) {
        const c = connections[i];
        if (c) |connection| {
            const len = try connection.stream.read(&buf);
            // 如果连接已经断开,那么关闭连接
            // 这是因为如果已经 close 的连接,读取的时候会返回0
            if (len == 0) {
                // 但为了保险起见,我们还是调用 close
                // 因为有可能是连接没有断开,但是出现了错误
                connection.stream.close();
                // 将 pollfd 和 connection 置为无效
                sockfds[i].fd = context.INVALID_SOCKET;
                std.log.info("client from {any} close!", .{
                    connection.address,
                });
                connections[i] = null;
            } else {
                // 如果读取到了数据,那么将数据写回去
                // 但仅仅这样写一次并不安全
                // 最优解应该是使用for循环检测写入的数据大小是否等于buf长度
                // 如果不等于就继续写入
                // 这是因为 TCP 是一个面向流的协议,它并不保证一次 write 调用能够发送所有的数据
                // 作为示例,我们不检查是否全部写入
                _ = try connection.stream.write(buf[0..len]);
            }
        }
    }
    // 检查是否是 POLLNVAL | POLLERR | POLLHUP 事件,即是否有错误发生,或者连接断开
    else if (sockfd.revents & (context.POLLNVAL | context.POLLERR | context.POLLHUP) != 0) {
        // 将 pollfd 和 connection 置为无效
        sockfds[i].fd = context.INVALID_SOCKET;
        connections[i] = null;
        std.log.info("client {} close", .{i});
    }
}

处理新连接的实现:

zig
// 检查是否有新的连接
// 这里的 sockfds[0] 是 server 的 pollfd
// 这里的 nums 检查可有可无,因为我们只关心是否有新的连接,POLLIN 就足够了
if (sockfds[0].revents & context.POLLIN != 0 and nums > 0) {
    std.log.info("new client", .{});
    // 如果有新的连接,那么调用 accept
    const client = try server.accept();
    for (1..max_sockets) |i| {
        // 找到一个空的 pollfd,将新的连接放进去
        if (sockfds[i].fd == context.INVALID_SOCKET) {
            sockfds[i].fd = client.stream.handle;
            connections[i] = client;
            std.log.info("new client {} comes", .{i});
            break;
        }
        // 如果没有找到空的 pollfd,那么说明连接数已经达到了最大值
        if (i == max_sockets - 1) {
            @panic("too many clients");
        }
    }
}