geom_gate userland utility improvements
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

1261 lines
29 KiB

  1. /*-
  2. * SPDX-License-Identifier: BSD-2-Clause-FreeBSD
  3. *
  4. * Copyright (c) 2004 Pawel Jakub Dawidek <pjd@FreeBSD.org>
  5. * All rights reserved.
  6. * Copyright 2020 John-Mark Gurney <jmg@FreeBSD.org>
  7. *
  8. * Redistribution and use in source and binary forms, with or without
  9. * modification, are permitted provided that the following conditions
  10. * are met:
  11. * 1. Redistributions of source code must retain the above copyright
  12. * notice, this list of conditions and the following disclaimer.
  13. * 2. Redistributions in binary form must reproduce the above copyright
  14. * notice, this list of conditions and the following disclaimer in the
  15. * documentation and/or other materials provided with the distribution.
  16. *
  17. * THIS SOFTWARE IS PROVIDED BY THE AUTHORS AND CONTRIBUTORS ``AS IS'' AND
  18. * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  19. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  20. * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE LIABLE
  21. * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  22. * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  23. * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  24. * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  25. * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  26. * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  27. * SUCH DAMAGE.
  28. *
  29. * $FreeBSD$
  30. */
  31. #include <stdio.h>
  32. #include <stdlib.h>
  33. #include <fcntl.h>
  34. #include <libutil.h>
  35. #include <paths.h>
  36. #include <pthread.h>
  37. #include <pthread_np.h>
  38. #include <err.h>
  39. #include <errno.h>
  40. #include <assert.h>
  41. #include <sys/param.h>
  42. #include <sys/ioctl.h>
  43. #include <sys/queue.h>
  44. #include <sys/socket.h>
  45. #include <sys/syslog.h>
  46. #include <sys/bio.h>
  47. #include <netdb.h>
  48. #include <semaphore.h>
  49. #include <libssh2.h>
  50. #include <libssh2_sftp.h>
  51. #include <geom/gate/g_gate.h>
  52. #include "ggate.h"
  53. static enum { UNSET, CREATE, DESTROY, LIST, RESCUE } action = UNSET;
  54. struct ggs_connection {
  55. int c_fd;
  56. int *c_didwork; /* allocated memory */
  57. LIBSSH2_SESSION *c_session;
  58. LIBSSH2_SFTP *c_sftp_session;
  59. LIBSSH2_SFTP_HANDLE *c_handle;
  60. };
  61. static struct pidfh *pfh;
  62. static const char *username;
  63. static const char *hostname;
  64. static const char *imgpath;
  65. static const char *identityfile;
  66. static const char *pubkeyfile;
  67. static const char *sshport = "22";
  68. static char *ggatessh_pidfile;
  69. static int unit = G_GATE_UNIT_AUTO;
  70. static unsigned flags = 0;
  71. static int force = 0;
  72. static unsigned queue_size = G_GATE_QUEUE_SIZE;
  73. static off_t mediasize;
  74. static unsigned sectorsize = 4096;
  75. static unsigned timeout = G_GATE_TIMEOUT;
  76. static int pushfd, popfd; /* work semaphore */
  77. static pthread_t reqtd, proctd, mediatd;
  78. static unsigned maxconnections = 32;
  79. static struct ggs_connection start_conn; /* only used once/first */
  80. struct ggs_sess_cache {
  81. LIBSSH2_SESSION *sc_ssh_session;
  82. LIBSSH2_SFTP *sc_session;
  83. LIBSSH2_SFTP_HANDLE *sc_handle;
  84. TAILQ_ENTRY(ggs_sess_cache) sc_next;
  85. };
  86. struct ggs_req {
  87. struct g_gate_ctl_io r_ggio;
  88. #define r_ssh_session r_sesscache->sc_ssh_session
  89. #define r_session r_sesscache->sc_session
  90. #define r_handle r_sesscache->sc_handle
  91. struct ggs_sess_cache *r_sesscache;
  92. size_t r_bufoff;
  93. int r_didseek;
  94. TAILQ_ENTRY(ggs_req) r_next;
  95. };
  96. static TAILQ_HEAD(ggs_reqqueue, ggs_req) procqueue =
  97. TAILQ_HEAD_INITIALIZER(procqueue);
  98. static TAILQ_HEAD(ggs_sessqueue, ggs_sess_cache) session_cache =
  99. TAILQ_HEAD_INITIALIZER(session_cache);
  100. static sem_t nconn_sem;
  101. static pthread_mutex_t procqueue_mtx;
  102. static void
  103. usage(void)
  104. {
  105. fprintf(stderr, "usage: %s create [-v] [-o <ro|wo|rw>] "
  106. "[-F pidfile] [-i identifyfile] "
  107. "[-l username] [-p port] "
  108. "[-q queue_size] [-s sectorsize] [-r nrequests] "
  109. "[-t timeout] [-u unit] <host> <path>\n", getprogname());
  110. fprintf(stderr, " %s rescue [-v] [-o <ro|wo|rw>] "
  111. "[-F pidfile] [-i identifyfile] "
  112. "[-l username] [-p port] "
  113. "[-r nrequests] <-u unit> <host> <path>\n", getprogname());
  114. fprintf(stderr, " %s destroy [-f] <-u unit>\n", getprogname());
  115. fprintf(stderr, " %s list [-v] [-u unit]\n", getprogname());
  116. exit(EXIT_FAILURE);
  117. }
  118. #if 0
  119. static void
  120. libssh2_debug_error(LIBSSH2_SESSION *session, const char *info)
  121. {
  122. char *errmsg;
  123. int error;
  124. int rc;
  125. error = libssh2_session_last_errno(session);
  126. rc = libssh2_session_last_error(session, &errmsg, NULL, 0);
  127. g_gate_log(LOG_DEBUG, "%s: %s(%d)", info, errmsg, error);
  128. }
  129. #endif
  130. static void
  131. libssh2_errorx(LIBSSH2_SESSION *session, const char *info)
  132. {
  133. char *errmsg;
  134. int rc;
  135. rc = libssh2_session_last_error(session, &errmsg, NULL, 0);
  136. g_gate_xlog("%s: %s", info, errmsg);
  137. }
  138. /*
  139. * Connect to the service (or port number) on host.
  140. *
  141. * Somewhat copied from freebsd/lib/libfetch/fetch_common.c:fetch_connect.
  142. */
  143. static int
  144. tcp_connect(const char *host, const char *service, int af)
  145. {
  146. struct addrinfo hints, *sai, *sai0;
  147. int err, sd;
  148. hints = (struct addrinfo){
  149. .ai_family = af,
  150. .ai_socktype = SOCK_STREAM,
  151. .ai_flags = AI_ADDRCONFIG,
  152. };
  153. /* resolve server address */
  154. if (getaddrinfo(host, service, &hints, &sai0) == -1)
  155. return -1;
  156. if (sai0 == NULL) {
  157. errno = ENOENT;
  158. return -1;
  159. }
  160. sd = -1;
  161. err = -1;
  162. /* try each server address in turn */
  163. for (sai = sai0; sai != NULL; sai = sai->ai_next) {
  164. /* open socket */
  165. if ((sd = socket(sai->ai_family, sai->ai_socktype,
  166. sai->ai_protocol)) == -1)
  167. break;
  168. /* attempt to connect to server address */
  169. if ((err = connect(sd, sai->ai_addr, sai->ai_addrlen)) == 0)
  170. break;
  171. /* clean up before next attempt */
  172. close(sd);
  173. sd = -1;
  174. }
  175. err = errno;
  176. fflush(stdout);
  177. freeaddrinfo(sai0);
  178. /* Fully close if it was opened; otherwise just don't leak the fd. */
  179. if (err == -1 && sd >= 0)
  180. close(sd);
  181. errno = err;
  182. return sd;
  183. }
  184. static int
  185. get_open_flags()
  186. {
  187. switch (flags) {
  188. case G_GATE_FLAG_READONLY:
  189. return LIBSSH2_FXF_READ;
  190. case G_GATE_FLAG_WRITEONLY:
  191. return LIBSSH2_FXF_WRITE;
  192. default:
  193. return LIBSSH2_FXF_READ|LIBSSH2_FXF_WRITE;
  194. }
  195. }
  196. static struct ggs_connection
  197. make_connection(void)
  198. {
  199. LIBSSH2_SESSION *session;
  200. LIBSSH2_SFTP *sftp_session;
  201. LIBSSH2_SFTP_HANDLE *handle;
  202. char *tmp;
  203. int *didworkp;
  204. int sockfd;
  205. int rc;
  206. sockfd = tcp_connect(hostname, sshport, 0);
  207. if (sockfd == -1) {
  208. if (errno == ENOENT)
  209. g_gate_xlog("tcp_connect: failed to lookup %s",
  210. hostname);
  211. g_gate_xlog("tcp_connect: %s.", strerror(errno));
  212. }
  213. didworkp = malloc(sizeof *didworkp);
  214. if (didworkp == NULL)
  215. g_gate_xlog("malloc failed.");
  216. /* session = libssh2_session_init(); */
  217. session = libssh2_session_init_ex(NULL, NULL, NULL, didworkp);
  218. if (session == NULL)
  219. libssh2_errorx(session, "libssh2_session_init");
  220. if (g_gate_verbose)
  221. libssh2_trace(session, LIBSSH2_TRACE_SOCKET|LIBSSH2_TRACE_KEX|
  222. LIBSSH2_TRACE_AUTH|LIBSSH2_TRACE_CONN|LIBSSH2_TRACE_SFTP|
  223. LIBSSH2_TRACE_ERROR|LIBSSH2_TRACE_PUBLICKEY);
  224. /* XXX - libssh2_session_flag to enable compression */
  225. rc = libssh2_session_handshake(session, sockfd);
  226. if (rc)
  227. libssh2_errorx(session, "libssh2_session_handshake");
  228. libssh2_session_set_blocking(session, 1);
  229. /* XXX - known hosts handling */
  230. if (identityfile == NULL) {
  231. tmp = NULL;
  232. asprintf(&tmp, "%s/.ssh/id_rsa", getenv("HOME"));
  233. identityfile = tmp;
  234. tmp = NULL;
  235. }
  236. asprintf(&tmp, "%s.pub", identityfile);
  237. pubkeyfile = tmp;
  238. tmp = NULL;
  239. g_gate_log(LOG_DEBUG, "trying identity file: %s", identityfile);
  240. rc = libssh2_userauth_publickey_fromfile(session, username, pubkeyfile,
  241. identityfile, NULL);
  242. //rc = libssh2_userauth_password(session, "freebsd", "freebsd");
  243. if (rc) {
  244. g_gate_log(LOG_ERR, "identity file: %s", identityfile);
  245. libssh2_errorx(session, "libssh2_userauth_publickey_fromfile");
  246. }
  247. /* always need at least one */
  248. sftp_session = libssh2_sftp_init(session);
  249. if (sftp_session == NULL)
  250. g_gate_xlog("libssh2_sftp_init");
  251. handle = libssh2_sftp_open(sftp_session, imgpath, get_open_flags(), 0);
  252. if (handle == NULL) {
  253. g_gate_log(LOG_ERR, "image file: %s", imgpath);
  254. libssh2_errorx(session, "libssh2_sftp_open");
  255. }
  256. return (struct ggs_connection){
  257. .c_fd = sockfd,
  258. .c_didwork = didworkp,
  259. .c_session = session,
  260. .c_sftp_session = sftp_session,
  261. .c_handle = handle,
  262. };
  263. }
  264. /*
  265. * Resize is required to run in a thread because the resize requires
  266. * that I/O is able to complete before it can return.
  267. */
  268. static void *
  269. mediachg(void *arg __unused)
  270. {
  271. struct g_gate_ctl_modify ggiom;
  272. /* update mediasize, it may have changed */
  273. ggiom = (struct g_gate_ctl_modify){
  274. .gctl_version = G_GATE_VERSION,
  275. .gctl_unit = unit,
  276. .gctl_modify = GG_MODIFY_MEDIASIZE,
  277. .gctl_mediasize = mediasize,
  278. };
  279. g_gate_ioctl(G_GATE_CMD_MODIFY, &ggiom);
  280. g_gate_log(LOG_DEBUG, "updated ggate%d mediasize to %zd", unit,
  281. mediasize);
  282. return NULL;
  283. }
  284. static void *
  285. req_thread(void *arg __unused)
  286. {
  287. struct ggs_req *greq;
  288. static char *buf;
  289. int buflen = 1024*1024;
  290. int error;
  291. g_gate_log(LOG_NOTICE, "%s: started!", __func__);
  292. greq = NULL;
  293. for (;;) {
  294. if (greq == NULL)
  295. greq = malloc(sizeof *greq);
  296. if (buf == NULL)
  297. buf = malloc(buflen);
  298. if (greq == NULL || buf == NULL) {
  299. /* XXX */
  300. g_gate_log(LOG_ERR, "Unable to allocate memory.");
  301. exit(1);
  302. }
  303. *greq = (struct ggs_req){
  304. .r_ggio = (struct g_gate_ctl_io){
  305. .gctl_version = G_GATE_VERSION,
  306. .gctl_unit = unit,
  307. .gctl_data = buf,
  308. .gctl_length = buflen,
  309. .gctl_error = 0,
  310. },
  311. };
  312. //g_gate_log(LOG_DEBUG, "waiting for ioctl");
  313. g_gate_ioctl(G_GATE_CMD_START, &greq->r_ggio);
  314. //g_gate_log(LOG_DEBUG, "got ioctl");
  315. error = greq->r_ggio.gctl_error;
  316. switch (error) {
  317. case 0:
  318. break;
  319. case ECANCELED:
  320. /* Exit gracefully. */
  321. g_gate_close_device();
  322. exit(EXIT_SUCCESS);
  323. case ENXIO:
  324. default:
  325. g_gate_xlog("ioctl(/dev/%s): %s.", G_GATE_CTL_NAME,
  326. strerror(error));
  327. }
  328. g_gate_log(LOG_DEBUG, "ggio(%p), ver: %u, unit: %d, seq: %llu, "
  329. "cmd: %u, offset: %llu, len: %llu", greq,
  330. greq->r_ggio.gctl_version, greq->r_ggio.gctl_unit,
  331. greq->r_ggio.gctl_seq, greq->r_ggio.gctl_cmd,
  332. greq->r_ggio.gctl_offset, greq->r_ggio.gctl_length);
  333. switch (greq->r_ggio.gctl_cmd) {
  334. case BIO_READ:
  335. /* use a correctly sized allocation */
  336. greq->r_ggio.gctl_data =
  337. malloc(greq->r_ggio.gctl_length);
  338. break;
  339. case BIO_WRITE:
  340. /* r_ggio takes ownership of buf now */
  341. buf = NULL;
  342. break;
  343. case BIO_DELETE:
  344. case BIO_FLUSH:
  345. greq->r_ggio.gctl_data = NULL;
  346. break;
  347. default:
  348. greq->r_ggio.gctl_error = EOPNOTSUPP;
  349. g_gate_ioctl(G_GATE_CMD_DONE, &greq->r_ggio);
  350. continue; /* return EOPNOTSUPP */
  351. break;
  352. }
  353. //g_gate_log(LOG_DEBUG, "waiting for slot");
  354. sem_wait(&nconn_sem);
  355. #if 0
  356. int semval;
  357. sem_getvalue(&nconn_sem, &semval);
  358. g_gate_log(LOG_DEBUG, "slots: %d", semval);
  359. #endif
  360. error = pthread_mutex_lock(&procqueue_mtx);
  361. assert(error == 0);
  362. TAILQ_INSERT_TAIL(&procqueue, greq, r_next);
  363. error = pthread_mutex_unlock(&procqueue_mtx);
  364. assert(error == 0);
  365. /* notify processing thread a request is waiting */
  366. error = write(pushfd, "T", 1);
  367. if (error != 1)
  368. g_gate_xlog("write pushfd: %d, error: %s.", error,
  369. strerror(error));
  370. /* pass ownership */
  371. greq = NULL;
  372. }
  373. g_gate_log(LOG_DEBUG, "%s: Died.", __func__);
  374. return (NULL);
  375. }
  376. static const char *
  377. sftperrno_str(int err)
  378. {
  379. const char *strs[] = {
  380. [0] = "ok",
  381. [1] = "eof",
  382. [2] = "no such file",
  383. [3] = "permission denied",
  384. [4] = "failure",
  385. [5] = "bad message",
  386. [6] = "no connection",
  387. [7] = "connection lost",
  388. [8] = "op unsupported",
  389. };
  390. if (err < 0 || err >= (int)nitems(strs))
  391. return "invalid errno";
  392. return strs[err];
  393. }
  394. static int
  395. process_pending(struct ggs_reqqueue *req_pending,
  396. struct ggs_sessqueue *sessqueue)
  397. {
  398. struct ggs_req *greq, *greq2;
  399. char *errmsg;
  400. int rc;
  401. int sftperrno;
  402. int didwork;
  403. didwork = 0;
  404. /* Work on each pending request */
  405. TAILQ_FOREACH_SAFE(greq, req_pending, r_next, greq2) {
  406. again:
  407. switch (greq->r_ggio.gctl_cmd) {
  408. case BIO_READ:
  409. g_gate_log(LOG_DEBUG, "sftp_read(%p): %d(%d), rem: %d",
  410. greq, greq->r_ggio.gctl_offset,
  411. greq->r_ggio.gctl_length,
  412. greq->r_ggio.gctl_length - greq->r_bufoff);
  413. if (greq->r_didseek == 0) {
  414. libssh2_sftp_seek64(greq->r_handle,
  415. greq->r_ggio.gctl_offset);
  416. greq->r_didseek = 1;
  417. }
  418. rc = libssh2_sftp_read(greq->r_handle,
  419. (char *)greq->r_ggio.gctl_data + greq->r_bufoff,
  420. greq->r_ggio.gctl_length - greq->r_bufoff);
  421. g_gate_log(LOG_DEBUG, "sftp_read ret: %d", rc);
  422. if (rc < 0 && rc != LIBSSH2_ERROR_EAGAIN)
  423. g_gate_log(LOG_ERR, "libssh2_sftp_read");
  424. break;
  425. case BIO_WRITE:
  426. g_gate_log(LOG_DEBUG, "sftp_write(%p): %d(%d), rem: %d",
  427. greq, greq->r_ggio.gctl_offset,
  428. greq->r_ggio.gctl_length,
  429. greq->r_ggio.gctl_length - greq->r_bufoff);
  430. if (greq->r_didseek == 0) {
  431. libssh2_sftp_seek64(greq->r_handle,
  432. greq->r_ggio.gctl_offset);
  433. greq->r_didseek = 1;
  434. }
  435. rc = libssh2_sftp_write(greq->r_handle,
  436. (char *)greq->r_ggio.gctl_data + greq->r_bufoff,
  437. greq->r_ggio.gctl_length - greq->r_bufoff);
  438. g_gate_log(LOG_DEBUG, "sftp_write ret: %d", rc);
  439. if (rc < 0 && rc != LIBSSH2_ERROR_EAGAIN)
  440. libssh2_errorx(greq->r_ssh_session,
  441. "libssh2_sftp_write");
  442. break;
  443. case BIO_FLUSH:
  444. g_gate_log(LOG_DEBUG, "sftp_flush(%p)", greq);
  445. rc = libssh2_sftp_fsync(greq->r_handle);
  446. didwork = 1; /* assume this always does work */
  447. switch (rc) {
  448. case LIBSSH2_ERROR_SFTP_PROTOCOL:
  449. greq->r_ggio.gctl_error = EOPNOTSUPP;
  450. goto completeio;
  451. case LIBSSH2_ERROR_EAGAIN:
  452. continue;
  453. case 0: /* success */
  454. goto completeio;
  455. default:
  456. libssh2_session_last_error(greq->r_ssh_session,
  457. &errmsg, NULL, 0);
  458. g_gate_log(LOG_ERR, "sftp_flush(%p) ret %d: %s",
  459. greq, rc, errmsg);
  460. greq->r_ggio.gctl_error = EIO;
  461. goto completeio;
  462. }
  463. /* NOTREACHABLE */
  464. break;
  465. case BIO_DELETE:
  466. g_gate_log(LOG_DEBUG, "sftp_punchhole(%p)", greq);
  467. rc = libssh2_sftp_punchhole(greq->r_handle,
  468. greq->r_ggio.gctl_offset, greq->r_ggio.gctl_length);
  469. didwork = 1; /* assume this always does work */
  470. switch (rc) {
  471. case LIBSSH2_ERROR_SFTP_PROTOCOL:
  472. greq->r_ggio.gctl_error = EOPNOTSUPP;
  473. sftperrno = libssh2_sftp_last_error(
  474. greq->r_session);
  475. g_gate_log(LOG_DEBUG, "sftp_punchhole(%p) errno: %s(%d)", greq,
  476. sftperrno_str(sftperrno), sftperrno);
  477. goto completeio;
  478. case LIBSSH2_ERROR_EAGAIN:
  479. continue;
  480. case 0: /* success */
  481. goto completeio;
  482. default:
  483. libssh2_session_last_error(greq->r_ssh_session,
  484. &errmsg, NULL, 0);
  485. g_gate_log(LOG_ERR, "sftp_punchhole(%p) ret %d: %s",
  486. greq, rc, errmsg);
  487. greq->r_ggio.gctl_error = EIO;
  488. goto completeio;
  489. }
  490. /* NOTREACHABLE */
  491. break;
  492. default:
  493. rc = 0;
  494. g_gate_log(LOG_ERR, "unhandled op: %d",
  495. greq->r_ggio.gctl_cmd);
  496. continue;
  497. }
  498. if (rc > 0) {
  499. didwork = 1;
  500. greq->r_bufoff += rc;
  501. /*
  502. * try again on partial read/write,
  503. * might have more data pending
  504. */
  505. if ((off_t)greq->r_bufoff != greq->r_ggio.gctl_length)
  506. goto again;
  507. }
  508. if ((off_t)greq->r_bufoff == greq->r_ggio.gctl_length) {
  509. /* complete */
  510. completeio:
  511. g_gate_log(LOG_DEBUG, "cmd complete: seq: %d, cmd: %d",
  512. greq->r_ggio.gctl_seq, greq->r_ggio.gctl_cmd);
  513. g_gate_ioctl(G_GATE_CMD_DONE, &greq->r_ggio);
  514. TAILQ_REMOVE(req_pending, greq, r_next);
  515. TAILQ_INSERT_HEAD(sessqueue, greq->r_sesscache,
  516. sc_next);
  517. free(greq->r_ggio.gctl_data);
  518. free(greq);
  519. /* release this slot */
  520. sem_post(&nconn_sem);
  521. }
  522. }
  523. return didwork;
  524. }
  525. /*
  526. * XXX - expose this, this is to try to silence the loop that can
  527. * happen when there is data waiting (oob or window info), but no
  528. * outstanding requests are pending.
  529. */
  530. int _libssh2_transport_read(LIBSSH2_SESSION *session);
  531. /*
  532. * libssh2 does not have a good way to handle detection when it's
  533. * truly time to sleep. Writing is an easy case, as if there's space
  534. * to write, and the _BLOCK_OUTBOUND flag is set, select will return
  535. * and the data will get processed.
  536. *
  537. * In the case of reading, it is more complicated, as a later request
  538. * could read data for a previous request, and there is no known way
  539. * to know when this is done. The solution I came up with is to wrap
  540. * the recv function, and continue to iterate through the pending
  541. * requests until no more data is read.
  542. */
  543. static ssize_t
  544. ggatessh_recv_libssh2_hack(libssh2_socket_t fd, void *buf,
  545. size_t len, int recv_flags, void **abstract)
  546. {
  547. int *didworkp = *abstract;
  548. ssize_t ret;
  549. ret = recv(fd, buf, len, recv_flags);
  550. if (ret > 0)
  551. *didworkp = 1;
  552. if (ret == -1 && errno == EAGAIN)
  553. return -EAGAIN;
  554. return ret;
  555. }
  556. /*
  557. * sftp session management is a bit tricky.
  558. * if there is an entry in sessioncache, use that one.
  559. * if we are waiting for a new session (gsc_pend != NULL),
  560. * establish session, then open handle
  561. * when the new session completes, process the work queue
  562. */
  563. static void *
  564. proc_thread(void *arg __unused)
  565. {
  566. char scratch[32];
  567. struct ggs_reqqueue req_pending;
  568. struct timeval to;
  569. struct ggs_sess_cache *gsc, *gsc_pending;
  570. struct ggs_req *greq;
  571. LIBSSH2_SESSION *session;
  572. int *didworkp; /* was any reads done, rescan work */
  573. fd_set fdread;
  574. fd_set fdwrite;
  575. fd_set fdexcep;
  576. int sockfd;
  577. int maxfd;
  578. int error;
  579. int dir;
  580. int rc;
  581. g_gate_log(LOG_NOTICE, "%s: started!", __func__);
  582. TAILQ_INIT(&req_pending);
  583. /* make sure we don't block on reading */
  584. fcntl(popfd, F_SETFL, O_NONBLOCK);
  585. sockfd = start_conn.c_fd;
  586. didworkp = start_conn.c_didwork;
  587. session = start_conn.c_session;
  588. gsc = malloc(sizeof *gsc);
  589. *gsc = (struct ggs_sess_cache){
  590. .sc_ssh_session = start_conn.c_session,
  591. .sc_session = start_conn.c_sftp_session,
  592. .sc_handle = start_conn.c_handle,
  593. };
  594. TAILQ_INSERT_HEAD(&session_cache, gsc, sc_next);
  595. gsc = NULL;
  596. gsc_pending = NULL;
  597. *didworkp = 0;
  598. libssh2_session_set_blocking(session, 0);
  599. libssh2_session_callback_set(session, LIBSSH2_CALLBACK_RECV,
  600. ggatessh_recv_libssh2_hack);
  601. for (;;) {
  602. //g_gate_log(LOG_DEBUG, "looping");
  603. if (!*didworkp) {
  604. /* setup polling loop */
  605. maxfd = -1;
  606. FD_ZERO(&fdread);
  607. FD_ZERO(&fdwrite);
  608. FD_ZERO(&fdexcep);
  609. dir = libssh2_session_block_directions(session);
  610. if (dir & LIBSSH2_SESSION_BLOCK_INBOUND ||
  611. gsc_pending != NULL)
  612. FD_SET(sockfd, &fdread);
  613. if (dir & LIBSSH2_SESSION_BLOCK_OUTBOUND)
  614. FD_SET(sockfd, &fdwrite);
  615. /* add in the pop descriptor */
  616. FD_SET(popfd, &fdread);
  617. maxfd = MAX(popfd, sockfd);
  618. g_gate_log(LOG_DEBUG, "selecting: %s %s, " \
  619. "read: sockfd: %d, popfd: %d, write: sockfd: %d",
  620. (dir & LIBSSH2_SESSION_BLOCK_INBOUND) ? "inbound" :
  621. "", (dir & LIBSSH2_SESSION_BLOCK_OUTBOUND) ?
  622. "outbound" : "", FD_ISSET(sockfd, &fdread),
  623. FD_ISSET(popfd, &fdread),
  624. FD_ISSET(sockfd, &fdwrite));
  625. to = (struct timeval){ .tv_sec = 1, .tv_usec = 1000 };
  626. (void)to;
  627. rc = select(maxfd + 1, &fdread, &fdwrite, &fdexcep,
  628. NULL);
  629. switch (rc) {
  630. case -1:
  631. g_gate_log(LOG_ERR, "%s: select failed: %s",
  632. __func__, strerror(errno));
  633. break;
  634. case 0:
  635. default:
  636. g_gate_log(LOG_DEBUG, "select: %d, " \
  637. "read: sockfd: %d, popfd: %d, " \
  638. "write: sockfd: %d", rc,
  639. FD_ISSET(sockfd, &fdread),
  640. FD_ISSET(popfd, &fdread),
  641. FD_ISSET(sockfd, &fdwrite));
  642. break;
  643. }
  644. }
  645. _libssh2_transport_read(session);
  646. *didworkp = 0;
  647. /* process pending, so any completed can be reused */
  648. process_pending(&req_pending, &session_cache);
  649. if (FD_ISSET(popfd, &fdread)) {
  650. /* read off the tokens */
  651. g_gate_log(LOG_DEBUG, "popping");
  652. read(popfd, scratch, sizeof scratch);
  653. for (;;) {
  654. procreq:
  655. /* get the request */
  656. error = pthread_mutex_lock(&procqueue_mtx);
  657. assert(error == 0);
  658. greq = TAILQ_FIRST(&procqueue);
  659. g_gate_log(LOG_DEBUG, "greq: %p", greq);
  660. if (greq != NULL)
  661. TAILQ_REMOVE(&procqueue, greq, r_next);
  662. error = pthread_mutex_unlock(&procqueue_mtx);
  663. assert(error == 0);
  664. /* no more to process */
  665. if (greq == NULL)
  666. break;
  667. gsc = TAILQ_FIRST(&session_cache);
  668. if (gsc == NULL) {
  669. if (gsc_pending == NULL) {
  670. /* need new session */
  671. g_gate_log(LOG_DEBUG,
  672. "need new session");
  673. gsc_pending =
  674. malloc(sizeof *gsc);
  675. gsc_pending->sc_ssh_session =
  676. session;
  677. gsc_pending->sc_session = NULL;
  678. gsc_pending->sc_handle = NULL;
  679. }
  680. /* put back request */
  681. error =
  682. pthread_mutex_lock(&procqueue_mtx);
  683. assert(error == 0);
  684. TAILQ_INSERT_HEAD(&procqueue, greq,
  685. r_next);
  686. error = pthread_mutex_unlock(
  687. &procqueue_mtx);
  688. assert(error == 0);
  689. break;
  690. } else {
  691. /* process request */
  692. TAILQ_REMOVE(&session_cache, gsc,
  693. sc_next);
  694. greq->r_sesscache = gsc;
  695. gsc = NULL;
  696. greq->r_bufoff = 0;
  697. TAILQ_INSERT_TAIL(&req_pending, greq,
  698. r_next);
  699. greq = NULL;
  700. }
  701. }
  702. }
  703. if (gsc_pending != NULL) {
  704. /* we are creating a new session */
  705. if (gsc_pending->sc_session == NULL) {
  706. *didworkp = 1;
  707. gsc_pending->sc_session =
  708. libssh2_sftp_init(session);
  709. }
  710. if (gsc_pending->sc_session != NULL) {
  711. *didworkp = 1;
  712. gsc_pending->sc_handle = libssh2_sftp_open(
  713. gsc_pending->sc_session, imgpath,
  714. get_open_flags(), 0);
  715. if (gsc_pending->sc_handle == NULL) {
  716. error = libssh2_session_last_errno(
  717. session);
  718. if (error != LIBSSH2_ERROR_EAGAIN)
  719. libssh2_errorx(session,
  720. "sftp_open");
  721. }
  722. }
  723. g_gate_log(LOG_DEBUG,
  724. "pending: session: %p, handle: %p",
  725. gsc_pending->sc_session, gsc_pending->sc_handle);
  726. /* we have a fully initalized entry, use it */
  727. if (gsc_pending->sc_handle != NULL) {
  728. g_gate_log(LOG_DEBUG, "new session created");
  729. TAILQ_INSERT_HEAD(&session_cache, gsc_pending,
  730. sc_next);
  731. gsc_pending = NULL;
  732. *didworkp = 1;
  733. goto procreq;
  734. }
  735. }
  736. /* kick of any queued requests from above */
  737. process_pending(&req_pending, &session_cache);
  738. }
  739. g_gate_log(LOG_DEBUG, "%s: Died.", __func__);
  740. pthread_exit(NULL);
  741. }
  742. static void
  743. ggatessh_makepidfile(void)
  744. {
  745. pid_t otherpid;
  746. if (!g_gate_verbose) {
  747. if (ggatessh_pidfile == NULL) {
  748. asprintf(&ggatessh_pidfile,
  749. _PATH_VARRUN "/ggatessh.ggate%d.pid", unit);
  750. if (ggatessh_pidfile == NULL)
  751. err(EXIT_FAILURE,
  752. "Cannot allocate memory for pidfile");
  753. }
  754. pfh = pidfile_open(ggatessh_pidfile, 0600, &otherpid);
  755. if (pfh == NULL) {
  756. if (errno == EEXIST) {
  757. errx(EXIT_FAILURE,
  758. "Daemon already running, pid: %jd.",
  759. (intmax_t)otherpid);
  760. }
  761. err(EXIT_FAILURE, "Cannot open/create pidfile");
  762. }
  763. }
  764. }
  765. static void
  766. mydaemon(void)
  767. {
  768. if (g_gate_verbose > 0)
  769. return;
  770. if (daemon(0, 0) == 0)
  771. return;
  772. if (action == CREATE)
  773. g_gate_destroy(unit, 1);
  774. err(EXIT_FAILURE, "Cannot daemonize");
  775. }
  776. static int
  777. g_gatessh_connect(void)
  778. {
  779. struct ggs_connection conn;
  780. LIBSSH2_SFTP_ATTRIBUTES attrs;
  781. int rc;
  782. /* get the remote's size */
  783. conn = make_connection();
  784. rc = libssh2_sftp_fstat(conn.c_handle, &attrs);
  785. /* only allow regular and char devices */
  786. if (!(LIBSSH2_SFTP_S_ISREG(attrs.flags) ||
  787. !LIBSSH2_SFTP_S_ISCHR(attrs.flags))) {
  788. g_gate_xlog("remote file not a regular file");
  789. }
  790. mediasize = attrs.filesize;
  791. g_gate_log(LOG_DEBUG, "got mediasize: %zd", mediasize);
  792. start_conn = conn; /* cache to use later */
  793. return 1;
  794. }
  795. static void
  796. g_gatessh_start(void)
  797. {
  798. int filedes[2];
  799. int error;
  800. pipe(filedes);
  801. pushfd = filedes[1];
  802. popfd = filedes[0];
  803. error = pthread_mutex_init(&procqueue_mtx, NULL);
  804. if (error != 0) {
  805. g_gate_xlog("pthread_mutex_init(inqueue_mtx): %s.",
  806. strerror(error));
  807. }
  808. sem_init(&nconn_sem, 0, maxconnections);
  809. error = pthread_create(&proctd, NULL, proc_thread, NULL);
  810. if (error != 0) {
  811. g_gate_destroy(unit, 1); /* XXX - remove */
  812. g_gate_xlog("pthread_create(proc_thread): %s.",
  813. strerror(error));
  814. }
  815. pthread_set_name_np(proctd, "proc");
  816. reqtd = pthread_self();
  817. pthread_set_name_np(reqtd, "req");
  818. req_thread(NULL);
  819. /* Disconnected. */
  820. close(pushfd);
  821. close(popfd);
  822. }
  823. static void
  824. signop(int sig __unused)
  825. {
  826. /* Do nothing. */
  827. }
  828. static void
  829. g_gatessh_loop(void)
  830. {
  831. struct g_gate_ctl_cancel ggioc;
  832. signal(SIGUSR1, signop);
  833. for (;;) {
  834. g_gatessh_start();
  835. g_gate_log(LOG_NOTICE, "Disconnected [%s@%s:%s]. Connecting...",
  836. username, hostname, imgpath);
  837. ggioc.gctl_version = G_GATE_VERSION;
  838. ggioc.gctl_unit = unit;
  839. ggioc.gctl_seq = 0;
  840. g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc);
  841. }
  842. }
  843. static void
  844. g_gatessh_create(void)
  845. {
  846. struct g_gate_ctl_create ggioc;
  847. if (!g_gatessh_connect())
  848. g_gate_xlog("Cannot connect: %s.", strerror(errno));
  849. /*
  850. * Ok, got both sockets, time to create provider.
  851. */
  852. memset(&ggioc, 0, sizeof(ggioc));
  853. ggioc.gctl_version = G_GATE_VERSION;
  854. ggioc.gctl_mediasize = mediasize;
  855. ggioc.gctl_sectorsize = sectorsize;
  856. ggioc.gctl_flags = flags;
  857. ggioc.gctl_maxcount = queue_size;
  858. ggioc.gctl_timeout = timeout;
  859. ggioc.gctl_unit = unit;
  860. snprintf(ggioc.gctl_info, sizeof(ggioc.gctl_info), "%s@%s:%s",
  861. username, hostname, imgpath);
  862. g_gate_ioctl(G_GATE_CMD_CREATE, &ggioc);
  863. if (unit == -1) {
  864. printf("%s%u\n", G_GATE_PROVIDER_NAME, ggioc.gctl_unit);
  865. fflush(stdout);
  866. }
  867. unit = ggioc.gctl_unit;
  868. ggatessh_makepidfile();
  869. mydaemon();
  870. if (pfh != NULL)
  871. pidfile_write(pfh);
  872. g_gatessh_loop();
  873. }
  874. static void
  875. g_gatessh_rescue(void)
  876. {
  877. struct g_gate_ctl_cancel ggioc;
  878. int error;
  879. if (!g_gatessh_connect())
  880. g_gate_xlog("Cannot connect: %s.", strerror(errno));
  881. ggioc = (struct g_gate_ctl_cancel){
  882. .gctl_version = G_GATE_VERSION,
  883. .gctl_unit = unit,
  884. .gctl_seq = 0,
  885. };
  886. g_gate_ioctl(G_GATE_CMD_CANCEL, &ggioc);
  887. ggatessh_makepidfile();
  888. mydaemon();
  889. pidfile_write(pfh);
  890. error = pthread_create(&mediatd, NULL, mediachg, NULL);
  891. if (error != 0)
  892. g_gate_xlog("unable to create mediasize change thread",
  893. strerror(errno));
  894. g_gatessh_loop();
  895. }
  896. /*
  897. * handle two methods of specifying things.
  898. * if only one arg, in the future split it how ssh does:
  899. * [user[:password]@]<host>:<img>
  900. * and URI:
  901. * sftp://[user[:password]@]<host>[:<port>]/<img>
  902. *
  903. * If both are specified, it's the same as above, but split
  904. * on the :.
  905. */
  906. static void
  907. handle_params(int argc, char *argv[])
  908. {
  909. if (username == NULL) {
  910. username = getenv("USER");
  911. if (username == NULL) {
  912. err(EXIT_FAILURE,
  913. "USER environment variable not present, set, "
  914. "or specify via -l argument.");
  915. }
  916. }
  917. if (argc != 2)
  918. usage();
  919. hostname = argv[0];
  920. imgpath = argv[1];
  921. }
  922. int
  923. main(int argc, char *argv[])
  924. {
  925. int rc;
  926. if (argc < 2)
  927. usage();
  928. if (strcasecmp(argv[1], "create") == 0)
  929. action = CREATE;
  930. else if (strcasecmp(argv[1], "destroy") == 0)
  931. action = DESTROY;
  932. else if (strcasecmp(argv[1], "list") == 0)
  933. action = LIST;
  934. else if (strcasecmp(argv[1], "rescue") == 0)
  935. action = RESCUE;
  936. else
  937. usage();
  938. argc -= 1;
  939. argv += 1;
  940. for (;;) {
  941. int ch;
  942. ch = getopt(argc, argv, "fF:i:l:o:p:q:r:s:t:u:v");
  943. if (ch == -1)
  944. break;
  945. switch (ch) {
  946. case 'f':
  947. if (action != DESTROY)
  948. usage();
  949. force = 1;
  950. break;
  951. case 'F':
  952. ggatessh_pidfile = optarg;
  953. break;
  954. case 'i':
  955. identityfile = optarg;
  956. break;
  957. case 'l':
  958. username = optarg;
  959. break;
  960. case 'o':
  961. if (action != CREATE && action != RESCUE)
  962. usage();
  963. if (strcasecmp("ro", optarg) == 0)
  964. flags = G_GATE_FLAG_READONLY;
  965. else if (strcasecmp("wo", optarg) == 0)
  966. flags = G_GATE_FLAG_WRITEONLY;
  967. else if (strcasecmp("rw", optarg) == 0)
  968. flags = 0;
  969. else {
  970. errx(EXIT_FAILURE,
  971. "Invalid argument for '-o' option.");
  972. }
  973. break;
  974. case 'p':
  975. sshport = optarg;
  976. break;
  977. case 'q':
  978. if (action != CREATE)
  979. usage();
  980. errno = 0;
  981. queue_size = strtoul(optarg, NULL, 10);
  982. if (queue_size == 0 && errno != 0)
  983. errx(EXIT_FAILURE, "Invalid queue_size.");
  984. break;
  985. case 'r':
  986. if (action != CREATE && action != RESCUE)
  987. usage();
  988. errno = 0;
  989. maxconnections = strtoul(optarg, NULL, 10);
  990. if (maxconnections == 0 && errno != 0)
  991. errx(EXIT_FAILURE, "Invalid queue_size.");
  992. break;
  993. case 's':
  994. if (action != CREATE)
  995. usage();
  996. errno = 0;
  997. sectorsize = strtoul(optarg, NULL, 10);
  998. if (sectorsize == 0 && errno != 0)
  999. errx(EXIT_FAILURE, "Invalid sectorsize.");
  1000. break;
  1001. case 't':
  1002. if (action != CREATE)
  1003. usage();
  1004. errno = 0;
  1005. timeout = strtoul(optarg, NULL, 10);
  1006. if (timeout == 0 && errno != 0)
  1007. errx(EXIT_FAILURE, "Invalid timeout.");
  1008. break;
  1009. case 'u':
  1010. errno = 0;
  1011. unit = strtol(optarg, NULL, 10);
  1012. if (unit == 0 && errno != 0)
  1013. errx(EXIT_FAILURE, "Invalid unit number.");
  1014. break;
  1015. case 'v':
  1016. if (action == DESTROY)
  1017. usage();
  1018. g_gate_verbose++;
  1019. break;
  1020. default:
  1021. usage();
  1022. }
  1023. }
  1024. argc -= optind;
  1025. argv += optind;
  1026. g_gate_log(LOG_DEBUG, "libssh2_init");
  1027. rc = libssh2_init(0);
  1028. if (rc != 0) {
  1029. fprintf(stderr, "libssh2 initialization failed (%d)\n", rc);
  1030. return 1;
  1031. }
  1032. switch (action) {
  1033. case CREATE:
  1034. if (argc < 1 || argc > 2)
  1035. usage();
  1036. handle_params(argc, argv);
  1037. g_gate_load_module();
  1038. g_gate_open_device();
  1039. g_gatessh_create();
  1040. break;
  1041. case DESTROY:
  1042. if (argc != 0)
  1043. usage();
  1044. if (unit == -1) {
  1045. fprintf(stderr, "Required unit number.\n");
  1046. usage();
  1047. }
  1048. g_gate_verbose = 1;
  1049. g_gate_open_device();
  1050. g_gate_destroy(unit, force);
  1051. break;
  1052. case LIST:
  1053. g_gate_list(unit, g_gate_verbose);
  1054. break;
  1055. case RESCUE:
  1056. if (argc < 1 || argc > 2)
  1057. usage();
  1058. if (unit == -1) {
  1059. fprintf(stderr, "Required unit number.\n");
  1060. usage();
  1061. }
  1062. handle_params(argc, argv);
  1063. g_gate_open_device();
  1064. g_gatessh_rescue();
  1065. break;
  1066. case UNSET:
  1067. default:
  1068. usage();
  1069. }
  1070. g_gate_close_device();
  1071. exit(EXIT_SUCCESS);
  1072. }