geom_gate userland utility improvements
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

1065 lines
26 KiB

  1. /*-
  2. * SPDX-License-Identifier: BSD-2-Clause-FreeBSD
  3. *
  4. * Copyright (c) 2004 Pawel Jakub Dawidek <pjd@FreeBSD.org>
  5. * All rights reserved.
  6. * Copyright 2020 John-Mark Gurney <jmg@FreeBSD.org>
  7. *
  8. * Redistribution and use in source and binary forms, with or without
  9. * modification, are permitted provided that the following conditions
  10. * are met:
  11. * 1. Redistributions of source code must retain the above copyright
  12. * notice, this list of conditions and the following disclaimer.
  13. * 2. Redistributions in binary form must reproduce the above copyright
  14. * notice, this list of conditions and the following disclaimer in the
  15. * documentation and/or other materials provided with the distribution.
  16. *
  17. * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
  18. * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  19. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  20. * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
  21. * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  22. * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  23. * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  24. * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  25. * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  26. * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  27. * SUCH DAMAGE.
  28. *
  29. * $FreeBSD$
  30. */
  31. #include <stdio.h>
  32. #include <stdlib.h>
  33. #include <fcntl.h>
  34. #include <libutil.h>
  35. #include <paths.h>
  36. #include <pthread.h>
  37. #include <pthread_np.h>
  38. #include <err.h>
  39. #include <errno.h>
  40. #include <assert.h>
  41. #include <sys/param.h>
  42. #include <sys/ioctl.h>
  43. #include <sys/queue.h>
  44. #include <sys/socket.h>
  45. #include <sys/syslog.h>
  46. #include <sys/bio.h>
  47. #include <netdb.h>
  48. #include <semaphore.h>
  49. #include <libssh2.h>
  50. #include <libssh2_sftp.h>
  51. #include <geom/gate/g_gate.h>
  52. #include "ggate.h"
  53. static enum { UNSET, CREATE, DESTROY, LIST, RESCUE } action = UNSET;
  54. struct ggs_connection {
  55. int c_fd;
  56. LIBSSH2_SESSION *c_session;
  57. LIBSSH2_SFTP *c_sftp_session;
  58. LIBSSH2_SFTP_HANDLE *c_handle;
  59. };
  60. static struct pidfh *pfh;
  61. static const char *username;
  62. static const char *hostname;
  63. static const char *imgpath;
  64. static const char *identityfile;
  65. static const char *pubkeyfile;
  66. static const char *sshport = "22";
  67. static char *ggatessh_pidfile;
  68. static int unit = G_GATE_UNIT_AUTO;
  69. static unsigned flags = 0;
  70. static int force = 0;
  71. static unsigned queue_size = G_GATE_QUEUE_SIZE;
  72. static off_t mediasize;
  73. static unsigned sectorsize = 4096;
  74. static unsigned timeout = G_GATE_TIMEOUT;
  75. static int pushfd, popfd; /* work semaphore */
  76. static pthread_t reqtd, proctd, mediatd;
  77. static unsigned maxconnections = 32;
  78. static struct ggs_connection start_conn; /* only used once/first */
  79. struct ggs_sess_cache {
  80. LIBSSH2_SESSION *sc_ssh_session;
  81. LIBSSH2_SFTP *sc_session;
  82. LIBSSH2_SFTP_HANDLE *sc_handle;
  83. TAILQ_ENTRY(ggs_sess_cache) sc_next;
  84. };
  85. struct ggs_req {
  86. struct g_gate_ctl_io r_ggio;
  87. #define r_ssh_session r_sesscache->sc_ssh_session
  88. #define r_session r_sesscache->sc_session
  89. #define r_handle r_sesscache->sc_handle
  90. struct ggs_sess_cache *r_sesscache;
  91. size_t r_bufoff;
  92. int r_didseek;
  93. TAILQ_ENTRY(ggs_req) r_next;
  94. };
  95. static TAILQ_HEAD(ggs_reqqueue, ggs_req) procqueue = TAILQ_HEAD_INITIALIZER(procqueue);
  96. static TAILQ_HEAD(ggs_sessqueue, ggs_sess_cache) session_cache = TAILQ_HEAD_INITIALIZER(session_cache);
  97. static sem_t nconn_sem;
  98. static pthread_mutex_t procqueue_mtx;
  99. static void
  100. usage(void)
  101. {
  102. fprintf(stderr, "usage: %s create [-v] [-o <ro|wo|rw>] "
  103. "[-F pidfile] [-i identifyfile] "
  104. "[-l username] [-p port] "
  105. "[-q queue_size] [-s sectorsize] [-r nrequests] "
  106. "[-t timeout] [-u unit] <host> <path>\n", getprogname());
  107. fprintf(stderr, " %s rescue [-v] [-o <ro|wo|rw>] "
  108. "[-F pidfile] [-i identifyfile] "
  109. "[-l username] [-p port] "
  110. "[-r nrequests] <-u unit> <host> <path>\n", getprogname());
  111. fprintf(stderr, " %s destroy [-f] <-u unit>\n", getprogname());
  112. fprintf(stderr, " %s list [-v] [-u unit]\n", getprogname());
  113. exit(EXIT_FAILURE);
  114. }
  115. static void
  116. libssh2_errorx(LIBSSH2_SESSION *session, const char *info)
  117. {
  118. char *errmsg;
  119. int rc;
  120. rc = libssh2_session_last_error(session, &errmsg, NULL, 0);
  121. g_gate_xlog("%s: %s", info, errmsg);
  122. }
  123. /*
  124. * Connect to the service (or port number) on host.
  125. *
  126. * Somewhat copied from freebsd/lib/libfetch/fetch_common.c:fetch_connect.
  127. */
  128. static int
  129. tcp_connect(const char *host, const char *service, int af)
  130. {
  131. struct addrinfo hints, *sai, *sai0;
  132. int err, sd;
  133. hints = (struct addrinfo){
  134. .ai_family = af,
  135. .ai_socktype = SOCK_STREAM,
  136. .ai_flags = AI_ADDRCONFIG,
  137. };
  138. /* resolve server address */
  139. if (getaddrinfo(host, service, &hints, &sai0) == -1)
  140. return -1;
  141. if (sai0 == NULL) {
  142. errno = ENOENT;
  143. return -1;
  144. }
  145. sd = -1;
  146. err = -1;
  147. /* try each server address in turn */
  148. for (sai = sai0; sai != NULL; sai = sai->ai_next) {
  149. /* open socket */
  150. if ((sd = socket(sai->ai_family, sai->ai_socktype,
  151. sai->ai_protocol)) == -1)
  152. break;
  153. /* attempt to connect to server address */
  154. if ((err = connect(sd, sai->ai_addr, sai->ai_addrlen)) == 0)
  155. break;
  156. /* clean up before next attempt */
  157. close(sd);
  158. sd = -1;
  159. }
  160. err = errno;
  161. fflush(stdout);
  162. freeaddrinfo(sai0);
  163. /* Fully close if it was opened; otherwise just don't leak the fd. */
  164. if (err == -1 && sd >= 0)
  165. close(sd);
  166. errno = err;
  167. return sd;
  168. }
  169. static struct ggs_connection
  170. make_connection(void)
  171. {
  172. LIBSSH2_SESSION *session;
  173. LIBSSH2_SFTP *sftp_session;
  174. LIBSSH2_SFTP_HANDLE *handle;
  175. char *tmp;
  176. int sockfd;
  177. int rc;
  178. sockfd = tcp_connect(hostname, sshport, 0);
  179. if (sockfd == -1) {
  180. if (errno == ENOENT)
  181. g_gate_xlog("tcp_connect: failed to lookup %s", hostname);
  182. g_gate_xlog("tcp_connect: %s.", strerror(errno));
  183. }
  184. session = libssh2_session_init();
  185. if (session == NULL)
  186. libssh2_errorx(session, "libssh2_session_init");
  187. if (g_gate_verbose) {
  188. //libssh2_trace(session, LIBSSH2_TRACE_SOCKET|LIBSSH2_TRACE_TRANS|LIBSSH2_TRACE_KEX|LIBSSH2_TRACE_AUTH|LIBSSH2_TRACE_CONN|LIBSSH2_TRACE_SFTP|LIBSSH2_TRACE_ERROR|LIBSSH2_TRACE_PUBLICKEY);
  189. libssh2_trace(session, LIBSSH2_TRACE_SOCKET|LIBSSH2_TRACE_KEX|LIBSSH2_TRACE_AUTH|LIBSSH2_TRACE_CONN|LIBSSH2_TRACE_SFTP|LIBSSH2_TRACE_ERROR|LIBSSH2_TRACE_PUBLICKEY);
  190. //libssh2_trace(session, LIBSSH2_TRACE_KEX|LIBSSH2_TRACE_AUTH|LIBSSH2_TRACE_CONN|LIBSSH2_TRACE_SFTP|LIBSSH2_TRACE_ERROR|LIBSSH2_TRACE_PUBLICKEY);
  191. }
  192. /* XXX - libssh2_session_flag to enable compression */
  193. rc = libssh2_session_handshake(session, sockfd);
  194. if (rc)
  195. libssh2_errorx(session, "libssh2_session_handshake");
  196. libssh2_session_set_blocking(session, 1);
  197. /* XXX - known hosts handling */
  198. if (identityfile == NULL) {
  199. tmp = NULL;
  200. asprintf(&tmp, "%s/.ssh/id_rsa", getenv("HOME"));
  201. identityfile = tmp;
  202. tmp = NULL;
  203. }
  204. asprintf(&tmp, "%s.pub", identityfile);
  205. pubkeyfile = tmp;
  206. tmp = NULL;
  207. g_gate_log(LOG_DEBUG, "trying identity file: %s", identityfile);
  208. rc = libssh2_userauth_publickey_fromfile(session, username, pubkeyfile, identityfile, NULL);
  209. //rc = libssh2_userauth_password(session, "freebsd", "freebsd");
  210. if (rc) {
  211. g_gate_log(LOG_ERR, "identity file: %s", identityfile);
  212. libssh2_errorx(session, "libssh2_userauth_publickey_fromfile");
  213. }
  214. /* always need at least one */
  215. sftp_session = libssh2_sftp_init(session);
  216. if (sftp_session == NULL)
  217. g_gate_xlog("libssh2_sftp_init");
  218. handle = libssh2_sftp_open(sftp_session, imgpath, LIBSSH2_FXF_READ|LIBSSH2_FXF_WRITE, 0);
  219. if (handle == NULL) {
  220. g_gate_log(LOG_ERR, "image file: %s", imgpath);
  221. libssh2_errorx(session, "libssh2_sftp_open");
  222. }
  223. return (struct ggs_connection){
  224. .c_fd = sockfd,
  225. .c_session = session,
  226. .c_sftp_session = sftp_session,
  227. .c_handle = handle,
  228. };
  229. }
  230. /*
  231. * Resize is required to run in a thread because the resize requires
  232. * that I/O is able to complete before it can return.
  233. */
  234. static void *
  235. mediachg(void *arg __unused)
  236. {
  237. struct g_gate_ctl_modify ggiom;
  238. /* update mediasize, it may have changed */
  239. ggiom = (struct g_gate_ctl_modify){
  240. .gctl_version = G_GATE_VERSION,
  241. .gctl_unit = unit,
  242. .gctl_modify = GG_MODIFY_MEDIASIZE,
  243. .gctl_mediasize = mediasize,
  244. };
  245. g_gate_ioctl(G_GATE_CMD_MODIFY, &ggiom);
  246. g_gate_log(LOG_DEBUG, "updated ggate%d mediasize to %zd", unit, mediasize);
  247. return NULL;
  248. }
  249. static void *
  250. req_thread(void *arg __unused)
  251. {
  252. struct ggs_req *greq;
  253. static char *buf;
  254. int buflen = 1024*1024;
  255. int error;
  256. g_gate_log(LOG_NOTICE, "%s: started!", __func__);
  257. greq = NULL;
  258. for (;;) {
  259. if (greq == NULL)
  260. greq = malloc(sizeof *greq);
  261. if (buf == NULL)
  262. buf = malloc(buflen);
  263. if (greq == NULL || buf == NULL) {
  264. /* XXX */
  265. g_gate_log(LOG_ERR, "Unable to allocate memory.");
  266. exit(1);
  267. }
  268. *greq = (struct ggs_req){
  269. .r_ggio = (struct g_gate_ctl_io){
  270. .gctl_version = G_GATE_VERSION,
  271. .gctl_unit = unit,
  272. .gctl_data = buf,
  273. .gctl_length = buflen,
  274. .gctl_error = 0,
  275. },
  276. };
  277. //g_gate_log(LOG_DEBUG, "waiting for ioctl");
  278. g_gate_ioctl(G_GATE_CMD_START, &greq->r_ggio);
  279. //g_gate_log(LOG_DEBUG, "got ioctl");
  280. error = greq->r_ggio.gctl_error;
  281. switch (error) {
  282. case 0:
  283. break;
  284. case ECANCELED:
  285. /* Exit gracefully. */
  286. g_gate_close_device();
  287. exit(EXIT_SUCCESS);
  288. case ENXIO:
  289. default:
  290. g_gate_xlog("ioctl(/dev/%s): %s.", G_GATE_CTL_NAME,
  291. strerror(error));
  292. }
  293. g_gate_log(LOG_DEBUG, "ggio(%p), ver: %u, unit: %d, seq: %llu, "
  294. "cmd: %u, offset: %llu, len: %llu", greq,
  295. greq->r_ggio.gctl_version, greq->r_ggio.gctl_unit,
  296. greq->r_ggio.gctl_seq, greq->r_ggio.gctl_cmd,
  297. greq->r_ggio.gctl_offset, greq->r_ggio.gctl_length);
  298. switch (greq->r_ggio.gctl_cmd) {
  299. case BIO_READ:
  300. /* use a correctly sized allocation */
  301. greq->r_ggio.gctl_data =
  302. malloc(greq->r_ggio.gctl_length);
  303. break;
  304. case BIO_WRITE:
  305. /* r_ggio takes ownership of buf now */
  306. buf = NULL;
  307. break;
  308. case BIO_FLUSH:
  309. greq->r_ggio.gctl_data = NULL;
  310. break;
  311. case BIO_DELETE:
  312. default:
  313. greq->r_ggio.gctl_error = EOPNOTSUPP;
  314. g_gate_ioctl(G_GATE_CMD_DONE, &greq->r_ggio);
  315. continue; /* return EOPNOTSUPP */
  316. break;
  317. }
  318. //g_gate_log(LOG_DEBUG, "waiting for slot");
  319. sem_wait(&nconn_sem);
  320. #if 0
  321. int semval;
  322. sem_getvalue(&nconn_sem, &semval);
  323. g_gate_log(LOG_DEBUG, "slots: %d", semval);
  324. #endif
  325. error = pthread_mutex_lock(&procqueue_mtx);
  326. assert(error == 0);
  327. TAILQ_INSERT_TAIL(&procqueue, greq, r_next);
  328. error = pthread_mutex_unlock(&procqueue_mtx);
  329. assert(error == 0);
  330. /* notify processing thread a request is waiting */
  331. error = write(pushfd, "T", 1);
  332. if (error != 1)
  333. g_gate_xlog("write pushfd: %d, error: %s.", error,
  334. strerror(error));
  335. /* pass ownership */
  336. greq = NULL;
  337. }
  338. g_gate_log(LOG_DEBUG, "%s: Died.", __func__);
  339. return (NULL);
  340. }
  341. static int
  342. process_pending(struct ggs_reqqueue *req_pending, struct ggs_sessqueue *sessqueue)
  343. {
  344. struct ggs_req *greq, *greq2;
  345. char *errmsg;
  346. int rc;
  347. int didwork;
  348. didwork = 0;
  349. /* Work on each pending request */
  350. TAILQ_FOREACH_SAFE(greq, req_pending, r_next, greq2) {
  351. again:
  352. switch (greq->r_ggio.gctl_cmd) {
  353. case BIO_READ:
  354. g_gate_log(LOG_DEBUG, "sftp_read(%p): %d(%d), rem: %d", greq, greq->r_ggio.gctl_offset, greq->r_ggio.gctl_length, greq->r_ggio.gctl_length - greq->r_bufoff);
  355. if (greq->r_didseek == 0) {
  356. libssh2_sftp_seek64(greq->r_handle, greq->r_ggio.gctl_offset);
  357. greq->r_didseek = 1;
  358. }
  359. rc = libssh2_sftp_read(greq->r_handle, (char *)greq->r_ggio.gctl_data + greq->r_bufoff, greq->r_ggio.gctl_length - greq->r_bufoff);
  360. g_gate_log(LOG_DEBUG, "sftp_read ret: %d", rc);
  361. if (rc < 0 && rc != LIBSSH2_ERROR_EAGAIN)
  362. g_gate_log(LOG_ERR, "libssh2_sftp_read");
  363. break;
  364. case BIO_WRITE:
  365. g_gate_log(LOG_DEBUG, "sftp_write(%p): %d(%d), rem: %d", greq, greq->r_ggio.gctl_offset, greq->r_ggio.gctl_length, greq->r_ggio.gctl_length - greq->r_bufoff);
  366. if (greq->r_didseek == 0) {
  367. libssh2_sftp_seek64(greq->r_handle, greq->r_ggio.gctl_offset);
  368. greq->r_didseek = 1;
  369. }
  370. rc = libssh2_sftp_write(greq->r_handle, (char *)greq->r_ggio.gctl_data + greq->r_bufoff, greq->r_ggio.gctl_length - greq->r_bufoff);
  371. g_gate_log(LOG_DEBUG, "sftp_write ret: %d", rc);
  372. if (rc < 0 && rc != LIBSSH2_ERROR_EAGAIN)
  373. libssh2_errorx(greq->r_ssh_session, "libssh2_sftp_write");
  374. break;
  375. case BIO_FLUSH:
  376. g_gate_log(LOG_DEBUG, "sftp_flush(%p)", greq);
  377. rc = libssh2_sftp_fsync(greq->r_handle);
  378. didwork = 1; /* assume this always does work */
  379. switch (rc) {
  380. case LIBSSH2_ERROR_SFTP_PROTOCOL:
  381. greq->r_ggio.gctl_error = EOPNOTSUPP;
  382. goto completeio;
  383. case LIBSSH2_ERROR_EAGAIN:
  384. continue;
  385. case 0: /* success */
  386. goto completeio;
  387. default:
  388. libssh2_session_last_error(greq->r_ssh_session, &errmsg, NULL, 0);
  389. g_gate_log(LOG_ERR, "sftp_flush(%p) ret %d: %s", greq, rc, errmsg);
  390. greq->r_ggio.gctl_error = EIO;
  391. goto completeio;
  392. }
  393. /* NOTREACHABLE */
  394. break;
  395. default:
  396. rc = 0;
  397. g_gate_log(LOG_ERR, "unhandled op: %d", greq->r_ggio.gctl_cmd);
  398. continue;
  399. }
  400. if (rc > 0) {
  401. didwork = 1;
  402. greq->r_bufoff += rc;
  403. /* try again on partial read/write, might have more data pending */
  404. if ((off_t)greq->r_bufoff != greq->r_ggio.gctl_length)
  405. goto again;
  406. }
  407. if ((off_t)greq->r_bufoff == greq->r_ggio.gctl_length) {
  408. /* complete */
  409. completeio:
  410. g_gate_log(LOG_DEBUG, "cmd complete: seq: %d, cmd: %d", greq->r_ggio.gctl_seq, greq->r_ggio.gctl_cmd);
  411. g_gate_ioctl(G_GATE_CMD_DONE, &greq->r_ggio);
  412. TAILQ_REMOVE(req_pending, greq, r_next);
  413. TAILQ_INSERT_HEAD(sessqueue, greq->r_sesscache, sc_next);
  414. free(greq->r_ggio.gctl_data);
  415. free(greq);
  416. /* release this slot */
  417. sem_post(&nconn_sem);
  418. }
  419. }
  420. return didwork;
  421. }
  422. /*
  423. * sftp session management is a bit tricky.
  424. * if there is an entry in sessioncache, use that one.
  425. * if we are waiting for a new session (gsc_pend != NULL),
  426. * establish session, then open handle
  427. * when the new session completes, process the work queue
  428. */
  429. static void *
  430. proc_thread(void *arg __unused)
  431. {
  432. char scratch[32];
  433. struct ggs_reqqueue req_pending;
  434. struct timeval to;
  435. struct ggs_sess_cache *gsc, *gsc_pending;
  436. struct ggs_req *greq;
  437. LIBSSH2_SESSION *session;
  438. fd_set fdread;
  439. fd_set fdwrite;
  440. fd_set fdexcep;
  441. int sockfd;
  442. int maxfd;
  443. int error;
  444. int dir;
  445. int rc;
  446. int didwork; /* did libssh2 do any work? */
  447. g_gate_log(LOG_NOTICE, "%s: started!", __func__);
  448. TAILQ_INIT(&req_pending);
  449. /* make sure we don't block on reading */
  450. fcntl(popfd, F_SETFL, O_NONBLOCK);
  451. sockfd = start_conn.c_fd;
  452. session = start_conn.c_session;
  453. gsc = malloc(sizeof *gsc);
  454. gsc->sc_ssh_session = start_conn.c_session;
  455. gsc->sc_session = start_conn.c_sftp_session;
  456. gsc->sc_handle = start_conn.c_handle;
  457. TAILQ_INSERT_HEAD(&session_cache, gsc, sc_next);
  458. gsc = NULL;
  459. gsc_pending = NULL;
  460. didwork = 0;
  461. libssh2_session_set_blocking(session, 0);
  462. for (;;) {
  463. //g_gate_log(LOG_DEBUG, "looping");
  464. if (!didwork) {
  465. /* setup polling loop */
  466. maxfd = -1;
  467. FD_ZERO(&fdread);
  468. FD_ZERO(&fdwrite);
  469. FD_ZERO(&fdexcep);
  470. dir = libssh2_session_block_directions(session);
  471. if (dir & LIBSSH2_SESSION_BLOCK_INBOUND || gsc_pending != NULL)
  472. FD_SET(sockfd, &fdread);
  473. if (dir & LIBSSH2_SESSION_BLOCK_OUTBOUND)
  474. FD_SET(sockfd, &fdwrite);
  475. /* add in the pop descriptor */
  476. FD_SET(popfd, &fdread);
  477. maxfd = MAX(popfd, sockfd);
  478. #if 0
  479. /* we need to be kj */
  480. if (gsc_pending != NULL)
  481. FD_SET(sockfd, &fdread);
  482. #endif
  483. g_gate_log(LOG_DEBUG, "selecting: %s %s, read: sockfd: %d, popfd: %d, write: sockfd: %d", (dir & LIBSSH2_SESSION_BLOCK_INBOUND) ? "inbound" : "", (dir & LIBSSH2_SESSION_BLOCK_OUTBOUND) ? "outbound" : "", FD_ISSET(sockfd, &fdread), FD_ISSET(popfd, &fdread), FD_ISSET(sockfd, &fdwrite));
  484. to = (struct timeval){ .tv_sec = 1, .tv_usec = 1000 };
  485. (void)to;
  486. rc = select(maxfd + 1, &fdread, &fdwrite, &fdexcep, NULL);
  487. switch (rc) {
  488. case -1:
  489. g_gate_log(LOG_ERR, "%s: select failed: %s", __func__,
  490. strerror(errno));
  491. break;
  492. case 0:
  493. default:
  494. g_gate_log(LOG_DEBUG, "select: %d, read: sockfd: %d, popfd: %d, write: sockfd: %d", rc, FD_ISSET(sockfd, &fdread), FD_ISSET(popfd, &fdread), FD_ISSET(sockfd, &fdwrite));
  495. break;
  496. }
  497. }
  498. didwork = 0;
  499. /* process pending, so any completed can be reused */
  500. didwork |= process_pending(&req_pending, &session_cache);
  501. if (FD_ISSET(popfd, &fdread)) {
  502. /* read off the tokens */
  503. g_gate_log(LOG_DEBUG, "popping");
  504. read(popfd, scratch, sizeof scratch);
  505. for (;;) {
  506. procreq:
  507. /* get the request */
  508. error = pthread_mutex_lock(&procqueue_mtx);
  509. assert(error == 0);
  510. greq = TAILQ_FIRST(&procqueue);
  511. g_gate_log(LOG_DEBUG, "greq: %p", greq);
  512. if (greq != NULL)
  513. TAILQ_REMOVE(&procqueue, greq, r_next);
  514. error = pthread_mutex_unlock(&procqueue_mtx);
  515. assert(error == 0);
  516. /* no more to process */
  517. if (greq == NULL)
  518. break;
  519. gsc = TAILQ_FIRST(&session_cache);
  520. if (gsc == NULL) {
  521. if (gsc_pending == NULL) {
  522. /* need new session */
  523. g_gate_log(LOG_DEBUG, "need new session");
  524. gsc_pending = malloc(sizeof *gsc);
  525. gsc_pending->sc_ssh_session = session;
  526. gsc_pending->sc_session = NULL;
  527. gsc_pending->sc_handle = NULL;
  528. }
  529. /* put back request */
  530. error = pthread_mutex_lock(&procqueue_mtx);
  531. assert(error == 0);
  532. TAILQ_INSERT_HEAD(&procqueue, greq, r_next);
  533. error = pthread_mutex_unlock(&procqueue_mtx);
  534. assert(error == 0);
  535. break;
  536. } else {
  537. /* process request */
  538. TAILQ_REMOVE(&session_cache, gsc, sc_next);
  539. greq->r_sesscache = gsc;
  540. gsc = NULL;
  541. greq->r_bufoff = 0;
  542. TAILQ_INSERT_TAIL(&req_pending, greq, r_next);
  543. greq = NULL;
  544. }
  545. }
  546. }
  547. if (gsc_pending != NULL) {
  548. /* we are creating a new session */
  549. if (gsc_pending->sc_session == NULL) {
  550. didwork = 1;
  551. gsc_pending->sc_session = libssh2_sftp_init(session);
  552. }
  553. if (gsc_pending->sc_session != NULL) {
  554. didwork = 1;
  555. gsc_pending->sc_handle = libssh2_sftp_open(gsc_pending->sc_session, "fstest/data.img", LIBSSH2_FXF_READ|LIBSSH2_FXF_WRITE, 0);
  556. }
  557. g_gate_log(LOG_DEBUG, "pending: session: %p, handle: %p", gsc_pending->sc_session, gsc_pending->sc_handle);
  558. /* we have a fully initalized entry, use it */
  559. if (gsc_pending->sc_handle != NULL) {
  560. g_gate_log(LOG_DEBUG, "new session created");
  561. TAILQ_INSERT_HEAD(&session_cache, gsc_pending, sc_next);
  562. gsc_pending = NULL;
  563. didwork = 1;
  564. goto procreq;
  565. }
  566. }
  567. /* kick of any queued requests from above */
  568. didwork |= process_pending(&req_pending, &session_cache);
  569. }
  570. g_gate_log(LOG_DEBUG, "%s: Died.", __func__);
  571. pthread_exit(NULL);
  572. }
  573. static void
  574. ggatessh_makepidfile(void)
  575. {
  576. pid_t otherpid;
  577. if (!g_gate_verbose) {
  578. if (ggatessh_pidfile == NULL) {
  579. asprintf(&ggatessh_pidfile, _PATH_VARRUN "/ggatessh.ggate%d.pid", unit);
  580. err(EXIT_FAILURE, "Cannot allocate memory for pidfile");
  581. }
  582. pfh = pidfile_open(ggatessh_pidfile, 0600, &otherpid);
  583. if (pfh == NULL) {
  584. if (errno == EEXIST) {
  585. errx(EXIT_FAILURE, "Daemon already running, pid: %jd.",
  586. (intmax_t)otherpid);
  587. }
  588. err(EXIT_FAILURE, "Cannot open/create pidfile");
  589. }
  590. }
  591. }
  592. static void
  593. mydaemon(void)
  594. {
  595. if (g_gate_verbose > 0)
  596. return;
  597. if (daemon(0, 0) == 0)
  598. return;
  599. if (action == CREATE)
  600. g_gate_destroy(unit, 1);
  601. err(EXIT_FAILURE, "Cannot daemonize");
  602. }
  603. static int
  604. g_gatessh_connect(void)
  605. {
  606. struct ggs_connection conn;
  607. LIBSSH2_SFTP_ATTRIBUTES attrs;
  608. int rc;
  609. /* get the remote's size */
  610. conn = make_connection();
  611. rc = libssh2_sftp_fstat(conn.c_handle, &attrs);
  612. /* only allow regular and char devices */
  613. if (!(LIBSSH2_SFTP_S_ISREG(attrs.flags) ||
  614. !LIBSSH2_SFTP_S_ISCHR(attrs.flags))) {
  615. g_gate_xlog("remote file not a regular file");
  616. }
  617. mediasize = attrs.filesize;
  618. g_gate_log(LOG_DEBUG, "got mediasize: %zd", mediasize);
  619. start_conn = conn; /* cache to use later */
  620. return 1;
  621. }
  622. static void
  623. g_gatessh_start(void)
  624. {
  625. int filedes[2];
  626. int error;
  627. pipe(filedes);
  628. pushfd = filedes[1];
  629. popfd = filedes[0];
  630. error = pthread_mutex_init(&procqueue_mtx, NULL);
  631. if (error != 0) {
  632. g_gate_xlog("pthread_mutex_init(inqueue_mtx): %s.",
  633. strerror(error));
  634. }
  635. sem_init(&nconn_sem, 0, maxconnections);
  636. error = pthread_create(&proctd, NULL, proc_thread, NULL);
  637. if (error != 0) {
  638. g_gate_destroy(unit, 1); /* XXX - remove */
  639. g_gate_xlog("pthread_create(proc_thread): %s.",
  640. strerror(error));
  641. }
  642. pthread_set_name_np(proctd, "proc");
  643. reqtd = pthread_self();
  644. pthread_set_name_np(reqtd, "req");
  645. req_thread(NULL);
  646. /* Disconnected. */
  647. close(pushfd);
  648. close(popfd);
  649. }
  650. static void
  651. signop(int sig __unused)
  652. {
  653. /* Do nothing. */
  654. }
  655. static void
  656. g_gatessh_loop(void)
  657. {
  658. struct g_gate_ctl_cancel ggioc;
  659. signal(SIGUSR1, signop);
  660. for (;;) {
  661. g_gatessh_start();
  662. g_gate_log(LOG_NOTICE, "Disconnected [%s@%s:%s]. Connecting...",
  663. username, hostname, imgpath);
  664. ggioc.gctl_version = G_GATE_VERSION;
  665. ggioc.gctl_unit = unit;
  666. ggioc.gctl_seq = 0;
  667. g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc);
  668. }
  669. }
  670. static void
  671. g_gatessh_create(void)
  672. {
  673. struct g_gate_ctl_create ggioc;
  674. if (!g_gatessh_connect())
  675. g_gate_xlog("Cannot connect: %s.", strerror(errno));
  676. /*
  677. * Ok, got both sockets, time to create provider.
  678. */
  679. memset(&ggioc, 0, sizeof(ggioc));
  680. ggioc.gctl_version = G_GATE_VERSION;
  681. ggioc.gctl_mediasize = mediasize;
  682. ggioc.gctl_sectorsize = sectorsize;
  683. ggioc.gctl_flags = flags;
  684. ggioc.gctl_maxcount = queue_size;
  685. ggioc.gctl_timeout = timeout;
  686. ggioc.gctl_unit = unit;
  687. snprintf(ggioc.gctl_info, sizeof(ggioc.gctl_info), "%s@%s:%s", username, hostname, imgpath);
  688. g_gate_ioctl(G_GATE_CMD_CREATE, &ggioc);
  689. if (unit == -1) {
  690. printf("%s%u\n", G_GATE_PROVIDER_NAME, ggioc.gctl_unit);
  691. fflush(stdout);
  692. }
  693. unit = ggioc.gctl_unit;
  694. ggatessh_makepidfile();
  695. mydaemon();
  696. if (pfh != NULL)
  697. pidfile_write(pfh);
  698. g_gatessh_loop();
  699. }
  700. static void
  701. g_gatessh_rescue(void)
  702. {
  703. struct g_gate_ctl_cancel ggioc;
  704. int error;
  705. g_gate_log(LOG_ERR, "a");
  706. if (!g_gatessh_connect())
  707. g_gate_xlog("Cannot connect: %s.", strerror(errno));
  708. g_gate_log(LOG_ERR, "b");
  709. ggioc = (struct g_gate_ctl_cancel){
  710. .gctl_version = G_GATE_VERSION,
  711. .gctl_unit = unit,
  712. .gctl_seq = 0,
  713. };
  714. g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc);
  715. ggatessh_makepidfile();
  716. mydaemon();
  717. pidfile_write(pfh);
  718. error = pthread_create(&mediatd, NULL, mediachg, NULL);
  719. if (error != 0)
  720. g_gate_xlog("unable to create mediasize change thread", strerror(errno));
  721. g_gatessh_loop();
  722. }
  723. /*
  724. * handle two methods of specifying things.
  725. * if only one arg, in the future split it how ssh does:
  726. * [user[:password]@]<host>:<img>
  727. * and URI:
  728. * sftp://[user[:password]@]<host>[:<port>]/<img>
  729. *
  730. * If both are specified, it's the same as above, but split
  731. * on the :.
  732. */
  733. static void
  734. handle_params(int argc, char *argv[])
  735. {
  736. if (username == NULL) {
  737. username = getenv("USER");
  738. if (username == NULL) {
  739. err(EXIT_FAILURE,
  740. "USER environment variable not present, set, "
  741. "or specify via -l argument.");
  742. }
  743. }
  744. if (argc != 2)
  745. usage();
  746. hostname = argv[0];
  747. imgpath = argv[1];
  748. }
  749. int
  750. main(int argc, char *argv[])
  751. {
  752. int rc;
  753. if (argc < 2)
  754. usage();
  755. if (strcasecmp(argv[1], "create") == 0)
  756. action = CREATE;
  757. else if (strcasecmp(argv[1], "destroy") == 0)
  758. action = DESTROY;
  759. else if (strcasecmp(argv[1], "list") == 0)
  760. action = LIST;
  761. else if (strcasecmp(argv[1], "rescue") == 0)
  762. action = RESCUE;
  763. else
  764. usage();
  765. argc -= 1;
  766. argv += 1;
  767. for (;;) {
  768. int ch;
  769. ch = getopt(argc, argv, "fF:i:l:o:p:q:r:s:t:u:v");
  770. if (ch == -1)
  771. break;
  772. switch (ch) {
  773. case 'f':
  774. if (action != DESTROY)
  775. usage();
  776. force = 1;
  777. break;
  778. case 'F':
  779. ggatessh_pidfile = optarg;
  780. break;
  781. case 'i':
  782. identityfile = optarg;
  783. break;
  784. case 'l':
  785. username = optarg;
  786. break;
  787. case 'o':
  788. if (action != CREATE && action != RESCUE)
  789. usage();
  790. if (strcasecmp("ro", optarg) == 0)
  791. flags = G_GATE_FLAG_READONLY;
  792. else if (strcasecmp("wo", optarg) == 0)
  793. flags = G_GATE_FLAG_WRITEONLY;
  794. else if (strcasecmp("rw", optarg) == 0)
  795. flags = 0;
  796. else {
  797. errx(EXIT_FAILURE,
  798. "Invalid argument for '-o' option.");
  799. }
  800. break;
  801. case 'p':
  802. sshport = optarg;
  803. break;
  804. case 'q':
  805. if (action != CREATE)
  806. usage();
  807. errno = 0;
  808. queue_size = strtoul(optarg, NULL, 10);
  809. if (queue_size == 0 && errno != 0)
  810. errx(EXIT_FAILURE, "Invalid queue_size.");
  811. break;
  812. case 'r':
  813. if (action != CREATE && action != RESCUE)
  814. usage();
  815. errno = 0;
  816. maxconnections = strtoul(optarg, NULL, 10);
  817. if (maxconnections == 0 && errno != 0)
  818. errx(EXIT_FAILURE, "Invalid queue_size.");
  819. break;
  820. case 's':
  821. if (action != CREATE)
  822. usage();
  823. errno = 0;
  824. sectorsize = strtoul(optarg, NULL, 10);
  825. if (sectorsize == 0 && errno != 0)
  826. errx(EXIT_FAILURE, "Invalid sectorsize.");
  827. break;
  828. case 't':
  829. if (action != CREATE)
  830. usage();
  831. errno = 0;
  832. timeout = strtoul(optarg, NULL, 10);
  833. if (timeout == 0 && errno != 0)
  834. errx(EXIT_FAILURE, "Invalid timeout.");
  835. break;
  836. case 'u':
  837. errno = 0;
  838. unit = strtol(optarg, NULL, 10);
  839. if (unit == 0 && errno != 0)
  840. errx(EXIT_FAILURE, "Invalid unit number.");
  841. break;
  842. case 'v':
  843. if (action == DESTROY)
  844. usage();
  845. g_gate_verbose++;
  846. break;
  847. default:
  848. usage();
  849. }
  850. }
  851. argc -= optind;
  852. argv += optind;
  853. g_gate_log(LOG_DEBUG, "libssh2_init");
  854. rc = libssh2_init(0);
  855. if (rc != 0) {
  856. fprintf(stderr, "libssh2 initialization failed (%d)\n", rc);
  857. return 1;
  858. }
  859. switch (action) {
  860. case CREATE:
  861. if (argc < 1 || argc > 2)
  862. usage();
  863. handle_params(argc, argv);
  864. g_gate_load_module();
  865. g_gate_open_device();
  866. g_gatessh_create();
  867. break;
  868. case DESTROY:
  869. if (argc != 0)
  870. usage();
  871. if (unit == -1) {
  872. fprintf(stderr, "Required unit number.\n");
  873. usage();
  874. }
  875. g_gate_verbose = 1;
  876. g_gate_open_device();
  877. g_gate_destroy(unit, force);
  878. break;
  879. case LIST:
  880. g_gate_list(unit, g_gate_verbose);
  881. break;
  882. case RESCUE:
  883. if (argc < 1 || argc > 2)
  884. usage();
  885. if (unit == -1) {
  886. fprintf(stderr, "Required unit number.\n");
  887. usage();
  888. }
  889. handle_params(argc, argv);
  890. g_gate_open_device();
  891. g_gatessh_rescue();
  892. break;
  893. case UNSET:
  894. default:
  895. usage();
  896. }
  897. g_gate_close_device();
  898. exit(EXIT_SUCCESS);
  899. }