From 515447a309664af21c011e801f432a3029d07fbf Mon Sep 17 00:00:00 2001 From: pjd Date: Fri, 8 Jul 2005 21:28:26 +0000 Subject: [PATCH] Reimplement ggatec/ggated applications. Change communication protocol to be much more resistant on network problems and to allow for much better performance. Better performance is achieved by creating two connections between ggatec and ggated one for sending the data and one for receiving it. Every connection is handled by separeted thread, so there is no more synchronous data flow (send and wait for response), now one threads sends all requests and another receives the data. Use two threads in ggatec(8): - sendtd, which takes I/O requests from the kernel and sends them to the ggated daemon on the other end; - recvtd, which waits for ggated responses and forwards them to the kernel. Use three threads in ggated(8): - recvtd, which waits for I/O requests and puts them onto incoming queue; - disktd, which takes requests from the incoming queue, does disk operations and puts finished requests onto outgoing queue; - sendtd, which takes finished requests from the outgoing queue and sends responses back to ggatec. Because there were major changes in communication protocol, there is no backward compatibility, from now on, both client and server has to run on 5.x or 6.x (or at least ggated should be from the same FreeBSD version on which ggatec is running). For Gbit networks some buffers need to be increased. I use those settings: kern.ipc.maxsockbuf=16777216 net.inet.tcp.sendspace=8388608 net.inet.tcp.recvspace=8388608 and I use '-S 4194304 -R 4194304' options for both, ggatec and ggated. Approved by: re (scottl) --- ggatec/Makefile | 4 +- ggatec/ggatec.c | 516 ++++++++++++++++++----------- ggated/Makefile | 3 + ggated/ggated.c | 866 +++++++++++++++++++++++++++++++++++------------- shared/ggate.c | 82 ++++- shared/ggate.h | 78 ++++- 6 files changed, 1094 insertions(+), 455 deletions(-) diff --git a/ggatec/Makefile b/ggatec/Makefile index 5674aa1..c49cfe8 100644 --- a/ggatec/Makefile +++ b/ggatec/Makefile @@ -9,7 +9,7 @@ SRCS= ggatec.c ggate.c CFLAGS+= -DLIBGEOM CFLAGS+= -I${.CURDIR}/../shared -DPADD= ${LIBGEOM} ${LIBSBUF} ${LIBBSDXML} ${LIBUTIL} -LDADD= -lgeom -lsbuf -lbsdxml -lutil +DPADD= ${LIBGEOM} ${LIBSBUF} ${LIBBSDXML} ${LIBUTIL} ${LIBPTHREAD} +LDADD= -lgeom -lsbuf -lbsdxml -lutil -lpthread .include diff --git a/ggatec/ggatec.c b/ggatec/ggatec.c index 84aa60e..70f667d 100644 --- a/ggatec/ggatec.c +++ b/ggatec/ggatec.c @@ -34,8 +34,12 @@ #include #include #include +#include +#include #include #include +#include + #include #include #include @@ -51,21 +55,22 @@ #include "ggate.h" -enum { UNSET, ATTACH, CREATE, DESTROY, LIST } action = UNSET; +enum { UNSET, CREATE, DESTROY, LIST, RESCUE } action = UNSET; static const char *path = NULL; static const char *host = NULL; static int unit = -1; static unsigned flags = 0; static int force = 0; -static int nagle = 1; static unsigned queue_size = G_GATE_QUEUE_SIZE; static unsigned port = G_GATE_PORT; static off_t mediasize; static unsigned sectorsize = 0; static unsigned timeout = G_GATE_TIMEOUT; -static unsigned rcvbuf = G_GATE_RCVBUF; -static unsigned sndbuf = G_GATE_SNDBUF; +static int sendfd, recvfd; +static uint32_t token; +static pthread_t sendtd, recvtd; +static int reconnect; static void usage(void) @@ -74,129 +79,30 @@ usage(void) fprintf(stderr, "usage: %s create [-nv] [-o ] [-p port] " "[-q queue_size] [-R rcvbuf] [-S sndbuf] [-s sectorsize] " "[-t timeout] [-u unit] \n", getprogname()); - fprintf(stderr, " %s attach [-nv] [-o ] [-p port] " + fprintf(stderr, " %s rescue [-nv] [-o ] [-p port] " "[-R rcvbuf] [-S sndbuf] <-u unit> \n", getprogname()); fprintf(stderr, " %s destroy [-f] <-u unit>\n", getprogname()); fprintf(stderr, " %s list [-v] [-u unit]\n", getprogname()); exit(EXIT_FAILURE); } -static int -handshake(void) -{ - struct g_gate_cinit cinit; - struct g_gate_sinit sinit; - struct sockaddr_in serv; - struct timeval tv; - size_t bsize; - int sfd; - - /* - * Do the network stuff. - */ - bzero(&serv, sizeof(serv)); - serv.sin_family = AF_INET; - serv.sin_addr.s_addr = g_gate_str2ip(host); - if (serv.sin_addr.s_addr == INADDR_NONE) { - g_gate_log(LOG_ERR, "Invalid IP/host name: %s.", host); - return (-1); - } - serv.sin_port = htons(port); - sfd = socket(AF_INET, SOCK_STREAM, 0); - if (sfd == -1) - g_gate_xlog("Can't open socket: %s.", strerror(errno)); - /* - * Some trivial network optimalization. - * This should be much more advanced. - */ - if (nagle) { - int on = 1; - - if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &on, - sizeof(on)) == -1) { - g_gate_xlog("setsockopt() error: %s.", strerror(errno)); - } - } - bsize = rcvbuf; - if (setsockopt(sfd, SOL_SOCKET, SO_RCVBUF, &bsize, sizeof(bsize)) == -1) - g_gate_xlog("setsockopt() error: %s.", strerror(errno)); - bsize = sndbuf; - if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &bsize, sizeof(bsize)) == -1) - g_gate_xlog("setsockopt() error: %s.", strerror(errno)); - tv.tv_sec = timeout; - tv.tv_usec = 0; - if (setsockopt(sfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1 || - setsockopt(sfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) { - g_gate_xlog("setsockopt() error: %s.", strerror(errno)); - } - if (connect(sfd, (struct sockaddr *)&serv, sizeof(serv)) == -1) { - g_gate_log(LOG_ERR, "Can't connect to server: %s.", - strerror(errno)); - return (-1); - } - - g_gate_log(LOG_INFO, "Connected to the server: %s:%d.", host, port); - - /* - * Creating and sending initial packet. - */ - if (strlcpy(cinit.gc_path, path, sizeof(cinit.gc_path)) >= - sizeof(cinit.gc_path)) { - g_gate_xlog("Path name too long."); - } - cinit.gc_flags = flags; - g_gate_log(LOG_DEBUG, "Sending initial packet."); - g_gate_swap2n_cinit(&cinit); - if (send(sfd, &cinit, sizeof(cinit), 0) == -1) { - g_gate_log(LOG_ERR, "Error while sending initial packet: %s.", - strerror(errno)); - return (-1); - } - g_gate_swap2h_cinit(&cinit); - - /* - * Receiving initial packet from server. - */ - g_gate_log(LOG_DEBUG, "Receiving initial packet."); - if (recv(sfd, &sinit, sizeof(sinit), MSG_WAITALL) == -1) { - g_gate_log(LOG_ERR, "Error while receiving data: %s.", - strerror(errno)); - return (-1); - } - g_gate_swap2h_sinit(&sinit); - if (sinit.gs_error != 0) - g_gate_xlog("Error from server: %s.", strerror(sinit.gs_error)); - - mediasize = sinit.gs_mediasize; - if (sectorsize == 0) - sectorsize = sinit.gs_sectorsize; - return (sfd); -} - -static int -serve(int sfd) +static void * +send_thread(void *arg __unused) { struct g_gate_ctl_io ggio; - size_t bsize; - char *buf; - - bsize = G_GATE_BUFSIZE_START; - buf = malloc(bsize); - if (buf == NULL) { - if (action == CREATE) - g_gate_destroy(unit, 1); - g_gate_xlog("No enough memory"); - } + struct g_gate_hdr hdr; + char buf[MAXPHYS]; + ssize_t data; + int error; + + g_gate_log(LOG_NOTICE, "%s: started!", __func__); ggio.gctl_version = G_GATE_VERSION; ggio.gctl_unit = unit; - bsize = sectorsize; - ggio.gctl_data = malloc(bsize); + ggio.gctl_data = buf; + for (;;) { - struct g_gate_hdr hdr; - int data, error; -once_again: - ggio.gctl_length = bsize; + ggio.gctl_length = sizeof(buf); ggio.gctl_error = 0; g_gate_ioctl(G_GATE_CMD_START, &ggio); error = ggio.gctl_error; @@ -204,11 +110,12 @@ once_again: case 0: break; case ECANCELED: + if (reconnect) + break; /* Exit gracefully. */ - free(ggio.gctl_data); g_gate_close_device(); - close(sfd); exit(EXIT_SUCCESS); +#if 0 case ENOMEM: /* Buffer too small. */ ggio.gctl_data = realloc(ggio.gctl_data, @@ -218,87 +125,232 @@ once_again: goto once_again; } /* FALLTHROUGH */ +#endif case ENXIO: default: g_gate_xlog("ioctl(/dev/%s): %s.", G_GATE_CTL_NAME, strerror(error)); } - hdr.gh_cmd = ggio.gctl_cmd; + if (reconnect) + break; + + switch (ggio.gctl_cmd) { + case BIO_READ: + hdr.gh_cmd = GGATE_CMD_READ; + break; + case BIO_WRITE: + hdr.gh_cmd = GGATE_CMD_WRITE; + break; + } + hdr.gh_seq = ggio.gctl_seq; hdr.gh_offset = ggio.gctl_offset; hdr.gh_length = ggio.gctl_length; hdr.gh_error = 0; g_gate_swap2n_hdr(&hdr); - data = send(sfd, &hdr, sizeof(hdr), 0); + + data = g_gate_send(sendfd, &hdr, sizeof(hdr), MSG_NOSIGNAL); g_gate_log(LOG_DEBUG, "Sent hdr packet."); g_gate_swap2h_hdr(&hdr); + if (reconnect) + break; if (data != sizeof(hdr)) { - ggio.gctl_error = EAGAIN; - goto done; + g_gate_log(LOG_ERR, "Lost connection 1."); + reconnect = 1; + pthread_kill(recvtd, SIGUSR1); + break; } - if (ggio.gctl_cmd == BIO_DELETE || ggio.gctl_cmd == BIO_WRITE) { - data = send(sfd, ggio.gctl_data, ggio.gctl_length, 0); - g_gate_log(LOG_DEBUG, "Sent data packet."); + + if (hdr.gh_cmd == GGATE_CMD_WRITE) { + data = g_gate_send(sendfd, ggio.gctl_data, + ggio.gctl_length, MSG_NOSIGNAL); + if (reconnect) + break; if (data != ggio.gctl_length) { - ggio.gctl_error = EAGAIN; - goto done; + g_gate_log(LOG_ERR, "Lost connection 2 (%zd != %zd).", data, (ssize_t)ggio.gctl_length); + reconnect = 1; + pthread_kill(recvtd, SIGUSR1); + break; } - g_gate_log(LOG_DEBUG, "Sent %d bytes (offset=%llu, " + g_gate_log(LOG_DEBUG, "Sent %zd bytes (offset=%llu, " "size=%u).", data, hdr.gh_offset, hdr.gh_length); } - data = recv(sfd, &hdr, sizeof(hdr), MSG_WAITALL); - g_gate_log(LOG_DEBUG, "Received hdr packet."); + } + g_gate_log(LOG_DEBUG, "%s: Died.", __func__); + return (NULL); +} + +static void * +recv_thread(void *arg __unused) +{ + struct g_gate_ctl_io ggio; + struct g_gate_hdr hdr; + char buf[MAXPHYS]; + ssize_t data; + + g_gate_log(LOG_NOTICE, "%s: started!", __func__); + + ggio.gctl_version = G_GATE_VERSION; + ggio.gctl_unit = unit; + ggio.gctl_data = buf; + + for (;;) { + data = g_gate_recv(recvfd, &hdr, sizeof(hdr), MSG_WAITALL); + if (reconnect) + break; g_gate_swap2h_hdr(&hdr); if (data != sizeof(hdr)) { - ggio.gctl_error = EIO; - goto done; + if (data == -1 && errno == EAGAIN) + continue; + g_gate_log(LOG_ERR, "Lost connection 3."); + reconnect = 1; + pthread_kill(sendtd, SIGUSR1); + break; } - if (ggio.gctl_cmd == BIO_READ) { - if (bsize < (size_t)ggio.gctl_length) { - ggio.gctl_data = realloc(ggio.gctl_data, - ggio.gctl_length); - if (ggio.gctl_data != NULL) - bsize = ggio.gctl_length; - else - g_gate_xlog("No memory."); - } - data = recv(sfd, ggio.gctl_data, ggio.gctl_length, - MSG_WAITALL); + g_gate_log(LOG_DEBUG, "Received hdr packet."); + + ggio.gctl_seq = hdr.gh_seq; + ggio.gctl_cmd = hdr.gh_cmd; + ggio.gctl_offset = hdr.gh_offset; + ggio.gctl_length = hdr.gh_length; + ggio.gctl_error = hdr.gh_error; + + if (ggio.gctl_error == 0 && ggio.gctl_cmd == GGATE_CMD_READ) { + data = g_gate_recv(recvfd, ggio.gctl_data, + ggio.gctl_length, MSG_WAITALL); + if (reconnect) + break; g_gate_log(LOG_DEBUG, "Received data packet."); if (data != ggio.gctl_length) { - ggio.gctl_error = EAGAIN; - goto done; + g_gate_log(LOG_ERR, "Lost connection 4."); + reconnect = 1; + pthread_kill(sendtd, SIGUSR1); + break; } g_gate_log(LOG_DEBUG, "Received %d bytes (offset=%ju, " "size=%zu).", data, (uintmax_t)hdr.gh_offset, (size_t)hdr.gh_length); } -done: + g_gate_ioctl(G_GATE_CMD_DONE, &ggio); - if (ggio.gctl_error == EAGAIN) - return (ggio.gctl_error); } - /* NOTREACHED */ - return (0); + g_gate_log(LOG_DEBUG, "%s: Died.", __func__); + pthread_exit(NULL); } -static void -serve_loop(int sfd) +static int +handshake(int dir) { + struct g_gate_version ver; + struct g_gate_cinit cinit; + struct g_gate_sinit sinit; + struct sockaddr_in serv; + int sfd; - for (;;) { - int error; + /* + * Do the network stuff. + */ + bzero(&serv, sizeof(serv)); + serv.sin_family = AF_INET; + serv.sin_addr.s_addr = g_gate_str2ip(host); + if (serv.sin_addr.s_addr == INADDR_NONE) { + g_gate_log(LOG_DEBUG, "Invalid IP/host name: %s.", host); + return (-1); + } + serv.sin_port = htons(port); + sfd = socket(AF_INET, SOCK_STREAM, 0); + if (sfd == -1) { + g_gate_log(LOG_DEBUG, "Cannot open socket: %s.", + strerror(errno)); + return (-1); + } + + g_gate_socket_settings(sfd); - error = serve(sfd); + if (connect(sfd, (struct sockaddr *)&serv, sizeof(serv)) == -1) { + g_gate_log(LOG_DEBUG, "Cannot connect to server: %s.", + strerror(errno)); close(sfd); - if (error != EAGAIN) - g_gate_xlog("%s.", strerror(error)); - sfd = handshake(); - if (sfd == -1) { - sleep(2); - continue; - } + return (-1); + } + + g_gate_log(LOG_INFO, "Connected to the server: %s:%d.", host, port); + + /* + * Create and send version packet. + */ + g_gate_log(LOG_DEBUG, "Sending version packet."); + assert(strlen(GGATE_MAGIC) == sizeof(ver.gv_magic)); + bcopy(GGATE_MAGIC, ver.gv_magic, sizeof(ver.gv_magic)); + ver.gv_version = GGATE_VERSION; + ver.gv_error = 0; + g_gate_swap2n_version(&ver); + if (g_gate_send(sfd, &ver, sizeof(ver), MSG_NOSIGNAL) == -1) { + g_gate_log(LOG_DEBUG, "Error while sending version packet: %s.", + strerror(errno)); + close(sfd); + return (-1); + } + bzero(&ver, sizeof(ver)); + if (g_gate_recv(sfd, &ver, sizeof(ver), MSG_WAITALL) == -1) { + g_gate_log(LOG_DEBUG, "Error while receiving data: %s.", + strerror(errno)); + close(sfd); + return (-1); + } + if (ver.gv_error != 0) { + g_gate_log(LOG_DEBUG, "Version verification problem: %s.", + strerror(errno)); + close(sfd); + return (-1); + } + + /* + * Create and send initial packet. + */ + g_gate_log(LOG_DEBUG, "Sending initial packet."); + if (strlcpy(cinit.gc_path, path, sizeof(cinit.gc_path)) >= + sizeof(cinit.gc_path)) { + g_gate_log(LOG_DEBUG, "Path name too long."); + close(sfd); + return (-1); + } + cinit.gc_flags = flags | dir; + cinit.gc_token = token; + cinit.gc_nconn = 2; + g_gate_swap2n_cinit(&cinit); + if (g_gate_send(sfd, &cinit, sizeof(cinit), MSG_NOSIGNAL) == -1) { + g_gate_log(LOG_DEBUG, "Error while sending initial packet: %s.", + strerror(errno)); + close(sfd); + return (-1); } + g_gate_swap2h_cinit(&cinit); + + /* + * Receiving initial packet from server. + */ + g_gate_log(LOG_DEBUG, "Receiving initial packet."); + if (g_gate_recv(sfd, &sinit, sizeof(sinit), MSG_WAITALL) == -1) { + g_gate_log(LOG_DEBUG, "Error while receiving data: %s.", + strerror(errno)); + close(sfd); + return (-1); + } + g_gate_swap2h_sinit(&sinit); + if (sinit.gs_error != 0) { + g_gate_log(LOG_DEBUG, "Error from server: %s.", + strerror(sinit.gs_error)); + close(sfd); + return (-1); + } + g_gate_log(LOG_DEBUG, "Received initial packet."); + + mediasize = sinit.gs_mediasize; + if (sectorsize == 0) + sectorsize = sinit.gs_sectorsize; + + return (sfd); } static void @@ -314,26 +366,87 @@ mydaemon(void) err(EXIT_FAILURE, "Cannot daemonize"); } +static int +g_gatec_connect(void) +{ + + token = arc4random(); + /* + * Our receive descriptor is connected to the send descriptor on the + * server side. + */ + recvfd = handshake(GGATE_FLAG_SEND); + if (recvfd == -1) + return (0); + /* + * Our send descriptor is connected to the receive descriptor on the + * server side. + */ + sendfd = handshake(GGATE_FLAG_RECV); + if (sendfd == -1) + return (0); + return (1); +} + static void -g_gatec_attach(void) +g_gatec_start(void) { - int sfd; + int error; - sfd = handshake(); - g_gate_log(LOG_DEBUG, "Worker created: %u.", getpid()); - mydaemon(); - serve_loop(sfd); + reconnect = 0; + error = pthread_create(&recvtd, NULL, recv_thread, NULL); + if (error != 0) { + g_gate_destroy(unit, 1); + g_gate_xlog("pthread_create(recv_thread): %s.", + strerror(error)); + } + sendtd = pthread_self(); + send_thread(NULL); + /* Disconnected. */ + close(sendfd); + close(recvfd); +} + +static void +signop(int sig __unused) +{ + + /* Do nothing. */ +} + +static void +g_gatec_loop(void) +{ + struct g_gate_ctl_cancel ggioc; + + signal(SIGUSR1, signop); + for (;;) { + g_gatec_start(); + g_gate_log(LOG_NOTICE, "Disconnected [%s %s]. Connecting...", + host, path); + while (!g_gatec_connect()) { + sleep(2); + g_gate_log(LOG_NOTICE, "Connecting [%s %s]...", host, + path); + } + ggioc.gctl_version = G_GATE_VERSION; + ggioc.gctl_unit = unit; + ggioc.gctl_seq = 0; + g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc); + } } static void g_gatec_create(void) { struct g_gate_ctl_create ggioc; - int sfd; - sfd = handshake(); - if (sfd == -1) - exit(EXIT_FAILURE); + if (!g_gatec_connect()) + g_gate_xlog("Cannot connect: %s.", strerror(errno)); + + /* + * Ok, got both sockets, time to create provider. + */ ggioc.gctl_version = G_GATE_VERSION; ggioc.gctl_mediasize = mediasize; ggioc.gctl_sectorsize = sectorsize; @@ -344,12 +457,29 @@ g_gatec_create(void) snprintf(ggioc.gctl_info, sizeof(ggioc.gctl_info), "%s:%u %s", host, port, path); g_gate_ioctl(G_GATE_CMD_CREATE, &ggioc); - g_gate_log(LOG_DEBUG, "Worker created: %u.", getpid()); if (unit == -1) printf("%s%u\n", G_GATE_PROVIDER_NAME, ggioc.gctl_unit); unit = ggioc.gctl_unit; + mydaemon(); - serve_loop(sfd); + g_gatec_loop(); +} + +static void +g_gatec_rescue(void) +{ + struct g_gate_ctl_cancel ggioc; + + if (!g_gatec_connect()) + g_gate_xlog("Cannot connect: %s.", strerror(errno)); + + ggioc.gctl_version = G_GATE_VERSION; + ggioc.gctl_unit = unit; + ggioc.gctl_seq = 0; + g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc); + + mydaemon(); + g_gatec_loop(); } int @@ -358,14 +488,14 @@ main(int argc, char *argv[]) if (argc < 2) usage(); - if (strcasecmp(argv[1], "attach") == 0) - action = ATTACH; - else if (strcasecmp(argv[1], "create") == 0) + if (strcasecmp(argv[1], "create") == 0) action = CREATE; else if (strcasecmp(argv[1], "destroy") == 0) action = DESTROY; else if (strcasecmp(argv[1], "list") == 0) action = LIST; + else if (strcasecmp(argv[1], "rescue") == 0) + action = RESCUE; else usage(); argc -= 1; @@ -383,12 +513,12 @@ main(int argc, char *argv[]) force = 1; break; case 'n': - if (action != ATTACH && action != CREATE) + if (action != CREATE && action != RESCUE) usage(); nagle = 0; break; case 'o': - if (action != ATTACH && action != CREATE) + if (action != CREATE && action != RESCUE) usage(); if (strcasecmp("ro", optarg) == 0) flags = G_GATE_FLAG_READONLY; @@ -402,7 +532,7 @@ main(int argc, char *argv[]) } break; case 'p': - if (action != ATTACH && action != CREATE) + if (action != CREATE && action != RESCUE) usage(); errno = 0; port = strtoul(optarg, NULL, 10); @@ -418,7 +548,7 @@ main(int argc, char *argv[]) errx(EXIT_FAILURE, "Invalid queue_size."); break; case 'R': - if (action != ATTACH && action != CREATE) + if (action != CREATE && action != RESCUE) usage(); errno = 0; rcvbuf = strtoul(optarg, NULL, 10); @@ -426,7 +556,7 @@ main(int argc, char *argv[]) errx(EXIT_FAILURE, "Invalid rcvbuf."); break; case 'S': - if (action != ATTACH && action != CREATE) + if (action != CREATE && action != RESCUE) usage(); errno = 0; sndbuf = strtoul(optarg, NULL, 10); @@ -468,18 +598,6 @@ main(int argc, char *argv[]) argv += optind; switch (action) { - case ATTACH: - if (argc != 2) - usage(); - if (unit == -1) { - fprintf(stderr, "Required unit number.\n"); - usage(); - } - g_gate_open_device(); - host = argv[0]; - path = argv[1]; - g_gatec_attach(); - break; case CREATE: if (argc != 2) usage(); @@ -501,6 +619,18 @@ main(int argc, char *argv[]) case LIST: g_gate_list(unit, g_gate_verbose); break; + case RESCUE: + if (argc != 2) + usage(); + if (unit == -1) { + fprintf(stderr, "Required unit number.\n"); + usage(); + } + g_gate_open_device(); + host = argv[0]; + path = argv[1]; + g_gatec_rescue(); + break; case UNSET: default: usage(); diff --git a/ggated/Makefile b/ggated/Makefile index fa5c0fb..4e7708e 100644 --- a/ggated/Makefile +++ b/ggated/Makefile @@ -6,6 +6,9 @@ PROG= ggated MAN= ggated.8 SRCS= ggated.c ggate.c +DPADD= ${LIBPTHREAD} +LDADD= -lpthread + CFLAGS+= -I${.CURDIR}/../shared .include diff --git a/ggated/ggated.c b/ggated/ggated.c index a0628ea..82d66e1 100644 --- a/ggated/ggated.c +++ b/ggated/ggated.c @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -44,6 +45,7 @@ #include #include #include +#include #include #include #include @@ -51,32 +53,58 @@ #include #include -#include #include "ggate.h" -#define G_GATED_EXPORT_FILE "/etc/gg.exports" -#define G_GATED_DEBUG(...) \ - if (g_gate_verbose) { \ - printf(__VA_ARGS__); \ - printf("\n"); \ - } +#define GGATED_EXPORT_FILE "/etc/gg.exports" + +struct ggd_connection { + off_t c_mediasize; + off_t c_sectorsize; + unsigned c_flags; /* flags (RO/RW) */ + int c_diskfd; + int c_sendfd; + int c_recvfd; + time_t c_birthtime; + char *c_path; + uint64_t c_token; + in_addr_t c_srcip; + LIST_ENTRY(ggd_connection) c_next; +}; -static const char *exports = G_GATED_EXPORT_FILE; -static int got_sighup = 0; -static int nagle = 1; -static unsigned rcvbuf = G_GATE_RCVBUF; -static unsigned sndbuf = G_GATE_SNDBUF; +struct ggd_request { + struct g_gate_hdr r_hdr; + char *r_data; + TAILQ_ENTRY(ggd_request) r_next; +}; +#define r_cmd r_hdr.gh_cmd +#define r_offset r_hdr.gh_offset +#define r_length r_hdr.gh_length +#define r_error r_hdr.gh_error -struct export { +struct ggd_export { char *e_path; /* path to device/file */ in_addr_t e_ip; /* remote IP address */ in_addr_t e_mask; /* IP mask */ unsigned e_flags; /* flags (RO/RW) */ - SLIST_ENTRY(export) e_next; + SLIST_ENTRY(ggd_export) e_next; }; -static SLIST_HEAD(, export) exports_list = - SLIST_HEAD_INITIALIZER(&exports_list); + +static const char *exports_file = GGATED_EXPORT_FILE; +static int got_sighup = 0; +in_addr_t bindaddr; + +static TAILQ_HEAD(, ggd_request) inqueue = TAILQ_HEAD_INITIALIZER(inqueue); +static TAILQ_HEAD(, ggd_request) outqueue = TAILQ_HEAD_INITIALIZER(outqueue); +pthread_mutex_t inqueue_mtx, outqueue_mtx; +pthread_cond_t inqueue_cond, outqueue_cond; + +static SLIST_HEAD(, ggd_export) exports = SLIST_HEAD_INITIALIZER(&exports); +static LIST_HEAD(, ggd_connection) connections = LIST_HEAD_INITIALIZER(&connection); + +static void *recv_thread(void *arg); +static void *disk_thread(void *arg); +static void *send_thread(void *arg); static void usage(void) @@ -118,7 +146,7 @@ countmask(unsigned m) static void line_parse(char *line, unsigned lineno) { - struct export *ex; + struct ggd_export *ex; char *word, *path, *sflags; unsigned flags, i, vmask; in_addr_t ip, mask; @@ -193,7 +221,7 @@ line_parse(char *line, unsigned lineno) ex->e_mask = mask; ex->e_flags = flags; - SLIST_INSERT_HEAD(&exports_list, ex, e_next); + SLIST_INSERT_HEAD(&exports, ex, e_next); g_gate_log(LOG_DEBUG, "Added %s/%u %s %s to exports list.", ip2str(ex->e_ip), vmask, path, sflags); @@ -202,11 +230,11 @@ line_parse(char *line, unsigned lineno) static void exports_clear(void) { - struct export *ex; + struct ggd_export *ex; - while (!SLIST_EMPTY(&exports_list)) { - ex = SLIST_FIRST(&exports_list); - SLIST_REMOVE_HEAD(&exports_list, e_next); + while (!SLIST_EMPTY(&exports)) { + ex = SLIST_FIRST(&exports); + SLIST_REMOVE_HEAD(&exports, e_next); free(ex); } } @@ -221,13 +249,13 @@ exports_get(void) exports_clear(); - fd = fopen(exports, "r"); + fd = fopen(exports_file, "r"); if (fd == NULL) { - g_gate_xlog("Cannot open exports file (%s): %s.", exports, + g_gate_xlog("Cannot open exports file (%s): %s.", exports_file, strerror(errno)); } - g_gate_log(LOG_INFO, "Reading exports file (%s).", exports); + g_gate_log(LOG_INFO, "Reading exports file (%s).", exports_file); for (;;) { if (fgets(buf, sizeof(buf), fd) == NULL) { @@ -270,236 +298,636 @@ exports_get(void) g_gate_log(LOG_INFO, "Exporting %u object(s).", objs); } -static struct export * -exports_find(struct sockaddr *s, const char *path) +static int +exports_check(struct ggd_export *ex, struct g_gate_cinit *cinit, + struct ggd_connection *conn) +{ + char ipmask[32]; /* 32 == strlen("xxx.xxx.xxx.xxx/xxx.xxx.xxx.xxx")+1 */ + int error = 0, flags; + + strlcpy(ipmask, ip2str(ex->e_ip), sizeof(ipmask)); + strlcat(ipmask, "/", sizeof(ipmask)); + strlcat(ipmask, ip2str(ex->e_mask), sizeof(ipmask)); + if ((cinit->gc_flags & GGATE_FLAG_RDONLY) != 0) { + if (ex->e_flags == O_WRONLY) { + g_gate_log(LOG_WARNING, "Read-only access requested, " + "but %s (%s) is exported write-only.", ex->e_path, + ipmask); + return (EPERM); + } else { + conn->c_flags |= GGATE_FLAG_RDONLY; + } + } else if ((cinit->gc_flags & GGATE_FLAG_WRONLY) != 0) { + if (ex->e_flags == O_RDONLY) { + g_gate_log(LOG_WARNING, "Write-only access requested, " + "but %s (%s) is exported read-only.", ex->e_path, + ipmask); + return (EPERM); + } else { + conn->c_flags |= GGATE_FLAG_WRONLY; + } + } else { + if (ex->e_flags == O_RDONLY) { + g_gate_log(LOG_WARNING, "Read-write access requested, " + "but %s (%s) is exported read-only.", ex->e_path, + ipmask); + return (EPERM); + } else if (ex->e_flags == O_WRONLY) { + g_gate_log(LOG_WARNING, "Read-write access requested, " + "but %s (%s) is exported write-only.", ex->e_path, + ipmask); + return (EPERM); + } + } + if ((conn->c_flags & GGATE_FLAG_RDONLY) != 0) + flags = O_RDONLY; + else if ((conn->c_flags & GGATE_FLAG_WRONLY) != 0) + flags = O_WRONLY; + else + flags = O_RDWR; + conn->c_diskfd = open(ex->e_path, flags); + if (conn->c_diskfd == -1) { + error = errno; + g_gate_log(LOG_ERR, "Cannot open %s: %s.", ex->e_path, + strerror(error)); + return (error); + } + return (0); +} + +static struct ggd_export * +exports_find(struct sockaddr *s, struct g_gate_cinit *cinit, + struct ggd_connection *conn) { - struct export *ex; + struct ggd_export *ex; in_addr_t ip; + int error; ip = htonl(((struct sockaddr_in *)(void *)s)->sin_addr.s_addr); - SLIST_FOREACH(ex, &exports_list, e_next) { - if ((ip & ex->e_mask) != ex->e_ip) + SLIST_FOREACH(ex, &exports, e_next) { + if ((ip & ex->e_mask) != ex->e_ip) { + g_gate_log(LOG_DEBUG, "exports[%s]: IP mismatch.", + ex->e_path); continue; - if (path != NULL && strcmp(path, ex->e_path) != 0) + } + if (strcmp(cinit->gc_path, ex->e_path) != 0) { + g_gate_log(LOG_DEBUG, "exports[%s]: Path mismatch.", + ex->e_path); continue; - - g_gate_log(LOG_INFO, "Connection from: %s.", ip2str(ip)); - return (ex); + } + error = exports_check(ex, cinit, conn); + if (error == 0) + return (ex); + else { + errno = error; + return (NULL); + } } - g_gate_log(LOG_INFO, "Unauthorized connection from: %s.", ip2str(ip)); - + g_gate_log(LOG_WARNING, "Unauthorized connection from: %s.", + ip2str(ip)); + errno = EPERM; return (NULL); } +/* + * Remove timed out connections. + */ static void -sendfail(int sfd, int error, const char *fmt, ...) +connection_cleanups(void) { - struct g_gate_sinit sinit; - va_list ap; - int data; - - sinit.gs_error = error; - g_gate_swap2n_sinit(&sinit); - data = send(sfd, &sinit, sizeof(sinit), 0); - g_gate_swap2h_sinit(&sinit); - if (data == -1) { - g_gate_xlog("Error while sending initial packet: %s.", - strerror(errno)); - } - if (fmt != NULL) { - va_start(ap, fmt); - g_gate_xvlog(fmt, ap); - /* NOTREACHED */ - va_end(ap); + struct ggd_connection *conn, *tconn; + time_t now; + + time(&now); + LIST_FOREACH_SAFE(conn, &connections, c_next, tconn) { + if (now - conn->c_birthtime > 10) { + LIST_REMOVE(conn, c_next); + g_gate_log(LOG_NOTICE, + "Connection from %s [%s] removed.", + ip2str(conn->c_srcip), conn->c_path); + close(conn->c_diskfd); + close(conn->c_sendfd); + close(conn->c_recvfd); + free(conn->c_path); + free(conn); + } } - exit(EXIT_FAILURE); } -static void -serve(int sfd, struct sockaddr *s) +static struct ggd_connection * +connection_find(struct g_gate_cinit *cinit) { - struct g_gate_cinit cinit; - struct g_gate_sinit sinit; - struct g_gate_hdr hdr; - struct export *ex; - char ipmask[32]; /* 32 == strlen("xxx.xxx.xxx.xxx/xxx.xxx.xxx.xxx")+1 */ - size_t bufsize; - int32_t error; - int fd, flags; - ssize_t data; - char *buf; + struct ggd_connection *conn; - g_gate_log(LOG_DEBUG, "Receiving initial packet."); - data = recv(sfd, &cinit, sizeof(cinit), MSG_WAITALL); - g_gate_swap2h_cinit(&cinit); - if (data == -1) { - g_gate_xlog("Error while receiving initial packet: %s.", - strerror(errno)); + LIST_FOREACH(conn, &connections, c_next) { + if (conn->c_token == cinit->gc_token) + break; } + return (conn); +} - ex = exports_find(s, cinit.gc_path); - if (ex == NULL) { - sendfail(sfd, EINVAL, "Requested path isn't exported: %s.", - strerror(errno)); +static struct ggd_connection * +connection_new(struct g_gate_cinit *cinit, struct sockaddr *s, int sfd) +{ + struct ggd_connection *conn; + in_addr_t ip; + + /* + * First, look for old connections. + * We probably should do it every X seconds, but what for? + * It is only dangerous if an attacker wants to overload connections + * queue, so here is a good place to do the cleanups. + */ + connection_cleanups(); + + conn = malloc(sizeof(*conn)); + if (conn == NULL) + return (NULL); + conn->c_path = strdup(cinit->gc_path); + if (conn->c_path == NULL) { + free(conn); + return (NULL); } + conn->c_token = cinit->gc_token; + ip = htonl(((struct sockaddr_in *)(void *)s)->sin_addr.s_addr); + conn->c_srcip = ip; + conn->c_sendfd = conn->c_recvfd = -1; + if ((cinit->gc_flags & GGATE_FLAG_SEND) != 0) + conn->c_sendfd = sfd; + else + conn->c_recvfd = sfd; + conn->c_mediasize = 0; + conn->c_sectorsize = 0; + time(&conn->c_birthtime); + conn->c_flags = cinit->gc_flags; + LIST_INSERT_HEAD(&connections, conn, c_next); + g_gate_log(LOG_DEBUG, "Connection created [%s, %s].", ip2str(ip), + conn->c_path); + return (conn); +} - error = 0; - strlcpy(ipmask, ip2str(ex->e_ip), sizeof(ipmask)); - strlcat(ipmask, "/", sizeof(ipmask)); - strlcat(ipmask, ip2str(ex->e_mask), sizeof(ipmask)); - if ((cinit.gc_flags & G_GATE_FLAG_READONLY) != 0) { - if (ex->e_flags == O_WRONLY) { - g_gate_log(LOG_ERR, "Read-only access requested, but " - "%s (%s) is exported write-only.", ex->e_path, - ipmask); - error = EPERM; - } else { - sinit.gs_flags = G_GATE_FLAG_READONLY; - } - } else if ((cinit.gc_flags & G_GATE_FLAG_WRITEONLY) != 0) { - if (ex->e_flags == O_RDONLY) { - g_gate_log(LOG_ERR, "Write-only access requested, but " - "%s (%s) is exported read-only.", ex->e_path, - ipmask); - error = EPERM; - } else { - sinit.gs_flags = G_GATE_FLAG_WRITEONLY; +static int +connection_add(struct ggd_connection *conn, struct g_gate_cinit *cinit, + struct sockaddr *s, int sfd) +{ + in_addr_t ip; + + ip = htonl(((struct sockaddr_in *)(void *)s)->sin_addr.s_addr); + if ((cinit->gc_flags & GGATE_FLAG_SEND) != 0) { + if (conn->c_sendfd != -1) { + g_gate_log(LOG_WARNING, + "Send socket already exists [%s, %s].", ip2str(ip), + conn->c_path); + return (EEXIST); } + conn->c_sendfd = sfd; } else { - if (ex->e_flags == O_RDONLY) { - g_gate_log(LOG_ERR, "Read-write access requested, but " - "%s (%s) is exported read-only.", ex->e_path, - ipmask); - error = EPERM; - } else if (ex->e_flags == O_WRONLY) { - g_gate_log(LOG_ERR, "Read-write access requested, but " - "%s (%s) is exported write-only.", ex->e_path, - ipmask); - error = EPERM; - } else { - sinit.gs_flags = 0; + if (conn->c_recvfd != -1) { + g_gate_log(LOG_WARNING, + "Receive socket already exists [%s, %s].", + ip2str(ip), conn->c_path); + return (EEXIST); } + conn->c_recvfd = sfd; } - if (error != 0) - sendfail(sfd, error, NULL); - flags = g_gate_openflags(sinit.gs_flags); - fd = open(ex->e_path, flags); - if (fd < 0) { - sendfail(sfd, errno, "Error while opening %s: %s.", ex->e_path, - strerror(errno)); + g_gate_log(LOG_DEBUG, "Connection added [%s, %s].", ip2str(ip), + conn->c_path); + return (0); +} + +/* + * Remove one socket from the given connection or the whole + * connection if sfd == -1. + */ +static void +connection_remove(struct ggd_connection *conn) +{ + + LIST_REMOVE(conn, c_next); + g_gate_log(LOG_DEBUG, "Connection removed [%s %s].", + ip2str(conn->c_srcip), conn->c_path); + if (conn->c_sendfd != -1) + close(conn->c_sendfd); + if (conn->c_recvfd != -1) + close(conn->c_recvfd); + free(conn->c_path); + free(conn); +} + +static int +connection_ready(struct ggd_connection *conn) +{ + + return (conn->c_sendfd != -1 && conn->c_recvfd != -1); +} + +static void +connection_launch(struct ggd_connection *conn) +{ + pthread_t td; + int error, pid; + + pid = fork(); + if (pid > 0) + return; + else if (pid == -1) { + g_gate_log(LOG_ERR, "Cannot fork: %s.", strerror(errno)); + return; } + g_gate_log(LOG_DEBUG, "Process created [%s].", conn->c_path); - g_gate_log(LOG_DEBUG, "Sending initial packet."); /* - * This field isn't used by ggc(8) for now. - * It should be used in future when user don't give device size. + * Create condition variables and mutexes for in-queue and out-queue + * synchronization. */ - sinit.gs_mediasize = g_gate_mediasize(fd); - sinit.gs_sectorsize = g_gate_sectorsize(fd); - sinit.gs_error = 0; + error = pthread_mutex_init(&inqueue_mtx, NULL); + if (error != 0) { + g_gate_xlog("pthread_mutex_init(inqueue_mtx): %s.", + strerror(error)); + } + error = pthread_cond_init(&inqueue_cond, NULL); + if (error != 0) { + g_gate_xlog("pthread_cond_init(inqueue_cond): %s.", + strerror(error)); + } + error = pthread_mutex_init(&outqueue_mtx, NULL); + if (error != 0) { + g_gate_xlog("pthread_mutex_init(outqueue_mtx): %s.", + strerror(error)); + } + error = pthread_cond_init(&outqueue_cond, NULL); + if (error != 0) { + g_gate_xlog("pthread_cond_init(outqueue_cond): %s.", + strerror(error)); + } + + /* + * Create threads: + * recvtd - thread for receiving I/O request + * diskio - thread for doing I/O request + * sendtd - thread for sending I/O requests back + */ + error = pthread_create(&td, NULL, send_thread, conn); + if (error != 0) { + g_gate_xlog("pthread_create(send_thread): %s.", + strerror(error)); + } + error = pthread_create(&td, NULL, recv_thread, conn); + if (error != 0) { + g_gate_xlog("pthread_create(recv_thread): %s.", + strerror(error)); + } + disk_thread(conn); +} + +static void +sendfail(int sfd, int error, const char *fmt, ...) +{ + struct g_gate_sinit sinit; + va_list ap; + ssize_t data; + + sinit.gs_error = error; g_gate_swap2n_sinit(&sinit); - data = send(sfd, &sinit, sizeof(sinit), 0); + data = g_gate_send(sfd, &sinit, sizeof(sinit), 0); g_gate_swap2h_sinit(&sinit); - if (data == -1) { - sendfail(sfd, errno, "Error while sending initial packet: %s.", + if (data != sizeof(sinit)) { + g_gate_log(LOG_WARNING, "Cannot send initial packet: %s.", strerror(errno)); + return; + } + if (fmt != NULL) { + va_start(ap, fmt); + g_gate_vlog(LOG_WARNING, fmt, ap); + va_end(ap); } +} - bufsize = G_GATE_BUFSIZE_START; - buf = malloc(bufsize); - if (buf == NULL) - g_gate_xlog("No enough memory."); +static void * +malloc_waitok(size_t size) +{ + void *p; - g_gate_log(LOG_DEBUG, "New process: %u.", getpid()); + while ((p = malloc(size)) == NULL) { + g_gate_log(LOG_DEBUG, "Cannot allocate %zu bytes.", size); + sleep(1); + } + return (p); +} + +static void * +recv_thread(void *arg) +{ + struct ggd_connection *conn; + struct ggd_request *req; + ssize_t data; + int error, fd; + conn = arg; + g_gate_log(LOG_NOTICE, "%s: started [%s]!", __func__, conn->c_path); + fd = conn->c_recvfd; for (;;) { /* - * Receive request. + * Get header packet. */ - data = recv(sfd, &hdr, sizeof(hdr), MSG_WAITALL); + req = malloc_waitok(sizeof(*req)); + data = g_gate_recv(fd, &req->r_hdr, sizeof(req->r_hdr), + MSG_WAITALL); if (data == 0) { g_gate_log(LOG_DEBUG, "Process %u exiting.", getpid()); exit(EXIT_SUCCESS); } else if (data == -1) { g_gate_xlog("Error while receiving hdr packet: %s.", strerror(errno)); - } else if (data != sizeof(hdr)) { + } else if (data != sizeof(req->r_hdr)) { g_gate_xlog("Malformed hdr packet received."); } g_gate_log(LOG_DEBUG, "Received hdr packet."); - g_gate_swap2h_hdr(&hdr); + g_gate_swap2h_hdr(&req->r_hdr); + + g_gate_log(LOG_DEBUG, "%s: offset=%jd length=%u", __func__, + (intmax_t)req->r_offset, (unsigned)req->r_length); /* - * Increase buffer if there is need to. + * Allocate memory for data. */ - if (hdr.gh_length > bufsize) { - bufsize = hdr.gh_length; - g_gate_log(LOG_DEBUG, "Increasing buffer to %u.", - bufsize); - buf = realloc(buf, bufsize); - if (buf == NULL) - g_gate_xlog("No enough memory."); - } + req->r_data = malloc_waitok(req->r_length); - if (hdr.gh_cmd == BIO_READ) { - if (pread(fd, buf, hdr.gh_length, - hdr.gh_offset) == -1) { - error = errno; - g_gate_log(LOG_ERR, "Error while reading data " - "(offset=%ju, size=%zu): %s.", - (uintmax_t)hdr.gh_offset, - (size_t)hdr.gh_length, strerror(error)); - } else { - error = 0; - } - hdr.gh_error = error; - g_gate_swap2n_hdr(&hdr); - if (send(sfd, &hdr, sizeof(hdr), 0) == -1) { - g_gate_xlog("Error while sending status: %s.", - strerror(errno)); - } - g_gate_swap2h_hdr(&hdr); - /* Send data only if there was no error while pread(). */ - if (error == 0) { - data = send(sfd, buf, hdr.gh_length, 0); - if (data == -1) { - g_gate_xlog("Error while sending data: " - "%s.", strerror(errno)); - } - g_gate_log(LOG_DEBUG, "Sent %d bytes " - "(offset=%ju, size=%zu).", data, - (uintmax_t)hdr.gh_offset, - (size_t)hdr.gh_length); - } - } else /* if (hdr.gh_cmd == BIO_WRITE) */ { + /* + * Receive data to write for WRITE request. + */ + if (req->r_cmd == GGATE_CMD_WRITE) { g_gate_log(LOG_DEBUG, "Waiting for %u bytes of data...", - hdr.gh_length); - data = recv(sfd, buf, hdr.gh_length, MSG_WAITALL); + req->r_length); + data = g_gate_recv(fd, req->r_data, req->r_length, + MSG_WAITALL); if (data == -1) { g_gate_xlog("Error while receiving data: %s.", strerror(errno)); } - if (pwrite(fd, buf, hdr.gh_length, hdr.gh_offset) == -1) { - error = errno; - g_gate_log(LOG_ERR, "Error while writing data " - "(offset=%llu, size=%u): %s.", - hdr.gh_offset, hdr.gh_length, - strerror(error)); - } else { - error = 0; + } + + /* + * Put the request onto the incoming queue. + */ + error = pthread_mutex_lock(&inqueue_mtx); + assert(error == 0); + TAILQ_INSERT_TAIL(&inqueue, req, r_next); + error = pthread_cond_signal(&inqueue_cond); + assert(error == 0); + error = pthread_mutex_unlock(&inqueue_mtx); + assert(error == 0); + } +} + +static void * +disk_thread(void *arg) +{ + struct ggd_connection *conn; + struct ggd_request *req; + ssize_t data; + int error, fd; + + conn = arg; + g_gate_log(LOG_NOTICE, "%s: started [%s]!", __func__, conn->c_path); + fd = conn->c_diskfd; + for (;;) { + /* + * Get a request from the incoming queue. + */ + error = pthread_mutex_lock(&inqueue_mtx); + assert(error == 0); + while ((req = TAILQ_FIRST(&inqueue)) == NULL) { + error = pthread_cond_wait(&inqueue_cond, &inqueue_mtx); + assert(error == 0); + } + TAILQ_REMOVE(&inqueue, req, r_next); + error = pthread_mutex_unlock(&inqueue_mtx); + assert(error == 0); + + /* + * Check the request. + */ + assert(req->r_cmd == GGATE_CMD_READ || req->r_cmd == GGATE_CMD_WRITE); + assert(req->r_offset + req->r_length <= (uintmax_t)conn->c_mediasize); + assert((req->r_offset % conn->c_sectorsize) == 0); + assert((req->r_length % conn->c_sectorsize) == 0); + + g_gate_log(LOG_DEBUG, "%s: offset=%jd length=%u", __func__, + (intmax_t)req->r_offset, (unsigned)req->r_length); + + /* + * Do the request. + */ + data = 0; + switch (req->r_cmd) { + case GGATE_CMD_READ: + data = pread(fd, req->r_data, req->r_length, + req->r_offset); + break; + case GGATE_CMD_WRITE: + data = pwrite(fd, req->r_data, req->r_length, + req->r_offset); + /* Free data memory here - better sooner. */ + free(req->r_data); + req->r_data = NULL; + break; + } + if (data != (ssize_t)req->r_length) { + /* Report short reads/writes as I/O errors. */ + if (errno == 0) + errno = EIO; + g_gate_log(LOG_ERR, "Disk error: %s", strerror(errno)); + req->r_error = errno; + if (req->r_data != NULL) { + free(req->r_data); + req->r_data = NULL; } - hdr.gh_error = error; - g_gate_swap2n_hdr(&hdr); - if (send(sfd, &hdr, sizeof(hdr), 0) == -1) { - g_gate_xlog("Error while sending status: %s.", + } + + /* + * Put the request onto the outgoing queue. + */ + error = pthread_mutex_lock(&outqueue_mtx); + assert(error == 0); + TAILQ_INSERT_TAIL(&outqueue, req, r_next); + error = pthread_cond_signal(&outqueue_cond); + assert(error == 0); + error = pthread_mutex_unlock(&outqueue_mtx); + assert(error == 0); + } +} + +static void * +send_thread(void *arg) +{ + struct ggd_connection *conn; + struct ggd_request *req; + ssize_t data; + int error, fd; + + conn = arg; + g_gate_log(LOG_NOTICE, "%s: started [%s]!", __func__, conn->c_path); + fd = conn->c_sendfd; + for (;;) { + /* + * Get a request from the outgoing queue. + */ + error = pthread_mutex_lock(&outqueue_mtx); + assert(error == 0); + while ((req = TAILQ_FIRST(&outqueue)) == NULL) { + error = pthread_cond_wait(&outqueue_cond, + &outqueue_mtx); + assert(error == 0); + } + TAILQ_REMOVE(&outqueue, req, r_next); + error = pthread_mutex_unlock(&outqueue_mtx); + assert(error == 0); + + g_gate_log(LOG_DEBUG, "%s: offset=%jd length=%u", __func__, + (intmax_t)req->r_offset, (unsigned)req->r_length); + + /* + * Send the request. + */ + g_gate_swap2n_hdr(&req->r_hdr); + if (g_gate_send(fd, &req->r_hdr, sizeof(req->r_hdr), 0) == -1) { + g_gate_xlog("Error while sending hdr packet: %s.", + strerror(errno)); + } + g_gate_log(LOG_DEBUG, "Sent hdr packet."); + g_gate_swap2h_hdr(&req->r_hdr); + if (req->r_data != NULL) { + data = g_gate_send(fd, req->r_data, req->r_length, 0); + if (data != (ssize_t)req->r_length) { + g_gate_xlog("Error while sending data: %s.", strerror(errno)); } - g_gate_swap2h_hdr(&hdr); - g_gate_log(LOG_DEBUG, "Received %d bytes (offset=%llu, " - "size=%u).", data, hdr.gh_offset, hdr.gh_length); + g_gate_log(LOG_DEBUG, + "Sent %zd bytes (offset=%ju, size=%zu).", data, + (uintmax_t)req->r_offset, (size_t)req->r_length); + free(req->r_data); + } + free(req); + } +} + +static void +log_connection(struct sockaddr *from) +{ + in_addr_t ip; + + ip = htonl(((struct sockaddr_in *)(void *)from)->sin_addr.s_addr); + g_gate_log(LOG_INFO, "Connection from: %s.", ip2str(ip)); +} + +static int +handshake(struct sockaddr *from, int sfd) +{ + struct g_gate_version ver; + struct g_gate_cinit cinit; + struct g_gate_sinit sinit; + struct ggd_connection *conn; + struct ggd_export *ex; + ssize_t data; + + log_connection(from); + /* + * Phase 1: Version verification. + */ + g_gate_log(LOG_DEBUG, "Receiving version packet."); + data = g_gate_recv(sfd, &ver, sizeof(ver), MSG_WAITALL); + g_gate_swap2h_version(&ver); + if (data != sizeof(ver)) { + g_gate_log(LOG_WARNING, "Malformed version packet."); + return (0); + } + g_gate_log(LOG_DEBUG, "Version packet received."); + if (memcmp(ver.gv_magic, GGATE_MAGIC, strlen(GGATE_MAGIC)) != 0) { + g_gate_log(LOG_WARNING, "Invalid magic field."); + return (0); + } + if (ver.gv_version != GGATE_VERSION) { + g_gate_log(LOG_WARNING, "Version %u is not supported.", + ver.gv_version); + return (0); + } + ver.gv_error = 0; + g_gate_swap2n_version(&ver); + data = g_gate_send(sfd, &ver, sizeof(ver), 0); + g_gate_swap2h_version(&ver); + if (data == -1) { + sendfail(sfd, errno, "Error while sending version packet: %s.", + strerror(errno)); + return (0); + } + + /* + * Phase 2: Request verification. + */ + g_gate_log(LOG_DEBUG, "Receiving initial packet."); + data = g_gate_recv(sfd, &cinit, sizeof(cinit), MSG_WAITALL); + g_gate_swap2h_cinit(&cinit); + if (data != sizeof(cinit)) { + g_gate_log(LOG_WARNING, "Malformed initial packet."); + return (0); + } + g_gate_log(LOG_DEBUG, "Initial packet received."); + conn = connection_find(&cinit); + if (conn != NULL) { + /* + * Connection should already exists. + */ + g_gate_log(LOG_DEBUG, "Found existing connection (token=%lu).", + (unsigned long)conn->c_token); + if (connection_add(conn, &cinit, from, sfd) == -1) { + connection_remove(conn); + return (0); + } + } else { + /* + * New connection, allocate space. + */ + conn = connection_new(&cinit, from, sfd); + if (conn == NULL) { + sendfail(sfd, ENOMEM, + "Cannot allocate new connection."); + return (0); } - g_gate_log(LOG_DEBUG, "Tick."); + g_gate_log(LOG_DEBUG, "New connection created (token=%lu).", + (unsigned long)conn->c_token); + } + + ex = exports_find(from, &cinit, conn); + if (ex == NULL) { + connection_remove(conn); + sendfail(sfd, errno, NULL); + return (0); + } + if (conn->c_mediasize == 0) { + conn->c_mediasize = g_gate_mediasize(conn->c_diskfd); + conn->c_sectorsize = g_gate_sectorsize(conn->c_diskfd); + } + sinit.gs_mediasize = conn->c_mediasize; + sinit.gs_sectorsize = conn->c_sectorsize; + sinit.gs_error = 0; + + g_gate_log(LOG_DEBUG, "Sending initial packet."); + + g_gate_swap2n_sinit(&sinit); + data = g_gate_send(sfd, &sinit, sizeof(sinit), 0); + g_gate_swap2h_sinit(&sinit); + if (data == -1) { + sendfail(sfd, errno, "Error while sending initial packet: %s.", + strerror(errno)); + return (0); } + + if (connection_ready(conn)) { + connection_launch(conn); + connection_remove(conn); + } + return (1); } static void @@ -514,12 +942,9 @@ main(int argc, char *argv[]) { struct sockaddr_in serv; struct sockaddr from; - in_addr_t bindaddr; socklen_t fromlen; - struct timeval tv; - int on, sfd, tmpsfd; - pid_t childpid; - unsigned bsize, port; + int sfd, tmpsfd; + unsigned port; bindaddr = htonl(INADDR_ANY); port = G_GATE_PORT; @@ -570,45 +995,27 @@ main(int argc, char *argv[]) argv += optind; if (argv[0] != NULL) - exports = argv[0]; + exports_file = argv[0]; exports_get(); if (!g_gate_verbose) { /* Run in daemon mode. */ if (daemon(0, 0) == -1) - g_gate_xlog("Can't daemonize: %s", strerror(errno)); + g_gate_xlog("Cannot daemonize: %s", strerror(errno)); } signal(SIGCHLD, SIG_IGN); sfd = socket(AF_INET, SOCK_STREAM, 0); if (sfd == -1) - g_gate_xlog("Can't open stream socket: %s.", strerror(errno)); + g_gate_xlog("Cannot open stream socket: %s.", strerror(errno)); bzero(&serv, sizeof(serv)); serv.sin_family = AF_INET; serv.sin_addr.s_addr = bindaddr; serv.sin_port = htons(port); - on = 1; - if (nagle) { - if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &on, - sizeof(on)) == -1) { - g_gate_xlog("setsockopt() error: %s.", strerror(errno)); - } - } - if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) - g_gate_xlog("setsockopt(): %s.", strerror(errno)); - bsize = rcvbuf; - if (setsockopt(sfd, SOL_SOCKET, SO_RCVBUF, &bsize, sizeof(bsize)) == -1) - g_gate_xlog("setsockopt(): %s.", strerror(errno)); - bsize = sndbuf; - if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &bsize, sizeof(bsize)) == -1) - g_gate_xlog("setsockopt(): %s.", strerror(errno)); - tv.tv_sec = 10; - tv.tv_usec = 0; - if (setsockopt(sfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1 || - setsockopt(sfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) { - g_gate_xlog("setsockopt() error: %s.", strerror(errno)); - } + + g_gate_socket_settings(sfd); + if (bind(sfd, (struct sockaddr *)&serv, sizeof(serv)) == -1) g_gate_xlog("bind(): %s.", strerror(errno)); if (listen(sfd, 5) == -1) @@ -629,21 +1036,8 @@ main(int argc, char *argv[]) exports_get(); } - if (exports_find(&from, NULL) == NULL) { + if (!handshake(&from, tmpsfd)) close(tmpsfd); - continue; - } - - childpid = fork(); - if (childpid < 0) { - g_gate_xlog("Cannot create child process: %s.", - strerror(errno)); - } else if (childpid == 0) { - close(sfd); - serve(tmpsfd, &from); - /* NOTREACHED */ - } - close(tmpsfd); } close(sfd); exit(EXIT_SUCCESS); diff --git a/shared/ggate.c b/shared/ggate.c index 08f13d7..c8428a6 100644 --- a/shared/ggate.c +++ b/shared/ggate.c @@ -38,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -206,17 +207,6 @@ g_gate_destroy(int unit, int force) g_gate_ioctl(G_GATE_CMD_DESTROY, &ggio); } -int -g_gate_openflags(unsigned ggflags) -{ - - if ((ggflags & G_GATE_FLAG_READONLY) != 0) - return (O_RDONLY); - else if ((ggflags & G_GATE_FLAG_WRITEONLY) != 0) - return (O_WRONLY); - return (O_RDWR); -} - void g_gate_load_module(void) { @@ -232,6 +222,76 @@ g_gate_load_module(void) } } +ssize_t +g_gate_send(int s, const void *buf, size_t len, int flags) +{ + ssize_t done = 0, done2; + const unsigned char *p = buf; + + while (len > 0) { + done2 = send(s, p, len, flags); + if (done2 == 0) + break; + else if (done2 == -1) { + if (errno == EAGAIN) { + printf("%s: EAGAIN\n", __func__); + continue; + } + done = -1; + break; + } + done += done2; + p += done2; + len -= done2; + } + return (done); +} + +ssize_t +g_gate_recv(int s, void *buf, size_t len, int flags) +{ + + return (recv(s, buf, len, flags)); +} + +int nagle = 1; +unsigned rcvbuf = G_GATE_RCVBUF; +unsigned sndbuf = G_GATE_SNDBUF; + +void +g_gate_socket_settings(int sfd) +{ + struct timeval tv; + int bsize, on; + + /* Socket settings. */ + on = 1; + if (nagle) { + if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &on, + sizeof(on)) == -1) { + g_gate_xlog("setsockopt() error: %s.", strerror(errno)); + } + } + if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) + g_gate_xlog("setsockopt(SO_REUSEADDR): %s.", strerror(errno)); + bsize = rcvbuf; + if (setsockopt(sfd, SOL_SOCKET, SO_RCVBUF, &bsize, sizeof(bsize)) == -1) + g_gate_xlog("setsockopt(SO_RCVBUF): %s.", strerror(errno)); + bsize = sndbuf; + if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &bsize, sizeof(bsize)) == -1) + g_gate_xlog("setsockopt(SO_SNDBUF): %s.", strerror(errno)); + tv.tv_sec = 1; + tv.tv_usec = 0; + if (setsockopt(sfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1) { + g_gate_log(LOG_ERR, "setsockopt(SO_SNDTIMEO) error: %s.", + strerror(errno)); + } + if (setsockopt(sfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) { + g_gate_log(LOG_ERR, "setsockopt(SO_RCVTIMEO) error: %s.", + strerror(errno)); + } +} + #ifdef LIBGEOM static struct gclass * find_class(struct gmesh *mesh, const char *name) diff --git a/shared/ggate.h b/shared/ggate.h index 12dfe6d..acbdaaa 100644 --- a/shared/ggate.h +++ b/shared/ggate.h @@ -32,22 +32,49 @@ #include #include -#define G_GATE_BUFSIZE_START 65536 #define G_GATE_PORT 3080 #define G_GATE_RCVBUF 131072 #define G_GATE_SNDBUF 131072 #define G_GATE_QUEUE_SIZE 1024 -#define G_GATE_TIMEOUT 30 +#define G_GATE_TIMEOUT 0 + +#define GGATE_MAGIC "GEOM_GATE " +#define GGATE_VERSION 0 + +#define GGATE_FLAG_RDONLY 0x0001 +#define GGATE_FLAG_WRONLY 0x0002 +/* + * If GGATE_FLAG_SEND not GGATE_FLAG_RECV flag is set, this is initial + * connection. + * If GGATE_FLAG_SEND flag is set - this is socket to send data. + * If GGATE_FLAG_RECV flag is set - this is socket to receive data. + */ +#define GGATE_FLAG_SEND 0x0004 +#define GGATE_FLAG_RECV 0x0008 + +#define GGATE_CMD_READ 0 +#define GGATE_CMD_WRITE 1 extern int g_gate_devfd; extern int g_gate_verbose; +extern int nagle; +extern unsigned rcvbuf, sndbuf; + +struct g_gate_version { + char gv_magic[16]; + uint16_t gv_version; + uint16_t gv_error; +} __packed; + /* Client's initial packet. */ struct g_gate_cinit { - char gc_path[PATH_MAX + 1]; - uint8_t gc_flags; -}; + char gc_path[PATH_MAX + 1]; + uint64_t gc_flags; + uint16_t gc_nconn; + uint32_t gc_token; +} __packed; /* Server's initial packet. */ struct g_gate_sinit { @@ -55,15 +82,16 @@ struct g_gate_sinit { uint64_t gs_mediasize; uint32_t gs_sectorsize; uint16_t gs_error; -}; +} __packed; /* Control struct. */ struct g_gate_hdr { uint8_t gh_cmd; /* command */ uint64_t gh_offset; /* device offset */ uint32_t gh_length; /* size of block */ - int16_t gh_error; /* error value (0 if ok) */ -}; + uint64_t gh_seq; /* request number */ + uint16_t gh_error; /* error value (0 if ok) */ +} __packed; void g_gate_vlog(int priority, const char *message, va_list ap); void g_gate_log(int priority, const char *message, ...); @@ -75,8 +103,10 @@ void g_gate_open_device(void); void g_gate_close_device(void); void g_gate_ioctl(unsigned long req, void *data); void g_gate_destroy(int unit, int force); -int g_gate_openflags(unsigned ggflags); void g_gate_load_module(void); +ssize_t g_gate_recv(int s, void *buf, size_t len, int flags); +ssize_t g_gate_send(int s, const void *buf, size_t len, int flags); +void g_gate_socket_settings(int sfd); #ifdef LIBGEOM void g_gate_list(int unit, int verbose); #endif @@ -89,17 +119,37 @@ in_addr_t g_gate_str2ip(const char *str); */ static __inline void -g_gate_swap2h_cinit(struct g_gate_cinit *cinit __unused) +g_gate_swap2h_version(struct g_gate_version *ver) +{ + + ver->gv_version = be16toh(ver->gv_version); + ver->gv_error = be16toh(ver->gv_error); +} + +static __inline void +g_gate_swap2n_version(struct g_gate_version *ver) +{ + + ver->gv_version = htobe16(ver->gv_version); + ver->gv_error = htobe16(ver->gv_error); +} + +static __inline void +g_gate_swap2h_cinit(struct g_gate_cinit *cinit) { - /* Nothing here for now. */ + cinit->gc_flags = be64toh(cinit->gc_flags); + cinit->gc_nconn = be16toh(cinit->gc_nconn); + cinit->gc_token = be32toh(cinit->gc_token); } static __inline void -g_gate_swap2n_cinit(struct g_gate_cinit *cinit __unused) +g_gate_swap2n_cinit(struct g_gate_cinit *cinit) { - /* Nothing here for now. */ + cinit->gc_flags = htobe64(cinit->gc_flags); + cinit->gc_nconn = htobe16(cinit->gc_nconn); + cinit->gc_token = htobe32(cinit->gc_token); } static __inline void @@ -129,6 +179,7 @@ g_gate_swap2h_hdr(struct g_gate_hdr *hdr) /* Swap only used fields. */ hdr->gh_offset = be64toh(hdr->gh_offset); hdr->gh_length = be32toh(hdr->gh_length); + hdr->gh_seq = be64toh(hdr->gh_seq); hdr->gh_error = be16toh(hdr->gh_error); } @@ -139,6 +190,7 @@ g_gate_swap2n_hdr(struct g_gate_hdr *hdr) /* Swap only used fields. */ hdr->gh_offset = htobe64(hdr->gh_offset); hdr->gh_length = htobe32(hdr->gh_length); + hdr->gh_seq = htobe64(hdr->gh_seq); hdr->gh_error = htobe16(hdr->gh_error); } #endif /* _GGATE_H_ */