geom_gate userland utility improvements
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

1132 lines
26 KiB

  1. /*-
  2. * SPDX-License-Identifier: BSD-2-Clause-FreeBSD
  3. *
  4. * Copyright (c) 2004 Pawel Jakub Dawidek <pjd@FreeBSD.org>
  5. * All rights reserved.
  6. * Copyright 2020 John-Mark Gurney <jmg@FreeBSD.org>
  7. *
  8. * Redistribution and use in source and binary forms, with or without
  9. * modification, are permitted provided that the following conditions
  10. * are met:
  11. * 1. Redistributions of source code must retain the above copyright
  12. * notice, this list of conditions and the following disclaimer.
  13. * 2. Redistributions in binary form must reproduce the above copyright
  14. * notice, this list of conditions and the following disclaimer in the
  15. * documentation and/or other materials provided with the distribution.
  16. *
  17. * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
  18. * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  19. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  20. * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
  21. * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  22. * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  23. * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  24. * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  25. * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  26. * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  27. * SUCH DAMAGE.
  28. *
  29. * $FreeBSD$
  30. */
  31. #include <stdio.h>
  32. #include <stdlib.h>
  33. #include <fcntl.h>
  34. #include <libutil.h>
  35. #include <paths.h>
  36. #include <pthread.h>
  37. #include <pthread_np.h>
  38. #include <err.h>
  39. #include <errno.h>
  40. #include <assert.h>
  41. #include <sys/param.h>
  42. #include <sys/ioctl.h>
  43. #include <sys/queue.h>
  44. #include <sys/socket.h>
  45. #include <sys/syslog.h>
  46. #include <sys/bio.h>
  47. #include <netdb.h>
  48. #include <semaphore.h>
  49. #include <libssh2.h>
  50. #include <libssh2_sftp.h>
  51. #include <geom/gate/g_gate.h>
  52. #include "ggate.h"
  53. static enum { UNSET, CREATE, DESTROY, LIST, RESCUE } action = UNSET;
  54. struct ggs_connection {
  55. int c_fd;
  56. LIBSSH2_SESSION *c_session;
  57. LIBSSH2_SFTP *c_sftp_session;
  58. LIBSSH2_SFTP_HANDLE *c_handle;
  59. };
  60. static struct pidfh *pfh;
  61. static const char *username;
  62. static const char *hostname;
  63. static const char *imgpath;
  64. static const char *identityfile;
  65. static const char *pubkeyfile;
  66. static const char *sshport = "22";
  67. static char *ggatessh_pidfile;
  68. static int unit = G_GATE_UNIT_AUTO;
  69. static unsigned flags = 0;
  70. static int force = 0;
  71. static unsigned queue_size = G_GATE_QUEUE_SIZE;
  72. static off_t mediasize;
  73. static unsigned sectorsize = 4096;
  74. static unsigned timeout = G_GATE_TIMEOUT;
  75. static int pushfd, popfd; /* work semaphore */
  76. static pthread_t reqtd, proctd, mediatd;
  77. static unsigned maxconnections = 32;
  78. static struct ggs_connection start_conn; /* only used once/first */
  79. struct ggs_sess_cache {
  80. LIBSSH2_SESSION *sc_ssh_session;
  81. LIBSSH2_SFTP *sc_session;
  82. LIBSSH2_SFTP_HANDLE *sc_handle;
  83. TAILQ_ENTRY(ggs_sess_cache) sc_next;
  84. };
  85. struct ggs_req {
  86. struct g_gate_ctl_io r_ggio;
  87. #define r_ssh_session r_sesscache->sc_ssh_session
  88. #define r_session r_sesscache->sc_session
  89. #define r_handle r_sesscache->sc_handle
  90. struct ggs_sess_cache *r_sesscache;
  91. size_t r_bufoff;
  92. int r_didseek;
  93. TAILQ_ENTRY(ggs_req) r_next;
  94. };
  95. static TAILQ_HEAD(ggs_reqqueue, ggs_req) procqueue =
  96. TAILQ_HEAD_INITIALIZER(procqueue);
  97. static TAILQ_HEAD(ggs_sessqueue, ggs_sess_cache) session_cache =
  98. TAILQ_HEAD_INITIALIZER(session_cache);
  99. static sem_t nconn_sem;
  100. static pthread_mutex_t procqueue_mtx;
  101. static void
  102. usage(void)
  103. {
  104. fprintf(stderr, "usage: %s create [-v] [-o <ro|wo|rw>] "
  105. "[-F pidfile] [-i identifyfile] "
  106. "[-l username] [-p port] "
  107. "[-q queue_size] [-s sectorsize] [-r nrequests] "
  108. "[-t timeout] [-u unit] <host> <path>\n", getprogname());
  109. fprintf(stderr, " %s rescue [-v] [-o <ro|wo|rw>] "
  110. "[-F pidfile] [-i identifyfile] "
  111. "[-l username] [-p port] "
  112. "[-r nrequests] <-u unit> <host> <path>\n", getprogname());
  113. fprintf(stderr, " %s destroy [-f] <-u unit>\n", getprogname());
  114. fprintf(stderr, " %s list [-v] [-u unit]\n", getprogname());
  115. exit(EXIT_FAILURE);
  116. }
  117. static void
  118. libssh2_errorx(LIBSSH2_SESSION *session, const char *info)
  119. {
  120. char *errmsg;
  121. int rc;
  122. rc = libssh2_session_last_error(session, &errmsg, NULL, 0);
  123. g_gate_xlog("%s: %s", info, errmsg);
  124. }
  125. /*
  126. * Connect to the service (or port number) on host.
  127. *
  128. * Somewhat copied from freebsd/lib/libfetch/fetch_common.c:fetch_connect.
  129. */
  130. static int
  131. tcp_connect(const char *host, const char *service, int af)
  132. {
  133. struct addrinfo hints, *sai, *sai0;
  134. int err, sd;
  135. hints = (struct addrinfo){
  136. .ai_family = af,
  137. .ai_socktype = SOCK_STREAM,
  138. .ai_flags = AI_ADDRCONFIG,
  139. };
  140. /* resolve server address */
  141. if (getaddrinfo(host, service, &hints, &sai0) == -1)
  142. return -1;
  143. if (sai0 == NULL) {
  144. errno = ENOENT;
  145. return -1;
  146. }
  147. sd = -1;
  148. err = -1;
  149. /* try each server address in turn */
  150. for (sai = sai0; sai != NULL; sai = sai->ai_next) {
  151. /* open socket */
  152. if ((sd = socket(sai->ai_family, sai->ai_socktype,
  153. sai->ai_protocol)) == -1)
  154. break;
  155. /* attempt to connect to server address */
  156. if ((err = connect(sd, sai->ai_addr, sai->ai_addrlen)) == 0)
  157. break;
  158. /* clean up before next attempt */
  159. close(sd);
  160. sd = -1;
  161. }
  162. err = errno;
  163. fflush(stdout);
  164. freeaddrinfo(sai0);
  165. /* Fully close if it was opened; otherwise just don't leak the fd. */
  166. if (err == -1 && sd >= 0)
  167. close(sd);
  168. errno = err;
  169. return sd;
  170. }
  171. static int
  172. get_open_flags()
  173. {
  174. switch (flags) {
  175. case G_GATE_FLAG_READONLY:
  176. return LIBSSH2_FXF_READ;
  177. case G_GATE_FLAG_WRITEONLY:
  178. return LIBSSH2_FXF_WRITE;
  179. default:
  180. return LIBSSH2_FXF_READ|LIBSSH2_FXF_WRITE;
  181. }
  182. }
  183. static struct ggs_connection
  184. make_connection(void)
  185. {
  186. LIBSSH2_SESSION *session;
  187. LIBSSH2_SFTP *sftp_session;
  188. LIBSSH2_SFTP_HANDLE *handle;
  189. char *tmp;
  190. int sockfd;
  191. int rc;
  192. sockfd = tcp_connect(hostname, sshport, 0);
  193. if (sockfd == -1) {
  194. if (errno == ENOENT)
  195. g_gate_xlog("tcp_connect: failed to lookup %s",
  196. hostname);
  197. g_gate_xlog("tcp_connect: %s.", strerror(errno));
  198. }
  199. session = libssh2_session_init();
  200. if (session == NULL)
  201. libssh2_errorx(session, "libssh2_session_init");
  202. if (g_gate_verbose)
  203. libssh2_trace(session, LIBSSH2_TRACE_SOCKET|LIBSSH2_TRACE_KEX|
  204. LIBSSH2_TRACE_AUTH|LIBSSH2_TRACE_CONN|LIBSSH2_TRACE_SFTP|
  205. LIBSSH2_TRACE_ERROR|LIBSSH2_TRACE_PUBLICKEY);
  206. /* XXX - libssh2_session_flag to enable compression */
  207. rc = libssh2_session_handshake(session, sockfd);
  208. if (rc)
  209. libssh2_errorx(session, "libssh2_session_handshake");
  210. libssh2_session_set_blocking(session, 1);
  211. /* XXX - known hosts handling */
  212. if (identityfile == NULL) {
  213. tmp = NULL;
  214. asprintf(&tmp, "%s/.ssh/id_rsa", getenv("HOME"));
  215. identityfile = tmp;
  216. tmp = NULL;
  217. }
  218. asprintf(&tmp, "%s.pub", identityfile);
  219. pubkeyfile = tmp;
  220. tmp = NULL;
  221. g_gate_log(LOG_DEBUG, "trying identity file: %s", identityfile);
  222. rc = libssh2_userauth_publickey_fromfile(session, username, pubkeyfile,
  223. identityfile, NULL);
  224. //rc = libssh2_userauth_password(session, "freebsd", "freebsd");
  225. if (rc) {
  226. g_gate_log(LOG_ERR, "identity file: %s", identityfile);
  227. libssh2_errorx(session, "libssh2_userauth_publickey_fromfile");
  228. }
  229. /* always need at least one */
  230. sftp_session = libssh2_sftp_init(session);
  231. if (sftp_session == NULL)
  232. g_gate_xlog("libssh2_sftp_init");
  233. handle = libssh2_sftp_open(sftp_session, imgpath, get_open_flags(), 0);
  234. if (handle == NULL) {
  235. g_gate_log(LOG_ERR, "image file: %s", imgpath);
  236. libssh2_errorx(session, "libssh2_sftp_open");
  237. }
  238. return (struct ggs_connection){
  239. .c_fd = sockfd,
  240. .c_session = session,
  241. .c_sftp_session = sftp_session,
  242. .c_handle = handle,
  243. };
  244. }
  245. /*
  246. * Resize is required to run in a thread because the resize requires
  247. * that I/O is able to complete before it can return.
  248. */
  249. static void *
  250. mediachg(void *arg __unused)
  251. {
  252. struct g_gate_ctl_modify ggiom;
  253. /* update mediasize, it may have changed */
  254. ggiom = (struct g_gate_ctl_modify){
  255. .gctl_version = G_GATE_VERSION,
  256. .gctl_unit = unit,
  257. .gctl_modify = GG_MODIFY_MEDIASIZE,
  258. .gctl_mediasize = mediasize,
  259. };
  260. g_gate_ioctl(G_GATE_CMD_MODIFY, &ggiom);
  261. g_gate_log(LOG_DEBUG, "updated ggate%d mediasize to %zd", unit,
  262. mediasize);
  263. return NULL;
  264. }
  265. static void *
  266. req_thread(void *arg __unused)
  267. {
  268. struct ggs_req *greq;
  269. static char *buf;
  270. int buflen = 1024*1024;
  271. int error;
  272. g_gate_log(LOG_NOTICE, "%s: started!", __func__);
  273. greq = NULL;
  274. for (;;) {
  275. if (greq == NULL)
  276. greq = malloc(sizeof *greq);
  277. if (buf == NULL)
  278. buf = malloc(buflen);
  279. if (greq == NULL || buf == NULL) {
  280. /* XXX */
  281. g_gate_log(LOG_ERR, "Unable to allocate memory.");
  282. exit(1);
  283. }
  284. *greq = (struct ggs_req){
  285. .r_ggio = (struct g_gate_ctl_io){
  286. .gctl_version = G_GATE_VERSION,
  287. .gctl_unit = unit,
  288. .gctl_data = buf,
  289. .gctl_length = buflen,
  290. .gctl_error = 0,
  291. },
  292. };
  293. //g_gate_log(LOG_DEBUG, "waiting for ioctl");
  294. g_gate_ioctl(G_GATE_CMD_START, &greq->r_ggio);
  295. //g_gate_log(LOG_DEBUG, "got ioctl");
  296. error = greq->r_ggio.gctl_error;
  297. switch (error) {
  298. case 0:
  299. break;
  300. case ECANCELED:
  301. /* Exit gracefully. */
  302. g_gate_close_device();
  303. exit(EXIT_SUCCESS);
  304. case ENXIO:
  305. default:
  306. g_gate_xlog("ioctl(/dev/%s): %s.", G_GATE_CTL_NAME,
  307. strerror(error));
  308. }
  309. g_gate_log(LOG_DEBUG, "ggio(%p), ver: %u, unit: %d, seq: %llu, "
  310. "cmd: %u, offset: %llu, len: %llu", greq,
  311. greq->r_ggio.gctl_version, greq->r_ggio.gctl_unit,
  312. greq->r_ggio.gctl_seq, greq->r_ggio.gctl_cmd,
  313. greq->r_ggio.gctl_offset, greq->r_ggio.gctl_length);
  314. switch (greq->r_ggio.gctl_cmd) {
  315. case BIO_READ:
  316. /* use a correctly sized allocation */
  317. greq->r_ggio.gctl_data =
  318. malloc(greq->r_ggio.gctl_length);
  319. break;
  320. case BIO_WRITE:
  321. /* r_ggio takes ownership of buf now */
  322. buf = NULL;
  323. break;
  324. case BIO_FLUSH:
  325. greq->r_ggio.gctl_data = NULL;
  326. break;
  327. case BIO_DELETE:
  328. default:
  329. greq->r_ggio.gctl_error = EOPNOTSUPP;
  330. g_gate_ioctl(G_GATE_CMD_DONE, &greq->r_ggio);
  331. continue; /* return EOPNOTSUPP */
  332. break;
  333. }
  334. //g_gate_log(LOG_DEBUG, "waiting for slot");
  335. sem_wait(&nconn_sem);
  336. #if 0
  337. int semval;
  338. sem_getvalue(&nconn_sem, &semval);
  339. g_gate_log(LOG_DEBUG, "slots: %d", semval);
  340. #endif
  341. error = pthread_mutex_lock(&procqueue_mtx);
  342. assert(error == 0);
  343. TAILQ_INSERT_TAIL(&procqueue, greq, r_next);
  344. error = pthread_mutex_unlock(&procqueue_mtx);
  345. assert(error == 0);
  346. /* notify processing thread a request is waiting */
  347. error = write(pushfd, "T", 1);
  348. if (error != 1)
  349. g_gate_xlog("write pushfd: %d, error: %s.", error,
  350. strerror(error));
  351. /* pass ownership */
  352. greq = NULL;
  353. }
  354. g_gate_log(LOG_DEBUG, "%s: Died.", __func__);
  355. return (NULL);
  356. }
  357. static int
  358. process_pending(struct ggs_reqqueue *req_pending,
  359. struct ggs_sessqueue *sessqueue)
  360. {
  361. struct ggs_req *greq, *greq2;
  362. char *errmsg;
  363. int rc;
  364. int didwork;
  365. didwork = 0;
  366. /* Work on each pending request */
  367. TAILQ_FOREACH_SAFE(greq, req_pending, r_next, greq2) {
  368. again:
  369. switch (greq->r_ggio.gctl_cmd) {
  370. case BIO_READ:
  371. g_gate_log(LOG_DEBUG, "sftp_read(%p): %d(%d), rem: %d",
  372. greq, greq->r_ggio.gctl_offset,
  373. greq->r_ggio.gctl_length,
  374. greq->r_ggio.gctl_length - greq->r_bufoff);
  375. if (greq->r_didseek == 0) {
  376. libssh2_sftp_seek64(greq->r_handle,
  377. greq->r_ggio.gctl_offset);
  378. greq->r_didseek = 1;
  379. }
  380. rc = libssh2_sftp_read(greq->r_handle,
  381. (char *)greq->r_ggio.gctl_data + greq->r_bufoff,
  382. greq->r_ggio.gctl_length - greq->r_bufoff);
  383. g_gate_log(LOG_DEBUG, "sftp_read ret: %d", rc);
  384. if (rc < 0 && rc != LIBSSH2_ERROR_EAGAIN)
  385. g_gate_log(LOG_ERR, "libssh2_sftp_read");
  386. break;
  387. case BIO_WRITE:
  388. g_gate_log(LOG_DEBUG, "sftp_write(%p): %d(%d), rem: %d",
  389. greq, greq->r_ggio.gctl_offset,
  390. greq->r_ggio.gctl_length,
  391. greq->r_ggio.gctl_length - greq->r_bufoff);
  392. if (greq->r_didseek == 0) {
  393. libssh2_sftp_seek64(greq->r_handle,
  394. greq->r_ggio.gctl_offset);
  395. greq->r_didseek = 1;
  396. }
  397. rc = libssh2_sftp_write(greq->r_handle,
  398. (char *)greq->r_ggio.gctl_data + greq->r_bufoff,
  399. greq->r_ggio.gctl_length - greq->r_bufoff);
  400. g_gate_log(LOG_DEBUG, "sftp_write ret: %d", rc);
  401. if (rc < 0 && rc != LIBSSH2_ERROR_EAGAIN)
  402. libssh2_errorx(greq->r_ssh_session,
  403. "libssh2_sftp_write");
  404. break;
  405. case BIO_FLUSH:
  406. g_gate_log(LOG_DEBUG, "sftp_flush(%p)", greq);
  407. rc = libssh2_sftp_fsync(greq->r_handle);
  408. didwork = 1; /* assume this always does work */
  409. switch (rc) {
  410. case LIBSSH2_ERROR_SFTP_PROTOCOL:
  411. greq->r_ggio.gctl_error = EOPNOTSUPP;
  412. goto completeio;
  413. case LIBSSH2_ERROR_EAGAIN:
  414. continue;
  415. case 0: /* success */
  416. goto completeio;
  417. default:
  418. libssh2_session_last_error(greq->r_ssh_session,
  419. &errmsg, NULL, 0);
  420. g_gate_log(LOG_ERR, "sftp_flush(%p) ret %d: %s",
  421. greq, rc, errmsg);
  422. greq->r_ggio.gctl_error = EIO;
  423. goto completeio;
  424. }
  425. /* NOTREACHABLE */
  426. break;
  427. default:
  428. rc = 0;
  429. g_gate_log(LOG_ERR, "unhandled op: %d",
  430. greq->r_ggio.gctl_cmd);
  431. continue;
  432. }
  433. if (rc > 0) {
  434. didwork = 1;
  435. greq->r_bufoff += rc;
  436. /*
  437. * try again on partial read/write,
  438. * might have more data pending
  439. */
  440. if ((off_t)greq->r_bufoff != greq->r_ggio.gctl_length)
  441. goto again;
  442. }
  443. if ((off_t)greq->r_bufoff == greq->r_ggio.gctl_length) {
  444. /* complete */
  445. completeio:
  446. g_gate_log(LOG_DEBUG, "cmd complete: seq: %d, cmd: %d",
  447. greq->r_ggio.gctl_seq, greq->r_ggio.gctl_cmd);
  448. g_gate_ioctl(G_GATE_CMD_DONE, &greq->r_ggio);
  449. TAILQ_REMOVE(req_pending, greq, r_next);
  450. TAILQ_INSERT_HEAD(sessqueue, greq->r_sesscache,
  451. sc_next);
  452. free(greq->r_ggio.gctl_data);
  453. free(greq);
  454. /* release this slot */
  455. sem_post(&nconn_sem);
  456. }
  457. }
  458. return didwork;
  459. }
  460. /*
  461. * sftp session management is a bit tricky.
  462. * if there is an entry in sessioncache, use that one.
  463. * if we are waiting for a new session (gsc_pend != NULL),
  464. * establish session, then open handle
  465. * when the new session completes, process the work queue
  466. */
  467. static void *
  468. proc_thread(void *arg __unused)
  469. {
  470. char scratch[32];
  471. struct ggs_reqqueue req_pending;
  472. struct timeval to;
  473. struct ggs_sess_cache *gsc, *gsc_pending;
  474. struct ggs_req *greq;
  475. LIBSSH2_SESSION *session;
  476. fd_set fdread;
  477. fd_set fdwrite;
  478. fd_set fdexcep;
  479. int sockfd;
  480. int maxfd;
  481. int error;
  482. int dir;
  483. int rc;
  484. int didwork; /* did libssh2 do any work? */
  485. g_gate_log(LOG_NOTICE, "%s: started!", __func__);
  486. TAILQ_INIT(&req_pending);
  487. /* make sure we don't block on reading */
  488. fcntl(popfd, F_SETFL, O_NONBLOCK);
  489. sockfd = start_conn.c_fd;
  490. session = start_conn.c_session;
  491. gsc = malloc(sizeof *gsc);
  492. gsc->sc_ssh_session = start_conn.c_session;
  493. gsc->sc_session = start_conn.c_sftp_session;
  494. gsc->sc_handle = start_conn.c_handle;
  495. TAILQ_INSERT_HEAD(&session_cache, gsc, sc_next);
  496. gsc = NULL;
  497. gsc_pending = NULL;
  498. didwork = 0;
  499. libssh2_session_set_blocking(session, 0);
  500. for (;;) {
  501. //g_gate_log(LOG_DEBUG, "looping");
  502. if (!didwork) {
  503. /* setup polling loop */
  504. maxfd = -1;
  505. FD_ZERO(&fdread);
  506. FD_ZERO(&fdwrite);
  507. FD_ZERO(&fdexcep);
  508. dir = libssh2_session_block_directions(session);
  509. if (dir & LIBSSH2_SESSION_BLOCK_INBOUND ||
  510. gsc_pending != NULL)
  511. FD_SET(sockfd, &fdread);
  512. if (dir & LIBSSH2_SESSION_BLOCK_OUTBOUND)
  513. FD_SET(sockfd, &fdwrite);
  514. /* add in the pop descriptor */
  515. FD_SET(popfd, &fdread);
  516. maxfd = MAX(popfd, sockfd);
  517. g_gate_log(LOG_DEBUG, "selecting: %s %s, " \
  518. "read: sockfd: %d, popfd: %d, write: sockfd: %d",
  519. (dir & LIBSSH2_SESSION_BLOCK_INBOUND) ? "inbound" :
  520. "", (dir & LIBSSH2_SESSION_BLOCK_OUTBOUND) ?
  521. "outbound" : "", FD_ISSET(sockfd, &fdread),
  522. FD_ISSET(popfd, &fdread),
  523. FD_ISSET(sockfd, &fdwrite));
  524. to = (struct timeval){ .tv_sec = 1, .tv_usec = 1000 };
  525. (void)to;
  526. rc = select(maxfd + 1, &fdread, &fdwrite, &fdexcep,
  527. NULL);
  528. switch (rc) {
  529. case -1:
  530. g_gate_log(LOG_ERR, "%s: select failed: %s",
  531. __func__, strerror(errno));
  532. break;
  533. case 0:
  534. default:
  535. g_gate_log(LOG_DEBUG, "select: %d, " \
  536. "read: sockfd: %d, popfd: %d, " \
  537. "write: sockfd: %d", rc,
  538. FD_ISSET(sockfd, &fdread),
  539. FD_ISSET(popfd, &fdread),
  540. FD_ISSET(sockfd, &fdwrite));
  541. break;
  542. }
  543. }
  544. didwork = 0;
  545. /* process pending, so any completed can be reused */
  546. didwork |= process_pending(&req_pending, &session_cache);
  547. if (FD_ISSET(popfd, &fdread)) {
  548. /* read off the tokens */
  549. g_gate_log(LOG_DEBUG, "popping");
  550. read(popfd, scratch, sizeof scratch);
  551. for (;;) {
  552. procreq:
  553. /* get the request */
  554. error = pthread_mutex_lock(&procqueue_mtx);
  555. assert(error == 0);
  556. greq = TAILQ_FIRST(&procqueue);
  557. g_gate_log(LOG_DEBUG, "greq: %p", greq);
  558. if (greq != NULL)
  559. TAILQ_REMOVE(&procqueue, greq, r_next);
  560. error = pthread_mutex_unlock(&procqueue_mtx);
  561. assert(error == 0);
  562. /* no more to process */
  563. if (greq == NULL)
  564. break;
  565. gsc = TAILQ_FIRST(&session_cache);
  566. if (gsc == NULL) {
  567. if (gsc_pending == NULL) {
  568. /* need new session */
  569. g_gate_log(LOG_DEBUG,
  570. "need new session");
  571. gsc_pending =
  572. malloc(sizeof *gsc);
  573. gsc_pending->sc_ssh_session =
  574. session;
  575. gsc_pending->sc_session = NULL;
  576. gsc_pending->sc_handle = NULL;
  577. }
  578. /* put back request */
  579. error =
  580. pthread_mutex_lock(&procqueue_mtx);
  581. assert(error == 0);
  582. TAILQ_INSERT_HEAD(&procqueue, greq,
  583. r_next);
  584. error = pthread_mutex_unlock(
  585. &procqueue_mtx);
  586. assert(error == 0);
  587. break;
  588. } else {
  589. /* process request */
  590. TAILQ_REMOVE(&session_cache, gsc,
  591. sc_next);
  592. greq->r_sesscache = gsc;
  593. gsc = NULL;
  594. greq->r_bufoff = 0;
  595. TAILQ_INSERT_TAIL(&req_pending, greq,
  596. r_next);
  597. greq = NULL;
  598. }
  599. }
  600. }
  601. if (gsc_pending != NULL) {
  602. /* we are creating a new session */
  603. if (gsc_pending->sc_session == NULL) {
  604. didwork = 1;
  605. gsc_pending->sc_session =
  606. libssh2_sftp_init(session);
  607. }
  608. if (gsc_pending->sc_session != NULL) {
  609. didwork = 1;
  610. gsc_pending->sc_handle = libssh2_sftp_open(
  611. gsc_pending->sc_session, "fstest/data.img",
  612. get_open_flags(), 0);
  613. }
  614. g_gate_log(LOG_DEBUG,
  615. "pending: session: %p, handle: %p",
  616. gsc_pending->sc_session, gsc_pending->sc_handle);
  617. /* we have a fully initalized entry, use it */
  618. if (gsc_pending->sc_handle != NULL) {
  619. g_gate_log(LOG_DEBUG, "new session created");
  620. TAILQ_INSERT_HEAD(&session_cache, gsc_pending,
  621. sc_next);
  622. gsc_pending = NULL;
  623. didwork = 1;
  624. goto procreq;
  625. }
  626. }
  627. /* kick of any queued requests from above */
  628. didwork |= process_pending(&req_pending, &session_cache);
  629. }
  630. g_gate_log(LOG_DEBUG, "%s: Died.", __func__);
  631. pthread_exit(NULL);
  632. }
  633. static void
  634. ggatessh_makepidfile(void)
  635. {
  636. pid_t otherpid;
  637. if (!g_gate_verbose) {
  638. if (ggatessh_pidfile == NULL) {
  639. asprintf(&ggatessh_pidfile,
  640. _PATH_VARRUN "/ggatessh.ggate%d.pid", unit);
  641. err(EXIT_FAILURE, "Cannot allocate memory for pidfile");
  642. }
  643. pfh = pidfile_open(ggatessh_pidfile, 0600, &otherpid);
  644. if (pfh == NULL) {
  645. if (errno == EEXIST) {
  646. errx(EXIT_FAILURE,
  647. "Daemon already running, pid: %jd.",
  648. (intmax_t)otherpid);
  649. }
  650. err(EXIT_FAILURE, "Cannot open/create pidfile");
  651. }
  652. }
  653. }
  654. static void
  655. mydaemon(void)
  656. {
  657. if (g_gate_verbose > 0)
  658. return;
  659. if (daemon(0, 0) == 0)
  660. return;
  661. if (action == CREATE)
  662. g_gate_destroy(unit, 1);
  663. err(EXIT_FAILURE, "Cannot daemonize");
  664. }
  665. static int
  666. g_gatessh_connect(void)
  667. {
  668. struct ggs_connection conn;
  669. LIBSSH2_SFTP_ATTRIBUTES attrs;
  670. int rc;
  671. /* get the remote's size */
  672. conn = make_connection();
  673. rc = libssh2_sftp_fstat(conn.c_handle, &attrs);
  674. /* only allow regular and char devices */
  675. if (!(LIBSSH2_SFTP_S_ISREG(attrs.flags) ||
  676. !LIBSSH2_SFTP_S_ISCHR(attrs.flags))) {
  677. g_gate_xlog("remote file not a regular file");
  678. }
  679. mediasize = attrs.filesize;
  680. g_gate_log(LOG_DEBUG, "got mediasize: %zd", mediasize);
  681. start_conn = conn; /* cache to use later */
  682. return 1;
  683. }
  684. static void
  685. g_gatessh_start(void)
  686. {
  687. int filedes[2];
  688. int error;
  689. pipe(filedes);
  690. pushfd = filedes[1];
  691. popfd = filedes[0];
  692. error = pthread_mutex_init(&procqueue_mtx, NULL);
  693. if (error != 0) {
  694. g_gate_xlog("pthread_mutex_init(inqueue_mtx): %s.",
  695. strerror(error));
  696. }
  697. sem_init(&nconn_sem, 0, maxconnections);
  698. error = pthread_create(&proctd, NULL, proc_thread, NULL);
  699. if (error != 0) {
  700. g_gate_destroy(unit, 1); /* XXX - remove */
  701. g_gate_xlog("pthread_create(proc_thread): %s.",
  702. strerror(error));
  703. }
  704. pthread_set_name_np(proctd, "proc");
  705. reqtd = pthread_self();
  706. pthread_set_name_np(reqtd, "req");
  707. req_thread(NULL);
  708. /* Disconnected. */
  709. close(pushfd);
  710. close(popfd);
  711. }
  712. static void
  713. signop(int sig __unused)
  714. {
  715. /* Do nothing. */
  716. }
  717. static void
  718. g_gatessh_loop(void)
  719. {
  720. struct g_gate_ctl_cancel ggioc;
  721. signal(SIGUSR1, signop);
  722. for (;;) {
  723. g_gatessh_start();
  724. g_gate_log(LOG_NOTICE, "Disconnected [%s@%s:%s]. Connecting...",
  725. username, hostname, imgpath);
  726. ggioc.gctl_version = G_GATE_VERSION;
  727. ggioc.gctl_unit = unit;
  728. ggioc.gctl_seq = 0;
  729. g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc);
  730. }
  731. }
  732. static void
  733. g_gatessh_create(void)
  734. {
  735. struct g_gate_ctl_create ggioc;
  736. if (!g_gatessh_connect())
  737. g_gate_xlog("Cannot connect: %s.", strerror(errno));
  738. /*
  739. * Ok, got both sockets, time to create provider.
  740. */
  741. memset(&ggioc, 0, sizeof(ggioc));
  742. ggioc.gctl_version = G_GATE_VERSION;
  743. ggioc.gctl_mediasize = mediasize;
  744. ggioc.gctl_sectorsize = sectorsize;
  745. ggioc.gctl_flags = flags;
  746. ggioc.gctl_maxcount = queue_size;
  747. ggioc.gctl_timeout = timeout;
  748. ggioc.gctl_unit = unit;
  749. snprintf(ggioc.gctl_info, sizeof(ggioc.gctl_info), "%s@%s:%s",
  750. username, hostname, imgpath);
  751. g_gate_ioctl(G_GATE_CMD_CREATE, &ggioc);
  752. if (unit == -1) {
  753. printf("%s%u\n", G_GATE_PROVIDER_NAME, ggioc.gctl_unit);
  754. fflush(stdout);
  755. }
  756. unit = ggioc.gctl_unit;
  757. ggatessh_makepidfile();
  758. mydaemon();
  759. if (pfh != NULL)
  760. pidfile_write(pfh);
  761. g_gatessh_loop();
  762. }
  763. static void
  764. g_gatessh_rescue(void)
  765. {
  766. struct g_gate_ctl_cancel ggioc;
  767. int error;
  768. g_gate_log(LOG_ERR, "a");
  769. if (!g_gatessh_connect())
  770. g_gate_xlog("Cannot connect: %s.", strerror(errno));
  771. g_gate_log(LOG_ERR, "b");
  772. ggioc = (struct g_gate_ctl_cancel){
  773. .gctl_version = G_GATE_VERSION,
  774. .gctl_unit = unit,
  775. .gctl_seq = 0,
  776. };
  777. g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc);
  778. ggatessh_makepidfile();
  779. mydaemon();
  780. pidfile_write(pfh);
  781. error = pthread_create(&mediatd, NULL, mediachg, NULL);
  782. if (error != 0)
  783. g_gate_xlog("unable to create mediasize change thread",
  784. strerror(errno));
  785. g_gatessh_loop();
  786. }
  787. /*
  788. * handle two methods of specifying things.
  789. * if only one arg, in the future split it how ssh does:
  790. * [user[:password]@]<host>:<img>
  791. * and URI:
  792. * sftp://[user[:password]@]<host>[:<port>]/<img>
  793. *
  794. * If both are specified, it's the same as above, but split
  795. * on the :.
  796. */
  797. static void
  798. handle_params(int argc, char *argv[])
  799. {
  800. if (username == NULL) {
  801. username = getenv("USER");
  802. if (username == NULL) {
  803. err(EXIT_FAILURE,
  804. "USER environment variable not present, set, "
  805. "or specify via -l argument.");
  806. }
  807. }
  808. if (argc != 2)
  809. usage();
  810. hostname = argv[0];
  811. imgpath = argv[1];
  812. }
  813. int
  814. main(int argc, char *argv[])
  815. {
  816. int rc;
  817. if (argc < 2)
  818. usage();
  819. if (strcasecmp(argv[1], "create") == 0)
  820. action = CREATE;
  821. else if (strcasecmp(argv[1], "destroy") == 0)
  822. action = DESTROY;
  823. else if (strcasecmp(argv[1], "list") == 0)
  824. action = LIST;
  825. else if (strcasecmp(argv[1], "rescue") == 0)
  826. action = RESCUE;
  827. else
  828. usage();
  829. argc -= 1;
  830. argv += 1;
  831. for (;;) {
  832. int ch;
  833. ch = getopt(argc, argv, "fF:i:l:o:p:q:r:s:t:u:v");
  834. if (ch == -1)
  835. break;
  836. switch (ch) {
  837. case 'f':
  838. if (action != DESTROY)
  839. usage();
  840. force = 1;
  841. break;
  842. case 'F':
  843. ggatessh_pidfile = optarg;
  844. break;
  845. case 'i':
  846. identityfile = optarg;
  847. break;
  848. case 'l':
  849. username = optarg;
  850. break;
  851. case 'o':
  852. if (action != CREATE && action != RESCUE)
  853. usage();
  854. if (strcasecmp("ro", optarg) == 0)
  855. flags = G_GATE_FLAG_READONLY;
  856. else if (strcasecmp("wo", optarg) == 0)
  857. flags = G_GATE_FLAG_WRITEONLY;
  858. else if (strcasecmp("rw", optarg) == 0)
  859. flags = 0;
  860. else {
  861. errx(EXIT_FAILURE,
  862. "Invalid argument for '-o' option.");
  863. }
  864. break;
  865. case 'p':
  866. sshport = optarg;
  867. break;
  868. case 'q':
  869. if (action != CREATE)
  870. usage();
  871. errno = 0;
  872. queue_size = strtoul(optarg, NULL, 10);
  873. if (queue_size == 0 && errno != 0)
  874. errx(EXIT_FAILURE, "Invalid queue_size.");
  875. break;
  876. case 'r':
  877. if (action != CREATE && action != RESCUE)
  878. usage();
  879. errno = 0;
  880. maxconnections = strtoul(optarg, NULL, 10);
  881. if (maxconnections == 0 && errno != 0)
  882. errx(EXIT_FAILURE, "Invalid queue_size.");
  883. break;
  884. case 's':
  885. if (action != CREATE)
  886. usage();
  887. errno = 0;
  888. sectorsize = strtoul(optarg, NULL, 10);
  889. if (sectorsize == 0 && errno != 0)
  890. errx(EXIT_FAILURE, "Invalid sectorsize.");
  891. break;
  892. case 't':
  893. if (action != CREATE)
  894. usage();
  895. errno = 0;
  896. timeout = strtoul(optarg, NULL, 10);
  897. if (timeout == 0 && errno != 0)
  898. errx(EXIT_FAILURE, "Invalid timeout.");
  899. break;
  900. case 'u':
  901. errno = 0;
  902. unit = strtol(optarg, NULL, 10);
  903. if (unit == 0 && errno != 0)
  904. errx(EXIT_FAILURE, "Invalid unit number.");
  905. break;
  906. case 'v':
  907. if (action == DESTROY)
  908. usage();
  909. g_gate_verbose++;
  910. break;
  911. default:
  912. usage();
  913. }
  914. }
  915. argc -= optind;
  916. argv += optind;
  917. g_gate_log(LOG_DEBUG, "libssh2_init");
  918. rc = libssh2_init(0);
  919. if (rc != 0) {
  920. fprintf(stderr, "libssh2 initialization failed (%d)\n", rc);
  921. return 1;
  922. }
  923. switch (action) {
  924. case CREATE:
  925. if (argc < 1 || argc > 2)
  926. usage();
  927. handle_params(argc, argv);
  928. g_gate_load_module();
  929. g_gate_open_device();
  930. g_gatessh_create();
  931. break;
  932. case DESTROY:
  933. if (argc != 0)
  934. usage();
  935. if (unit == -1) {
  936. fprintf(stderr, "Required unit number.\n");
  937. usage();
  938. }
  939. g_gate_verbose = 1;
  940. g_gate_open_device();
  941. g_gate_destroy(unit, force);
  942. break;
  943. case LIST:
  944. g_gate_list(unit, g_gate_verbose);
  945. break;
  946. case RESCUE:
  947. if (argc < 1 || argc > 2)
  948. usage();
  949. if (unit == -1) {
  950. fprintf(stderr, "Required unit number.\n");
  951. usage();
  952. }
  953. handle_params(argc, argv);
  954. g_gate_open_device();
  955. g_gatessh_rescue();
  956. break;
  957. case UNSET:
  958. default:
  959. usage();
  960. }
  961. g_gate_close_device();
  962. exit(EXIT_SUCCESS);
  963. }