diff --git a/ggatec/Makefile b/ggatec/Makefile index 5674aa1..c49cfe8 100644 --- a/ggatec/Makefile +++ b/ggatec/Makefile @@ -9,7 +9,7 @@ SRCS= ggatec.c ggate.c CFLAGS+= -DLIBGEOM CFLAGS+= -I${.CURDIR}/../shared -DPADD= ${LIBGEOM} ${LIBSBUF} ${LIBBSDXML} ${LIBUTIL} -LDADD= -lgeom -lsbuf -lbsdxml -lutil +DPADD= ${LIBGEOM} ${LIBSBUF} ${LIBBSDXML} ${LIBUTIL} ${LIBPTHREAD} +LDADD= -lgeom -lsbuf -lbsdxml -lutil -lpthread .include diff --git a/ggatec/ggatec.c b/ggatec/ggatec.c index 84aa60e..70f667d 100644 --- a/ggatec/ggatec.c +++ b/ggatec/ggatec.c @@ -34,8 +34,12 @@ #include #include #include +#include +#include #include #include +#include + #include #include #include @@ -51,21 +55,22 @@ #include "ggate.h" -enum { UNSET, ATTACH, CREATE, DESTROY, LIST } action = UNSET; +enum { UNSET, CREATE, DESTROY, LIST, RESCUE } action = UNSET; static const char *path = NULL; static const char *host = NULL; static int unit = -1; static unsigned flags = 0; static int force = 0; -static int nagle = 1; static unsigned queue_size = G_GATE_QUEUE_SIZE; static unsigned port = G_GATE_PORT; static off_t mediasize; static unsigned sectorsize = 0; static unsigned timeout = G_GATE_TIMEOUT; -static unsigned rcvbuf = G_GATE_RCVBUF; -static unsigned sndbuf = G_GATE_SNDBUF; +static int sendfd, recvfd; +static uint32_t token; +static pthread_t sendtd, recvtd; +static int reconnect; static void usage(void) @@ -74,129 +79,30 @@ usage(void) fprintf(stderr, "usage: %s create [-nv] [-o ] [-p port] " "[-q queue_size] [-R rcvbuf] [-S sndbuf] [-s sectorsize] " "[-t timeout] [-u unit] \n", getprogname()); - fprintf(stderr, " %s attach [-nv] [-o ] [-p port] " + fprintf(stderr, " %s rescue [-nv] [-o ] [-p port] " "[-R rcvbuf] [-S sndbuf] <-u unit> \n", getprogname()); fprintf(stderr, " %s destroy [-f] <-u unit>\n", getprogname()); fprintf(stderr, " %s list [-v] [-u unit]\n", getprogname()); exit(EXIT_FAILURE); } -static int -handshake(void) -{ - struct g_gate_cinit cinit; - struct g_gate_sinit sinit; - struct sockaddr_in serv; - struct timeval tv; - size_t bsize; - int sfd; - - /* - * Do the network stuff. - */ - bzero(&serv, sizeof(serv)); - serv.sin_family = AF_INET; - serv.sin_addr.s_addr = g_gate_str2ip(host); - if (serv.sin_addr.s_addr == INADDR_NONE) { - g_gate_log(LOG_ERR, "Invalid IP/host name: %s.", host); - return (-1); - } - serv.sin_port = htons(port); - sfd = socket(AF_INET, SOCK_STREAM, 0); - if (sfd == -1) - g_gate_xlog("Can't open socket: %s.", strerror(errno)); - /* - * Some trivial network optimalization. - * This should be much more advanced. - */ - if (nagle) { - int on = 1; - - if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &on, - sizeof(on)) == -1) { - g_gate_xlog("setsockopt() error: %s.", strerror(errno)); - } - } - bsize = rcvbuf; - if (setsockopt(sfd, SOL_SOCKET, SO_RCVBUF, &bsize, sizeof(bsize)) == -1) - g_gate_xlog("setsockopt() error: %s.", strerror(errno)); - bsize = sndbuf; - if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &bsize, sizeof(bsize)) == -1) - g_gate_xlog("setsockopt() error: %s.", strerror(errno)); - tv.tv_sec = timeout; - tv.tv_usec = 0; - if (setsockopt(sfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1 || - setsockopt(sfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) { - g_gate_xlog("setsockopt() error: %s.", strerror(errno)); - } - if (connect(sfd, (struct sockaddr *)&serv, sizeof(serv)) == -1) { - g_gate_log(LOG_ERR, "Can't connect to server: %s.", - strerror(errno)); - return (-1); - } - - g_gate_log(LOG_INFO, "Connected to the server: %s:%d.", host, port); - - /* - * Creating and sending initial packet. - */ - if (strlcpy(cinit.gc_path, path, sizeof(cinit.gc_path)) >= - sizeof(cinit.gc_path)) { - g_gate_xlog("Path name too long."); - } - cinit.gc_flags = flags; - g_gate_log(LOG_DEBUG, "Sending initial packet."); - g_gate_swap2n_cinit(&cinit); - if (send(sfd, &cinit, sizeof(cinit), 0) == -1) { - g_gate_log(LOG_ERR, "Error while sending initial packet: %s.", - strerror(errno)); - return (-1); - } - g_gate_swap2h_cinit(&cinit); - - /* - * Receiving initial packet from server. - */ - g_gate_log(LOG_DEBUG, "Receiving initial packet."); - if (recv(sfd, &sinit, sizeof(sinit), MSG_WAITALL) == -1) { - g_gate_log(LOG_ERR, "Error while receiving data: %s.", - strerror(errno)); - return (-1); - } - g_gate_swap2h_sinit(&sinit); - if (sinit.gs_error != 0) - g_gate_xlog("Error from server: %s.", strerror(sinit.gs_error)); - - mediasize = sinit.gs_mediasize; - if (sectorsize == 0) - sectorsize = sinit.gs_sectorsize; - return (sfd); -} - -static int -serve(int sfd) +static void * +send_thread(void *arg __unused) { struct g_gate_ctl_io ggio; - size_t bsize; - char *buf; - - bsize = G_GATE_BUFSIZE_START; - buf = malloc(bsize); - if (buf == NULL) { - if (action == CREATE) - g_gate_destroy(unit, 1); - g_gate_xlog("No enough memory"); - } + struct g_gate_hdr hdr; + char buf[MAXPHYS]; + ssize_t data; + int error; + + g_gate_log(LOG_NOTICE, "%s: started!", __func__); ggio.gctl_version = G_GATE_VERSION; ggio.gctl_unit = unit; - bsize = sectorsize; - ggio.gctl_data = malloc(bsize); + ggio.gctl_data = buf; + for (;;) { - struct g_gate_hdr hdr; - int data, error; -once_again: - ggio.gctl_length = bsize; + ggio.gctl_length = sizeof(buf); ggio.gctl_error = 0; g_gate_ioctl(G_GATE_CMD_START, &ggio); error = ggio.gctl_error; @@ -204,11 +110,12 @@ once_again: case 0: break; case ECANCELED: + if (reconnect) + break; /* Exit gracefully. */ - free(ggio.gctl_data); g_gate_close_device(); - close(sfd); exit(EXIT_SUCCESS); +#if 0 case ENOMEM: /* Buffer too small. */ ggio.gctl_data = realloc(ggio.gctl_data, @@ -218,87 +125,232 @@ once_again: goto once_again; } /* FALLTHROUGH */ +#endif case ENXIO: default: g_gate_xlog("ioctl(/dev/%s): %s.", G_GATE_CTL_NAME, strerror(error)); } - hdr.gh_cmd = ggio.gctl_cmd; + if (reconnect) + break; + + switch (ggio.gctl_cmd) { + case BIO_READ: + hdr.gh_cmd = GGATE_CMD_READ; + break; + case BIO_WRITE: + hdr.gh_cmd = GGATE_CMD_WRITE; + break; + } + hdr.gh_seq = ggio.gctl_seq; hdr.gh_offset = ggio.gctl_offset; hdr.gh_length = ggio.gctl_length; hdr.gh_error = 0; g_gate_swap2n_hdr(&hdr); - data = send(sfd, &hdr, sizeof(hdr), 0); + + data = g_gate_send(sendfd, &hdr, sizeof(hdr), MSG_NOSIGNAL); g_gate_log(LOG_DEBUG, "Sent hdr packet."); g_gate_swap2h_hdr(&hdr); + if (reconnect) + break; if (data != sizeof(hdr)) { - ggio.gctl_error = EAGAIN; - goto done; + g_gate_log(LOG_ERR, "Lost connection 1."); + reconnect = 1; + pthread_kill(recvtd, SIGUSR1); + break; } - if (ggio.gctl_cmd == BIO_DELETE || ggio.gctl_cmd == BIO_WRITE) { - data = send(sfd, ggio.gctl_data, ggio.gctl_length, 0); - g_gate_log(LOG_DEBUG, "Sent data packet."); + + if (hdr.gh_cmd == GGATE_CMD_WRITE) { + data = g_gate_send(sendfd, ggio.gctl_data, + ggio.gctl_length, MSG_NOSIGNAL); + if (reconnect) + break; if (data != ggio.gctl_length) { - ggio.gctl_error = EAGAIN; - goto done; + g_gate_log(LOG_ERR, "Lost connection 2 (%zd != %zd).", data, (ssize_t)ggio.gctl_length); + reconnect = 1; + pthread_kill(recvtd, SIGUSR1); + break; } - g_gate_log(LOG_DEBUG, "Sent %d bytes (offset=%llu, " + g_gate_log(LOG_DEBUG, "Sent %zd bytes (offset=%llu, " "size=%u).", data, hdr.gh_offset, hdr.gh_length); } - data = recv(sfd, &hdr, sizeof(hdr), MSG_WAITALL); - g_gate_log(LOG_DEBUG, "Received hdr packet."); + } + g_gate_log(LOG_DEBUG, "%s: Died.", __func__); + return (NULL); +} + +static void * +recv_thread(void *arg __unused) +{ + struct g_gate_ctl_io ggio; + struct g_gate_hdr hdr; + char buf[MAXPHYS]; + ssize_t data; + + g_gate_log(LOG_NOTICE, "%s: started!", __func__); + + ggio.gctl_version = G_GATE_VERSION; + ggio.gctl_unit = unit; + ggio.gctl_data = buf; + + for (;;) { + data = g_gate_recv(recvfd, &hdr, sizeof(hdr), MSG_WAITALL); + if (reconnect) + break; g_gate_swap2h_hdr(&hdr); if (data != sizeof(hdr)) { - ggio.gctl_error = EIO; - goto done; + if (data == -1 && errno == EAGAIN) + continue; + g_gate_log(LOG_ERR, "Lost connection 3."); + reconnect = 1; + pthread_kill(sendtd, SIGUSR1); + break; } - if (ggio.gctl_cmd == BIO_READ) { - if (bsize < (size_t)ggio.gctl_length) { - ggio.gctl_data = realloc(ggio.gctl_data, - ggio.gctl_length); - if (ggio.gctl_data != NULL) - bsize = ggio.gctl_length; - else - g_gate_xlog("No memory."); - } - data = recv(sfd, ggio.gctl_data, ggio.gctl_length, - MSG_WAITALL); + g_gate_log(LOG_DEBUG, "Received hdr packet."); + + ggio.gctl_seq = hdr.gh_seq; + ggio.gctl_cmd = hdr.gh_cmd; + ggio.gctl_offset = hdr.gh_offset; + ggio.gctl_length = hdr.gh_length; + ggio.gctl_error = hdr.gh_error; + + if (ggio.gctl_error == 0 && ggio.gctl_cmd == GGATE_CMD_READ) { + data = g_gate_recv(recvfd, ggio.gctl_data, + ggio.gctl_length, MSG_WAITALL); + if (reconnect) + break; g_gate_log(LOG_DEBUG, "Received data packet."); if (data != ggio.gctl_length) { - ggio.gctl_error = EAGAIN; - goto done; + g_gate_log(LOG_ERR, "Lost connection 4."); + reconnect = 1; + pthread_kill(sendtd, SIGUSR1); + break; } g_gate_log(LOG_DEBUG, "Received %d bytes (offset=%ju, " "size=%zu).", data, (uintmax_t)hdr.gh_offset, (size_t)hdr.gh_length); } -done: + g_gate_ioctl(G_GATE_CMD_DONE, &ggio); - if (ggio.gctl_error == EAGAIN) - return (ggio.gctl_error); } - /* NOTREACHED */ - return (0); + g_gate_log(LOG_DEBUG, "%s: Died.", __func__); + pthread_exit(NULL); } -static void -serve_loop(int sfd) +static int +handshake(int dir) { + struct g_gate_version ver; + struct g_gate_cinit cinit; + struct g_gate_sinit sinit; + struct sockaddr_in serv; + int sfd; - for (;;) { - int error; + /* + * Do the network stuff. + */ + bzero(&serv, sizeof(serv)); + serv.sin_family = AF_INET; + serv.sin_addr.s_addr = g_gate_str2ip(host); + if (serv.sin_addr.s_addr == INADDR_NONE) { + g_gate_log(LOG_DEBUG, "Invalid IP/host name: %s.", host); + return (-1); + } + serv.sin_port = htons(port); + sfd = socket(AF_INET, SOCK_STREAM, 0); + if (sfd == -1) { + g_gate_log(LOG_DEBUG, "Cannot open socket: %s.", + strerror(errno)); + return (-1); + } + + g_gate_socket_settings(sfd); - error = serve(sfd); + if (connect(sfd, (struct sockaddr *)&serv, sizeof(serv)) == -1) { + g_gate_log(LOG_DEBUG, "Cannot connect to server: %s.", + strerror(errno)); close(sfd); - if (error != EAGAIN) - g_gate_xlog("%s.", strerror(error)); - sfd = handshake(); - if (sfd == -1) { - sleep(2); - continue; - } + return (-1); + } + + g_gate_log(LOG_INFO, "Connected to the server: %s:%d.", host, port); + + /* + * Create and send version packet. + */ + g_gate_log(LOG_DEBUG, "Sending version packet."); + assert(strlen(GGATE_MAGIC) == sizeof(ver.gv_magic)); + bcopy(GGATE_MAGIC, ver.gv_magic, sizeof(ver.gv_magic)); + ver.gv_version = GGATE_VERSION; + ver.gv_error = 0; + g_gate_swap2n_version(&ver); + if (g_gate_send(sfd, &ver, sizeof(ver), MSG_NOSIGNAL) == -1) { + g_gate_log(LOG_DEBUG, "Error while sending version packet: %s.", + strerror(errno)); + close(sfd); + return (-1); + } + bzero(&ver, sizeof(ver)); + if (g_gate_recv(sfd, &ver, sizeof(ver), MSG_WAITALL) == -1) { + g_gate_log(LOG_DEBUG, "Error while receiving data: %s.", + strerror(errno)); + close(sfd); + return (-1); + } + if (ver.gv_error != 0) { + g_gate_log(LOG_DEBUG, "Version verification problem: %s.", + strerror(errno)); + close(sfd); + return (-1); + } + + /* + * Create and send initial packet. + */ + g_gate_log(LOG_DEBUG, "Sending initial packet."); + if (strlcpy(cinit.gc_path, path, sizeof(cinit.gc_path)) >= + sizeof(cinit.gc_path)) { + g_gate_log(LOG_DEBUG, "Path name too long."); + close(sfd); + return (-1); + } + cinit.gc_flags = flags | dir; + cinit.gc_token = token; + cinit.gc_nconn = 2; + g_gate_swap2n_cinit(&cinit); + if (g_gate_send(sfd, &cinit, sizeof(cinit), MSG_NOSIGNAL) == -1) { + g_gate_log(LOG_DEBUG, "Error while sending initial packet: %s.", + strerror(errno)); + close(sfd); + return (-1); } + g_gate_swap2h_cinit(&cinit); + + /* + * Receiving initial packet from server. + */ + g_gate_log(LOG_DEBUG, "Receiving initial packet."); + if (g_gate_recv(sfd, &sinit, sizeof(sinit), MSG_WAITALL) == -1) { + g_gate_log(LOG_DEBUG, "Error while receiving data: %s.", + strerror(errno)); + close(sfd); + return (-1); + } + g_gate_swap2h_sinit(&sinit); + if (sinit.gs_error != 0) { + g_gate_log(LOG_DEBUG, "Error from server: %s.", + strerror(sinit.gs_error)); + close(sfd); + return (-1); + } + g_gate_log(LOG_DEBUG, "Received initial packet."); + + mediasize = sinit.gs_mediasize; + if (sectorsize == 0) + sectorsize = sinit.gs_sectorsize; + + return (sfd); } static void @@ -314,26 +366,87 @@ mydaemon(void) err(EXIT_FAILURE, "Cannot daemonize"); } +static int +g_gatec_connect(void) +{ + + token = arc4random(); + /* + * Our receive descriptor is connected to the send descriptor on the + * server side. + */ + recvfd = handshake(GGATE_FLAG_SEND); + if (recvfd == -1) + return (0); + /* + * Our send descriptor is connected to the receive descriptor on the + * server side. + */ + sendfd = handshake(GGATE_FLAG_RECV); + if (sendfd == -1) + return (0); + return (1); +} + static void -g_gatec_attach(void) +g_gatec_start(void) { - int sfd; + int error; - sfd = handshake(); - g_gate_log(LOG_DEBUG, "Worker created: %u.", getpid()); - mydaemon(); - serve_loop(sfd); + reconnect = 0; + error = pthread_create(&recvtd, NULL, recv_thread, NULL); + if (error != 0) { + g_gate_destroy(unit, 1); + g_gate_xlog("pthread_create(recv_thread): %s.", + strerror(error)); + } + sendtd = pthread_self(); + send_thread(NULL); + /* Disconnected. */ + close(sendfd); + close(recvfd); +} + +static void +signop(int sig __unused) +{ + + /* Do nothing. */ +} + +static void +g_gatec_loop(void) +{ + struct g_gate_ctl_cancel ggioc; + + signal(SIGUSR1, signop); + for (;;) { + g_gatec_start(); + g_gate_log(LOG_NOTICE, "Disconnected [%s %s]. Connecting...", + host, path); + while (!g_gatec_connect()) { + sleep(2); + g_gate_log(LOG_NOTICE, "Connecting [%s %s]...", host, + path); + } + ggioc.gctl_version = G_GATE_VERSION; + ggioc.gctl_unit = unit; + ggioc.gctl_seq = 0; + g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc); + } } static void g_gatec_create(void) { struct g_gate_ctl_create ggioc; - int sfd; - sfd = handshake(); - if (sfd == -1) - exit(EXIT_FAILURE); + if (!g_gatec_connect()) + g_gate_xlog("Cannot connect: %s.", strerror(errno)); + + /* + * Ok, got both sockets, time to create provider. + */ ggioc.gctl_version = G_GATE_VERSION; ggioc.gctl_mediasize = mediasize; ggioc.gctl_sectorsize = sectorsize; @@ -344,12 +457,29 @@ g_gatec_create(void) snprintf(ggioc.gctl_info, sizeof(ggioc.gctl_info), "%s:%u %s", host, port, path); g_gate_ioctl(G_GATE_CMD_CREATE, &ggioc); - g_gate_log(LOG_DEBUG, "Worker created: %u.", getpid()); if (unit == -1) printf("%s%u\n", G_GATE_PROVIDER_NAME, ggioc.gctl_unit); unit = ggioc.gctl_unit; + mydaemon(); - serve_loop(sfd); + g_gatec_loop(); +} + +static void +g_gatec_rescue(void) +{ + struct g_gate_ctl_cancel ggioc; + + if (!g_gatec_connect()) + g_gate_xlog("Cannot connect: %s.", strerror(errno)); + + ggioc.gctl_version = G_GATE_VERSION; + ggioc.gctl_unit = unit; + ggioc.gctl_seq = 0; + g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc); + + mydaemon(); + g_gatec_loop(); } int @@ -358,14 +488,14 @@ main(int argc, char *argv[]) if (argc < 2) usage(); - if (strcasecmp(argv[1], "attach") == 0) - action = ATTACH; - else if (strcasecmp(argv[1], "create") == 0) + if (strcasecmp(argv[1], "create") == 0) action = CREATE; else if (strcasecmp(argv[1], "destroy") == 0) action = DESTROY; else if (strcasecmp(argv[1], "list") == 0) action = LIST; + else if (strcasecmp(argv[1], "rescue") == 0) + action = RESCUE; else usage(); argc -= 1; @@ -383,12 +513,12 @@ main(int argc, char *argv[]) force = 1; break; case 'n': - if (action != ATTACH && action != CREATE) + if (action != CREATE && action != RESCUE) usage(); nagle = 0; break; case 'o': - if (action != ATTACH && action != CREATE) + if (action != CREATE && action != RESCUE) usage(); if (strcasecmp("ro", optarg) == 0) flags = G_GATE_FLAG_READONLY; @@ -402,7 +532,7 @@ main(int argc, char *argv[]) } break; case 'p': - if (action != ATTACH && action != CREATE) + if (action != CREATE && action != RESCUE) usage(); errno = 0; port = strtoul(optarg, NULL, 10); @@ -418,7 +548,7 @@ main(int argc, char *argv[]) errx(EXIT_FAILURE, "Invalid queue_size."); break; case 'R': - if (action != ATTACH && action != CREATE) + if (action != CREATE && action != RESCUE) usage(); errno = 0; rcvbuf = strtoul(optarg, NULL, 10); @@ -426,7 +556,7 @@ main(int argc, char *argv[]) errx(EXIT_FAILURE, "Invalid rcvbuf."); break; case 'S': - if (action != ATTACH && action != CREATE) + if (action != CREATE && action != RESCUE) usage(); errno = 0; sndbuf = strtoul(optarg, NULL, 10); @@ -468,18 +598,6 @@ main(int argc, char *argv[]) argv += optind; switch (action) { - case ATTACH: - if (argc != 2) - usage(); - if (unit == -1) { - fprintf(stderr, "Required unit number.\n"); - usage(); - } - g_gate_open_device(); - host = argv[0]; - path = argv[1]; - g_gatec_attach(); - break; case CREATE: if (argc != 2) usage(); @@ -501,6 +619,18 @@ main(int argc, char *argv[]) case LIST: g_gate_list(unit, g_gate_verbose); break; + case RESCUE: + if (argc != 2) + usage(); + if (unit == -1) { + fprintf(stderr, "Required unit number.\n"); + usage(); + } + g_gate_open_device(); + host = argv[0]; + path = argv[1]; + g_gatec_rescue(); + break; case UNSET: default: usage(); diff --git a/ggated/Makefile b/ggated/Makefile index fa5c0fb..4e7708e 100644 --- a/ggated/Makefile +++ b/ggated/Makefile @@ -6,6 +6,9 @@ PROG= ggated MAN= ggated.8 SRCS= ggated.c ggate.c +DPADD= ${LIBPTHREAD} +LDADD= -lpthread + CFLAGS+= -I${.CURDIR}/../shared .include diff --git a/ggated/ggated.c b/ggated/ggated.c index a0628ea..82d66e1 100644 --- a/ggated/ggated.c +++ b/ggated/ggated.c @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -44,6 +45,7 @@ #include #include #include +#include #include #include #include @@ -51,32 +53,58 @@ #include #include -#include #include "ggate.h" -#define G_GATED_EXPORT_FILE "/etc/gg.exports" -#define G_GATED_DEBUG(...) \ - if (g_gate_verbose) { \ - printf(__VA_ARGS__); \ - printf("\n"); \ - } +#define GGATED_EXPORT_FILE "/etc/gg.exports" + +struct ggd_connection { + off_t c_mediasize; + off_t c_sectorsize; + unsigned c_flags; /* flags (RO/RW) */ + int c_diskfd; + int c_sendfd; + int c_recvfd; + time_t c_birthtime; + char *c_path; + uint64_t c_token; + in_addr_t c_srcip; + LIST_ENTRY(ggd_connection) c_next; +}; -static const char *exports = G_GATED_EXPORT_FILE; -static int got_sighup = 0; -static int nagle = 1; -static unsigned rcvbuf = G_GATE_RCVBUF; -static unsigned sndbuf = G_GATE_SNDBUF; +struct ggd_request { + struct g_gate_hdr r_hdr; + char *r_data; + TAILQ_ENTRY(ggd_request) r_next; +}; +#define r_cmd r_hdr.gh_cmd +#define r_offset r_hdr.gh_offset +#define r_length r_hdr.gh_length +#define r_error r_hdr.gh_error -struct export { +struct ggd_export { char *e_path; /* path to device/file */ in_addr_t e_ip; /* remote IP address */ in_addr_t e_mask; /* IP mask */ unsigned e_flags; /* flags (RO/RW) */ - SLIST_ENTRY(export) e_next; + SLIST_ENTRY(ggd_export) e_next; }; -static SLIST_HEAD(, export) exports_list = - SLIST_HEAD_INITIALIZER(&exports_list); + +static const char *exports_file = GGATED_EXPORT_FILE; +static int got_sighup = 0; +in_addr_t bindaddr; + +static TAILQ_HEAD(, ggd_request) inqueue = TAILQ_HEAD_INITIALIZER(inqueue); +static TAILQ_HEAD(, ggd_request) outqueue = TAILQ_HEAD_INITIALIZER(outqueue); +pthread_mutex_t inqueue_mtx, outqueue_mtx; +pthread_cond_t inqueue_cond, outqueue_cond; + +static SLIST_HEAD(, ggd_export) exports = SLIST_HEAD_INITIALIZER(&exports); +static LIST_HEAD(, ggd_connection) connections = LIST_HEAD_INITIALIZER(&connection); + +static void *recv_thread(void *arg); +static void *disk_thread(void *arg); +static void *send_thread(void *arg); static void usage(void) @@ -118,7 +146,7 @@ countmask(unsigned m) static void line_parse(char *line, unsigned lineno) { - struct export *ex; + struct ggd_export *ex; char *word, *path, *sflags; unsigned flags, i, vmask; in_addr_t ip, mask; @@ -193,7 +221,7 @@ line_parse(char *line, unsigned lineno) ex->e_mask = mask; ex->e_flags = flags; - SLIST_INSERT_HEAD(&exports_list, ex, e_next); + SLIST_INSERT_HEAD(&exports, ex, e_next); g_gate_log(LOG_DEBUG, "Added %s/%u %s %s to exports list.", ip2str(ex->e_ip), vmask, path, sflags); @@ -202,11 +230,11 @@ line_parse(char *line, unsigned lineno) static void exports_clear(void) { - struct export *ex; + struct ggd_export *ex; - while (!SLIST_EMPTY(&exports_list)) { - ex = SLIST_FIRST(&exports_list); - SLIST_REMOVE_HEAD(&exports_list, e_next); + while (!SLIST_EMPTY(&exports)) { + ex = SLIST_FIRST(&exports); + SLIST_REMOVE_HEAD(&exports, e_next); free(ex); } } @@ -221,13 +249,13 @@ exports_get(void) exports_clear(); - fd = fopen(exports, "r"); + fd = fopen(exports_file, "r"); if (fd == NULL) { - g_gate_xlog("Cannot open exports file (%s): %s.", exports, + g_gate_xlog("Cannot open exports file (%s): %s.", exports_file, strerror(errno)); } - g_gate_log(LOG_INFO, "Reading exports file (%s).", exports); + g_gate_log(LOG_INFO, "Reading exports file (%s).", exports_file); for (;;) { if (fgets(buf, sizeof(buf), fd) == NULL) { @@ -270,236 +298,636 @@ exports_get(void) g_gate_log(LOG_INFO, "Exporting %u object(s).", objs); } -static struct export * -exports_find(struct sockaddr *s, const char *path) +static int +exports_check(struct ggd_export *ex, struct g_gate_cinit *cinit, + struct ggd_connection *conn) +{ + char ipmask[32]; /* 32 == strlen("xxx.xxx.xxx.xxx/xxx.xxx.xxx.xxx")+1 */ + int error = 0, flags; + + strlcpy(ipmask, ip2str(ex->e_ip), sizeof(ipmask)); + strlcat(ipmask, "/", sizeof(ipmask)); + strlcat(ipmask, ip2str(ex->e_mask), sizeof(ipmask)); + if ((cinit->gc_flags & GGATE_FLAG_RDONLY) != 0) { + if (ex->e_flags == O_WRONLY) { + g_gate_log(LOG_WARNING, "Read-only access requested, " + "but %s (%s) is exported write-only.", ex->e_path, + ipmask); + return (EPERM); + } else { + conn->c_flags |= GGATE_FLAG_RDONLY; + } + } else if ((cinit->gc_flags & GGATE_FLAG_WRONLY) != 0) { + if (ex->e_flags == O_RDONLY) { + g_gate_log(LOG_WARNING, "Write-only access requested, " + "but %s (%s) is exported read-only.", ex->e_path, + ipmask); + return (EPERM); + } else { + conn->c_flags |= GGATE_FLAG_WRONLY; + } + } else { + if (ex->e_flags == O_RDONLY) { + g_gate_log(LOG_WARNING, "Read-write access requested, " + "but %s (%s) is exported read-only.", ex->e_path, + ipmask); + return (EPERM); + } else if (ex->e_flags == O_WRONLY) { + g_gate_log(LOG_WARNING, "Read-write access requested, " + "but %s (%s) is exported write-only.", ex->e_path, + ipmask); + return (EPERM); + } + } + if ((conn->c_flags & GGATE_FLAG_RDONLY) != 0) + flags = O_RDONLY; + else if ((conn->c_flags & GGATE_FLAG_WRONLY) != 0) + flags = O_WRONLY; + else + flags = O_RDWR; + conn->c_diskfd = open(ex->e_path, flags); + if (conn->c_diskfd == -1) { + error = errno; + g_gate_log(LOG_ERR, "Cannot open %s: %s.", ex->e_path, + strerror(error)); + return (error); + } + return (0); +} + +static struct ggd_export * +exports_find(struct sockaddr *s, struct g_gate_cinit *cinit, + struct ggd_connection *conn) { - struct export *ex; + struct ggd_export *ex; in_addr_t ip; + int error; ip = htonl(((struct sockaddr_in *)(void *)s)->sin_addr.s_addr); - SLIST_FOREACH(ex, &exports_list, e_next) { - if ((ip & ex->e_mask) != ex->e_ip) + SLIST_FOREACH(ex, &exports, e_next) { + if ((ip & ex->e_mask) != ex->e_ip) { + g_gate_log(LOG_DEBUG, "exports[%s]: IP mismatch.", + ex->e_path); continue; - if (path != NULL && strcmp(path, ex->e_path) != 0) + } + if (strcmp(cinit->gc_path, ex->e_path) != 0) { + g_gate_log(LOG_DEBUG, "exports[%s]: Path mismatch.", + ex->e_path); continue; - - g_gate_log(LOG_INFO, "Connection from: %s.", ip2str(ip)); - return (ex); + } + error = exports_check(ex, cinit, conn); + if (error == 0) + return (ex); + else { + errno = error; + return (NULL); + } } - g_gate_log(LOG_INFO, "Unauthorized connection from: %s.", ip2str(ip)); - + g_gate_log(LOG_WARNING, "Unauthorized connection from: %s.", + ip2str(ip)); + errno = EPERM; return (NULL); } +/* + * Remove timed out connections. + */ static void -sendfail(int sfd, int error, const char *fmt, ...) +connection_cleanups(void) { - struct g_gate_sinit sinit; - va_list ap; - int data; - - sinit.gs_error = error; - g_gate_swap2n_sinit(&sinit); - data = send(sfd, &sinit, sizeof(sinit), 0); - g_gate_swap2h_sinit(&sinit); - if (data == -1) { - g_gate_xlog("Error while sending initial packet: %s.", - strerror(errno)); - } - if (fmt != NULL) { - va_start(ap, fmt); - g_gate_xvlog(fmt, ap); - /* NOTREACHED */ - va_end(ap); + struct ggd_connection *conn, *tconn; + time_t now; + + time(&now); + LIST_FOREACH_SAFE(conn, &connections, c_next, tconn) { + if (now - conn->c_birthtime > 10) { + LIST_REMOVE(conn, c_next); + g_gate_log(LOG_NOTICE, + "Connection from %s [%s] removed.", + ip2str(conn->c_srcip), conn->c_path); + close(conn->c_diskfd); + close(conn->c_sendfd); + close(conn->c_recvfd); + free(conn->c_path); + free(conn); + } } - exit(EXIT_FAILURE); } -static void -serve(int sfd, struct sockaddr *s) +static struct ggd_connection * +connection_find(struct g_gate_cinit *cinit) { - struct g_gate_cinit cinit; - struct g_gate_sinit sinit; - struct g_gate_hdr hdr; - struct export *ex; - char ipmask[32]; /* 32 == strlen("xxx.xxx.xxx.xxx/xxx.xxx.xxx.xxx")+1 */ - size_t bufsize; - int32_t error; - int fd, flags; - ssize_t data; - char *buf; + struct ggd_connection *conn; - g_gate_log(LOG_DEBUG, "Receiving initial packet."); - data = recv(sfd, &cinit, sizeof(cinit), MSG_WAITALL); - g_gate_swap2h_cinit(&cinit); - if (data == -1) { - g_gate_xlog("Error while receiving initial packet: %s.", - strerror(errno)); + LIST_FOREACH(conn, &connections, c_next) { + if (conn->c_token == cinit->gc_token) + break; } + return (conn); +} - ex = exports_find(s, cinit.gc_path); - if (ex == NULL) { - sendfail(sfd, EINVAL, "Requested path isn't exported: %s.", - strerror(errno)); +static struct ggd_connection * +connection_new(struct g_gate_cinit *cinit, struct sockaddr *s, int sfd) +{ + struct ggd_connection *conn; + in_addr_t ip; + + /* + * First, look for old connections. + * We probably should do it every X seconds, but what for? + * It is only dangerous if an attacker wants to overload connections + * queue, so here is a good place to do the cleanups. + */ + connection_cleanups(); + + conn = malloc(sizeof(*conn)); + if (conn == NULL) + return (NULL); + conn->c_path = strdup(cinit->gc_path); + if (conn->c_path == NULL) { + free(conn); + return (NULL); } + conn->c_token = cinit->gc_token; + ip = htonl(((struct sockaddr_in *)(void *)s)->sin_addr.s_addr); + conn->c_srcip = ip; + conn->c_sendfd = conn->c_recvfd = -1; + if ((cinit->gc_flags & GGATE_FLAG_SEND) != 0) + conn->c_sendfd = sfd; + else + conn->c_recvfd = sfd; + conn->c_mediasize = 0; + conn->c_sectorsize = 0; + time(&conn->c_birthtime); + conn->c_flags = cinit->gc_flags; + LIST_INSERT_HEAD(&connections, conn, c_next); + g_gate_log(LOG_DEBUG, "Connection created [%s, %s].", ip2str(ip), + conn->c_path); + return (conn); +} - error = 0; - strlcpy(ipmask, ip2str(ex->e_ip), sizeof(ipmask)); - strlcat(ipmask, "/", sizeof(ipmask)); - strlcat(ipmask, ip2str(ex->e_mask), sizeof(ipmask)); - if ((cinit.gc_flags & G_GATE_FLAG_READONLY) != 0) { - if (ex->e_flags == O_WRONLY) { - g_gate_log(LOG_ERR, "Read-only access requested, but " - "%s (%s) is exported write-only.", ex->e_path, - ipmask); - error = EPERM; - } else { - sinit.gs_flags = G_GATE_FLAG_READONLY; - } - } else if ((cinit.gc_flags & G_GATE_FLAG_WRITEONLY) != 0) { - if (ex->e_flags == O_RDONLY) { - g_gate_log(LOG_ERR, "Write-only access requested, but " - "%s (%s) is exported read-only.", ex->e_path, - ipmask); - error = EPERM; - } else { - sinit.gs_flags = G_GATE_FLAG_WRITEONLY; +static int +connection_add(struct ggd_connection *conn, struct g_gate_cinit *cinit, + struct sockaddr *s, int sfd) +{ + in_addr_t ip; + + ip = htonl(((struct sockaddr_in *)(void *)s)->sin_addr.s_addr); + if ((cinit->gc_flags & GGATE_FLAG_SEND) != 0) { + if (conn->c_sendfd != -1) { + g_gate_log(LOG_WARNING, + "Send socket already exists [%s, %s].", ip2str(ip), + conn->c_path); + return (EEXIST); } + conn->c_sendfd = sfd; } else { - if (ex->e_flags == O_RDONLY) { - g_gate_log(LOG_ERR, "Read-write access requested, but " - "%s (%s) is exported read-only.", ex->e_path, - ipmask); - error = EPERM; - } else if (ex->e_flags == O_WRONLY) { - g_gate_log(LOG_ERR, "Read-write access requested, but " - "%s (%s) is exported write-only.", ex->e_path, - ipmask); - error = EPERM; - } else { - sinit.gs_flags = 0; + if (conn->c_recvfd != -1) { + g_gate_log(LOG_WARNING, + "Receive socket already exists [%s, %s].", + ip2str(ip), conn->c_path); + return (EEXIST); } + conn->c_recvfd = sfd; } - if (error != 0) - sendfail(sfd, error, NULL); - flags = g_gate_openflags(sinit.gs_flags); - fd = open(ex->e_path, flags); - if (fd < 0) { - sendfail(sfd, errno, "Error while opening %s: %s.", ex->e_path, - strerror(errno)); + g_gate_log(LOG_DEBUG, "Connection added [%s, %s].", ip2str(ip), + conn->c_path); + return (0); +} + +/* + * Remove one socket from the given connection or the whole + * connection if sfd == -1. + */ +static void +connection_remove(struct ggd_connection *conn) +{ + + LIST_REMOVE(conn, c_next); + g_gate_log(LOG_DEBUG, "Connection removed [%s %s].", + ip2str(conn->c_srcip), conn->c_path); + if (conn->c_sendfd != -1) + close(conn->c_sendfd); + if (conn->c_recvfd != -1) + close(conn->c_recvfd); + free(conn->c_path); + free(conn); +} + +static int +connection_ready(struct ggd_connection *conn) +{ + + return (conn->c_sendfd != -1 && conn->c_recvfd != -1); +} + +static void +connection_launch(struct ggd_connection *conn) +{ + pthread_t td; + int error, pid; + + pid = fork(); + if (pid > 0) + return; + else if (pid == -1) { + g_gate_log(LOG_ERR, "Cannot fork: %s.", strerror(errno)); + return; } + g_gate_log(LOG_DEBUG, "Process created [%s].", conn->c_path); - g_gate_log(LOG_DEBUG, "Sending initial packet."); /* - * This field isn't used by ggc(8) for now. - * It should be used in future when user don't give device size. + * Create condition variables and mutexes for in-queue and out-queue + * synchronization. */ - sinit.gs_mediasize = g_gate_mediasize(fd); - sinit.gs_sectorsize = g_gate_sectorsize(fd); - sinit.gs_error = 0; + error = pthread_mutex_init(&inqueue_mtx, NULL); + if (error != 0) { + g_gate_xlog("pthread_mutex_init(inqueue_mtx): %s.", + strerror(error)); + } + error = pthread_cond_init(&inqueue_cond, NULL); + if (error != 0) { + g_gate_xlog("pthread_cond_init(inqueue_cond): %s.", + strerror(error)); + } + error = pthread_mutex_init(&outqueue_mtx, NULL); + if (error != 0) { + g_gate_xlog("pthread_mutex_init(outqueue_mtx): %s.", + strerror(error)); + } + error = pthread_cond_init(&outqueue_cond, NULL); + if (error != 0) { + g_gate_xlog("pthread_cond_init(outqueue_cond): %s.", + strerror(error)); + } + + /* + * Create threads: + * recvtd - thread for receiving I/O request + * diskio - thread for doing I/O request + * sendtd - thread for sending I/O requests back + */ + error = pthread_create(&td, NULL, send_thread, conn); + if (error != 0) { + g_gate_xlog("pthread_create(send_thread): %s.", + strerror(error)); + } + error = pthread_create(&td, NULL, recv_thread, conn); + if (error != 0) { + g_gate_xlog("pthread_create(recv_thread): %s.", + strerror(error)); + } + disk_thread(conn); +} + +static void +sendfail(int sfd, int error, const char *fmt, ...) +{ + struct g_gate_sinit sinit; + va_list ap; + ssize_t data; + + sinit.gs_error = error; g_gate_swap2n_sinit(&sinit); - data = send(sfd, &sinit, sizeof(sinit), 0); + data = g_gate_send(sfd, &sinit, sizeof(sinit), 0); g_gate_swap2h_sinit(&sinit); - if (data == -1) { - sendfail(sfd, errno, "Error while sending initial packet: %s.", + if (data != sizeof(sinit)) { + g_gate_log(LOG_WARNING, "Cannot send initial packet: %s.", strerror(errno)); + return; + } + if (fmt != NULL) { + va_start(ap, fmt); + g_gate_vlog(LOG_WARNING, fmt, ap); + va_end(ap); } +} - bufsize = G_GATE_BUFSIZE_START; - buf = malloc(bufsize); - if (buf == NULL) - g_gate_xlog("No enough memory."); +static void * +malloc_waitok(size_t size) +{ + void *p; - g_gate_log(LOG_DEBUG, "New process: %u.", getpid()); + while ((p = malloc(size)) == NULL) { + g_gate_log(LOG_DEBUG, "Cannot allocate %zu bytes.", size); + sleep(1); + } + return (p); +} + +static void * +recv_thread(void *arg) +{ + struct ggd_connection *conn; + struct ggd_request *req; + ssize_t data; + int error, fd; + conn = arg; + g_gate_log(LOG_NOTICE, "%s: started [%s]!", __func__, conn->c_path); + fd = conn->c_recvfd; for (;;) { /* - * Receive request. + * Get header packet. */ - data = recv(sfd, &hdr, sizeof(hdr), MSG_WAITALL); + req = malloc_waitok(sizeof(*req)); + data = g_gate_recv(fd, &req->r_hdr, sizeof(req->r_hdr), + MSG_WAITALL); if (data == 0) { g_gate_log(LOG_DEBUG, "Process %u exiting.", getpid()); exit(EXIT_SUCCESS); } else if (data == -1) { g_gate_xlog("Error while receiving hdr packet: %s.", strerror(errno)); - } else if (data != sizeof(hdr)) { + } else if (data != sizeof(req->r_hdr)) { g_gate_xlog("Malformed hdr packet received."); } g_gate_log(LOG_DEBUG, "Received hdr packet."); - g_gate_swap2h_hdr(&hdr); + g_gate_swap2h_hdr(&req->r_hdr); + + g_gate_log(LOG_DEBUG, "%s: offset=%jd length=%u", __func__, + (intmax_t)req->r_offset, (unsigned)req->r_length); /* - * Increase buffer if there is need to. + * Allocate memory for data. */ - if (hdr.gh_length > bufsize) { - bufsize = hdr.gh_length; - g_gate_log(LOG_DEBUG, "Increasing buffer to %u.", - bufsize); - buf = realloc(buf, bufsize); - if (buf == NULL) - g_gate_xlog("No enough memory."); - } + req->r_data = malloc_waitok(req->r_length); - if (hdr.gh_cmd == BIO_READ) { - if (pread(fd, buf, hdr.gh_length, - hdr.gh_offset) == -1) { - error = errno; - g_gate_log(LOG_ERR, "Error while reading data " - "(offset=%ju, size=%zu): %s.", - (uintmax_t)hdr.gh_offset, - (size_t)hdr.gh_length, strerror(error)); - } else { - error = 0; - } - hdr.gh_error = error; - g_gate_swap2n_hdr(&hdr); - if (send(sfd, &hdr, sizeof(hdr), 0) == -1) { - g_gate_xlog("Error while sending status: %s.", - strerror(errno)); - } - g_gate_swap2h_hdr(&hdr); - /* Send data only if there was no error while pread(). */ - if (error == 0) { - data = send(sfd, buf, hdr.gh_length, 0); - if (data == -1) { - g_gate_xlog("Error while sending data: " - "%s.", strerror(errno)); - } - g_gate_log(LOG_DEBUG, "Sent %d bytes " - "(offset=%ju, size=%zu).", data, - (uintmax_t)hdr.gh_offset, - (size_t)hdr.gh_length); - } - } else /* if (hdr.gh_cmd == BIO_WRITE) */ { + /* + * Receive data to write for WRITE request. + */ + if (req->r_cmd == GGATE_CMD_WRITE) { g_gate_log(LOG_DEBUG, "Waiting for %u bytes of data...", - hdr.gh_length); - data = recv(sfd, buf, hdr.gh_length, MSG_WAITALL); + req->r_length); + data = g_gate_recv(fd, req->r_data, req->r_length, + MSG_WAITALL); if (data == -1) { g_gate_xlog("Error while receiving data: %s.", strerror(errno)); } - if (pwrite(fd, buf, hdr.gh_length, hdr.gh_offset) == -1) { - error = errno; - g_gate_log(LOG_ERR, "Error while writing data " - "(offset=%llu, size=%u): %s.", - hdr.gh_offset, hdr.gh_length, - strerror(error)); - } else { - error = 0; + } + + /* + * Put the request onto the incoming queue. + */ + error = pthread_mutex_lock(&inqueue_mtx); + assert(error == 0); + TAILQ_INSERT_TAIL(&inqueue, req, r_next); + error = pthread_cond_signal(&inqueue_cond); + assert(error == 0); + error = pthread_mutex_unlock(&inqueue_mtx); + assert(error == 0); + } +} + +static void * +disk_thread(void *arg) +{ + struct ggd_connection *conn; + struct ggd_request *req; + ssize_t data; + int error, fd; + + conn = arg; + g_gate_log(LOG_NOTICE, "%s: started [%s]!", __func__, conn->c_path); + fd = conn->c_diskfd; + for (;;) { + /* + * Get a request from the incoming queue. + */ + error = pthread_mutex_lock(&inqueue_mtx); + assert(error == 0); + while ((req = TAILQ_FIRST(&inqueue)) == NULL) { + error = pthread_cond_wait(&inqueue_cond, &inqueue_mtx); + assert(error == 0); + } + TAILQ_REMOVE(&inqueue, req, r_next); + error = pthread_mutex_unlock(&inqueue_mtx); + assert(error == 0); + + /* + * Check the request. + */ + assert(req->r_cmd == GGATE_CMD_READ || req->r_cmd == GGATE_CMD_WRITE); + assert(req->r_offset + req->r_length <= (uintmax_t)conn->c_mediasize); + assert((req->r_offset % conn->c_sectorsize) == 0); + assert((req->r_length % conn->c_sectorsize) == 0); + + g_gate_log(LOG_DEBUG, "%s: offset=%jd length=%u", __func__, + (intmax_t)req->r_offset, (unsigned)req->r_length); + + /* + * Do the request. + */ + data = 0; + switch (req->r_cmd) { + case GGATE_CMD_READ: + data = pread(fd, req->r_data, req->r_length, + req->r_offset); + break; + case GGATE_CMD_WRITE: + data = pwrite(fd, req->r_data, req->r_length, + req->r_offset); + /* Free data memory here - better sooner. */ + free(req->r_data); + req->r_data = NULL; + break; + } + if (data != (ssize_t)req->r_length) { + /* Report short reads/writes as I/O errors. */ + if (errno == 0) + errno = EIO; + g_gate_log(LOG_ERR, "Disk error: %s", strerror(errno)); + req->r_error = errno; + if (req->r_data != NULL) { + free(req->r_data); + req->r_data = NULL; } - hdr.gh_error = error; - g_gate_swap2n_hdr(&hdr); - if (send(sfd, &hdr, sizeof(hdr), 0) == -1) { - g_gate_xlog("Error while sending status: %s.", + } + + /* + * Put the request onto the outgoing queue. + */ + error = pthread_mutex_lock(&outqueue_mtx); + assert(error == 0); + TAILQ_INSERT_TAIL(&outqueue, req, r_next); + error = pthread_cond_signal(&outqueue_cond); + assert(error == 0); + error = pthread_mutex_unlock(&outqueue_mtx); + assert(error == 0); + } +} + +static void * +send_thread(void *arg) +{ + struct ggd_connection *conn; + struct ggd_request *req; + ssize_t data; + int error, fd; + + conn = arg; + g_gate_log(LOG_NOTICE, "%s: started [%s]!", __func__, conn->c_path); + fd = conn->c_sendfd; + for (;;) { + /* + * Get a request from the outgoing queue. + */ + error = pthread_mutex_lock(&outqueue_mtx); + assert(error == 0); + while ((req = TAILQ_FIRST(&outqueue)) == NULL) { + error = pthread_cond_wait(&outqueue_cond, + &outqueue_mtx); + assert(error == 0); + } + TAILQ_REMOVE(&outqueue, req, r_next); + error = pthread_mutex_unlock(&outqueue_mtx); + assert(error == 0); + + g_gate_log(LOG_DEBUG, "%s: offset=%jd length=%u", __func__, + (intmax_t)req->r_offset, (unsigned)req->r_length); + + /* + * Send the request. + */ + g_gate_swap2n_hdr(&req->r_hdr); + if (g_gate_send(fd, &req->r_hdr, sizeof(req->r_hdr), 0) == -1) { + g_gate_xlog("Error while sending hdr packet: %s.", + strerror(errno)); + } + g_gate_log(LOG_DEBUG, "Sent hdr packet."); + g_gate_swap2h_hdr(&req->r_hdr); + if (req->r_data != NULL) { + data = g_gate_send(fd, req->r_data, req->r_length, 0); + if (data != (ssize_t)req->r_length) { + g_gate_xlog("Error while sending data: %s.", strerror(errno)); } - g_gate_swap2h_hdr(&hdr); - g_gate_log(LOG_DEBUG, "Received %d bytes (offset=%llu, " - "size=%u).", data, hdr.gh_offset, hdr.gh_length); + g_gate_log(LOG_DEBUG, + "Sent %zd bytes (offset=%ju, size=%zu).", data, + (uintmax_t)req->r_offset, (size_t)req->r_length); + free(req->r_data); + } + free(req); + } +} + +static void +log_connection(struct sockaddr *from) +{ + in_addr_t ip; + + ip = htonl(((struct sockaddr_in *)(void *)from)->sin_addr.s_addr); + g_gate_log(LOG_INFO, "Connection from: %s.", ip2str(ip)); +} + +static int +handshake(struct sockaddr *from, int sfd) +{ + struct g_gate_version ver; + struct g_gate_cinit cinit; + struct g_gate_sinit sinit; + struct ggd_connection *conn; + struct ggd_export *ex; + ssize_t data; + + log_connection(from); + /* + * Phase 1: Version verification. + */ + g_gate_log(LOG_DEBUG, "Receiving version packet."); + data = g_gate_recv(sfd, &ver, sizeof(ver), MSG_WAITALL); + g_gate_swap2h_version(&ver); + if (data != sizeof(ver)) { + g_gate_log(LOG_WARNING, "Malformed version packet."); + return (0); + } + g_gate_log(LOG_DEBUG, "Version packet received."); + if (memcmp(ver.gv_magic, GGATE_MAGIC, strlen(GGATE_MAGIC)) != 0) { + g_gate_log(LOG_WARNING, "Invalid magic field."); + return (0); + } + if (ver.gv_version != GGATE_VERSION) { + g_gate_log(LOG_WARNING, "Version %u is not supported.", + ver.gv_version); + return (0); + } + ver.gv_error = 0; + g_gate_swap2n_version(&ver); + data = g_gate_send(sfd, &ver, sizeof(ver), 0); + g_gate_swap2h_version(&ver); + if (data == -1) { + sendfail(sfd, errno, "Error while sending version packet: %s.", + strerror(errno)); + return (0); + } + + /* + * Phase 2: Request verification. + */ + g_gate_log(LOG_DEBUG, "Receiving initial packet."); + data = g_gate_recv(sfd, &cinit, sizeof(cinit), MSG_WAITALL); + g_gate_swap2h_cinit(&cinit); + if (data != sizeof(cinit)) { + g_gate_log(LOG_WARNING, "Malformed initial packet."); + return (0); + } + g_gate_log(LOG_DEBUG, "Initial packet received."); + conn = connection_find(&cinit); + if (conn != NULL) { + /* + * Connection should already exists. + */ + g_gate_log(LOG_DEBUG, "Found existing connection (token=%lu).", + (unsigned long)conn->c_token); + if (connection_add(conn, &cinit, from, sfd) == -1) { + connection_remove(conn); + return (0); + } + } else { + /* + * New connection, allocate space. + */ + conn = connection_new(&cinit, from, sfd); + if (conn == NULL) { + sendfail(sfd, ENOMEM, + "Cannot allocate new connection."); + return (0); } - g_gate_log(LOG_DEBUG, "Tick."); + g_gate_log(LOG_DEBUG, "New connection created (token=%lu).", + (unsigned long)conn->c_token); + } + + ex = exports_find(from, &cinit, conn); + if (ex == NULL) { + connection_remove(conn); + sendfail(sfd, errno, NULL); + return (0); + } + if (conn->c_mediasize == 0) { + conn->c_mediasize = g_gate_mediasize(conn->c_diskfd); + conn->c_sectorsize = g_gate_sectorsize(conn->c_diskfd); + } + sinit.gs_mediasize = conn->c_mediasize; + sinit.gs_sectorsize = conn->c_sectorsize; + sinit.gs_error = 0; + + g_gate_log(LOG_DEBUG, "Sending initial packet."); + + g_gate_swap2n_sinit(&sinit); + data = g_gate_send(sfd, &sinit, sizeof(sinit), 0); + g_gate_swap2h_sinit(&sinit); + if (data == -1) { + sendfail(sfd, errno, "Error while sending initial packet: %s.", + strerror(errno)); + return (0); } + + if (connection_ready(conn)) { + connection_launch(conn); + connection_remove(conn); + } + return (1); } static void @@ -514,12 +942,9 @@ main(int argc, char *argv[]) { struct sockaddr_in serv; struct sockaddr from; - in_addr_t bindaddr; socklen_t fromlen; - struct timeval tv; - int on, sfd, tmpsfd; - pid_t childpid; - unsigned bsize, port; + int sfd, tmpsfd; + unsigned port; bindaddr = htonl(INADDR_ANY); port = G_GATE_PORT; @@ -570,45 +995,27 @@ main(int argc, char *argv[]) argv += optind; if (argv[0] != NULL) - exports = argv[0]; + exports_file = argv[0]; exports_get(); if (!g_gate_verbose) { /* Run in daemon mode. */ if (daemon(0, 0) == -1) - g_gate_xlog("Can't daemonize: %s", strerror(errno)); + g_gate_xlog("Cannot daemonize: %s", strerror(errno)); } signal(SIGCHLD, SIG_IGN); sfd = socket(AF_INET, SOCK_STREAM, 0); if (sfd == -1) - g_gate_xlog("Can't open stream socket: %s.", strerror(errno)); + g_gate_xlog("Cannot open stream socket: %s.", strerror(errno)); bzero(&serv, sizeof(serv)); serv.sin_family = AF_INET; serv.sin_addr.s_addr = bindaddr; serv.sin_port = htons(port); - on = 1; - if (nagle) { - if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &on, - sizeof(on)) == -1) { - g_gate_xlog("setsockopt() error: %s.", strerror(errno)); - } - } - if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) - g_gate_xlog("setsockopt(): %s.", strerror(errno)); - bsize = rcvbuf; - if (setsockopt(sfd, SOL_SOCKET, SO_RCVBUF, &bsize, sizeof(bsize)) == -1) - g_gate_xlog("setsockopt(): %s.", strerror(errno)); - bsize = sndbuf; - if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &bsize, sizeof(bsize)) == -1) - g_gate_xlog("setsockopt(): %s.", strerror(errno)); - tv.tv_sec = 10; - tv.tv_usec = 0; - if (setsockopt(sfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1 || - setsockopt(sfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) { - g_gate_xlog("setsockopt() error: %s.", strerror(errno)); - } + + g_gate_socket_settings(sfd); + if (bind(sfd, (struct sockaddr *)&serv, sizeof(serv)) == -1) g_gate_xlog("bind(): %s.", strerror(errno)); if (listen(sfd, 5) == -1) @@ -629,21 +1036,8 @@ main(int argc, char *argv[]) exports_get(); } - if (exports_find(&from, NULL) == NULL) { + if (!handshake(&from, tmpsfd)) close(tmpsfd); - continue; - } - - childpid = fork(); - if (childpid < 0) { - g_gate_xlog("Cannot create child process: %s.", - strerror(errno)); - } else if (childpid == 0) { - close(sfd); - serve(tmpsfd, &from); - /* NOTREACHED */ - } - close(tmpsfd); } close(sfd); exit(EXIT_SUCCESS); diff --git a/shared/ggate.c b/shared/ggate.c index 08f13d7..c8428a6 100644 --- a/shared/ggate.c +++ b/shared/ggate.c @@ -38,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -206,17 +207,6 @@ g_gate_destroy(int unit, int force) g_gate_ioctl(G_GATE_CMD_DESTROY, &ggio); } -int -g_gate_openflags(unsigned ggflags) -{ - - if ((ggflags & G_GATE_FLAG_READONLY) != 0) - return (O_RDONLY); - else if ((ggflags & G_GATE_FLAG_WRITEONLY) != 0) - return (O_WRONLY); - return (O_RDWR); -} - void g_gate_load_module(void) { @@ -232,6 +222,76 @@ g_gate_load_module(void) } } +ssize_t +g_gate_send(int s, const void *buf, size_t len, int flags) +{ + ssize_t done = 0, done2; + const unsigned char *p = buf; + + while (len > 0) { + done2 = send(s, p, len, flags); + if (done2 == 0) + break; + else if (done2 == -1) { + if (errno == EAGAIN) { + printf("%s: EAGAIN\n", __func__); + continue; + } + done = -1; + break; + } + done += done2; + p += done2; + len -= done2; + } + return (done); +} + +ssize_t +g_gate_recv(int s, void *buf, size_t len, int flags) +{ + + return (recv(s, buf, len, flags)); +} + +int nagle = 1; +unsigned rcvbuf = G_GATE_RCVBUF; +unsigned sndbuf = G_GATE_SNDBUF; + +void +g_gate_socket_settings(int sfd) +{ + struct timeval tv; + int bsize, on; + + /* Socket settings. */ + on = 1; + if (nagle) { + if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &on, + sizeof(on)) == -1) { + g_gate_xlog("setsockopt() error: %s.", strerror(errno)); + } + } + if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) + g_gate_xlog("setsockopt(SO_REUSEADDR): %s.", strerror(errno)); + bsize = rcvbuf; + if (setsockopt(sfd, SOL_SOCKET, SO_RCVBUF, &bsize, sizeof(bsize)) == -1) + g_gate_xlog("setsockopt(SO_RCVBUF): %s.", strerror(errno)); + bsize = sndbuf; + if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &bsize, sizeof(bsize)) == -1) + g_gate_xlog("setsockopt(SO_SNDBUF): %s.", strerror(errno)); + tv.tv_sec = 1; + tv.tv_usec = 0; + if (setsockopt(sfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)) == -1) { + g_gate_log(LOG_ERR, "setsockopt(SO_SNDTIMEO) error: %s.", + strerror(errno)); + } + if (setsockopt(sfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) { + g_gate_log(LOG_ERR, "setsockopt(SO_RCVTIMEO) error: %s.", + strerror(errno)); + } +} + #ifdef LIBGEOM static struct gclass * find_class(struct gmesh *mesh, const char *name) diff --git a/shared/ggate.h b/shared/ggate.h index 12dfe6d..acbdaaa 100644 --- a/shared/ggate.h +++ b/shared/ggate.h @@ -32,22 +32,49 @@ #include #include -#define G_GATE_BUFSIZE_START 65536 #define G_GATE_PORT 3080 #define G_GATE_RCVBUF 131072 #define G_GATE_SNDBUF 131072 #define G_GATE_QUEUE_SIZE 1024 -#define G_GATE_TIMEOUT 30 +#define G_GATE_TIMEOUT 0 + +#define GGATE_MAGIC "GEOM_GATE " +#define GGATE_VERSION 0 + +#define GGATE_FLAG_RDONLY 0x0001 +#define GGATE_FLAG_WRONLY 0x0002 +/* + * If GGATE_FLAG_SEND not GGATE_FLAG_RECV flag is set, this is initial + * connection. + * If GGATE_FLAG_SEND flag is set - this is socket to send data. + * If GGATE_FLAG_RECV flag is set - this is socket to receive data. + */ +#define GGATE_FLAG_SEND 0x0004 +#define GGATE_FLAG_RECV 0x0008 + +#define GGATE_CMD_READ 0 +#define GGATE_CMD_WRITE 1 extern int g_gate_devfd; extern int g_gate_verbose; +extern int nagle; +extern unsigned rcvbuf, sndbuf; + +struct g_gate_version { + char gv_magic[16]; + uint16_t gv_version; + uint16_t gv_error; +} __packed; + /* Client's initial packet. */ struct g_gate_cinit { - char gc_path[PATH_MAX + 1]; - uint8_t gc_flags; -}; + char gc_path[PATH_MAX + 1]; + uint64_t gc_flags; + uint16_t gc_nconn; + uint32_t gc_token; +} __packed; /* Server's initial packet. */ struct g_gate_sinit { @@ -55,15 +82,16 @@ struct g_gate_sinit { uint64_t gs_mediasize; uint32_t gs_sectorsize; uint16_t gs_error; -}; +} __packed; /* Control struct. */ struct g_gate_hdr { uint8_t gh_cmd; /* command */ uint64_t gh_offset; /* device offset */ uint32_t gh_length; /* size of block */ - int16_t gh_error; /* error value (0 if ok) */ -}; + uint64_t gh_seq; /* request number */ + uint16_t gh_error; /* error value (0 if ok) */ +} __packed; void g_gate_vlog(int priority, const char *message, va_list ap); void g_gate_log(int priority, const char *message, ...); @@ -75,8 +103,10 @@ void g_gate_open_device(void); void g_gate_close_device(void); void g_gate_ioctl(unsigned long req, void *data); void g_gate_destroy(int unit, int force); -int g_gate_openflags(unsigned ggflags); void g_gate_load_module(void); +ssize_t g_gate_recv(int s, void *buf, size_t len, int flags); +ssize_t g_gate_send(int s, const void *buf, size_t len, int flags); +void g_gate_socket_settings(int sfd); #ifdef LIBGEOM void g_gate_list(int unit, int verbose); #endif @@ -89,17 +119,37 @@ in_addr_t g_gate_str2ip(const char *str); */ static __inline void -g_gate_swap2h_cinit(struct g_gate_cinit *cinit __unused) +g_gate_swap2h_version(struct g_gate_version *ver) +{ + + ver->gv_version = be16toh(ver->gv_version); + ver->gv_error = be16toh(ver->gv_error); +} + +static __inline void +g_gate_swap2n_version(struct g_gate_version *ver) +{ + + ver->gv_version = htobe16(ver->gv_version); + ver->gv_error = htobe16(ver->gv_error); +} + +static __inline void +g_gate_swap2h_cinit(struct g_gate_cinit *cinit) { - /* Nothing here for now. */ + cinit->gc_flags = be64toh(cinit->gc_flags); + cinit->gc_nconn = be16toh(cinit->gc_nconn); + cinit->gc_token = be32toh(cinit->gc_token); } static __inline void -g_gate_swap2n_cinit(struct g_gate_cinit *cinit __unused) +g_gate_swap2n_cinit(struct g_gate_cinit *cinit) { - /* Nothing here for now. */ + cinit->gc_flags = htobe64(cinit->gc_flags); + cinit->gc_nconn = htobe16(cinit->gc_nconn); + cinit->gc_token = htobe32(cinit->gc_token); } static __inline void @@ -129,6 +179,7 @@ g_gate_swap2h_hdr(struct g_gate_hdr *hdr) /* Swap only used fields. */ hdr->gh_offset = be64toh(hdr->gh_offset); hdr->gh_length = be32toh(hdr->gh_length); + hdr->gh_seq = be64toh(hdr->gh_seq); hdr->gh_error = be16toh(hdr->gh_error); } @@ -139,6 +190,7 @@ g_gate_swap2n_hdr(struct g_gate_hdr *hdr) /* Swap only used fields. */ hdr->gh_offset = htobe64(hdr->gh_offset); hdr->gh_length = htobe32(hdr->gh_length); + hdr->gh_seq = htobe64(hdr->gh_seq); hdr->gh_error = htobe16(hdr->gh_error); } #endif /* _GGATE_H_ */