2 * SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0
4 * Copyright (c) 2006 Mellanox Technologies Ltd. All rights reserved.
6 * This software is available to you under a choice of one of two
7 * licenses. You may choose to be licensed under the terms of the GNU
8 * General Public License (GPL) Version 2, available from the file
9 * COPYING in the main directory of this source tree, or the
10 * OpenIB.org BSD license below:
12 * Redistribution and use in source and binary forms, with or
13 * without modification, are permitted provided that the following
16 * - Redistributions of source code must retain the above
17 * copyright notice, this list of conditions and the following
20 * - Redistributions in binary form must reproduce the above
21 * copyright notice, this list of conditions and the following
22 * disclaimer in the documentation and/or other materials
23 * provided with the distribution.
25 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
34 #include <linux/tcp.h>
35 #include <asm/ioctls.h>
36 #include <linux/workqueue.h>
37 #include <linux/net.h>
38 #include <linux/socket.h>
39 #include <net/protocol.h>
40 #include <net/inet_common.h>
41 #include <rdma/rdma_cm.h>
42 #include <rdma/ib_verbs.h>
43 #include <rdma/ib_fmr_pool.h>
44 #include <rdma/ib_umem.h>
45 #include <net/tcp.h> /* for memcpy_toiovec */
47 #include <asm/uaccess.h>
48 #include <linux/delay.h>
51 static int sdp_post_srcavail(struct socket *sk, struct tx_srcavail_state *tx_sa)
53 struct sdp_sock *ssk = sdp_sk(sk);
56 struct page *payload_pg;
58 struct ib_umem_chunk *chunk;
63 BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey);
65 BUG_ON(!tx_sa->umem->chunk_list.next);
67 chunk = list_entry(tx_sa->umem->chunk_list.next, struct ib_umem_chunk, list);
70 off = tx_sa->umem->offset;
71 len = tx_sa->umem->length;
73 tx_sa->bytes_sent = tx_sa->bytes_acked = 0;
75 mb = sdp_alloc_mb_srcavail(sk, len, tx_sa->fmr->fmr->lkey, off, 0);
79 sdp_dbg_data(sk, "sending SrcAvail\n");
81 TX_SRCAVAIL_STATE(mb) = tx_sa; /* tx_sa is hanged on the mb
82 * but continue to live after mb is freed */
85 /* must have payload inlined in SrcAvail packet in combined mode */
86 payload_len = MIN(tx_sa->umem->page_size - off, len);
87 payload_len = MIN(payload_len, ssk->xmit_size_goal - sizeof(struct sdp_srcah));
88 payload_pg = sg_page(&chunk->page_list[0]);
91 sdp_dbg_data(sk, "payload: off: 0x%x, pg: %p, len: 0x%x\n",
92 off, payload_pg, payload_len);
94 mb_fill_page_desc(mb, mb_shinfo(mb)->nr_frags,
95 payload_pg, off, payload_len);
97 mb->len += payload_len;
98 mb->data_len = payload_len;
99 mb->truesize += payload_len;
100 // sk->sk_wmem_queued += payload_len;
101 // sk->sk_forward_alloc -= payload_len;
103 mb_entail(sk, ssk, mb);
105 ssk->write_seq += payload_len;
106 SDP_SKB_CB(mb)->end_seq += payload_len;
108 tx_sa->bytes_sent = tx_sa->umem->length;
109 tx_sa->bytes_acked = payload_len;
111 /* TODO: pushing the mb into the tx_queue should be enough */
116 static int sdp_post_srcavail_cancel(struct socket *sk)
118 struct sdp_sock *ssk = sdp_sk(sk);
121 sdp_dbg_data(ssk->socket, "Posting srcavail cancel\n");
123 mb = sdp_alloc_mb_srcavail_cancel(sk, 0);
124 mb_entail(sk, ssk, mb);
126 sdp_post_sends(ssk, 0);
128 schedule_delayed_work(&ssk->srcavail_cancel_work,
129 SDP_SRCAVAIL_CANCEL_TIMEOUT);
134 void srcavail_cancel_timeout(struct work_struct *work)
136 struct sdp_sock *ssk =
137 container_of(work, struct sdp_sock, srcavail_cancel_work.work);
138 struct socket *sk = ssk->socket;
142 sdp_dbg_data(sk, "both SrcAvail and SrcAvailCancel timedout."
143 " closing connection\n");
144 sdp_set_error(sk, -ECONNRESET);
150 static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p,
153 struct socket *sk = ssk->socket;
156 long current_timeo = *timeo_p;
157 struct tx_srcavail_state *tx_sa = ssk->tx_sa;
160 sdp_dbg_data(sk, "sleep till RdmaRdCompl. timeo = %ld.\n", *timeo_p);
161 sdp_prf1(sk, NULL, "Going to sleep");
162 while (ssk->qp_active) {
163 prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
165 if (unlikely(!*timeo_p)) {
167 tx_sa->abort_flags |= TX_SA_TIMEDOUT;
168 sdp_prf1(sk, NULL, "timeout");
169 SDPSTATS_COUNTER_INC(zcopy_tx_timeout);
173 else if (tx_sa->bytes_acked > tx_sa->bytes_sent) {
175 sdp_dbg_data(sk, "acked bytes > sent bytes\n");
176 tx_sa->abort_flags |= TX_SA_ERROR;
180 if (tx_sa->abort_flags & TX_SA_SENDSM) {
181 sdp_prf1(sk, NULL, "Aborting SrcAvail sending");
182 SDPSTATS_COUNTER_INC(zcopy_tx_aborted);
187 if (!ignore_signals) {
188 if (signal_pending(current)) {
190 sdp_prf1(sk, NULL, "signalled");
191 tx_sa->abort_flags |= TX_SA_INTRRUPTED;
195 if (ssk->rx_sa && (tx_sa->bytes_acked < tx_sa->bytes_sent)) {
196 sdp_dbg_data(sk, "Crossing SrcAvail - aborting this\n");
197 tx_sa->abort_flags |= TX_SA_CROSS_SEND;
198 SDPSTATS_COUNTER_INC(zcopy_cross_send);
204 posts_handler_put(ssk);
206 sk_wait_event(sk, ¤t_timeo,
207 tx_sa->abort_flags &&
209 (tx_sa->bytes_acked < tx_sa->bytes_sent) &&
211 sdp_dbg_data(ssk->socket, "woke up sleepers\n");
213 posts_handler_get(ssk);
215 if (tx_sa->bytes_acked == tx_sa->bytes_sent)
219 vm_wait -= current_timeo;
220 current_timeo = *timeo_p;
221 if (current_timeo != MAX_SCHEDULE_TIMEOUT &&
222 (current_timeo -= vm_wait) < 0)
226 *timeo_p = current_timeo;
229 finish_wait(sk->sk_sleep, &wait);
231 sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d/%d bytes, flags: 0x%x\n",
232 tx_sa->bytes_acked, tx_sa->bytes_sent, tx_sa->abort_flags);
234 if (!ssk->qp_active) {
235 sdp_dbg(sk, "QP destroyed while waiting\n");
241 static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk)
243 struct socket *sk = ssk->socket;
244 long timeo = HZ * 5; /* Timeout for for RDMA read */
247 sdp_dbg_data(sk, "Sleep till RDMA wr finished.\n");
249 prepare_to_wait(sk->sk_sleep, &wait, TASK_UNINTERRUPTIBLE);
251 if (!ssk->tx_ring.rdma_inflight->busy) {
252 sdp_dbg_data(sk, "got rdma cqe\n");
256 if (!ssk->qp_active) {
257 sdp_dbg_data(sk, "QP destroyed\n");
262 sdp_warn(sk, "Panic: Timed out waiting for RDMA read\n");
267 posts_handler_put(ssk);
269 sdp_prf1(sk, NULL, "Going to sleep");
270 sk_wait_event(sk, &timeo,
271 !ssk->tx_ring.rdma_inflight->busy);
272 sdp_prf1(sk, NULL, "Woke up");
273 sdp_dbg_data(ssk->socket, "woke up sleepers\n");
275 posts_handler_get(ssk);
278 finish_wait(sk->sk_sleep, &wait);
280 sdp_dbg_data(sk, "Finished waiting\n");
283 int sdp_post_rdma_rd_compl(struct sdp_sock *ssk,
284 struct rx_srcavail_state *rx_sa)
287 int copied = rx_sa->used - rx_sa->reported;
289 if (rx_sa->used <= rx_sa->reported)
292 mb = sdp_alloc_mb_rdmardcompl(ssk->socket, copied, 0);
294 rx_sa->reported += copied;
296 /* TODO: What if no tx_credits available? */
297 sdp_post_send(ssk, mb);
302 int sdp_post_sendsm(struct socket *sk)
304 struct mbuf *mb = sdp_alloc_mb_sendsm(sk, 0);
306 sdp_post_send(sdp_sk(sk), mb);
311 static int sdp_update_iov_used(struct socket *sk, struct iovec *iov, int len)
313 sdp_dbg_data(sk, "updating consumed 0x%x bytes from iov\n", len);
316 int copy = min_t(unsigned int, iov->iov_len, len);
318 iov->iov_len -= copy;
319 iov->iov_base += copy;
327 static inline int sge_bytes(struct ib_sge *sge, int sge_cnt)
331 while (sge_cnt > 0) {
332 bytes += sge->length;
339 void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack)
341 struct socket *sk = ssk->socket;
344 spin_lock_irqsave(&ssk->tx_sa_lock, flags);
347 sdp_prf1(sk, NULL, "SendSM for cancelled/finished SrcAvail");
351 if (ssk->tx_sa->mseq > mseq_ack) {
352 sdp_dbg_data(sk, "SendSM arrived for old SrcAvail. "
353 "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n",
354 mseq_ack, ssk->tx_sa->mseq);
358 sdp_dbg_data(sk, "Got SendSM - aborting SrcAvail\n");
360 ssk->tx_sa->abort_flags |= TX_SA_SENDSM;
361 cancel_delayed_work(&ssk->srcavail_cancel_work);
363 wake_up(sk->sk_sleep);
364 sdp_dbg_data(sk, "woke up sleepers\n");
367 spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
370 void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack,
373 struct socket *sk = ssk->socket;
376 sdp_prf1(sk, NULL, "RdmaRdCompl ssk=%p tx_sa=%p", ssk, ssk->tx_sa);
377 sdp_dbg_data(sk, "RdmaRdCompl ssk=%p tx_sa=%p\n", ssk, ssk->tx_sa);
379 spin_lock_irqsave(&ssk->tx_sa_lock, flags);
384 sdp_dbg_data(sk, "Got RdmaRdCompl for aborted SrcAvail\n");
388 if (ssk->tx_sa->mseq > mseq_ack) {
389 sdp_dbg_data(sk, "RdmaRdCompl arrived for old SrcAvail. "
390 "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n",
391 mseq_ack, ssk->tx_sa->mseq);
395 ssk->tx_sa->bytes_acked += bytes_completed;
397 wake_up(sk->sk_sleep);
398 sdp_dbg_data(sk, "woke up sleepers\n");
401 spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
405 static unsigned long sdp_get_max_memlockable_bytes(unsigned long offset)
408 unsigned long lock_limit;
410 if (capable(CAP_IPC_LOCK))
413 lock_limit = current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur;
414 avail = lock_limit - (current->mm->locked_vm << PAGE_SHIFT);
416 return avail - offset;
419 static int sdp_alloc_fmr(struct socket *sk, void *uaddr, size_t len,
420 struct ib_pool_fmr **_fmr, struct ib_umem **_umem)
422 struct ib_pool_fmr *fmr;
423 struct ib_umem *umem;
424 struct ib_device *dev;
426 struct ib_umem_chunk *chunk;
429 unsigned long max_lockable_bytes;
431 if (unlikely(len > SDP_MAX_RDMA_READ_LEN)) {
432 sdp_dbg_data(sk, "len:0x%lx > FMR_SIZE: 0x%lx\n",
433 len, SDP_MAX_RDMA_READ_LEN);
434 len = SDP_MAX_RDMA_READ_LEN;
437 max_lockable_bytes = sdp_get_max_memlockable_bytes((unsigned long)uaddr & ~PAGE_MASK);
438 if (unlikely(len > max_lockable_bytes)) {
439 sdp_dbg_data(sk, "len:0x%lx > RLIMIT_MEMLOCK available: 0x%lx\n",
440 len, max_lockable_bytes);
441 len = max_lockable_bytes;
444 sdp_dbg_data(sk, "user buf: %p, len:0x%lx max_lockable_bytes: 0x%lx\n",
445 uaddr, len, max_lockable_bytes);
447 umem = ib_umem_get(&sdp_sk(sk)->context, (unsigned long)uaddr, len,
448 IB_ACCESS_REMOTE_WRITE, 0);
452 sdp_warn(sk, "Error doing umem_get 0x%lx bytes: %d\n", len, rc);
453 sdp_warn(sk, "RLIMIT_MEMLOCK: 0x%lx[cur] 0x%lx[max] CAP_IPC_LOCK: %d\n",
454 current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur,
455 current->signal->rlim[RLIMIT_MEMLOCK].rlim_max,
456 capable(CAP_IPC_LOCK));
460 sdp_dbg_data(sk, "umem->offset = 0x%x, length = 0x%lx\n",
461 umem->offset, umem->length);
463 pages = (u64 *) __get_free_page(GFP_KERNEL);
465 goto err_pages_alloc;
469 dev = sdp_sk(sk)->ib_device;
470 list_for_each_entry(chunk, &umem->chunk_list, list) {
471 for (j = 0; j < chunk->nmap; ++j) {
472 len = ib_sg_dma_len(dev,
473 &chunk->page_list[j]) >> PAGE_SHIFT;
475 for (k = 0; k < len; ++k) {
476 pages[n++] = ib_sg_dma_address(dev,
477 &chunk->page_list[j]) +
484 fmr = ib_fmr_pool_map_phys(sdp_sk(sk)->sdp_dev->fmr_pool, pages, n, 0);
486 sdp_warn(sk, "Error allocating fmr: %ld\n", PTR_ERR(fmr));
490 free_page((unsigned long) pages);
498 free_page((unsigned long) pages);
501 ib_umem_release(umem);
508 void sdp_free_fmr(struct socket *sk, struct ib_pool_fmr **_fmr, struct ib_umem **_umem)
510 if (!sdp_sk(sk)->qp_active)
513 ib_fmr_pool_unmap(*_fmr);
516 ib_umem_release(*_umem);
520 static int sdp_post_rdma_read(struct socket *sk, struct rx_srcavail_state *rx_sa)
522 struct sdp_sock *ssk = sdp_sk(sk);
523 struct ib_send_wr *bad_wr;
524 struct ib_send_wr wr = { NULL };
527 wr.opcode = IB_WR_RDMA_READ;
529 wr.wr_id = SDP_OP_RDMA;
530 wr.wr.rdma.rkey = rx_sa->rkey;
533 ssk->tx_ring.rdma_inflight = rx_sa;
535 sge.addr = rx_sa->umem->offset;
536 sge.length = rx_sa->umem->length;
537 sge.lkey = rx_sa->fmr->fmr->lkey;
539 wr.wr.rdma.remote_addr = rx_sa->vaddr + rx_sa->used;
544 wr.send_flags = IB_SEND_SIGNALED;
546 return ib_post_send(ssk->qp, &wr, &bad_wr);
549 int sdp_rdma_to_iovec(struct socket *sk, struct iovec *iov, struct mbuf *mb,
552 struct sdp_sock *ssk = sdp_sk(sk);
553 struct rx_srcavail_state *rx_sa = RX_SRCAVAIL_STATE(mb);
554 int got_srcavail_cancel;
559 sdp_dbg_data(ssk->socket, "preparing RDMA read."
560 " len: 0x%x. buffer len: 0x%lx\n", len, iov->iov_len);
562 sock_hold(sk, SOCK_REF_RDMA_RD);
564 if (len > rx_sa->len) {
565 sdp_warn(sk, "len:0x%x > rx_sa->len: 0x%x\n", len, rx_sa->len);
570 rc = sdp_alloc_fmr(sk, iov->iov_base, len, &rx_sa->fmr, &rx_sa->umem);
572 sdp_warn(sk, "Error allocating fmr: %d\n", rc);
576 rc = sdp_post_rdma_read(sk, rx_sa);
578 sdp_warn(sk, "ib_post_send failed with status %d.\n", rc);
579 sdp_set_error(ssk->socket, -ECONNRESET);
584 sdp_prf(sk, mb, "Finished posting(rc=%d), now to wait", rc);
586 got_srcavail_cancel = ssk->srcavail_cancel_mseq > rx_sa->mseq;
590 sdp_wait_rdma_wr_finished(ssk);
592 sdp_prf(sk, mb, "Finished waiting(rc=%d)", rc);
593 if (!ssk->qp_active) {
594 sdp_dbg_data(sk, "QP destroyed during RDMA read\n");
599 copied = rx_sa->umem->length;
601 sdp_update_iov_used(sk, iov, copied);
602 rx_sa->used += copied;
603 atomic_add(copied, &ssk->rcv_nxt);
606 ssk->tx_ring.rdma_inflight = NULL;
609 sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
612 if (rc && ssk->qp_active) {
613 sdp_warn(sk, "Couldn't do RDMA - post sendsm\n");
614 rx_sa->flags |= RX_SA_ABORTED;
617 sock_put(sk, SOCK_REF_RDMA_RD);
622 static inline int wait_for_sndbuf(struct socket *sk, long *timeo_p)
624 struct sdp_sock *ssk = sdp_sk(sk);
626 int credits_needed = 1;
628 sdp_dbg_data(sk, "Wait for mem\n");
630 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
632 SDPSTATS_COUNTER_INC(send_wait_for_mem);
636 sdp_xmit_poll(ssk, 1);
638 ret = sdp_tx_wait_memory(ssk, timeo_p, &credits_needed);
643 static int do_sdp_sendmsg_zcopy(struct socket *sk, struct tx_srcavail_state *tx_sa,
644 struct iovec *iov, long *timeo)
646 struct sdp_sock *ssk = sdp_sk(sk);
648 unsigned long lock_flags;
650 rc = sdp_alloc_fmr(sk, iov->iov_base, iov->iov_len,
651 &tx_sa->fmr, &tx_sa->umem);
653 sdp_warn(sk, "Error allocating fmr: %d\n", rc);
657 if (tx_slots_free(ssk) == 0) {
658 rc = wait_for_sndbuf(sk, timeo);
660 sdp_warn(sk, "Couldn't get send buffer\n");
661 goto err_no_tx_slots;
665 rc = sdp_post_srcavail(sk, tx_sa);
667 sdp_dbg(sk, "Error posting SrcAvail\n");
671 rc = sdp_wait_rdmardcompl(ssk, timeo, 0);
673 enum tx_sa_flag f = tx_sa->abort_flags;
675 if (f & TX_SA_SENDSM) {
676 sdp_dbg_data(sk, "Got SendSM. use SEND verb.\n");
677 } else if (f & TX_SA_ERROR) {
678 sdp_dbg_data(sk, "SrcAvail error completion\n");
680 SDPSTATS_COUNTER_INC(zcopy_tx_error);
681 } else if (ssk->qp_active) {
682 sdp_post_srcavail_cancel(sk);
684 /* Wait for RdmaRdCompl/SendSM to
685 * finish the transaction */
687 sdp_dbg_data(sk, "Waiting for SendSM\n");
688 sdp_wait_rdmardcompl(ssk, timeo, 1);
689 sdp_dbg_data(sk, "finished waiting\n");
691 cancel_delayed_work(&ssk->srcavail_cancel_work);
693 sdp_dbg_data(sk, "QP was destroyed while waiting\n");
696 sdp_dbg_data(sk, "got RdmaRdCompl\n");
699 spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags);
701 spin_unlock_irqrestore(&ssk->tx_sa_lock, lock_flags);
704 sdp_update_iov_used(sk, iov, tx_sa->bytes_acked);
707 sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
713 int sdp_sendmsg_zcopy(struct kiocb *iocb, struct socket *sk, struct iovec *iov)
715 struct sdp_sock *ssk = sdp_sk(sk);
718 struct tx_srcavail_state *tx_sa;
720 size_t bytes_to_copy = 0;
723 sdp_dbg_data(sk, "Sending iov: %p, iov_len: 0x%lx\n",
724 iov->iov_base, iov->iov_len);
725 sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy start");
727 sdp_dbg_data(sk, "Deadlock prevent: crossing SrcAvail\n");
731 sock_hold(ssk->socket, SOCK_REF_ZCOPY);
733 SDPSTATS_COUNTER_INC(sendmsg_zcopy_segment);
735 timeo = SDP_SRCAVAIL_ADV_TIMEOUT ;
737 /* Ok commence sending. */
738 offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1);
740 tx_sa = kmalloc(sizeof(struct tx_srcavail_state), GFP_KERNEL);
742 sdp_warn(sk, "Error allocating zcopy context\n");
743 rc = -EAGAIN; /* Buffer too big - fallback to bcopy */
744 goto err_alloc_tx_sa;
747 bytes_to_copy = iov->iov_len;
751 rc = do_sdp_sendmsg_zcopy(sk, tx_sa, iov, &timeo);
753 if (iov->iov_len && iov->iov_len < sdp_zcopy_thresh) {
754 sdp_dbg_data(sk, "0x%lx bytes left, switching to bcopy\n",
758 } while (!rc && iov->iov_len > 0 && !tx_sa->abort_flags);
762 copied = bytes_to_copy - iov->iov_len;
764 sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy end rc: %d copied: %d", rc, copied);
766 sock_put(ssk->socket, SOCK_REF_ZCOPY);
768 if (rc < 0 && rc != -EAGAIN && rc != -ETIME)
774 void sdp_abort_srcavail(struct socket *sk)
776 struct sdp_sock *ssk = sdp_sk(sk);
777 struct tx_srcavail_state *tx_sa = ssk->tx_sa;
783 cancel_delayed_work(&ssk->srcavail_cancel_work);
784 flush_scheduled_work();
786 spin_lock_irqsave(&ssk->tx_sa_lock, flags);
788 sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
792 spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
795 void sdp_abort_rdma_read(struct socket *sk)
797 struct sdp_sock *ssk = sdp_sk(sk);
798 struct rx_srcavail_state *rx_sa = ssk->rx_sa;
803 sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);