sys: general adoption of SPDX licensing ID tags.
[freebsd.git] / sys / ofed / drivers / infiniband / ulp / sdp / sdp_zcopy.c
1 /*-
2  * SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0
3  *
4  * Copyright (c) 2006 Mellanox Technologies Ltd.  All rights reserved.
5  *
6  * This software is available to you under a choice of one of two
7  * licenses.  You may choose to be licensed under the terms of the GNU
8  * General Public License (GPL) Version 2, available from the file
9  * COPYING in the main directory of this source tree, or the
10  * OpenIB.org BSD license below:
11  *
12  *     Redistribution and use in source and binary forms, with or
13  *     without modification, are permitted provided that the following
14  *     conditions are met:
15  *
16  *      - Redistributions of source code must retain the above
17  *        copyright notice, this list of conditions and the following
18  *        disclaimer.
19  *
20  *      - Redistributions in binary form must reproduce the above
21  *        copyright notice, this list of conditions and the following
22  *        disclaimer in the documentation and/or other materials
23  *        provided with the distribution.
24  *
25  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32  * SOFTWARE.
33  */
34 #include <linux/tcp.h>
35 #include <asm/ioctls.h>
36 #include <linux/workqueue.h>
37 #include <linux/net.h>
38 #include <linux/socket.h>
39 #include <net/protocol.h>
40 #include <net/inet_common.h>
41 #include <rdma/rdma_cm.h>
42 #include <rdma/ib_verbs.h>
43 #include <rdma/ib_fmr_pool.h>
44 #include <rdma/ib_umem.h> 
45 #include <net/tcp.h> /* for memcpy_toiovec */
46 #include <asm/io.h>
47 #include <asm/uaccess.h>
48 #include <linux/delay.h>
49 #include "sdp.h"
50
51 static int sdp_post_srcavail(struct socket *sk, struct tx_srcavail_state *tx_sa)
52 {
53         struct sdp_sock *ssk = sdp_sk(sk);
54         struct mbuf *mb;
55         int payload_len;
56         struct page *payload_pg;
57         int off, len;
58         struct ib_umem_chunk *chunk;
59
60         WARN_ON(ssk->tx_sa);
61
62         BUG_ON(!tx_sa);
63         BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey);
64         BUG_ON(!tx_sa->umem);
65         BUG_ON(!tx_sa->umem->chunk_list.next);
66
67         chunk = list_entry(tx_sa->umem->chunk_list.next, struct ib_umem_chunk, list);
68         BUG_ON(!chunk->nmap);
69
70         off = tx_sa->umem->offset;
71         len = tx_sa->umem->length;
72
73         tx_sa->bytes_sent = tx_sa->bytes_acked = 0;
74
75         mb = sdp_alloc_mb_srcavail(sk, len, tx_sa->fmr->fmr->lkey, off, 0);
76         if (!mb) {
77                 return -ENOMEM;
78         }
79         sdp_dbg_data(sk, "sending SrcAvail\n");
80                 
81         TX_SRCAVAIL_STATE(mb) = tx_sa; /* tx_sa is hanged on the mb 
82                                          * but continue to live after mb is freed */
83         ssk->tx_sa = tx_sa;
84
85         /* must have payload inlined in SrcAvail packet in combined mode */
86         payload_len = MIN(tx_sa->umem->page_size - off, len);
87         payload_len = MIN(payload_len, ssk->xmit_size_goal - sizeof(struct sdp_srcah));
88         payload_pg  = sg_page(&chunk->page_list[0]);
89         get_page(payload_pg);
90
91         sdp_dbg_data(sk, "payload: off: 0x%x, pg: %p, len: 0x%x\n",
92                 off, payload_pg, payload_len);
93
94         mb_fill_page_desc(mb, mb_shinfo(mb)->nr_frags,
95                         payload_pg, off, payload_len);
96
97         mb->len             += payload_len;
98         mb->data_len         = payload_len;
99         mb->truesize        += payload_len;
100 //      sk->sk_wmem_queued   += payload_len;
101 //      sk->sk_forward_alloc -= payload_len;
102
103         mb_entail(sk, ssk, mb);
104         
105         ssk->write_seq += payload_len;
106         SDP_SKB_CB(mb)->end_seq += payload_len;
107
108         tx_sa->bytes_sent = tx_sa->umem->length;
109         tx_sa->bytes_acked = payload_len;
110
111         /* TODO: pushing the mb into the tx_queue should be enough */
112
113         return 0;
114 }
115
116 static int sdp_post_srcavail_cancel(struct socket *sk)
117 {
118         struct sdp_sock *ssk = sdp_sk(sk);
119         struct mbuf *mb;
120
121         sdp_dbg_data(ssk->socket, "Posting srcavail cancel\n");
122
123         mb = sdp_alloc_mb_srcavail_cancel(sk, 0);
124         mb_entail(sk, ssk, mb);
125
126         sdp_post_sends(ssk, 0);
127
128         schedule_delayed_work(&ssk->srcavail_cancel_work,
129                         SDP_SRCAVAIL_CANCEL_TIMEOUT);
130
131         return 0;
132 }
133
134 void srcavail_cancel_timeout(struct work_struct *work)
135 {
136         struct sdp_sock *ssk =
137                 container_of(work, struct sdp_sock, srcavail_cancel_work.work);
138         struct socket *sk = ssk->socket;
139
140         lock_sock(sk);
141
142         sdp_dbg_data(sk, "both SrcAvail and SrcAvailCancel timedout."
143                         " closing connection\n");
144         sdp_set_error(sk, -ECONNRESET);
145         wake_up(&ssk->wq);
146
147         release_sock(sk);
148 }
149
150 static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p,
151                 int ignore_signals)
152 {
153         struct socket *sk = ssk->socket;
154         int err = 0;
155         long vm_wait = 0;
156         long current_timeo = *timeo_p;
157         struct tx_srcavail_state *tx_sa = ssk->tx_sa;
158         DEFINE_WAIT(wait);
159
160         sdp_dbg_data(sk, "sleep till RdmaRdCompl. timeo = %ld.\n", *timeo_p);
161         sdp_prf1(sk, NULL, "Going to sleep");
162         while (ssk->qp_active) {
163                 prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
164
165                 if (unlikely(!*timeo_p)) {
166                         err = -ETIME;
167                         tx_sa->abort_flags |= TX_SA_TIMEDOUT;
168                         sdp_prf1(sk, NULL, "timeout");
169                         SDPSTATS_COUNTER_INC(zcopy_tx_timeout);
170                         break;
171                 }
172
173                 else if (tx_sa->bytes_acked > tx_sa->bytes_sent) {
174                         err = -EINVAL;
175                         sdp_dbg_data(sk, "acked bytes > sent bytes\n");
176                         tx_sa->abort_flags |= TX_SA_ERROR;
177                         break;
178                 }
179
180                 if (tx_sa->abort_flags & TX_SA_SENDSM) {
181                         sdp_prf1(sk, NULL, "Aborting SrcAvail sending");
182                         SDPSTATS_COUNTER_INC(zcopy_tx_aborted);
183                         err = -EAGAIN;
184                         break ;
185                 }
186
187                 if (!ignore_signals) {
188                         if (signal_pending(current)) {
189                                 err = -EINTR;
190                                 sdp_prf1(sk, NULL, "signalled");
191                                 tx_sa->abort_flags |= TX_SA_INTRRUPTED;
192                                 break;
193                         }
194
195                         if (ssk->rx_sa && (tx_sa->bytes_acked < tx_sa->bytes_sent)) {
196                                 sdp_dbg_data(sk, "Crossing SrcAvail - aborting this\n");
197                                 tx_sa->abort_flags |= TX_SA_CROSS_SEND;
198                                 SDPSTATS_COUNTER_INC(zcopy_cross_send);
199                                 err = -ETIME;
200                                 break ;
201                         }
202                 }
203
204                 posts_handler_put(ssk);
205
206                 sk_wait_event(sk, &current_timeo,
207                                 tx_sa->abort_flags &&
208                                 ssk->rx_sa &&
209                                 (tx_sa->bytes_acked < tx_sa->bytes_sent) && 
210                                 vm_wait);
211                 sdp_dbg_data(ssk->socket, "woke up sleepers\n");
212
213                 posts_handler_get(ssk);
214
215                 if (tx_sa->bytes_acked == tx_sa->bytes_sent)
216                         break;
217
218                 if (vm_wait) {
219                         vm_wait -= current_timeo;
220                         current_timeo = *timeo_p;
221                         if (current_timeo != MAX_SCHEDULE_TIMEOUT &&
222                             (current_timeo -= vm_wait) < 0)
223                                 current_timeo = 0;
224                         vm_wait = 0;
225                 }
226                 *timeo_p = current_timeo;
227         }
228
229         finish_wait(sk->sk_sleep, &wait);
230
231         sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d/%d bytes, flags: 0x%x\n",
232                         tx_sa->bytes_acked, tx_sa->bytes_sent, tx_sa->abort_flags);
233
234         if (!ssk->qp_active) {
235                 sdp_dbg(sk, "QP destroyed while waiting\n");
236                 return -EINVAL;
237         }
238         return err;
239 }
240
241 static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk)
242 {
243         struct socket *sk = ssk->socket;
244         long timeo = HZ * 5; /* Timeout for for RDMA read */
245         DEFINE_WAIT(wait);
246
247         sdp_dbg_data(sk, "Sleep till RDMA wr finished.\n");
248         while (1) {
249                 prepare_to_wait(sk->sk_sleep, &wait, TASK_UNINTERRUPTIBLE);
250
251                 if (!ssk->tx_ring.rdma_inflight->busy) {
252                         sdp_dbg_data(sk, "got rdma cqe\n");
253                         break;
254                 }
255
256                 if (!ssk->qp_active) {
257                         sdp_dbg_data(sk, "QP destroyed\n");
258                         break;
259                 }
260
261                 if (!timeo) {
262                         sdp_warn(sk, "Panic: Timed out waiting for RDMA read\n");
263                         WARN_ON(1);
264                         break;
265                 }
266
267                 posts_handler_put(ssk);
268
269                 sdp_prf1(sk, NULL, "Going to sleep");
270                 sk_wait_event(sk, &timeo, 
271                         !ssk->tx_ring.rdma_inflight->busy);
272                 sdp_prf1(sk, NULL, "Woke up");
273                 sdp_dbg_data(ssk->socket, "woke up sleepers\n");
274
275                 posts_handler_get(ssk);
276         }
277
278         finish_wait(sk->sk_sleep, &wait);
279
280         sdp_dbg_data(sk, "Finished waiting\n");
281 }
282
283 int sdp_post_rdma_rd_compl(struct sdp_sock *ssk,
284                 struct rx_srcavail_state *rx_sa)
285 {
286         struct mbuf *mb;
287         int copied = rx_sa->used - rx_sa->reported;
288
289         if (rx_sa->used <= rx_sa->reported)
290                 return 0;
291
292         mb = sdp_alloc_mb_rdmardcompl(ssk->socket, copied, 0);
293
294         rx_sa->reported += copied;
295
296         /* TODO: What if no tx_credits available? */
297         sdp_post_send(ssk, mb);
298
299         return 0;
300 }
301
302 int sdp_post_sendsm(struct socket *sk)
303 {
304         struct mbuf *mb = sdp_alloc_mb_sendsm(sk, 0);
305
306         sdp_post_send(sdp_sk(sk), mb);
307
308         return 0;
309 }
310
311 static int sdp_update_iov_used(struct socket *sk, struct iovec *iov, int len)
312 {
313         sdp_dbg_data(sk, "updating consumed 0x%x bytes from iov\n", len);
314         while (len > 0) {
315                 if (iov->iov_len) {
316                         int copy = min_t(unsigned int, iov->iov_len, len);
317                         len -= copy;
318                         iov->iov_len -= copy;
319                         iov->iov_base += copy;
320                 }
321                 iov++;
322         }
323
324         return 0;
325 }
326
327 static inline int sge_bytes(struct ib_sge *sge, int sge_cnt)
328 {
329         int bytes = 0;
330
331         while (sge_cnt > 0) {
332                 bytes += sge->length;
333                 sge++;
334                 sge_cnt--;
335         }
336
337         return bytes;
338 }
339 void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack)
340 {
341         struct socket *sk = ssk->socket;
342         unsigned long flags;
343
344         spin_lock_irqsave(&ssk->tx_sa_lock, flags);
345
346         if (!ssk->tx_sa) {
347                 sdp_prf1(sk, NULL, "SendSM for cancelled/finished SrcAvail");
348                 goto out;
349         }
350
351         if (ssk->tx_sa->mseq > mseq_ack) {
352                 sdp_dbg_data(sk, "SendSM arrived for old SrcAvail. "
353                         "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n",
354                         mseq_ack, ssk->tx_sa->mseq);
355                 goto out;
356         }
357
358         sdp_dbg_data(sk, "Got SendSM - aborting SrcAvail\n");
359
360         ssk->tx_sa->abort_flags |= TX_SA_SENDSM;
361         cancel_delayed_work(&ssk->srcavail_cancel_work);
362
363         wake_up(sk->sk_sleep);
364         sdp_dbg_data(sk, "woke up sleepers\n");
365
366 out:
367         spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
368 }
369
370 void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack,
371                 u32 bytes_completed)
372 {
373         struct socket *sk = ssk->socket;
374         unsigned long flags;
375
376         sdp_prf1(sk, NULL, "RdmaRdCompl ssk=%p tx_sa=%p", ssk, ssk->tx_sa);
377         sdp_dbg_data(sk, "RdmaRdCompl ssk=%p tx_sa=%p\n", ssk, ssk->tx_sa);
378
379         spin_lock_irqsave(&ssk->tx_sa_lock, flags);
380
381         BUG_ON(!ssk);
382
383         if (!ssk->tx_sa) {
384                 sdp_dbg_data(sk, "Got RdmaRdCompl for aborted SrcAvail\n");
385                 goto out;
386         }
387
388         if (ssk->tx_sa->mseq > mseq_ack) {
389                 sdp_dbg_data(sk, "RdmaRdCompl arrived for old SrcAvail. "
390                         "SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n",
391                         mseq_ack, ssk->tx_sa->mseq);
392                 goto out;
393         }
394
395         ssk->tx_sa->bytes_acked += bytes_completed;
396
397         wake_up(sk->sk_sleep);
398         sdp_dbg_data(sk, "woke up sleepers\n");
399
400 out:
401         spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
402         return;
403 }
404
405 static unsigned long sdp_get_max_memlockable_bytes(unsigned long offset)
406 {
407         unsigned long avail;
408         unsigned long lock_limit;
409
410         if (capable(CAP_IPC_LOCK))
411                 return ULONG_MAX;
412
413         lock_limit = current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur;
414         avail = lock_limit - (current->mm->locked_vm << PAGE_SHIFT);
415
416         return avail - offset;
417 }
418
419 static int sdp_alloc_fmr(struct socket *sk, void *uaddr, size_t len,
420         struct ib_pool_fmr **_fmr, struct ib_umem **_umem)
421 {
422         struct ib_pool_fmr *fmr;
423         struct ib_umem *umem;
424         struct ib_device *dev;
425         u64 *pages;
426         struct ib_umem_chunk *chunk;
427         int n, j, k;
428         int rc = 0;
429         unsigned long max_lockable_bytes;
430
431         if (unlikely(len > SDP_MAX_RDMA_READ_LEN)) {
432                 sdp_dbg_data(sk, "len:0x%lx > FMR_SIZE: 0x%lx\n",
433                         len, SDP_MAX_RDMA_READ_LEN);
434                 len = SDP_MAX_RDMA_READ_LEN;
435         }
436
437         max_lockable_bytes = sdp_get_max_memlockable_bytes((unsigned long)uaddr & ~PAGE_MASK);
438         if (unlikely(len > max_lockable_bytes)) {
439                 sdp_dbg_data(sk, "len:0x%lx > RLIMIT_MEMLOCK available: 0x%lx\n",
440                         len, max_lockable_bytes);
441                 len = max_lockable_bytes;
442         }
443
444         sdp_dbg_data(sk, "user buf: %p, len:0x%lx max_lockable_bytes: 0x%lx\n",
445                         uaddr, len, max_lockable_bytes);
446
447         umem = ib_umem_get(&sdp_sk(sk)->context, (unsigned long)uaddr, len,
448                 IB_ACCESS_REMOTE_WRITE, 0);
449
450         if (IS_ERR(umem)) {
451                 rc = PTR_ERR(umem);
452                 sdp_warn(sk, "Error doing umem_get 0x%lx bytes: %d\n", len, rc);
453                 sdp_warn(sk, "RLIMIT_MEMLOCK: 0x%lx[cur] 0x%lx[max] CAP_IPC_LOCK: %d\n",
454                                 current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur,
455                                 current->signal->rlim[RLIMIT_MEMLOCK].rlim_max,
456                                 capable(CAP_IPC_LOCK));
457                 goto err_umem_get;
458         }
459
460         sdp_dbg_data(sk, "umem->offset = 0x%x, length = 0x%lx\n",
461                 umem->offset, umem->length);
462
463         pages = (u64 *) __get_free_page(GFP_KERNEL);
464         if (!pages)
465                 goto err_pages_alloc;
466
467         n = 0;
468
469         dev = sdp_sk(sk)->ib_device;
470         list_for_each_entry(chunk, &umem->chunk_list, list) {
471                 for (j = 0; j < chunk->nmap; ++j) {
472                         len = ib_sg_dma_len(dev,
473                                         &chunk->page_list[j]) >> PAGE_SHIFT;
474
475                         for (k = 0; k < len; ++k) {
476                                 pages[n++] = ib_sg_dma_address(dev,
477                                                 &chunk->page_list[j]) +
478                                         umem->page_size * k;
479
480                         }
481                 }
482         }
483
484         fmr = ib_fmr_pool_map_phys(sdp_sk(sk)->sdp_dev->fmr_pool, pages, n, 0);
485         if (IS_ERR(fmr)) {
486                 sdp_warn(sk, "Error allocating fmr: %ld\n", PTR_ERR(fmr));
487                 goto err_fmr_alloc;
488         }
489
490         free_page((unsigned long) pages);
491
492         *_umem = umem;
493         *_fmr = fmr;
494
495         return 0;
496
497 err_fmr_alloc:  
498         free_page((unsigned long) pages);
499
500 err_pages_alloc:
501         ib_umem_release(umem);
502
503 err_umem_get:
504
505         return rc;
506 }
507
508 void sdp_free_fmr(struct socket *sk, struct ib_pool_fmr **_fmr, struct ib_umem **_umem)
509 {
510         if (!sdp_sk(sk)->qp_active)
511                 return;
512
513         ib_fmr_pool_unmap(*_fmr);
514         *_fmr = NULL;
515
516         ib_umem_release(*_umem);
517         *_umem = NULL;
518 }
519
520 static int sdp_post_rdma_read(struct socket *sk, struct rx_srcavail_state *rx_sa)
521 {
522         struct sdp_sock *ssk = sdp_sk(sk);
523         struct ib_send_wr *bad_wr;
524         struct ib_send_wr wr = { NULL };
525         struct ib_sge sge;
526
527         wr.opcode = IB_WR_RDMA_READ;
528         wr.next = NULL;
529         wr.wr_id = SDP_OP_RDMA;
530         wr.wr.rdma.rkey = rx_sa->rkey;
531         wr.send_flags = 0;
532
533         ssk->tx_ring.rdma_inflight = rx_sa;
534
535         sge.addr = rx_sa->umem->offset;
536         sge.length = rx_sa->umem->length;
537         sge.lkey = rx_sa->fmr->fmr->lkey;
538
539         wr.wr.rdma.remote_addr = rx_sa->vaddr + rx_sa->used;
540         wr.num_sge = 1;
541         wr.sg_list = &sge;
542         rx_sa->busy++;
543
544         wr.send_flags = IB_SEND_SIGNALED;
545
546         return ib_post_send(ssk->qp, &wr, &bad_wr);
547 }
548
549 int sdp_rdma_to_iovec(struct socket *sk, struct iovec *iov, struct mbuf *mb,
550                 unsigned long *used)
551 {
552         struct sdp_sock *ssk = sdp_sk(sk);
553         struct rx_srcavail_state *rx_sa = RX_SRCAVAIL_STATE(mb);
554         int got_srcavail_cancel;
555         int rc = 0;
556         int len = *used;
557         int copied;
558
559         sdp_dbg_data(ssk->socket, "preparing RDMA read."
560                 " len: 0x%x. buffer len: 0x%lx\n", len, iov->iov_len);
561
562         sock_hold(sk, SOCK_REF_RDMA_RD);
563
564         if (len > rx_sa->len) {
565                 sdp_warn(sk, "len:0x%x > rx_sa->len: 0x%x\n", len, rx_sa->len);
566                 WARN_ON(1);
567                 len = rx_sa->len;
568         }
569
570         rc = sdp_alloc_fmr(sk, iov->iov_base, len, &rx_sa->fmr, &rx_sa->umem);
571         if (rc) {
572                 sdp_warn(sk, "Error allocating fmr: %d\n", rc);
573                 goto err_alloc_fmr;
574         }
575
576         rc = sdp_post_rdma_read(sk, rx_sa);
577         if (unlikely(rc)) {
578                 sdp_warn(sk, "ib_post_send failed with status %d.\n", rc);
579                 sdp_set_error(ssk->socket, -ECONNRESET);
580                 wake_up(&ssk->wq);
581                 goto err_post_send;
582         }
583
584         sdp_prf(sk, mb, "Finished posting(rc=%d), now to wait", rc);
585
586         got_srcavail_cancel = ssk->srcavail_cancel_mseq > rx_sa->mseq;
587
588         sdp_arm_tx_cq(sk);
589
590         sdp_wait_rdma_wr_finished(ssk);
591
592         sdp_prf(sk, mb, "Finished waiting(rc=%d)", rc);
593         if (!ssk->qp_active) {
594                 sdp_dbg_data(sk, "QP destroyed during RDMA read\n");
595                 rc = -EPIPE;
596                 goto err_post_send;
597         }
598
599         copied = rx_sa->umem->length;
600
601         sdp_update_iov_used(sk, iov, copied);
602         rx_sa->used += copied;
603         atomic_add(copied, &ssk->rcv_nxt);
604         *used = copied;
605
606         ssk->tx_ring.rdma_inflight = NULL;
607
608 err_post_send:
609         sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
610
611 err_alloc_fmr:
612         if (rc && ssk->qp_active) {
613                 sdp_warn(sk, "Couldn't do RDMA - post sendsm\n");
614                 rx_sa->flags |= RX_SA_ABORTED;
615         }
616
617         sock_put(sk, SOCK_REF_RDMA_RD);
618
619         return rc;
620 }
621
622 static inline int wait_for_sndbuf(struct socket *sk, long *timeo_p)
623 {
624         struct sdp_sock *ssk = sdp_sk(sk);
625         int ret = 0;
626         int credits_needed = 1;
627
628         sdp_dbg_data(sk, "Wait for mem\n");
629
630         set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
631
632         SDPSTATS_COUNTER_INC(send_wait_for_mem);
633
634         sdp_do_posts(ssk);
635
636         sdp_xmit_poll(ssk, 1);
637
638         ret = sdp_tx_wait_memory(ssk, timeo_p, &credits_needed);
639
640         return ret;
641 }
642
643 static int do_sdp_sendmsg_zcopy(struct socket *sk, struct tx_srcavail_state *tx_sa,
644                 struct iovec *iov, long *timeo)
645 {
646         struct sdp_sock *ssk = sdp_sk(sk);
647         int rc = 0;
648         unsigned long lock_flags;
649
650         rc = sdp_alloc_fmr(sk, iov->iov_base, iov->iov_len,
651                         &tx_sa->fmr, &tx_sa->umem);
652         if (rc) {
653                 sdp_warn(sk, "Error allocating fmr: %d\n", rc);
654                 goto err_alloc_fmr;
655         }
656
657         if (tx_slots_free(ssk) == 0) {
658                 rc = wait_for_sndbuf(sk, timeo);
659                 if (rc) {
660                         sdp_warn(sk, "Couldn't get send buffer\n");
661                         goto err_no_tx_slots;
662                 }
663         }
664
665         rc = sdp_post_srcavail(sk, tx_sa);
666         if (rc) {
667                 sdp_dbg(sk, "Error posting SrcAvail\n");
668                 goto err_abort_send;
669         }
670
671         rc = sdp_wait_rdmardcompl(ssk, timeo, 0);
672         if (unlikely(rc)) {
673                 enum tx_sa_flag f = tx_sa->abort_flags;
674
675                 if (f & TX_SA_SENDSM) {
676                         sdp_dbg_data(sk, "Got SendSM. use SEND verb.\n");
677                 } else if (f & TX_SA_ERROR) {
678                         sdp_dbg_data(sk, "SrcAvail error completion\n");
679                         sdp_reset(sk);
680                         SDPSTATS_COUNTER_INC(zcopy_tx_error);
681                 } else if (ssk->qp_active) {
682                         sdp_post_srcavail_cancel(sk);
683
684                         /* Wait for RdmaRdCompl/SendSM to
685                          * finish the transaction */
686                         *timeo = 2 * HZ;
687                         sdp_dbg_data(sk, "Waiting for SendSM\n");
688                         sdp_wait_rdmardcompl(ssk, timeo, 1);
689                         sdp_dbg_data(sk, "finished waiting\n");
690
691                         cancel_delayed_work(&ssk->srcavail_cancel_work);
692                 } else {
693                         sdp_dbg_data(sk, "QP was destroyed while waiting\n");
694                 }
695         } else {
696                 sdp_dbg_data(sk, "got RdmaRdCompl\n");
697         }
698
699         spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags);
700         ssk->tx_sa = NULL;
701         spin_unlock_irqrestore(&ssk->tx_sa_lock, lock_flags);
702
703 err_abort_send:
704         sdp_update_iov_used(sk, iov, tx_sa->bytes_acked);
705
706 err_no_tx_slots:
707         sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
708
709 err_alloc_fmr:
710         return rc;      
711 }
712
713 int sdp_sendmsg_zcopy(struct kiocb *iocb, struct socket *sk, struct iovec *iov)
714 {
715         struct sdp_sock *ssk = sdp_sk(sk);
716         int rc = 0;
717         long timeo;
718         struct tx_srcavail_state *tx_sa;
719         int offset;
720         size_t bytes_to_copy = 0;
721         int copied = 0;
722
723         sdp_dbg_data(sk, "Sending iov: %p, iov_len: 0x%lx\n",
724                         iov->iov_base, iov->iov_len);
725         sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy start");
726         if (ssk->rx_sa) {
727                 sdp_dbg_data(sk, "Deadlock prevent: crossing SrcAvail\n");
728                 return 0;
729         }
730
731         sock_hold(ssk->socket, SOCK_REF_ZCOPY);
732
733         SDPSTATS_COUNTER_INC(sendmsg_zcopy_segment);
734
735         timeo = SDP_SRCAVAIL_ADV_TIMEOUT ;
736
737         /* Ok commence sending. */
738         offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1);
739
740         tx_sa = kmalloc(sizeof(struct tx_srcavail_state), GFP_KERNEL);
741         if (!tx_sa) {
742                 sdp_warn(sk, "Error allocating zcopy context\n");
743                 rc = -EAGAIN; /* Buffer too big - fallback to bcopy */
744                 goto err_alloc_tx_sa;
745         }
746
747         bytes_to_copy = iov->iov_len;
748         do {
749                 tx_sa_reset(tx_sa);
750
751                 rc = do_sdp_sendmsg_zcopy(sk, tx_sa, iov, &timeo);
752
753                 if (iov->iov_len && iov->iov_len < sdp_zcopy_thresh) {
754                         sdp_dbg_data(sk, "0x%lx bytes left, switching to bcopy\n",
755                                 iov->iov_len);
756                         break;
757                 }
758         } while (!rc && iov->iov_len > 0 && !tx_sa->abort_flags);
759
760         kfree(tx_sa);
761 err_alloc_tx_sa:
762         copied = bytes_to_copy - iov->iov_len;
763
764         sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy end rc: %d copied: %d", rc, copied);
765
766         sock_put(ssk->socket, SOCK_REF_ZCOPY);
767
768         if (rc < 0 && rc != -EAGAIN && rc != -ETIME)
769                 return rc;
770
771         return copied;
772 }
773
774 void sdp_abort_srcavail(struct socket *sk)
775 {
776         struct sdp_sock *ssk = sdp_sk(sk);
777         struct tx_srcavail_state *tx_sa = ssk->tx_sa;
778         unsigned long flags;
779
780         if (!tx_sa)
781                 return;
782
783         cancel_delayed_work(&ssk->srcavail_cancel_work);
784         flush_scheduled_work();
785
786         spin_lock_irqsave(&ssk->tx_sa_lock, flags);
787
788         sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
789
790         ssk->tx_sa = NULL;
791
792         spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
793 }
794
795 void sdp_abort_rdma_read(struct socket *sk)
796 {
797         struct sdp_sock *ssk = sdp_sk(sk);
798         struct rx_srcavail_state *rx_sa = ssk->rx_sa;
799
800         if (!rx_sa)
801                 return;
802
803         sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
804
805         ssk->rx_sa = NULL;
806 }