Using non-blocking and asynchronous I/O (CK10 problem) in Linux and Windows (with epool, iocp, libevent/libev/libuv/boost.asio and librt/libaio)

C10k problem/C10k problem@wiki is the problem of optimizing network sockets to handle a large number of clients at the same time.

Thread per client scales only to a certain amount of clients per RAM. If you like to scale beyond that to like to minimize your state per client.

On most UNIXes, that number is around 300. On Windows, it’s around 800. I personally would only recommend it for applications that plan to handle 100 clients or fewer, or one platforms where you know the threading library works well this way.

Converting threaded programs to pure async is a disaster.For one thing, you can never, ever block under any circumstances on pain of total disaster. This means every single line of code is performance critical. For all but the most trivial applications, this alone is a deal killer.
from lkml.org

One thread per client doesn’t scale. We must serve many clients with each thread

In non-blocking IO (O_NONBLOCK) you start IO, get notified (EWOULDBLOCK) if it blocks, and readiness notify (pool, ...) to know when it's OK to start next IO. Usable in network but not disk IO.

In asynchronous/completion IO you start IO and get completion notification (signal or completion ports) to known when it finished. Works in both network and disk IO.

Edge-triggered readiness notification means you give the kernel a file descriptor, and later, when that descriptor transitions from not ready to ready, the kernel notifies you somehow. It then assumes you know the file descriptor is ready, and will not send any more readiness notifications of that type for that file descriptor until you do something that causes the file descriptor to no longer be ready (e.g. until you receive the EWOULDBLOCK error on a send, recv, or accept call, or a send or recv transfers less than the requested number of bytes).
from lkml.org

/* using edge-trigger epoll */

void setnonblocking(int fd)
    int flags = fcntl(fd, F_GETFL, 0);
    fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}

/* set up listening socket, 'listen_sock' (socket(), bind(), listen()) */
epollfd = epoll_create(10);
ev.events = EPOLLIN;
ev.data.fd = listen_sock;
epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev);

for (;;) {
    /* block until some events happens */
    nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
    for (n = 0; n < nfds; ++n) {
        if (events[n].data.fd == listen_sock) {
            conn_sock = accept(listen_sock, (struct sockaddr *) &local, &addrlen);
            setnonblocking(conn_sock);
            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = conn_sock;
            epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock, &ev);
        } else {
            do_use_fd(events[n].data.fd);
        }
    }
}

from epoll@man

  • Asyncronous IO port completion@wiki Windows/Solaris only. You start some operation asynchronously, and receive a notification when that operation has completed. Works in both network and disk IO.

There is a notify on ready model in Windows as well (select or WSAWaitForMultipleEvents) but it can’t scale to large numbers of sockets, so it’s not suitable for high-performance network applications.

The fundamental variation is that in a Unix you generally ask the kernel to wait for state change in a file descriptor’s readability or writablity. With overlapped I/O and IOCPs the programmers waits for asynchronous function calls to complete. For example, instead of waiting for a socket to become writable and then using send(2) on it, as you commonly would do in a Unix, with overlapped I/O you would rather WSASend() the data and then wait for it to have been sent.
from Asynchronous I/O in Windows for Unix Programmers

// TCP echo-server

DWORD WINAPI ServerWorkerThread(LPVOID CompletionPortID) {
    while(TRUE) {
        GetQueuedCompletionStatus((HANDLE)CompletionPortID, &BytesTransferred, (LPDWORD)&PerHandleData, (LPOVERLAPPED *) &PerIoData, INFINITE);
        // ...

        // continue sending until all bytes are sent
        if (PerIoData->BytesRECV > PerIoData->BytesSEND) {
            WSASend(PerHandleData->Socket, &(PerIoData->DataBuf), 1, &SendBytes, 0, &(PerIoData->Overlapped), NULL);
        } else {
            WSARecv(PerHandleData->Socket, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags, &(PerIoData->Overlapped), NULL);
        }
    }
}

int main(int argc, char **argv) {
    // setup an I/O completion port
    HANDLE CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

    // create a server worker thread and pass the completion port to the thread
    HANDLE ThreadHandle = CreateThread(NULL, 0, ServerWorkerThread, CompletionPort,  0, &ThreadID);

    // create a listening socket
    // ...

    // accept connections and assign to the completion port
    while(TRUE) {
        SOCKET Accept = WSAAccept(Listen, NULL, NULL, NULL, 0);

        // associate the accepted socket with the original completion port
        LPPER_HANDLE_DATA PerHandleData = (LPPER_HANDLE_DATA) GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA));
        PerHandleData->Socket = Accept;
        CreateIoCompletionPort((HANDLE) Accept, CompletionPort, (DWORD) PerHandleData, 0);

        // create per I/O socket information structure to associate with the WSARecv
        LPPER_IO_OPERATION_DATA PerIoData = (LPPER_IO_OPERATION_DATA) GlobalAlloc(GPTR, sizeof(PER_IO_OPERATION_DATA));
        // ...
        WSARecv(Accept, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags, &(PerIoData->Overlapped), NULL);    
    }
}

from IOComplete

  • libevent/libevent@wiki replaces main event loop to support execution of callbacks when a specific event occurs on a file descriptor or after a timeout.
    Its a wrapper around epoll, kqueue and IOCP.
/* TCP echo-server */

struct client { int fd; struct bufferevent *buf_ev; };

int setnonblock(int fd) {
    int flags = fcntl(fd, F_GETFL);
    flags |= O_NONBLOCK;
    fcntl(fd, F_SETFL, flags);
}

void buf_read_callback(struct bufferevent *incoming, void *arg) {
    /* echo back */
    char *req = evbuffer_readline(incoming->input);
    evreturn = evbuffer_new();
    evbuffer_add_printf(evreturn, "You said %sn",req);
    bufferevent_write_buffer(incoming, evreturn);
    evbuffer_free(evreturn);
    free(req);
}

void buf_write_callback(struct bufferevent *bev, void *arg) {}
void buf_error_callback(struct bufferevent *bev, short what, void *arg) {...}

void accept_callback(int fd, short ev, void *arg) {
    /* accept non-blocking client socket */
    int client_fd = accept(fd, (struct sockaddr *)&client_addr, &client_len);
    setnonblock(client_fd);

    /* register callbacks */
    struct client *client = calloc(1, sizeof(*client));
    client->fd = client_fd;
    client->buf_ev = bufferevent_new(client_fd, buf_read_callback, buf_write_callback, buf_error_callback, client);

    bufferevent_enable(client->buf_ev, EV_READ);
}

int main(int argc, char **argv) {
    event_init();

    /* bind, listen on non-blocking  */
    bind(socketlisten, (struct sockaddr *)&addresslisten, sizeof(addresslisten));
    listen(socketlisten, 5);
    setsockopt(socketlisten, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
    setnonblock(socketlisten);

    /* register callbacks and start loop */
    event_set(&accept_event, socketlisten, EV_READ|EV_PERSIST, accept_callback, NULL);
    event_add(&accept_event, NULL);
    event_dispatch();

    close(socketlisten);
    return 0;
}

from Boost network performance with libevent and libev

  • libev re-written libevent also using epoll/kqueue but no IOCP. Focus on Unix I/O multiplexers.
    For disk IO use libeio, asynchronous read, write, open, close, stat, unlink, fdatasync, mknod, readdir etc.
/* TCP echo server */

void accept_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) {
    int client_sd = accept(watcher->fd, (struct sockaddr *)&client_addr, &client_len);

    /* initialize and start watcher to read client requests */
    ev_io_init(w_client, read_cb, client_sd, EV_READ);
    ev_io_start(loop, w_client);
}

void read_cb(struct ev_loop *loop, struct ev_io *watcher, int revents) {
    /* receive message from client socket */
    read = recv(watcher->fd, buffer, BUFFER_SIZE, 0);
    if(read == 0) {
        /* stop and free watchet if client socket is closing */
        ev_io_stop(loop, watcher);
        free(watcher);
        return;
    }

    /* send message bach to the client */
    send(watcher->fd, buffer, read, 0);
    bzero(buffer, read);
}

int main() {
    struct ev_loop *loop = ev_default_loop(0);

    /* bind and listen ... */

    /* initialize and start a watcher to accepts client requests */
    ev_io_init(&w_accept, accept_cb, sd, EV_READ);
    ev_io_start(loop, &w_accept);

    while (1) ev_loop(loop, 0);

    return 0;
}

from libev tcp echo server

/* TCP echo server */

uv_loop_t *loop;

void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
    buf->base = (char*) malloc(suggested_size);
    buf->len = suggested_size;
}

void echo_write(uv_write_t *req, int status) { free(req); }

void echo_read(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
    if (nread < 0) {
        uv_close((uv_handle_t*) client, NULL);
        return;
    }

    uv_write_t *req = (uv_write_t *) malloc(sizeof(uv_write_t));
    uv_buf_t wrbuf = uv_buf_init(buf->base, nread);
    uv_write(req, client, &wrbuf, 1, echo_write);
    free(buf->base);
}

void on_new_connection(uv_stream_t *server, int status) {
    if (status == -1) return;

    uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
    uv_tcp_init(loop, client);
    if (uv_accept(server, (uv_stream_t*) client) == 0) {
        uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
    }
    else {
        uv_close((uv_handle_t*) client, NULL);
    }
}

int main() {
    loop = uv_default_loop();

    uv_tcp_t server;
    uv_tcp_init(loop, &server);

    struct sockaddr_in addr;
    uv_ip4_addr("0.0.0.0", DEFAULT_PORT, &addr);

    uv_tcp_bind(&server, (const struct sockaddr*)&addr, 0);
    int r = uv_listen((uv_stream_t*) &server, DEFAULT_BACKLOG, on_new_connection);
    if (r) return 1;

    return uv_run(loop, UV_RUN_DEFAULT);
}

from uvbook

/* TCP echo server */
using boost::asio::ip::tcp;

class session : public std::enable_shared_from_this<session> {
public:
    session(tcp::socket socket) : socket_(std::move(socket)) { }
    void start() { do_read(); }
private:
    void do_read() {
        auto self(shared_from_this());
        socket_.async_read_some(boost::asio::buffer(data_, max_length), 
            [this, self](boost::system::error_code ec, std::size_t length) {
                if (!ec) { do_write(length); }
            });
    }
    void do_write(std::size_t length) {
        auto self(shared_from_this());
        boost::asio::async_write(socket_, boost::asio::buffer(data_, length),
            [this, self](boost::system::error_code ec, std::size_t /*length*/) {
                if (!ec) { do_read(); }
        });
    }
    tcp::socket socket_;
    enum { max_length = 1024 };
    char data_[max_length];
};

class server {
public:
  server(boost::asio::io_service& io_service, short port)
    : acceptor_(io_service, tcp::endpoint(tcp::v4(), port)), socket_(io_service) {
    do_accept();
  }
private:
    void do_accept() {
        acceptor_.async_accept(socket_, [this](boost::system::error_code ec) {
            if (!ec) { std::make_shared<session>(std::move(socket_))->start(); }
            do_accept();
        });
    }
    tcp::acceptor acceptor_;
    tcp::socket socket_;
};

int main(int argc, char* argv[]) {
    boost::asio::io_service io_service;
    server s(io_service, std::atoi(argv[1]));
    io_service.run();
    return 0;
}

from boost samples

  • POSIX Asynchronous I/O implemented on Linux as aio_*@man in GNU libc using pthreads, link with librt(-lrt). Works in both network and disk IO. It works on files with buffering enabled (no need for O_DIRECT), but outstanding operations queue is limited to number of threads.
#include <aio.h>
/* using signals as notification for AIO requests */
void setup_io( ... ) {
    int fd;
    struct sigaction sig_act;
    struct aiocb my_aiocb;

    /* set up the signal handler */
    sigemptyset(&sig_act.sa_mask);
    sig_act.sa_flags = SA_SIGINFO;
    sig_act.sa_sigaction = aio_completion_handler;

    /* set up the AIO request */
    bzero( (char *)&my_aiocb, sizeof(struct aiocb) );
    my_aiocb.aio_fildes = fd;
    my_aiocb.aio_buf = malloc(BUF_SIZE+1);
    my_aiocb.aio_nbytes = BUF_SIZE;
    my_aiocb.aio_offset = next_offset;

    /* link the AIO request with the signal handler */
    my_aiocb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
    my_aiocb.aio_sigevent.sigev_signo = SIGIO;
    my_aiocb.aio_sigevent.sigev_value.sival_ptr = &my_aiocb;

    /* map the signal to the signal handler */
    ret = sigaction( SIGIO, &sig_act, NULL );

    ret = aio_read( &my_aiocb );
}

void aio_completion_handler( int signo, siginfo_t *info, void *context ) {
    struct aiocb *req;
    if (info->si_signo == SIGIO) {
        req = (struct aiocb *)info->si_value.sival_ptr;
        /* did the request complete? */
        if (aio_error( req ) == 0) {
            /* request completed successfully, get the return status */
            ret = aio_return( req );
        }
    }
    return;
}

from Using POSIX AIO API

  • Native/Kernel-Linux Asynchronous I/O implemented in libaio, link with (-laio). Works only in disk IO (and only with O_DIRECT), no network IO. Will silently fallback to syncronous if underlying IO doesn’t support it.

Enables overlap I/O operations with other processing, by providing an interface for submitting one or more I/O requests in one system call io_submit without waiting for completion, and a separate interface io_getevents to reap completed I/O operations associated with a given completion group.

/* or #include <libaio.h>
int io_setup(unsigned nr, aio_context_t *ctxp) {
    return syscall(__NR_io_setup, nr, ctxp);
}
int io_destroy(aio_context_t ctx) {
    return syscall(__NR_io_destroy, ctx);
}
int io_submit(aio_context_t ctx, long nr,  struct iocb **iocbpp) {
    return syscall(__NR_io_submit, ctx, nr, iocbpp);
}
int io_getevents(aio_context_t ctx, long min_nr, long max_nr, struct io_event *events, struct timespec *timeout) {
    return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events, timeout);
}*/

int main() {
    aio_context_t ctx = 0;
    struct iocb cb;
    struct iocb *cbs[1];
    char data[4096];
    struct io_event events[1];
    int ret, fd;

    fd = open("/tmp/testfile", O_RDWR | O_CREAT);
    ret = io_setup(128, &ctx);

    /* setup I/O control block */
    memset(&cb, 0, sizeof(cb));
    cb.aio_fildes = fd;
    cb.aio_lio_opcode = IOCB_CMD_PWRITE;
    cb.aio_buf = (uint64_t)data;
    cb.aio_offset = 0;
    cb.aio_nbytes = 4096;
    cbs[0] = &cb;

    ret = io_submit(ctx, 1, cbs);

    /* get the reply */
    ret = io_getevents(ctx, 1, 1, events, NULL);
    printf("%dn", ret);

    ret = io_destroy(ctx);
    return 0;
}

from Linux Asynchronous I/O Explained and AIOUserGuide

Advertisements

One comment

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s