/*- * SPDX-License-Identifier: BSD-2-Clause-FreeBSD * * Copyright (c) 2004 Pawel Jakub Dawidek * All rights reserved. * Copyright 2020 John-Mark Gurney * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * * $FreeBSD$ */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "ggate.h" static enum { UNSET, CREATE, DESTROY, LIST, RESCUE } action = UNSET; static const char *url = NULL; static int unit = G_GATE_UNIT_AUTO; static unsigned flags = 0; static int force = 0; static unsigned queue_size = G_GATE_QUEUE_SIZE; static off_t mediasize; static unsigned sectorsize = 4096; static unsigned timeout = G_GATE_TIMEOUT; static int pushfd, popfd; /* work semaphore */ static pthread_t reqtd, proctd; static unsigned maxconnections = 32; struct ggh_req { struct g_gate_ctl_io r_ggio; CURL *r_chandle; size_t r_bufoff; TAILQ_ENTRY(ggh_req) r_next; }; static TAILQ_HEAD(, ggh_req) procqueue = TAILQ_HEAD_INITIALIZER(procqueue); static sem_t nconn_sem; static pthread_mutex_t procqueue_mtx; static void usage(void) { fprintf(stderr, "usage: %s create [-v] [-o ] " "[-q queue_size] [-s sectorsize] [-r nrequests] " "[-t timeout] [-u unit] \n", getprogname()); fprintf(stderr, " %s rescue [-v] [-o ] " "[-r nrequests] <-u unit> \n", getprogname()); fprintf(stderr, " %s destroy [-f] <-u unit>\n", getprogname()); fprintf(stderr, " %s list [-v] [-u unit]\n", getprogname()); exit(EXIT_FAILURE); } static void * req_thread(void *arg __unused) { struct ggh_req *greq; static char *buf; int buflen = 1024*1024; int error; g_gate_log(LOG_NOTICE, "%s: started!", __func__); greq = NULL; for (;;) { if (greq == NULL) greq = malloc(sizeof *greq); if (buf == NULL) buf = malloc(buflen); if (greq == NULL || buf == NULL) { /* XXX */ g_gate_log(LOG_ERR, "Unable to allocate memory."); exit(1); } greq->r_ggio.gctl_version = G_GATE_VERSION; greq->r_ggio.gctl_unit = unit; greq->r_ggio.gctl_data = buf; greq->r_ggio.gctl_length = buflen; greq->r_ggio.gctl_error = 0; //g_gate_log(LOG_DEBUG, "waiting for ioctl"); g_gate_ioctl(G_GATE_CMD_START, &greq->r_ggio); //g_gate_log(LOG_DEBUG, "got ioctl"); error = greq->r_ggio.gctl_error; switch (error) { case 0: break; case ECANCELED: /* Exit gracefully. */ g_gate_close_device(); exit(EXIT_SUCCESS); case ENXIO: default: g_gate_xlog("ioctl(/dev/%s): %s.", G_GATE_CTL_NAME, strerror(error)); } g_gate_log(LOG_DEBUG, "ggio, ver: %u, unit: %d, seq: %llu, " "cmd: %u, offset: %llu, len: %llu", greq->r_ggio.gctl_version, greq->r_ggio.gctl_unit, greq->r_ggio.gctl_seq, greq->r_ggio.gctl_cmd, greq->r_ggio.gctl_offset, greq->r_ggio.gctl_length); switch (greq->r_ggio.gctl_cmd) { case BIO_READ: /* use a correctly sized allocation */ greq->r_ggio.gctl_data = malloc(greq->r_ggio.gctl_length); break; case BIO_WRITE: /* r_ggio takes ownership of buf now */ buf = NULL; break; case BIO_DELETE: case BIO_FLUSH: default: greq->r_ggio.gctl_error = EOPNOTSUPP; g_gate_ioctl(G_GATE_CMD_DONE, &greq->r_ggio); continue; /* return EOPNOTSUPP */ break; } //g_gate_log(LOG_DEBUG, "waiting for slot"); sem_wait(&nconn_sem); #if 0 int semval; sem_getvalue(&nconn_sem, &semval); g_gate_log(LOG_DEBUG, "slots: %d", semval); #endif error = pthread_mutex_lock(&procqueue_mtx); assert(error == 0); TAILQ_INSERT_TAIL(&procqueue, greq, r_next); error = pthread_mutex_unlock(&procqueue_mtx); assert(error == 0); /* notify processing thread a request is waiting */ error = write(pushfd, "T", 1); if (error != 1) g_gate_xlog("write pushfd: %d, error: %s.", error, strerror(error)); /* pass ownership */ greq = NULL; } g_gate_log(LOG_DEBUG, "%s: Died.", __func__); return (NULL); } /* * To support any auth: * https://curl.haxx.se/libcurl/c/anyauthput.html */ static curlioerr curl_ioctl(CURL *hndl, curliocmd cmd, void *userdata) { struct ggh_req *greq; (void)hndl; greq = (struct ggh_req *)userdata; switch (cmd) { case CURLIOCMD_RESTARTREAD: greq->r_bufoff = 0; break; default: return CURLIOE_UNKNOWNCMD; } return CURLIOE_OK; } /* * file the curl buffer with data to send to remote server. */ static size_t curl_readfun(char *buffer, size_t size, size_t nitems, void *userdata) { struct ggh_req *greq; size_t cnt; greq = (struct ggh_req *)userdata; cnt = MIN(size * nitems, greq->r_ggio.gctl_length - greq->r_bufoff); //g_gate_log(LOG_DEBUG, "sending %zd bytes on %p", cnt, greq); memcpy(buffer, (char *)greq->r_ggio.gctl_data + greq->r_bufoff, cnt); greq->r_bufoff += cnt; return cnt; } static size_t curl_writefun(char *buffer, size_t size, size_t nitems, void *userdata) { struct ggh_req *greq; size_t cnt; greq = (struct ggh_req *)userdata; cnt = size * nitems; assert((off_t)(greq->r_bufoff + cnt) <= greq->r_ggio.gctl_length); memcpy((char *)greq->r_ggio.gctl_data + greq->r_bufoff, buffer, cnt); greq->r_bufoff += cnt; return cnt; } static void process_greq(CURLM *cmulti, struct ggh_req *greq) { char range_header[256]; off_t start, length, end; /* start processing */ greq->r_chandle = curl_easy_init(); curl_easy_setopt(greq->r_chandle, CURLOPT_URL, url); curl_easy_setopt(greq->r_chandle, CURLOPT_PRIVATE, (char *)greq); //curl_easy_setopt(greq->r_chandle, CURLOPT_VERBOSE, (long)1); start = greq->r_ggio.gctl_offset; length = greq->r_ggio.gctl_length; end = start + length; greq->r_bufoff = 0; switch (greq->r_ggio.gctl_cmd) { case BIO_READ: curl_easy_setopt(greq->r_chandle, CURLOPT_WRITEFUNCTION, curl_writefun); curl_easy_setopt(greq->r_chandle, CURLOPT_WRITEDATA, greq); sprintf(range_header, "%zd-%zd", start, end - 1); g_gate_log(LOG_DEBUG, "read range: %s", range_header); curl_easy_setopt(greq->r_chandle, CURLOPT_RANGE, range_header); curl_multi_add_handle(cmulti, greq->r_chandle); break; case BIO_WRITE: curl_easy_setopt(greq->r_chandle, CURLOPT_IOCTLFUNCTION, curl_ioctl); curl_easy_setopt(greq->r_chandle, CURLOPT_IOCTLDATA, greq); curl_easy_setopt(greq->r_chandle, CURLOPT_READFUNCTION, curl_readfun); curl_easy_setopt(greq->r_chandle, CURLOPT_READDATA, greq); curl_easy_setopt(greq->r_chandle, CURLOPT_UPLOAD, (long)1); /* XXX - support more than basic */ //curl_easy_setopt(greq->r_chandle, CURLOPT_HTTPAUTH, (long)CURLAUTH_ANY); curl_easy_setopt(greq->r_chandle, CURLOPT_HTTPAUTH, (long)CURLAUTH_BASIC); //curl_easy_setopt(greq->r_chandle, CURLOPT_VERBOSE, (long)1); /* https://curl.haxx.se/mail/lib-2019-05/0012.html */ curl_easy_setopt(greq->r_chandle, CURLOPT_INFILESIZE_LARGE, (curl_off_t)length); /* we don't need resume from as we don't seek */ //curl_easy_setopt(greq->r_chandle, CURLOPT_RESUME_FROM_LARGE, (curl_off_t)start); sprintf(range_header, "Content-Range: bytes %zd-%zd/%zd", start, end - 1, mediasize); g_gate_log(LOG_DEBUG, "write range: %s", range_header); struct curl_slist *header_list; header_list = curl_slist_append(NULL, range_header); curl_easy_setopt(greq->r_chandle, CURLOPT_HTTPHEADER, header_list); #if 1 curl_multi_add_handle(cmulti, greq->r_chandle); #else CURLcode res; res = curl_easy_perform(greq->r_chandle); curl_easy_getinfo(greq->r_chandle, CURLINFO_RESPONSE_CODE, &code); if (code != 200) { g_gate_log(LOG_ERR, "Got invalid response, HTTP code %03d.", code); } #endif break; } /* start processing */ //curl_multi_add_handle(cmulti, greq->r_chandle); } static void * proc_thread(void *arg __unused) { char scratch[32]; struct timeval to; CURLMsg *m; CURLM *cmulti; struct ggh_req *greq; fd_set fdread; fd_set fdwrite; fd_set fdexcep; CURLMcode mc; long curl_timeo; long code; int rc; int maxfd; int error; int still_running; g_gate_log(LOG_NOTICE, "%s: started!", __func__); /* make sure we don't block on reading */ fcntl(popfd, F_SETFL, O_NONBLOCK); cmulti = curl_multi_init(); //mc = curl_multi_setopt(cmulti, CURLOPT_VERBOSE, (long)1); for (;;) { //g_gate_log(LOG_DEBUG, "looping"); /* setup polling loop */ maxfd = -1; FD_ZERO(&fdread); FD_ZERO(&fdwrite); FD_ZERO(&fdexcep); to = (struct timeval){ .tv_sec = 1 }; curl_timeo = -1; curl_multi_timeout(cmulti, &curl_timeo); if (curl_timeo >= 0) { to.tv_sec = curl_timeo / 1000; if (to.tv_sec > 1) to.tv_sec = 1; else to.tv_usec = (curl_timeo % 1000) * 1000; } mc = curl_multi_fdset(cmulti, &fdread, &fdwrite, &fdexcep, &maxfd); if (mc != CURLM_OK) { g_gate_log(LOG_ERR, "%s: fdset failed.", __func__); break; } /* add in the pop descriptor */ FD_SET(popfd, &fdread); maxfd = MAX(popfd, maxfd); rc = select(maxfd + 1, &fdread, &fdwrite, &fdexcep, &to); switch (rc) { case -1: g_gate_log(LOG_ERR, "%s: select failed: %s", __func__, strerror(errno)); break; case 0: default: curl_multi_perform(cmulti, &still_running); break; } /* Check for completed requests */ do { int msgq = 0; m = curl_multi_info_read(cmulti, &msgq); if (m != NULL && m->msg == CURLMSG_DONE) { CURL *e = m->easy_handle; curl_easy_getinfo(e, CURLINFO_PRIVATE, (char *)&greq); curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, &code); g_gate_log(LOG_DEBUG, "request code: %d", code); if (code != 206 && code != 204) { g_gate_log(LOG_ERR, "request failed: %d", code); greq->r_ggio.gctl_error = EIO; } g_gate_ioctl(G_GATE_CMD_DONE, &greq->r_ggio); //g_gate_log(LOG_DEBUG, "releasing slot"); sem_post(&nconn_sem); curl_multi_remove_handle(cmulti, e); curl_easy_cleanup(e); free(greq->r_ggio.gctl_data); free(greq); } else if (m != NULL) { g_gate_log(LOG_ERR, "unknown curl msg: %d", m->msg); } } while (m != NULL); if (FD_ISSET(popfd, &fdread)) { /* read off the tokens */ read(popfd, scratch, sizeof scratch); do { /* get the request */ error = pthread_mutex_lock(&procqueue_mtx); assert(error == 0); greq = TAILQ_FIRST(&procqueue); if (greq != NULL) TAILQ_REMOVE(&procqueue, greq, r_next); error = pthread_mutex_unlock(&procqueue_mtx); assert(error == 0); /* no more to process */ if (greq == NULL) break; process_greq(cmulti, greq); } while (greq != NULL); } } curl_multi_cleanup(cmulti); g_gate_log(LOG_DEBUG, "%s: Died.", __func__); pthread_exit(NULL); } static void mydaemon(void) { if (g_gate_verbose > 0) return; if (daemon(0, 0) == 0) return; if (action == CREATE) g_gate_destroy(unit, 1); err(EXIT_FAILURE, "Cannot daemonize"); } static int g_gatehttp_connect(void) { CURL *hndl; CURLcode cc; long code; curl_off_t cl; /* get the remote's size */ hndl = curl_easy_init(); curl_easy_setopt(hndl, CURLOPT_URL, url); curl_easy_setopt(hndl, CURLOPT_NOBODY, (long)1); //curl_easy_setopt(hndl, CURLOPT_VERBOSE, (long)1); cc = curl_easy_perform(hndl); if (cc != CURLE_OK) { g_gate_log(LOG_ERR, "curl request failed."); return 0; } curl_easy_getinfo(hndl, CURLINFO_RESPONSE_CODE, &code); if (code != 200) { g_gate_log(LOG_ERR, "Got invalid response, HTTP code %03d.", code); return 0; } curl_easy_getinfo(hndl, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &cl); mediasize = cl; g_gate_log(LOG_DEBUG, "got mediasize: %zd", mediasize); curl_easy_cleanup(hndl); return 1; } static void g_gatehttp_start(void) { int filedes[2]; int error; pipe(filedes); pushfd = filedes[1]; popfd = filedes[0]; error = pthread_mutex_init(&procqueue_mtx, NULL); if (error != 0) { g_gate_xlog("pthread_mutex_init(inqueue_mtx): %s.", strerror(error)); } sem_init(&nconn_sem, 0, maxconnections); error = pthread_create(&proctd, NULL, proc_thread, NULL); if (error != 0) { g_gate_destroy(unit, 1); g_gate_xlog("pthread_create(proc_thread): %s.", strerror(error)); } pthread_set_name_np(proctd, "proc"); reqtd = pthread_self(); pthread_set_name_np(reqtd, "req"); req_thread(NULL); /* Disconnected. */ close(pushfd); close(popfd); } static void signop(int sig __unused) { /* Do nothing. */ } static void g_gatehttp_loop(void) { struct g_gate_ctl_cancel ggioc; signal(SIGUSR1, signop); for (;;) { g_gatehttp_start(); g_gate_log(LOG_NOTICE, "Disconnected [%s]. Connecting...", url); ggioc.gctl_version = G_GATE_VERSION; ggioc.gctl_unit = unit; ggioc.gctl_seq = 0; g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc); } } static void g_gatehttp_create(void) { struct g_gate_ctl_create ggioc; if (!g_gatehttp_connect()) g_gate_xlog("Cannot connect: %s.", strerror(errno)); /* * Ok, got both sockets, time to create provider. */ memset(&ggioc, 0, sizeof(ggioc)); ggioc.gctl_version = G_GATE_VERSION; ggioc.gctl_mediasize = mediasize; ggioc.gctl_sectorsize = sectorsize; ggioc.gctl_flags = flags; ggioc.gctl_maxcount = queue_size; ggioc.gctl_timeout = timeout; ggioc.gctl_unit = unit; snprintf(ggioc.gctl_info, sizeof(ggioc.gctl_info), "%s", url); g_gate_ioctl(G_GATE_CMD_CREATE, &ggioc); if (unit == -1) { printf("%s%u\n", G_GATE_PROVIDER_NAME, ggioc.gctl_unit); fflush(stdout); } unit = ggioc.gctl_unit; mydaemon(); g_gatehttp_loop(); } static void g_gatehttp_rescue(void) { struct g_gate_ctl_cancel ggioc; if (!g_gatehttp_connect()) g_gate_xlog("Cannot connect: %s.", strerror(errno)); ggioc.gctl_version = G_GATE_VERSION; ggioc.gctl_unit = unit; ggioc.gctl_seq = 0; g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc); mydaemon(); g_gatehttp_loop(); } int main(int argc, char *argv[]) { if (argc < 2) usage(); if (strcasecmp(argv[1], "create") == 0) action = CREATE; else if (strcasecmp(argv[1], "destroy") == 0) action = DESTROY; else if (strcasecmp(argv[1], "list") == 0) action = LIST; else if (strcasecmp(argv[1], "rescue") == 0) action = RESCUE; else usage(); argc -= 1; argv += 1; for (;;) { int ch; ch = getopt(argc, argv, "fo:q:r:s:t:u:v"); if (ch == -1) break; switch (ch) { case 'f': if (action != DESTROY) usage(); force = 1; break; case 'o': if (action != CREATE && action != RESCUE) usage(); if (strcasecmp("ro", optarg) == 0) flags = G_GATE_FLAG_READONLY; else if (strcasecmp("wo", optarg) == 0) flags = G_GATE_FLAG_WRITEONLY; else if (strcasecmp("rw", optarg) == 0) flags = 0; else { errx(EXIT_FAILURE, "Invalid argument for '-o' option."); } break; case 'q': if (action != CREATE) usage(); errno = 0; queue_size = strtoul(optarg, NULL, 10); if (queue_size == 0 && errno != 0) errx(EXIT_FAILURE, "Invalid queue_size."); break; case 'r': if (action != CREATE && action != RESCUE) usage(); errno = 0; maxconnections = strtoul(optarg, NULL, 10); if (maxconnections == 0 && errno != 0) errx(EXIT_FAILURE, "Invalid queue_size."); break; case 's': if (action != CREATE) usage(); errno = 0; sectorsize = strtoul(optarg, NULL, 10); if (sectorsize == 0 && errno != 0) errx(EXIT_FAILURE, "Invalid sectorsize."); break; case 't': if (action != CREATE) usage(); errno = 0; timeout = strtoul(optarg, NULL, 10); if (timeout == 0 && errno != 0) errx(EXIT_FAILURE, "Invalid timeout."); break; case 'u': errno = 0; unit = strtol(optarg, NULL, 10); if (unit == 0 && errno != 0) errx(EXIT_FAILURE, "Invalid unit number."); break; case 'v': if (action == DESTROY) usage(); g_gate_verbose++; break; default: usage(); } } argc -= optind; argv += optind; switch (action) { case CREATE: if (argc != 1) usage(); g_gate_load_module(); g_gate_open_device(); url = argv[0]; g_gatehttp_create(); break; case DESTROY: if (unit == -1) { fprintf(stderr, "Required unit number.\n"); usage(); } g_gate_verbose = 1; g_gate_open_device(); g_gate_destroy(unit, force); break; case LIST: g_gate_list(unit, g_gate_verbose); break; case RESCUE: if (argc != 1) usage(); if (unit == -1) { fprintf(stderr, "Required unit number.\n"); usage(); } g_gate_open_device(); url = argv[0]; g_gatehttp_rescue(); break; case UNSET: default: usage(); } g_gate_close_device(); exit(EXIT_SUCCESS); }