Browse Source

make ggatessh work.. it uses libssh2, but it is recommended to use

a patched version that does not do read ahead.  It also supports
resizing when rescuing and the file has changed sizes..
tags/ggatessh-v1.0.0
John-Mark Gurney 4 years ago
parent
commit
1c8b4c849f
12 changed files with 1439 additions and 994 deletions
  1. +2
    -2
      .gitignore
  2. +2
    -2
      Makefile
  3. +11
    -15
      README.md
  4. +0
    -19
      ggatehttp/Makefile
  5. +0
    -770
      ggatehttp/ggatehttp.c
  6. +19
    -0
      ggatessh/Makefile
  7. +0
    -0
      ggatessh/Makefile.depend
  8. +52
    -34
      ggatessh/ggatessh.8
  9. +1064
    -0
      ggatessh/ggatessh.c
  10. +1
    -1
      tests/Makefile
  11. +0
    -151
      tests/ggatehttp_test.sh
  12. +288
    -0
      tests/ggatessh_test.sh

+ 2
- 2
.gitignore View File

@@ -3,6 +3,6 @@
*.8.gz
*.full
*.debug
ggatehttp/ggatehttp
ggatessh/ggatessh
tests/Kyuafile
tests/ggatehttp_test
tests/ggatessh_test

+ 2
- 2
Makefile View File

@@ -1,9 +1,9 @@
# $FreeBSD$

SUBDIR= ${_ggatehttp} \
SUBDIR= ${_ggatessh} \
tests

_ggatehttp= ggatehttp
_ggatessh= ggatessh

.PHONY: devtest



+ 11
- 15
README.md View File

@@ -1,20 +1,16 @@
https ggate working tree
========================
ggatessh working tree
=====================

This is a working tree for ggate work.

This is a variant of ggatec using http(s) GET/PUT instead of talking
to ggated.
This is a variant of ggatec using sftp instead of talking to ggated.

Note that when I started on this project, that this would be completely
standards complaint. After running into an issue w/ the wsgidav server
not supporting partial PUTs, I did some research, and came across this
[post](https://blog.sphere.chronosempire.org.uk/2012/11/21/webdav-and-the-http-patch-nightmare)
that talks about how the IETF intentionally broke partial PUTs, despite
having the same problems w/ other parts of their spec.
It uses the libssh2 library with a few modifications. The build system
will be updated to compile and use libssh2. There is at least one minor
modification needed to turn off read-ahead for sftp support.

Servers known to work:
- apache 2.2.x: can truncate file under some conditions
Services known to not work:
- wsgidav (Python): Does not implement partial PUT
By default, libssh2 assumes that you'll read a whole file sequentially
in blocking mode, and if it does not do this, there will be pipeline
stalls. In our case, this would result in lots of wasted bandwith as
we will be seeking around the file to read and write, and so this
feature needs to be disabled.

+ 0
- 19
ggatehttp/Makefile View File

@@ -1,19 +0,0 @@
# $FreeBSD$

.PATH: ${.CURDIR:H}/shared

PROG= ggatehttp
MAN= ggatehttp.8
SRCS= ggatehttp.c ggate.c

CFLAGS+= -DMAX_SEND_SIZE=32768
CFLAGS+= -DLIBGEOM
CFLAGS+= -I${.CURDIR:H}/shared
CFLAGS+= -I/usr/local/include
#CFLAGS+= -O0
#CFLAGS+= -fprofile-instr-generate

LDFLAGS+= -L/usr/local/lib
LDADD= -lgeom -lutil -lpthread -lcurl

.include <bsd.prog.mk>

+ 0
- 770
ggatehttp/ggatehttp.c View File

@@ -1,770 +0,0 @@
/*-
* SPDX-License-Identifier: BSD-2-Clause-FreeBSD
*
* Copyright (c) 2004 Pawel Jakub Dawidek <pjd@FreeBSD.org>
* All rights reserved.
* Copyright 2020 John-Mark Gurney <jmg@FreeBSD.org>
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* $FreeBSD$
*/

#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <ctype.h>
#include <libgen.h>
#include <pthread.h>
#include <pthread_np.h>
#include <signal.h>
#include <err.h>
#include <errno.h>
#include <assert.h>

#include <sys/param.h>
#include <sys/ioctl.h>
#include <sys/queue.h>
#include <sys/socket.h>
#include <sys/sysctl.h>
#include <sys/syslog.h>
#include <sys/time.h>
#include <sys/bio.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>

#include <semaphore.h>

#include <curl/curl.h>
#include <curl/multi.h>

#include <geom/gate/g_gate.h>
#include "ggate.h"

static enum { UNSET, CREATE, DESTROY, LIST, RESCUE } action = UNSET;

static const char *url = NULL;
static int unit = G_GATE_UNIT_AUTO;
static unsigned flags = 0;
static int force = 0;
static unsigned queue_size = G_GATE_QUEUE_SIZE;
static off_t mediasize;
static unsigned sectorsize = 4096;
static unsigned timeout = G_GATE_TIMEOUT;
static int pushfd, popfd; /* work semaphore */
static pthread_t reqtd, proctd;
static unsigned maxconnections = 32;

struct ggh_req {
struct g_gate_ctl_io r_ggio;
CURL *r_chandle;
size_t r_bufoff;
TAILQ_ENTRY(ggh_req) r_next;
};

static TAILQ_HEAD(, ggh_req) procqueue = TAILQ_HEAD_INITIALIZER(procqueue);
static sem_t nconn_sem;
static pthread_mutex_t procqueue_mtx;

static void
usage(void)
{

fprintf(stderr, "usage: %s create [-v] [-o <ro|wo|rw>] "
"[-q queue_size] [-s sectorsize] [-r nrequests] "
"[-t timeout] [-u unit] <url>\n", getprogname());
fprintf(stderr, " %s rescue [-v] [-o <ro|wo|rw>] "
"[-r nrequests] <-u unit> <url>\n", getprogname());
fprintf(stderr, " %s destroy [-f] <-u unit>\n", getprogname());
fprintf(stderr, " %s list [-v] [-u unit]\n", getprogname());
exit(EXIT_FAILURE);
}

static void *
req_thread(void *arg __unused)
{
struct ggh_req *greq;
static char *buf;
int buflen = 1024*1024;
int error;

g_gate_log(LOG_NOTICE, "%s: started!", __func__);

greq = NULL;

for (;;) {
if (greq == NULL)
greq = malloc(sizeof *greq);

if (buf == NULL)
buf = malloc(buflen);

if (greq == NULL || buf == NULL) {
/* XXX */
g_gate_log(LOG_ERR, "Unable to allocate memory.");
exit(1);
}

greq->r_ggio.gctl_version = G_GATE_VERSION;
greq->r_ggio.gctl_unit = unit;
greq->r_ggio.gctl_data = buf;
greq->r_ggio.gctl_length = buflen;
greq->r_ggio.gctl_error = 0;

//g_gate_log(LOG_DEBUG, "waiting for ioctl");
g_gate_ioctl(G_GATE_CMD_START, &greq->r_ggio);
//g_gate_log(LOG_DEBUG, "got ioctl");

error = greq->r_ggio.gctl_error;
switch (error) {
case 0:
break;
case ECANCELED:
/* Exit gracefully. */
g_gate_close_device();
exit(EXIT_SUCCESS);
case ENXIO:
default:
g_gate_xlog("ioctl(/dev/%s): %s.", G_GATE_CTL_NAME,
strerror(error));
}

g_gate_log(LOG_DEBUG, "ggio, ver: %u, unit: %d, seq: %llu, "
"cmd: %u, offset: %llu, len: %llu",
greq->r_ggio.gctl_version, greq->r_ggio.gctl_unit,
greq->r_ggio.gctl_seq, greq->r_ggio.gctl_cmd,
greq->r_ggio.gctl_offset, greq->r_ggio.gctl_length);

switch (greq->r_ggio.gctl_cmd) {
case BIO_READ:
/* use a correctly sized allocation */
greq->r_ggio.gctl_data =
malloc(greq->r_ggio.gctl_length);
break;
case BIO_WRITE:
/* r_ggio takes ownership of buf now */
buf = NULL;
break;

case BIO_DELETE:
case BIO_FLUSH:
default:
greq->r_ggio.gctl_error = EOPNOTSUPP;
g_gate_ioctl(G_GATE_CMD_DONE, &greq->r_ggio);
continue; /* return EOPNOTSUPP */
break;
}

//g_gate_log(LOG_DEBUG, "waiting for slot");
sem_wait(&nconn_sem);
#if 0
int semval;
sem_getvalue(&nconn_sem, &semval);
g_gate_log(LOG_DEBUG, "slots: %d", semval);
#endif

error = pthread_mutex_lock(&procqueue_mtx);
assert(error == 0);
TAILQ_INSERT_TAIL(&procqueue, greq, r_next);
error = pthread_mutex_unlock(&procqueue_mtx);
assert(error == 0);

/* notify processing thread a request is waiting */
error = write(pushfd, "T", 1);
if (error != 1)
g_gate_xlog("write pushfd: %d, error: %s.", error,
strerror(error));

/* pass ownership */
greq = NULL;
}
g_gate_log(LOG_DEBUG, "%s: Died.", __func__);
return (NULL);
}

/*
* To support any auth:
* https://curl.haxx.se/libcurl/c/anyauthput.html
*/
static curlioerr
curl_ioctl(CURL *hndl, curliocmd cmd, void *userdata)
{
struct ggh_req *greq;

(void)hndl;

greq = (struct ggh_req *)userdata;

switch (cmd) {
case CURLIOCMD_RESTARTREAD:
greq->r_bufoff = 0;
break;

default:
return CURLIOE_UNKNOWNCMD;
}

return CURLIOE_OK;
}

/*
* file the curl buffer with data to send to remote server.
*/
static size_t
curl_readfun(char *buffer, size_t size, size_t nitems, void *userdata)
{
struct ggh_req *greq;
size_t cnt;

greq = (struct ggh_req *)userdata;

cnt = MIN(size * nitems, greq->r_ggio.gctl_length - greq->r_bufoff);

//g_gate_log(LOG_DEBUG, "sending %zd bytes on %p", cnt, greq);

memcpy(buffer, (char *)greq->r_ggio.gctl_data + greq->r_bufoff, cnt);

greq->r_bufoff += cnt;

return cnt;
}

static size_t
curl_writefun(char *buffer, size_t size, size_t nitems, void *userdata)
{
struct ggh_req *greq;
size_t cnt;

greq = (struct ggh_req *)userdata;

cnt = size * nitems;

assert((off_t)(greq->r_bufoff + cnt) <= greq->r_ggio.gctl_length);

memcpy((char *)greq->r_ggio.gctl_data + greq->r_bufoff, buffer, cnt);

greq->r_bufoff += cnt;

return cnt;
}

static void
process_greq(CURLM *cmulti, struct ggh_req *greq)
{
char range_header[256];
off_t start, length, end;

/* start processing */
greq->r_chandle = curl_easy_init();

curl_easy_setopt(greq->r_chandle, CURLOPT_URL, url);
curl_easy_setopt(greq->r_chandle, CURLOPT_PRIVATE, (char *)greq);
//curl_easy_setopt(greq->r_chandle, CURLOPT_VERBOSE, (long)1);

start = greq->r_ggio.gctl_offset;
length = greq->r_ggio.gctl_length;
end = start + length;

greq->r_bufoff = 0;
switch (greq->r_ggio.gctl_cmd) {
case BIO_READ:
curl_easy_setopt(greq->r_chandle, CURLOPT_WRITEFUNCTION,
curl_writefun);
curl_easy_setopt(greq->r_chandle, CURLOPT_WRITEDATA, greq);

sprintf(range_header, "%zd-%zd", start, end - 1);
g_gate_log(LOG_DEBUG, "read range: %s", range_header);
curl_easy_setopt(greq->r_chandle, CURLOPT_RANGE, range_header);
curl_multi_add_handle(cmulti, greq->r_chandle);
break;

case BIO_WRITE:
curl_easy_setopt(greq->r_chandle, CURLOPT_IOCTLFUNCTION,
curl_ioctl);
curl_easy_setopt(greq->r_chandle, CURLOPT_IOCTLDATA, greq);
curl_easy_setopt(greq->r_chandle, CURLOPT_READFUNCTION,
curl_readfun);
curl_easy_setopt(greq->r_chandle, CURLOPT_READDATA, greq);
curl_easy_setopt(greq->r_chandle, CURLOPT_UPLOAD, (long)1);
/* XXX - support more than basic */
//curl_easy_setopt(greq->r_chandle, CURLOPT_HTTPAUTH, (long)CURLAUTH_ANY);
curl_easy_setopt(greq->r_chandle, CURLOPT_HTTPAUTH,
(long)CURLAUTH_BASIC);

//curl_easy_setopt(greq->r_chandle, CURLOPT_VERBOSE, (long)1);

/* https://curl.haxx.se/mail/lib-2019-05/0012.html */
curl_easy_setopt(greq->r_chandle, CURLOPT_INFILESIZE_LARGE,
(curl_off_t)length);
/* we don't need resume from as we don't seek */
//curl_easy_setopt(greq->r_chandle, CURLOPT_RESUME_FROM_LARGE, (curl_off_t)start);
sprintf(range_header, "Content-Range: bytes %zd-%zd/%zd",
start, end - 1, mediasize);
g_gate_log(LOG_DEBUG, "write range: %s", range_header);

struct curl_slist *header_list;
header_list = curl_slist_append(NULL, range_header);
curl_easy_setopt(greq->r_chandle, CURLOPT_HTTPHEADER,
header_list);

#if 1
curl_multi_add_handle(cmulti, greq->r_chandle);
#else
CURLcode res;
res = curl_easy_perform(greq->r_chandle);
curl_easy_getinfo(greq->r_chandle, CURLINFO_RESPONSE_CODE,
&code);
if (code != 200) {
g_gate_log(LOG_ERR,
"Got invalid response, HTTP code %03d.", code);
}
#endif
break;
}

/* start processing */
//curl_multi_add_handle(cmulti, greq->r_chandle);
}

static void *
proc_thread(void *arg __unused)
{
char scratch[32];
struct timeval to;
CURLMsg *m;
CURLM *cmulti;
struct ggh_req *greq;
fd_set fdread;
fd_set fdwrite;
fd_set fdexcep;
CURLMcode mc;
long curl_timeo;
long code;
int rc;
int maxfd;
int error;
int still_running;

g_gate_log(LOG_NOTICE, "%s: started!", __func__);

/* make sure we don't block on reading */
fcntl(popfd, F_SETFL, O_NONBLOCK);

cmulti = curl_multi_init();
//mc = curl_multi_setopt(cmulti, CURLOPT_VERBOSE, (long)1);
for (;;) {
//g_gate_log(LOG_DEBUG, "looping");

/* setup polling loop */
maxfd = -1;
FD_ZERO(&fdread);
FD_ZERO(&fdwrite);
FD_ZERO(&fdexcep);
to = (struct timeval){ .tv_sec = 1 };
curl_timeo = -1;
curl_multi_timeout(cmulti, &curl_timeo);
if (curl_timeo >= 0) {
to.tv_sec = curl_timeo / 1000;
if (to.tv_sec > 1)
to.tv_sec = 1;
else
to.tv_usec = (curl_timeo % 1000) * 1000;
}
mc = curl_multi_fdset(cmulti, &fdread, &fdwrite, &fdexcep, &maxfd);
if (mc != CURLM_OK) {
g_gate_log(LOG_ERR, "%s: fdset failed.", __func__);
break;
}

/* add in the pop descriptor */
FD_SET(popfd, &fdread);
maxfd = MAX(popfd, maxfd);

rc = select(maxfd + 1, &fdread, &fdwrite, &fdexcep, &to);
switch (rc) {
case -1:
g_gate_log(LOG_ERR, "%s: select failed: %s", __func__,
strerror(errno));
break;
case 0:
default:
curl_multi_perform(cmulti, &still_running);
break;
}

/* Check for completed requests */
do {
int msgq = 0;
m = curl_multi_info_read(cmulti, &msgq);
if (m != NULL && m->msg == CURLMSG_DONE) {
CURL *e = m->easy_handle;

curl_easy_getinfo(e, CURLINFO_PRIVATE,
(char *)&greq);
curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE,
&code);
g_gate_log(LOG_DEBUG, "request code: %d", code);
if (code != 206 && code != 204) {
g_gate_log(LOG_ERR,
"request failed: %d", code);
greq->r_ggio.gctl_error = EIO;
}

g_gate_ioctl(G_GATE_CMD_DONE, &greq->r_ggio);

//g_gate_log(LOG_DEBUG, "releasing slot");
sem_post(&nconn_sem);

curl_multi_remove_handle(cmulti, e);
curl_easy_cleanup(e);

free(greq->r_ggio.gctl_data);
free(greq);
} else if (m != NULL) {
g_gate_log(LOG_ERR, "unknown curl msg: %d",
m->msg);
}
} while (m != NULL);

if (FD_ISSET(popfd, &fdread)) {
/* read off the tokens */
read(popfd, scratch, sizeof scratch);

do {
/* get the request */
error = pthread_mutex_lock(&procqueue_mtx);
assert(error == 0);
greq = TAILQ_FIRST(&procqueue);
if (greq != NULL)
TAILQ_REMOVE(&procqueue, greq, r_next);
error = pthread_mutex_unlock(&procqueue_mtx);
assert(error == 0);

/* no more to process */
if (greq == NULL)
break;

process_greq(cmulti, greq);
} while (greq != NULL);
}
}

curl_multi_cleanup(cmulti);
g_gate_log(LOG_DEBUG, "%s: Died.", __func__);
pthread_exit(NULL);
}

static void
mydaemon(void)
{

if (g_gate_verbose > 0)
return;
if (daemon(0, 0) == 0)
return;
if (action == CREATE)
g_gate_destroy(unit, 1);
err(EXIT_FAILURE, "Cannot daemonize");
}

static int
g_gatehttp_connect(void)
{
CURL *hndl;
CURLcode cc;
long code;
curl_off_t cl;

/* get the remote's size */
hndl = curl_easy_init();
curl_easy_setopt(hndl, CURLOPT_URL, url);
curl_easy_setopt(hndl, CURLOPT_NOBODY, (long)1);
//curl_easy_setopt(hndl, CURLOPT_VERBOSE, (long)1);

cc = curl_easy_perform(hndl);

if (cc != CURLE_OK) {
g_gate_log(LOG_ERR, "curl request failed.");
return 0;
}

curl_easy_getinfo(hndl, CURLINFO_RESPONSE_CODE, &code);
if (code != 200) {
g_gate_log(LOG_ERR, "Got invalid response, HTTP code %03d.", code);
return 0;
}

curl_easy_getinfo(hndl, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &cl);
mediasize = cl;
g_gate_log(LOG_DEBUG, "got mediasize: %zd", mediasize);

curl_easy_cleanup(hndl);

return 1;
}

static void
g_gatehttp_start(void)
{
int filedes[2];
int error;

pipe(filedes);
pushfd = filedes[1];
popfd = filedes[0];

error = pthread_mutex_init(&procqueue_mtx, NULL);
if (error != 0) {
g_gate_xlog("pthread_mutex_init(inqueue_mtx): %s.",
strerror(error));
}

sem_init(&nconn_sem, 0, maxconnections);

error = pthread_create(&proctd, NULL, proc_thread, NULL);
if (error != 0) {
g_gate_destroy(unit, 1);
g_gate_xlog("pthread_create(proc_thread): %s.",
strerror(error));
}
pthread_set_name_np(proctd, "proc");

reqtd = pthread_self();
pthread_set_name_np(reqtd, "req");
req_thread(NULL);

/* Disconnected. */
close(pushfd);
close(popfd);
}

static void
signop(int sig __unused)
{

/* Do nothing. */
}

static void
g_gatehttp_loop(void)
{
struct g_gate_ctl_cancel ggioc;

signal(SIGUSR1, signop);
for (;;) {
g_gatehttp_start();
g_gate_log(LOG_NOTICE, "Disconnected [%s]. Connecting...",
url);

ggioc.gctl_version = G_GATE_VERSION;
ggioc.gctl_unit = unit;
ggioc.gctl_seq = 0;
g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc);
}
}

static void
g_gatehttp_create(void)
{
struct g_gate_ctl_create ggioc;

if (!g_gatehttp_connect())
g_gate_xlog("Cannot connect: %s.", strerror(errno));

/*
* Ok, got both sockets, time to create provider.
*/
memset(&ggioc, 0, sizeof(ggioc));
ggioc.gctl_version = G_GATE_VERSION;
ggioc.gctl_mediasize = mediasize;
ggioc.gctl_sectorsize = sectorsize;
ggioc.gctl_flags = flags;
ggioc.gctl_maxcount = queue_size;
ggioc.gctl_timeout = timeout;
ggioc.gctl_unit = unit;
snprintf(ggioc.gctl_info, sizeof(ggioc.gctl_info), "%s", url);
g_gate_ioctl(G_GATE_CMD_CREATE, &ggioc);
if (unit == -1) {
printf("%s%u\n", G_GATE_PROVIDER_NAME, ggioc.gctl_unit);
fflush(stdout);
}
unit = ggioc.gctl_unit;

mydaemon();
g_gatehttp_loop();
}

static void
g_gatehttp_rescue(void)
{
struct g_gate_ctl_cancel ggioc;

if (!g_gatehttp_connect())
g_gate_xlog("Cannot connect: %s.", strerror(errno));

ggioc.gctl_version = G_GATE_VERSION;
ggioc.gctl_unit = unit;
ggioc.gctl_seq = 0;
g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc);

mydaemon();
g_gatehttp_loop();
}

int
main(int argc, char *argv[])
{

if (argc < 2)
usage();
if (strcasecmp(argv[1], "create") == 0)
action = CREATE;
else if (strcasecmp(argv[1], "destroy") == 0)
action = DESTROY;
else if (strcasecmp(argv[1], "list") == 0)
action = LIST;
else if (strcasecmp(argv[1], "rescue") == 0)
action = RESCUE;
else
usage();
argc -= 1;
argv += 1;
for (;;) {
int ch;

ch = getopt(argc, argv, "fo:q:r:s:t:u:v");
if (ch == -1)
break;
switch (ch) {
case 'f':
if (action != DESTROY)
usage();
force = 1;
break;
case 'o':
if (action != CREATE && action != RESCUE)
usage();
if (strcasecmp("ro", optarg) == 0)
flags = G_GATE_FLAG_READONLY;
else if (strcasecmp("wo", optarg) == 0)
flags = G_GATE_FLAG_WRITEONLY;
else if (strcasecmp("rw", optarg) == 0)
flags = 0;
else {
errx(EXIT_FAILURE,
"Invalid argument for '-o' option.");
}
break;
case 'q':
if (action != CREATE)
usage();
errno = 0;
queue_size = strtoul(optarg, NULL, 10);
if (queue_size == 0 && errno != 0)
errx(EXIT_FAILURE, "Invalid queue_size.");
break;
case 'r':
if (action != CREATE && action != RESCUE)
usage();
errno = 0;
maxconnections = strtoul(optarg, NULL, 10);
if (maxconnections == 0 && errno != 0)
errx(EXIT_FAILURE, "Invalid queue_size.");
break;
case 's':
if (action != CREATE)
usage();
errno = 0;
sectorsize = strtoul(optarg, NULL, 10);
if (sectorsize == 0 && errno != 0)
errx(EXIT_FAILURE, "Invalid sectorsize.");
break;
case 't':
if (action != CREATE)
usage();
errno = 0;
timeout = strtoul(optarg, NULL, 10);
if (timeout == 0 && errno != 0)
errx(EXIT_FAILURE, "Invalid timeout.");
break;
case 'u':
errno = 0;
unit = strtol(optarg, NULL, 10);
if (unit == 0 && errno != 0)
errx(EXIT_FAILURE, "Invalid unit number.");
break;
case 'v':
if (action == DESTROY)
usage();
g_gate_verbose++;
break;
default:
usage();
}
}
argc -= optind;
argv += optind;

switch (action) {
case CREATE:
if (argc != 1)
usage();
g_gate_load_module();
g_gate_open_device();
url = argv[0];
g_gatehttp_create();
break;
case DESTROY:
if (unit == -1) {
fprintf(stderr, "Required unit number.\n");
usage();
}
g_gate_verbose = 1;
g_gate_open_device();
g_gate_destroy(unit, force);
break;
case LIST:
g_gate_list(unit, g_gate_verbose);
break;
case RESCUE:
if (argc != 1)
usage();
if (unit == -1) {
fprintf(stderr, "Required unit number.\n");
usage();
}
g_gate_open_device();
url = argv[0];
g_gatehttp_rescue();
break;
case UNSET:
default:
usage();
}
g_gate_close_device();
exit(EXIT_SUCCESS);
}

+ 19
- 0
ggatessh/Makefile View File

@@ -0,0 +1,19 @@
# $FreeBSD$

.PATH: ${.CURDIR:H}/shared

PROG= ggatessh
MAN= ggatessh.8
SRCS= ggatessh.c ggate.c

CFLAGS+= -DMAX_SEND_SIZE=32768
CFLAGS+= -DLIBGEOM
CFLAGS+= -I${.CURDIR:H}/shared
CFLAGS+= -I/home/freebsd/libssh2/include
#CFLAGS+= -O0 -g
#CFLAGS+= -fprofile-instr-generate

LDFLAGS+= -L/home/freebsd/libssh2/src/.libs
LDADD= -lgeom -lutil -lpthread -lssh2

.include <bsd.prog.mk>

ggatehttp/Makefile.depend → ggatessh/Makefile.depend View File


ggatehttp/ggatehttp.8 → ggatessh/ggatessh.8 View File

@@ -24,35 +24,37 @@
.\"
.\" $FreeBSD$
.\"
.Dd September 8, 2016
.Dt GGATEC 8
.Dd October 21, 2020
.Dt GGATESSH 8
.Os
.Sh NAME
.Nm ggatec
.Nd "GEOM Gate network client and control utility"
.Nm ggatessh
.Nd "GEOM Gate SSH/SFTP client and control utility"
.Sh SYNOPSIS
.Nm
.Cm create
.Op Fl n
.Op Fl v
.Op Fl o Cm ro | wo | rw
.Op Fl F Ar pidfile
.Op Fl i Ar identifyfile
.Op Fl l Ar username
.Op Fl p Ar port
.Op Fl q Ar queue_size
.Op Fl R Ar rcvbuf
.Op Fl S Ar sndbuf
.Op Fl s Ar sectorsize
.Op Fl r Ar nrequests
.Op Fl t Ar timeout
.Op Fl u Ar unit
.Ar host
.Ar path
.Nm
.Cm rescue
.Op Fl n
.Op Fl v
.Op Fl o Cm ro | wo | rw
.Op Fl F Ar pidfile
.Op Fl i Ar identifyfile
.Op Fl l Ar username
.Op Fl p Ar port
.Op Fl R Ar rcvbuf
.Op Fl S Ar sndbuf
.Op Fl r Ar nrequests
.Fl u Ar unit
.Ar host
.Ar path
@@ -70,26 +72,28 @@ The
utility is a network client for the GEOM Gate class.
It is responsible for the creation of
.Nm ggate
devices and forwarding I/O requests between the
.Nm GEOM Gate
kernel subsystem and the
.Xr ggated 8
network daemon.
devices and forwarding I/O requests between the GEOM Gate
kernel subsystem and an
.Xr sftp-server 8
over SSH.
.Pp
Available commands:
.Bl -tag -width ".Cm destroy"
.It Cm create
Connect to a
.Xr ggated 8
daemon on the specified host and create a
Connect to an
.Xr sftp-server 8
via SSH on the specified host and create a
.Nm ggate
provider for the specified remote file or device.
.It Cm rescue
Create a new connection after the
.Nm
process has died or been killed.
process has died or has been killed.
The new connection to the
.Xr ggated 8
daemon handles pending and future requests.
.Xr sftp-server 8
handles pending and future requests.
If the remote file has changed in size, the ggate device
will be resized to match the new remote size.
.It Cm destroy
Destroy the given
.Nm ggate
@@ -102,14 +106,28 @@ providers.
.Pp
Available options:
.Bl -tag -width ".Fl s Cm ro | wo | rw"
.It Fl F Ar pidfile
Write out the daemon's pid to
.Ar pidfile .
The default is
.Pa /var/run/ggatessh.ggate<unit>.pid .
.It Fl f
Forcibly destroy
.Nm ggate
provider (cancels all pending requests).
.It Fl n
Do not use
.Dv TCP_NODELAY
option on TCP sockets.
.It Fl i Ar identityfile
The path to the identity file to use for ssh public key authentication.
If the
.Ar identityfile
is not specified, the default of
.Pa $HOME/.ssh/id_rsa
will be used.
.It Fl l Ar username
The user name to authentice to the server with.
If
.Ar username
is not specified, the default will be
.Ev $USER .
.It Fl o Cm ro | wo | rw
Specify permissions to use when opening the file or device: read-only
.Pq Cm ro ,
@@ -121,17 +139,17 @@ Default is
.Cm rw .
.It Fl p Ar port
Port to connect to on the remote host.
Default is 3080.
Default is 22.
.It Fl q Ar queue_size
Number of pending I/O requests that can be queued before they will
start to be canceled.
Default is 1024.
.It Fl R Ar rcvbuf
Size of receive buffer to use.
When not specified, the system default is used.
.It Fl S Ar sndbuf
Size of send buffer to use.
When not specified, the system default is used.
.It Fl r Ar nrequests
Specifies how many requests may be outstanding at a single time.
This determines that maximum number of connections to the
.Xr sftp-server 8
that will be established at a time.
If unspecified, the default is 32.
.It Fl s Ar sectorsize
Sector size for
.Nm ggate
@@ -146,9 +164,9 @@ Unit number to use.
Do not fork, run in foreground and print debug information on standard
output.
.It Ar host
Remote host to connect to.
Remote SSH server to connect to.
.It Ar path
Path to a regular file or device.
Path to a regular file or device on the remote host.
.El
.Sh EXIT STATUS
Exit status is 0 on success, or 1 if the command fails.

+ 1064
- 0
ggatessh/ggatessh.c
File diff suppressed because it is too large
View File


+ 1
- 1
tests/Makefile View File

@@ -4,6 +4,6 @@ PACKAGE= tests

TESTSDIR= ${TESTSBASE}/sys/geom/class/gate

ATF_TESTS_SH+= ggatehttp_test
ATF_TESTS_SH+= ggatessh_test

.include <bsd.test.mk>

+ 0
- 151
tests/ggatehttp_test.sh View File

@@ -1,151 +0,0 @@
# $FreeBSD$

PIDFILE=ggated.pid
TESTURL="$GGATEHTTP_URL"
TEMPFILE="random.data"

atf_test_case ggatehttp cleanup
ggatehttp_head()
{
atf_set "descr" "ggatehttp can proxy to http"
atf_set "require.progs" "ggatehttp"
atf_set "require.user" "root"
atf_set "timeout" 10
}

ggatehttp_body()
{
load_ggate
us=$(alloc_ggate_dev)
src=$(alloc_md)

n1mchunks=10

atf_check -e ignore -o ignore \
dd if=/dev/random of="$TEMPFILE" bs=1m count=$n1mchunks conv=notrunc

atf_check ggatehttp create -u $us "$TESTURL"

ggate_dev=/dev/ggate${us}

wait_for_ggate_device ${ggate_dev}

# Test writing
atf_check -e ignore -o ignore \
dd if="$TEMPFILE" of=${ggate_dev} bs=1m count=$n1mchunks conv=notrunc

# Test reading
atf_check -e ignore -o ignore \
dd of="$TEMPFILE"2 if=${ggate_dev} bs=1m count=$n1mchunks conv=notrunc

ls -l "$TEMPFILE" "$TEMPFILE"2

# Verify that we read what we wrote
atf_check cmp "$TEMPFILE" "$TEMPFILE"2

rm "$TEMPFILE" "$TEMPFILE"2
}

ggatehttp_cleanup()
{
common_cleanup
}

atf_init_test_cases()
{
atf_add_test_case ggatehttp
}

alloc_ggate_dev()
{
local us

us=0
while [ -c /dev/ggate${us} ]; do
: $(( us += 1 ))
done
echo ${us} > ggate.devs
echo ${us}
}

alloc_md()
{
local md

md=$(mdconfig -a -t malloc -s 1M) || \
atf_fail "failed to allocate md device"
echo ${md} >> md.devs
echo ${md}
}

checksum()
{
local src work
src=$1
work=$2

src_checksum=$(md5 -q $src)
work_checksum=$(md5 -q $work)

if [ "$work_checksum" != "$src_checksum" ]; then
atf_fail "work md5 checksum didn't match"
fi

ggate_checksum=$(md5 -q /dev/ggate${us})
if [ "$ggate_checksum" != "$src_checksum" ]; then
atf_fail "ggate md5 checksum didn't match"
fi
}

common_cleanup()
{
if [ -f "ggate.devs" ]; then
while read test_ggate; do
ggatec destroy -f -u $test_ggate >/dev/null
done < ggate.devs
rm ggate.devs
fi

if [ -f "$PIDFILE" ]; then
pkill -F "$PIDFILE"
rm $PIDFILE
fi

if [ -f "PLAINFILES" ]; then
while read f; do
rm -f ${f}
done < ${PLAINFILES}
rm ${PLAINFILES}
fi

if [ -f "md.devs" ]; then
while read test_md; do
mdconfig -d -u $test_md 2>/dev/null
done < md.devs
rm md.devs
fi
true
}

load_ggate()
{
local class=gate

# If the geom class isn't already loaded, try loading it.
if ! kldstat -q -m g_${class}; then
if ! geom ${class} load; then
atf_skip "could not load module for geom class=${class}"
fi
fi
}

# Bug 204616: ggatel(8) creates /dev/ggate* asynchronously if `ggatel create`
# isn't called with `-v`.
wait_for_ggate_device()
{
ggate_device=$1

while [ ! -c $ggate_device ]; do
sleep 0.5
done
}

+ 288
- 0
tests/ggatessh_test.sh View File

@@ -0,0 +1,288 @@
# $FreeBSD$

PIDFILE=ggatessh.pid
TESTIMG="test.img"
TEMPFILE="random.data"
SFTPSERVER="/usr/libexec/sftp-server"
PORT=2222

atf_test_case ggatessh cleanup
ggatessh_head()
{
atf_set "descr" "ggatessh can proxy to sftp"
atf_set "require.progs" "ggatessh"
atf_set "require.user" "root"
atf_set "timeout" 20
}

ggatessh_body()
{
load_ggate
us=$(alloc_ggate_dev)

n1mchunks=3
secsize=4096

atf_check -e ignore -o ignore \
dd if=/dev/random of="$TEMPFILE" bs=1m count=$n1mchunks conv=notrunc

startup_sshd

truncate -s ${n1mchunks}m "$TESTIMG"
# sshd authenticates and switches to USER
chown "$USER" "$TESTIMG"

echo 'WARNING: ggatessh error messages goes to syslog (aka /var/log/messages)'

atf_check ggatessh create -i "$(pwd)/id_rsa" -p "$PORT" -F "$PIDFILE" -u $us -l "$USER" 127.0.0.1 "$(pwd)/$TESTIMG"

ggate_dev=/dev/ggate${us}

wait_for_ggate_device ${ggate_dev}

# make sure it has correct size and sector sizekj
read _dev _secsize _size _nsecs _stripesize _stripeoff <<EOF
$(diskinfo /dev/ggate$us)
EOF
atf_check_equal "$_secsize" $secsize
atf_check_equal "$_size" $(($n1mchunks * 1024 * 1024))
atf_check_equal "$_nsecs" $(($n1mchunks * 1024 * 1024 / $secsize))

# Test writing
atf_check -e ignore -o ignore \
dd if="$TEMPFILE" of=${ggate_dev} bs=1m count=$n1mchunks conv=notrunc

# Test reading
atf_check -e ignore -o ignore \
dd of="$TEMPFILE"2 if=${ggate_dev} bs=1m count=$n1mchunks conv=notrunc

# Verify that we read what we wrote
atf_check cmp "$TEMPFILE" "$TEMPFILE"2

# Verify that the image matches
atf_check cmp "$TEMPFILE" "$TESTIMG"

rm "$TEMPFILE" "$TEMPFILE"2
}

ggatessh_cleanup()
{

common_cleanup
}

atf_test_case ggatessh_resize cleanup
ggatessh_resize_head()
{
atf_set "descr" "ggatessh will resize the devices"
atf_set "require.progs" "ggatessh"
atf_set "require.user" "root"
atf_set "timeout" 20
}

ggatessh_resize_body()
{

n1mchunks=4
secsize=4096
us=$(alloc_ggate_dev)

startup_sshd

truncate -s ${n1mchunks}m "$TESTIMG"
# sshd authenticates and switches to USER
chown "$USER" "$TESTIMG"

echo 'WARNING: ggatessh error messages goes to syslog (aka /var/log/messages)'

atf_check ggatessh create -i "$(pwd)/id_rsa" -p "$PORT" -F "$PIDFILE" -u $us -l "$USER" 127.0.0.1 "$(pwd)/$TESTIMG"

ggate_dev=/dev/ggate${us}

wait_for_ggate_device ${ggate_dev}

# make sure it has correct size and sector sizekj
read _dev _secsize _size _nsecs _stripesize _stripeoff <<EOF
$(diskinfo /dev/ggate$us)
EOF
atf_check_equal "$_secsize" $secsize
atf_check_equal "$_size" $(($n1mchunks * 1024 * 1024))
atf_check_equal "$_nsecs" $(($n1mchunks * 1024 * 1024 / $secsize))

# kill off old ggate
pkill -F "$PIDFILE"

# Test resizing
n1mchunks=6
truncate -s ${n1mchunks}m "$TESTIMG"

ps auxwww | grep ggatessh
cat "$PIDFILE"

sleep 1

# restart ggate
atf_check ggatessh rescue -v -i "$(pwd)/id_rsa" -p "$PORT" -F "$PIDFILE" -u $us -l "$USER" 127.0.0.1 "$(pwd)/$TESTIMG" &

sleep 1

# make sure it has correct size and sector size
read _dev _secsize _size _nsecs _stripesize _stripeoff <<EOF
$(diskinfo /dev/ggate$us)
EOF
atf_check_equal "$_secsize" $secsize
atf_check_equal "$_size" $(($n1mchunks * 1024 * 1024))
atf_check_equal "$_nsecs" $(($n1mchunks * 1024 * 1024 / $secsize))

dd if=/dev/ggate$us of=/dev/null bs=1m
}

ggatessh_resize_cleanup()
{

common_cleanup
}

atf_init_test_cases()
{
atf_add_test_case ggatessh
atf_add_test_case ggatessh_resize
}

alloc_ggate_dev()
{
local us

us=0
while [ -c /dev/ggate${us} ]; do
: $(( us += 1 ))
done
echo ${us} > ggate.devs
echo ${us}
}

alloc_md()
{
local md

md=$(mdconfig -a -t malloc -s 1M) || \
atf_fail "failed to allocate md device"
echo ${md} >> md.devs
echo ${md}
}

# slightly modified from:
# https://serverfault.com/questions/344295/is-it-possible-to-run-sshd-as-a-normal-user
startup_sshd()
{
# Host keys
ssh-keygen -f ssh_host_rsa_key -N '' -t rsa > /dev/null

# user key
ssh-keygen -f id_rsa -N '' -t rsa > /dev/null

(echo -n 'command="/usr/libexec/sftp-server" '; cat id_rsa.pub) > authorized_keys

cat > sshd_config <<EOF
ListenAddress 127.0.0.1:$PORT
HostKey /home/freebsd/custom_ssh/ssh_host_rsa_key
AuthorizedKeysFile $(pwd)/authorized_keys
ChallengeResponseAuthentication no
PasswordAuthentication no
# to allow writable tmp w/ sticky bit
StrictModes no
UsePAM no
Subsystem sftp ${SFTPSERVER}
PidFile $(pwd)/sshd.pid
EOF

if ! :; then
/usr/sbin/sshd -dD -f $(pwd)/sshd_config &
sleep .2
else
/usr/sbin/sshd -f $(pwd)/sshd_config
while ! [ -f sshd.pid ]; do
sleep .2
done
fi
}

checksum()
{
local src work
src=$1
work=$2

src_checksum=$(md5 -q $src)
work_checksum=$(md5 -q $work)

if [ "$work_checksum" != "$src_checksum" ]; then
atf_fail "work md5 checksum didn't match"
fi

ggate_checksum=$(md5 -q /dev/ggate${us})
if [ "$ggate_checksum" != "$src_checksum" ]; then
atf_fail "ggate md5 checksum didn't match"
fi
}

common_cleanup()
{
if [ -f "ggate.devs" ]; then
while read test_ggate; do
ggatessh destroy -f -u $test_ggate >/dev/null
done < ggate.devs
rm ggate.devs
fi

if [ -f "sshd.pid" ]; then
pkill -F sshd.pid
# clean up after startup_sshd
rm ssh_host_rsa_key
rm id_rsa id_rsa.pub
rm authorized_keys
fi

if [ -f "$PIDFILE" ]; then
pkill -F "$PIDFILE"
rm $PIDFILE
fi

if [ -f "PLAINFILES" ]; then
while read f; do
rm -f ${f}
done < ${PLAINFILES}
rm ${PLAINFILES}
fi

if [ -f "md.devs" ]; then
while read test_md; do
mdconfig -d -u $test_md 2>/dev/null
done < md.devs
rm md.devs
fi
true
}

load_ggate()
{
local class=gate

# If the geom class isn't already loaded, try loading it.
if ! kldstat -q -m g_${class}; then
if ! geom ${class} load; then
atf_skip "could not load module for geom class=${class}"
fi
fi
}

# Bug 204616: ggatel(8) creates /dev/ggate* asynchronously if `ggatel create`
# isn't called with `-v`.
wait_for_ggate_device()
{
ggate_device=$1

while [ ! -c $ggate_device ]; do
sleep 0.5
done
}

Loading…
Cancel
Save