This tutorial is designed for macOS or BSD systems.

What is kqueue?

kqueue is an event notification mechanism in macOS (and BSD systems such as FreeBSD). If you are writing a server, but you don’t want to allocate a thread for each client; you can use kqueue to listen for events from clients, for example:

  • Who just connected?
  • Who disconnected?
  • Which client has sent a message?

The system notifies you automatically. Multiple clients can be served by a single thread. This is I/O Multiplexing. kqueue is similar to epoll in Linux and iocp in Windows. This will save resources.

How to use kqueue?

We need define a base code struct.

#ifdef __APPLE__

// Write code to use kqueue

#endif

int main() {
#ifdef __APPLE__
    // init socket
#endif
    return 0;
}

This is a client/server architecture, and the first thing we need to do is define the list of clients.

fd represents a file descriptor. It is similar to phone number.

#define MAX_CLIENT_QTY 1024

struct client_data
{
    // If client_data[i].fd > 0, this isn't empty
    // If client_data[i].fd == 0, this is empty
    int fd;
} client_data[MAX_CLIENT_QTY];

The socket server needs to be initialized before use.

Open and listen a port (Now, we are listening 127.0.0.1:25560).

int socket_listen()
{
    // Define address info data structure
    struct addrinfo *addr;
    struct addrinfo hints;
    // Reset memory data to 0
    memset(&hints, 0, sizeof(hints));
    // AI_PASSIVE: This is a server address
    hints.ai_flags = AI_PASSIVE;
    // PF_UNSPEC: No protocol family specified, IPv4 or IPv6
    hints.ai_family = PF_UNSPEC;
    // SOCK_STREAM: Use TCP byte stream
    hints.ai_socktype = SOCK_STREAM;
    // 127.0.0.1: Only allow local address
    // 25560: Port number
    getaddrinfo("127.0.0.1", "25560", &hints, &addr);
    // ai_family: Protocol family
    // ai_socktype: Socket type, is SOCK_STREAM
    // ai_protocol: Protocol number, 0 -> TCP
    // This line code indicates that a doorknob is being added to the "port" door
    const int socket_s = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
    // Give the "socket" door a house number
    bind(socket_s, addr->ai_addr, addr->ai_addrlen);
    // There are 5 chairs lined up at the entrance; if too many new clients arrive, they'll have to wait
    listen(socket_s, 5);
    // Return the handle of this socket
    return socket_s;
}

Start listener.

int socket_init()
{
    // Initialize the handle of socket
    const int socket_s = socket_listen();
    // Create an event listener queue
    const int kq = kqueue();
    // Create an event
    struct kevent kev;
    // kev: Event struct
    // socket_s: the file descriptor (or handle) of socket
    // EVFILT_READ: Listen for "readable events" (new connection or data arrival)
    // EV_ADD: Add this event to kqueue
    // Other parameters use default value (0 or NULL)
    // This line code mean: Call me when someone comes
    EV_SET(&kev, socket_s, EVFILT_READ, EV_ADD, 0, 0, NULL);
    // Registry event
    // kq: Queue
    // kev: Event
    // 1: Event quatity
    // Other parameters: Only registry
    kevent(kq, &kev, 1, NULL, 0, NULL);
    // Start event listener loop (To be implemented later)
    // "Waiters begin working 24/7"
    run_event_loop(kq, socket_s);
    return EXIT_SUCCESS;
}

Implement event listener loop.

void run_event_loop(const int kq, const int socket_s)
{
    // `evs`: A temporary variable used to register or remove events from `kqueue`
    struct kevent evs;
    // `addr` and `addr_len`: Used to store client address information (used during `accept()`)
    struct sockaddr_storage addr;
    socklen_t addr_len = sizeof(addr);
    // Infinite loop listening for events
    while (1)
    {

        struct kevent events[MAX_CLIENT_QTY];
        // The server (program) arrives at the main service desk (kq), s
        // tands still (permanently blocked), and waits for a customer (file descriptor/signal) 
        // to call for service (an event occurs).
        // Once a call occurs, the service desk records all past call information, 
        // up to a maximum of the number of pages in the logbook (MAX_CLIENT_QTY), 
        // into your logbook (the `events` array), and then tells you the total number 
        // of calls recorded (ne).
        const int ne = kevent(kq, NULL, 0, events, MAX_CLIENT_QTY, NULL);
        // Iterate through each event
        for (int i = 0; i < ne; ++i)
        {
            // New connect event
            if (events[i].ident == socket_s)
            {
                // Accept connection
                const int clint_s = accept(events[i].ident, (struct sockaddr *) &addr, &addr_len);
                // Successfully: Add clint_s to the list of clients (To be implemented later)
                if (create_connect(clint_s) == 0)
                {
                    // Registry event to kqueue
                    EV_SET(&evs, clint_s, EVFILT_READ, EV_ADD, 0, 0, NULL);
                    kevent(kq, &evs, 1, NULL, 0, NULL);
                    // Send welcome message to the client (To be implemented later)
                    send_welcome_msg(clint_s);
                }
                // Failed
                else
                {
                    printf("accept error");
                    // Close connection
                    close(clint_s);
                }
            }
            // Client disconnect
            else if (events[i].flags & EV_EOF)
            {
                // Get file descriptor
                const int fd = events[i].ident;
                printf("fd %d closed\n", fd);
                // Remove event from kqueue
                EV_SET(&evs, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
                kevent(kq, &evs, 1, NULL, 0, NULL);
                // Clear the client from the list of client (To be implemented later)
                remove_connect(fd);
            }
            // Client sent message
            else if (events[i].filter == EVFILT_READ)
            {
                // Receive message (To be implemented later)
                recv_msg(events[i].ident);
            }
        }
    }
}

Get connect by fd

int get_connect(const int fd)
{
    for (int i = 0; i < MAX_CLIENT_QTY; i++)
    {
        if (client_data[i].fd == fd)
        {
            return i;
        }
    }
    return -1;
}

int get_empty_connect()
{
    // 0: 0 is empty connect
    // Find first empty connect location
    return get_connect(0);
}

Create connect and insert into the list of client.

int create_connect(const int fd)
{
    // Check if fd is valid
    // fd > 0 -> valid
    // fd <= 0 -> invalid
    if (fd <= 0)
    {
        // Failed
        return -1;
    }
    // Get empty connect
    const int res = get_empty_connect();
    // If full
    if (res == -1)
    {
        // Failed
        return -1;
    }
    // Not full
    client_data[res].fd = fd;
    return 0;
}

Send welcome message to client.

// s: client number or fd
void send_welcome_msg(const int s)
{
    char msg[80];
    sprintf(msg, "welcome! you are client #%d!\n", get_connect(s));
    // Send message to the the client
    // s: client number or fd
    // msg: Message content
    // strlen(msg): the length of the message content
    // 0: default send method
    send(s, msg, strlen(msg), 0);
}

Remove connect from the list of client.

int remove_connect(const int fd)
{
    if (fd <= 0)
    {
        return -1;
    }
    const int res = get_connect(fd);
    if (res == -1)
    {
        return -1;
    }
    // Set 0 to clear the client
    client_data[res].fd = 0;
    return 0;
}

Receive message.

void recv_msg(const int s)
{
    // Create a memory buffer, max 1023 byte (0 - 1022), because buf[1023] is '\0'
    char buf[MAX_CLIENT_QTY];
    // Read data from socket
    // Max size: buf length - 1
    // 0: default read method
    // bytes_read: the number of bytes actually read
    const int bytes_read = recv(s, buf, sizeof(buf) - 1, 0);
    // Append '\0'
    buf[bytes_read] = 0;
    printf("client #%d: %s", get_connect(s), buf);
    // Force the output to be printed to the screen immediately
    fflush(stdout);
}

Now you can use kqueue in a basic way.

Extended content.

How can we obtain input from stdin and implement the corresponding functionality?

Since we need to perform some string operations here, we need to create some functions first.

These are not the focus of this discussion, so they will not be covered for now.

int start_with(const char *prefix, const char *str)
{
    size_t prefix_len = strlen(prefix);
    size_t str_len = strlen(str);
    return str_len < prefix_len ? 0 : !memcmp(prefix, str, prefix_len);
}
char** split_by_space(const char *str) {
    char *copy = strdup(str);
    char *token = strtok(copy, " ");
    int capacity = 10;
    char **result = malloc(capacity * sizeof(char*));
    int count = 0;
    while (token) {
        if (count >= capacity - 1) {
            capacity *= 2;
            result = realloc(result, capacity * sizeof(char*));
        }
        result[count++] = strdup(token);
        token = strtok(NULL, " ");
    }
    result[count] = NULL;
    free(copy);
    return result;
}
char* str_replace(const char *src, const char *old, const char *new_str) {
    if (!src || !old || !new_str) return NULL;

    size_t src_len = strlen(src);
    size_t old_len = strlen(old);
    size_t new_len = strlen(new_str);

    if (old_len == 0) return strdup(src);

    size_t count = 0;
    const char *pos = src;
    while ((pos = strstr(pos, old)) != NULL) {
        count++;
        pos += old_len;
    }

    size_t new_total_len = src_len + (new_len - old_len) * count + 1;
    char *result = malloc(new_total_len);
    if (!result) return NULL;

    const char *src_ptr = src;
    char *dst_ptr = result;

    while ((pos = strstr(src_ptr, old)) != NULL) {
        size_t bytes_to_copy = pos - src_ptr;
        memcpy(dst_ptr, src_ptr, bytes_to_copy);
        dst_ptr += bytes_to_copy;

        memcpy(dst_ptr, new_str, new_len);
        dst_ptr += new_len;

        src_ptr = pos + old_len;
    }

    strcpy(dst_ptr, src_ptr);

    return result;
}

How to read stdin async? (Or why we need to read async?)

Based on the previous code, we know that the run_event_loop function is an infinite loop function and cannot block the run_event_loop loop. Therefore, we cannot read it in the conventional way, but can only read it async.

Q: So how do we implement asynchronous reading?

A: It’s very simple, we can use multithreading to read.

Let’s first create an empty function.

void *stdin_reader()
{
    // Write code
}

Modify the code in the socket_init function.

Before:

int socket_init()
{
    const int socket_s = socket_listen();
    const int kq = kqueue();
    struct kevent kev;
    EV_SET(&kev, socket_s, EVFILT_READ, EV_ADD, 0, 0, NULL);
    kevent(kq, &kev, 1, NULL, 0, NULL);
    run_event_loop(kq, socket_s);
    return EXIT_SUCCESS;
}

After:

int socket_init()
{
    const int socket_s = socket_listen();
    const int kq = kqueue();
    struct kevent kev;
    EV_SET(&kev, socket_s, EVFILT_READ, EV_ADD, 0, 0, NULL);
    kevent(kq, &kev, 1, NULL, 0, NULL);
    // Create a handle of thread
    pthread_t ptr;
    // Create a new thread
    // &ptr: a handle of thread
    // NULL: default thread attr
    // stdin_reader: this is the function we want to use to read `stdin`
    // NULL: the parameters passed to `stdin_reader` are not needed here
    pthread_create(&ptr, NULL, stdin_reader, NULL);
    // Infinite loop
    run_event_loop(kq, socket_s);
    // End of the thread
    pthread_cancel(ptr);
    // Waiting for the thread to finish (reclaim resources)
    pthread_join(ptr, NULL);
    return EXIT_SUCCESS;
}

Complete stdin_reader function.

void send_all_msg(const char *msg)
{
    for (int i = 0; i < MAX_CLIENT_QTY; i++)
    {
        if (client_data[i].fd > 0)
        {
            send(client_data[i].fd, msg, strlen(msg), 0);
        }
    }
}

void *stdin_reader()
{
    // Memeory buffer, max 256 byte
    char s[256];
    // Use `fgets()` to read an entire line from stdin (keyboard)
    // Tips: In operating systems like Linux and BSD, there is an important concept - everything is a file,
    //       so, stdin is a file.
    while (fgets(s, sizeof(s), stdin))
    {
        // Append '\0' to s
        s[strcspn(s, "\n") + 1] = 0;
        // Check if s is "/quit\n"
        if (strcmp(s, "/quit\n") == 0)
        {
            // Execute quit function
            exit(0);
        }
        // Check if s is "/help\n"
        if (strcmp(s, "/help\n") == 0)
        {
            // Output help info
            printf("Available commands:\n");
            printf("/help - Show this help message\n");
            printf("/quit - Quit the server\n");
            printf("/say <client_id> <message> - Send a message to a specific client\n");
            printf("/list - List all connected clients\n");
            continue;
        }
        // Check if s is start with "/list"
        if (start_with("/list", s))
        {
            printf("Connected clients:\n");
            for (int i = 0; i < MAX_CLIENT_QTY; i++)
            {
                // Output all exists clients
                if (client_data[i].fd > 0)
                {
                    printf("#%d\n", i);
                }
            }
            printf("\n");
            continue;
        }
        // Check if s is start with "/say"
        if (start_with("/say", s))
        {
            // Split s by space
            char** tokens = split_by_space(s);
            // Get tokens length
            int count = 0;
            for (int i = 0; tokens[i] != NULL; i++)
            {
                count++;
            }
            // Check if count is lt 3
            if (count < 3)
            {
                // Output help info
                printf("Usage: /say <client_id> <message>\n");
                continue;
            }
            // Get and check client id
            int client_id = atoi(tokens[1]);
            if (client_id < 0 || client_id >= MAX_CLIENT_QTY)
            {
                printf("Invalid client id\n");
                continue;
            }
            // Get command header, for example:
            // /say 0 Hello, I am a server.
            // Command header is "/say 0 "
            char* cmd_header = strcat(strcat(tokens[0], " "), strcat(tokens[1], " "));
            // Clear command header to get all message content
            char* msg = str_replace(s, cmd_header, "");
            // Send message to the client
            send(client_data[client_id].fd, msg, strlen(msg), 0);
            // Free pointer
            free(tokens);
        }
        else
        {
            send_all_msg(s);
        }
    }
    return NULL;
}

The complete code.

#ifdef __APPLE__
#include <stdio.h>
#include <unistd.h>
#include <_stdlib.h>
#include <sys/event.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <string.h>
#include <pthread.h>

#define MAX_CLIENT_QTY 1024

struct client_data
{
    int fd;
} client_data[MAX_CLIENT_QTY];

int get_connect(const int fd)
{
    for (int i = 0; i < MAX_CLIENT_QTY; i++)
    {
        if (client_data[i].fd == fd)
        {
            return i;
        }
    }
    return -1;
}

int get_empty_connect()
{
    return get_connect(0);
}

int create_connect(const int fd)
{
    if (fd <= 0)
    {
        return -1;
    }
    const int res = get_empty_connect();
    if (res == -1)
    {
        return -1;
    }
    client_data[res].fd = fd;
    return 0;
}

int remove_connect(const int fd)
{
    if (fd <= 0)
    {
        return -1;
    }
    const int res = get_connect(fd);
    if (res == -1)
    {
        return -1;
    }
    client_data[res].fd = 0;
    return 0;
}

void recv_msg(const int s)
{
    char buf[MAX_CLIENT_QTY];
    const int bytes_read = recv(s, buf, sizeof(buf) - 1, 0);
    buf[bytes_read] = 0;
    printf("client #%d: %s", get_connect(s), buf);
    fflush(stdout);
}

void send_welcome_msg(const int s)
{
    char msg[80];
    sprintf(msg, "welcome! you are client #%d!\n", get_connect(s));
    send(s, msg, strlen(msg), 0);
}

void send_all_msg(const char *msg)
{
    for (int i = 0; i < MAX_CLIENT_QTY; i++)
    {
        if (client_data[i].fd > 0)
        {
            send(client_data[i].fd, msg, strlen(msg), 0);
        }
    }
}

void run_event_loop(const int kq, const int socket_s)
{
    struct kevent evs;
    struct sockaddr_storage addr;
    socklen_t addr_len = sizeof(addr);
    while (1)
    {
        struct kevent events[MAX_CLIENT_QTY];
        const int ne = kevent(kq, NULL, 0, events, MAX_CLIENT_QTY, NULL);
        for (int i = 0; i < ne; ++i)
        {
            if (events[i].ident == socket_s)
            {
                const int clint_s = accept(events[i].ident, (struct sockaddr *) &addr, &addr_len);
                if (create_connect(clint_s) == 0)
                {
                    EV_SET(&evs, clint_s, EVFILT_READ, EV_ADD, 0, 0, NULL);
                    kevent(kq, &evs, 1, NULL, 0, NULL);
                    send_welcome_msg(clint_s);
                } else
                {
                    printf("accept error");
                    close(clint_s);
                }
            }
            else if (events[i].flags & EV_EOF)
            {
                const int fd = events[i].ident;
                printf("fd %d closed\n", fd);
                EV_SET(&evs, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
                kevent(kq, &evs, 1, NULL, 0, NULL);
                remove_connect(fd);
            }
            else if (events[i].filter == EVFILT_READ)
            {
                recv_msg(events[i].ident);
            }
        }
    }
}

int start_with(const char *prefix, const char *str)
{
    size_t prefix_len = strlen(prefix);
    size_t str_len = strlen(str);
    return str_len < prefix_len ? 0 : !memcmp(prefix, str, prefix_len);
}

char** split_by_space(const char *str) {
    char *copy = strdup(str);
    char *token = strtok(copy, " ");
    int capacity = 10;
    char **result = malloc(capacity * sizeof(char*));
    int count = 0;
    while (token) {
        if (count >= capacity - 1) {
            capacity *= 2;
            result = realloc(result, capacity * sizeof(char*));
        }
        result[count++] = strdup(token);
        token = strtok(NULL, " ");
    }
    result[count] = NULL;
    free(copy);
    return result;
}

char* str_replace(const char *src, const char *old, const char *new_str) {
    if (!src || !old || !new_str) return NULL;

    size_t src_len = strlen(src);
    size_t old_len = strlen(old);
    size_t new_len = strlen(new_str);

    if (old_len == 0) return strdup(src);

    size_t count = 0;
    const char *pos = src;
    while ((pos = strstr(pos, old)) != NULL) {
        count++;
        pos += old_len;
    }

    size_t new_total_len = src_len + (new_len - old_len) * count + 1;
    char *result = malloc(new_total_len);
    if (!result) return NULL;

    const char *src_ptr = src;
    char *dst_ptr = result;

    while ((pos = strstr(src_ptr, old)) != NULL) {
        size_t bytes_to_copy = pos - src_ptr;
        memcpy(dst_ptr, src_ptr, bytes_to_copy);
        dst_ptr += bytes_to_copy;

        memcpy(dst_ptr, new_str, new_len);
        dst_ptr += new_len;

        src_ptr = pos + old_len;
    }

    strcpy(dst_ptr, src_ptr);

    return result;
}

void *stdin_reader()
{
    char s[256];
    while (fgets(s, sizeof(s), stdin))
    {
        s[strcspn(s, "\n") + 1] = 0;
        if (strcmp(s, "/quit\n") == 0)
        {
            exit(EXIT_SUCCESS);
        }
        if (start_with("/help", s))
        {
            printf("Available commands:\n");
            printf("/help - Show this help message\n");
            printf("/quit - Quit the server\n");
            printf("/say <client_id> <message> - Send a message to a specific client\n");
            printf("/list - List all connected clients\n");
            continue;
        }
        if (start_with("/list", s))
        {
            printf("Connected clients:\n");
            for (int i = 0; i < MAX_CLIENT_QTY; i++)
            {
                if (client_data[i].fd > 0)
                {
                    printf("#%d\n", i);
                }
            }
            printf("\n");
            continue;
        }
        if (start_with("/say", s))
        {
            char** tokens = split_by_space(s);
            int count = 0;
            for (int i = 0; tokens[i] != NULL; i++)
            {
                count++;
            }
            if (count < 3)
            {
                printf("Usage: /say <client_id> <message>\n");
                continue;
            }
            int client_id = atoi(tokens[1]);
            if (client_id < 0 || client_id >= MAX_CLIENT_QTY)
            {
                printf("Invalid client id\n");
                continue;
            }
            char* cmd_header = strcat(strcat(tokens[0], " "), strcat(tokens[1], " "));
            char* msg = str_replace(s, cmd_header, "");
            send(client_data[client_id].fd, msg, strlen(msg), 0);
            free(tokens);
        }
        else
        {
            send_all_msg(s);
        }
    }
    return NULL;
}

int socket_listen()
{
    struct addrinfo *addr;
    struct addrinfo hints;
    memset(&hints, 0, sizeof(hints));
    hints.ai_flags = AI_PASSIVE;
    hints.ai_family = PF_UNSPEC;
    hints.ai_socktype = SOCK_STREAM;
    getaddrinfo("127.0.0.1", "25560", &hints, &addr);
    const int socket_s = socket(addr->ai_family, addr->ai_socktype, addr->ai_protocol);
    bind(socket_s, addr->ai_addr, addr->ai_addrlen);
    listen(socket_s, 5);
    return socket_s;
}

int socket_init()
{
    const int socket_s = socket_listen();
    const int kq = kqueue();
    struct kevent kev;
    EV_SET(&kev, socket_s, EVFILT_READ, EV_ADD, 0, 0, NULL);
    kevent(kq, &kev, 1, NULL, 0, NULL);
    pthread_t ptr;
    pthread_create(&ptr, NULL, stdin_reader, NULL);
    run_event_loop(kq, socket_s);
    pthread_cancel(ptr);
    pthread_join(ptr, NULL);
    return EXIT_SUCCESS;
}

#endif

int main() {
#ifdef __APPLE__
    socket_init();
#endif
    return 0;
}

Screenshot of running.

Server

Client