sys: general adoption of SPDX licensing ID tags.
[freebsd.git] / sys / ofed / drivers / infiniband / ulp / sdp / sdp_rx.c
1 /*-
2  * SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0
3  *
4  * Copyright (c) 2009 Mellanox Technologies Ltd.  All rights reserved.
5  *
6  * This software is available to you under a choice of one of two
7  * licenses.  You may choose to be licensed under the terms of the GNU
8  * General Public License (GPL) Version 2, available from the file
9  * COPYING in the main directory of this source tree, or the
10  * OpenIB.org BSD license below:
11  *
12  *     Redistribution and use in source and binary forms, with or
13  *     without modification, are permitted provided that the following
14  *     conditions are met:
15  *
16  *      - Redistributions of source code must retain the above
17  *        copyright notice, this list of conditions and the following
18  *        disclaimer.
19  *
20  *      - Redistributions in binary form must reproduce the above
21  *        copyright notice, this list of conditions and the following
22  *        disclaimer in the documentation and/or other materials
23  *        provided with the distribution.
24  *
25  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32  * SOFTWARE.
33  */
34 #include "sdp.h"
35
36 SDP_MODPARAM_INT(rcvbuf_initial_size, 32 * 1024,
37                 "Receive buffer initial size in bytes.");
38 SDP_MODPARAM_SINT(rcvbuf_scale, 0x8,
39                 "Receive buffer size scale factor.");
40
41 /* Like tcp_fin - called when SDP_MID_DISCONNECT is received */
42 static void
43 sdp_handle_disconn(struct sdp_sock *ssk)
44 {
45
46         sdp_dbg(ssk->socket, "%s\n", __func__);
47
48         SDP_WLOCK_ASSERT(ssk);
49         if (TCPS_HAVERCVDFIN(ssk->state) == 0)
50                 socantrcvmore(ssk->socket);
51
52         switch (ssk->state) {
53         case TCPS_SYN_RECEIVED:
54         case TCPS_ESTABLISHED:
55                 ssk->state = TCPS_CLOSE_WAIT;
56                 break;
57
58         case TCPS_FIN_WAIT_1:
59                 /* Received a reply FIN - start Infiniband tear down */
60                 sdp_dbg(ssk->socket,
61                     "%s: Starting Infiniband tear down sending DREQ\n",
62                     __func__);
63
64                 sdp_cancel_dreq_wait_timeout(ssk);
65                 ssk->qp_active = 0;
66                 if (ssk->id) {
67                         struct rdma_cm_id *id;
68
69                         id = ssk->id;
70                         SDP_WUNLOCK(ssk);
71                         rdma_disconnect(id);
72                         SDP_WLOCK(ssk);
73                 } else {
74                         sdp_warn(ssk->socket,
75                             "%s: ssk->id is NULL\n", __func__);
76                         return;
77                 }
78                 break;
79         case TCPS_TIME_WAIT:
80                 /* This is a mutual close situation and we've got the DREQ from
81                    the peer before the SDP_MID_DISCONNECT */
82                 break;
83         case TCPS_CLOSED:
84                 /* FIN arrived after IB teardown started - do nothing */
85                 sdp_dbg(ssk->socket, "%s: fin in state %s\n",
86                     __func__, sdp_state_str(ssk->state));
87                 return;
88         default:
89                 sdp_warn(ssk->socket,
90                     "%s: FIN in unexpected state. state=%d\n",
91                     __func__, ssk->state);
92                 break;
93         }
94 }
95
96 static int
97 sdp_post_recv(struct sdp_sock *ssk)
98 {
99         struct sdp_buf *rx_req;
100         int i, rc;
101         u64 addr;
102         struct ib_device *dev;
103         struct ib_recv_wr rx_wr = { NULL };
104         struct ib_sge ibsge[SDP_MAX_RECV_SGES];
105         struct ib_sge *sge = ibsge;
106         struct ib_recv_wr *bad_wr;
107         struct mbuf *mb, *m;
108         struct sdp_bsdh *h;
109         int id = ring_head(ssk->rx_ring);
110
111         /* Now, allocate and repost recv */
112         sdp_prf(ssk->socket, mb, "Posting mb");
113         mb = m_getm2(NULL, ssk->recv_bytes, M_NOWAIT, MT_DATA, M_PKTHDR);
114         if (mb == NULL) {
115                 /* Retry so we can't stall out with no memory. */
116                 if (!rx_ring_posted(ssk))
117                         queue_work(rx_comp_wq, &ssk->rx_comp_work);
118                 return -1;
119         }
120         for (m = mb; m != NULL; m = m->m_next) {
121                 m->m_len = M_SIZE(m);
122                 mb->m_pkthdr.len += m->m_len;
123         }
124         h = mtod(mb, struct sdp_bsdh *);
125         rx_req = ssk->rx_ring.buffer + (id & (SDP_RX_SIZE - 1));
126         rx_req->mb = mb;
127         dev = ssk->ib_device;
128         for (i = 0;  mb != NULL; i++, mb = mb->m_next, sge++) {
129                 addr = ib_dma_map_single(dev, mb->m_data, mb->m_len,
130                     DMA_TO_DEVICE);
131                 /* TODO: proper error handling */
132                 BUG_ON(ib_dma_mapping_error(dev, addr));
133                 BUG_ON(i >= SDP_MAX_RECV_SGES);
134                 rx_req->mapping[i] = addr;
135                 sge->addr = addr;
136                 sge->length = mb->m_len;
137                 sge->lkey = ssk->sdp_dev->pd->local_dma_lkey;
138         }
139
140         rx_wr.next = NULL;
141         rx_wr.wr_id = id | SDP_OP_RECV;
142         rx_wr.sg_list = ibsge;
143         rx_wr.num_sge = i;
144         rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr);
145         if (unlikely(rc)) {
146                 sdp_warn(ssk->socket, "ib_post_recv failed. status %d\n", rc);
147
148                 sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
149                 m_freem(mb);
150
151                 sdp_notify(ssk, ECONNRESET);
152
153                 return -1;
154         }
155
156         atomic_inc(&ssk->rx_ring.head);
157         SDPSTATS_COUNTER_INC(post_recv);
158
159         return 0;
160 }
161
162 static inline int
163 sdp_post_recvs_needed(struct sdp_sock *ssk)
164 {
165         unsigned long bytes_in_process;
166         unsigned long max_bytes;
167         int buffer_size;
168         int posted;
169
170         if (!ssk->qp_active || !ssk->socket)
171                 return 0;
172
173         posted = rx_ring_posted(ssk);
174         if (posted >= SDP_RX_SIZE)
175                 return 0;
176         if (posted < SDP_MIN_TX_CREDITS)
177                 return 1;
178
179         buffer_size = ssk->recv_bytes;
180         max_bytes = max(ssk->socket->so_rcv.sb_hiwat,
181             (1 + SDP_MIN_TX_CREDITS) * buffer_size);
182         max_bytes *= rcvbuf_scale;
183         /*
184          * Compute bytes in the receive queue and socket buffer.
185          */
186         bytes_in_process = (posted - SDP_MIN_TX_CREDITS) * buffer_size;
187         bytes_in_process += sbused(&ssk->socket->so_rcv);
188
189         return bytes_in_process < max_bytes;
190 }
191
192 static inline void
193 sdp_post_recvs(struct sdp_sock *ssk)
194 {
195
196         while (sdp_post_recvs_needed(ssk))
197                 if (sdp_post_recv(ssk))
198                         return;
199 }
200
201 static inline struct mbuf *
202 sdp_sock_queue_rcv_mb(struct socket *sk, struct mbuf *mb)
203 {
204         struct sdp_sock *ssk = sdp_sk(sk);
205         struct sdp_bsdh *h;
206
207         h = mtod(mb, struct sdp_bsdh *);
208
209 #ifdef SDP_ZCOPY
210         SDP_SKB_CB(mb)->seq = rcv_nxt(ssk);
211         if (h->mid == SDP_MID_SRCAVAIL) {
212                 struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1);
213                 struct rx_srcavail_state *rx_sa;
214                 
215                 ssk->srcavail_cancel_mseq = 0;
216
217                 ssk->rx_sa = rx_sa = RX_SRCAVAIL_STATE(mb) = kzalloc(
218                                 sizeof(struct rx_srcavail_state), M_NOWAIT);
219
220                 rx_sa->mseq = ntohl(h->mseq);
221                 rx_sa->used = 0;
222                 rx_sa->len = mb_len = ntohl(srcah->len);
223                 rx_sa->rkey = ntohl(srcah->rkey);
224                 rx_sa->vaddr = be64_to_cpu(srcah->vaddr);
225                 rx_sa->flags = 0;
226
227                 if (ssk->tx_sa) {
228                         sdp_dbg_data(ssk->socket, "got RX SrcAvail while waiting "
229                                         "for TX SrcAvail. waking up TX SrcAvail"
230                                         "to be aborted\n");
231                         wake_up(sk->sk_sleep);
232                 }
233
234                 atomic_add(mb->len, &ssk->rcv_nxt);
235                 sdp_dbg_data(sk, "queueing SrcAvail. mb_len = %d vaddr = %lld\n",
236                         mb_len, rx_sa->vaddr);
237         } else
238 #endif
239         {
240                 atomic_add(mb->m_pkthdr.len, &ssk->rcv_nxt);
241         }
242
243         m_adj(mb, SDP_HEAD_SIZE);
244         SOCKBUF_LOCK(&sk->so_rcv);
245         if (unlikely(h->flags & SDP_OOB_PRES))
246                 sdp_urg(ssk, mb);
247         sbappend_locked(&sk->so_rcv, mb, 0);
248         sorwakeup_locked(sk);
249         return mb;
250 }
251
252 static int
253 sdp_get_recv_bytes(struct sdp_sock *ssk, u32 new_size)
254 {
255
256         return MIN(new_size, SDP_MAX_PACKET);
257 }
258
259 int
260 sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
261 {
262
263         ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
264         sdp_post_recvs(ssk);
265
266         return 0;
267 }
268
269 int
270 sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size)
271 {
272         u32 curr_size = ssk->recv_bytes;
273         u32 max_size = SDP_MAX_PACKET;
274
275         if (new_size > curr_size && new_size <= max_size) {
276                 ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
277                 return 0;
278         }
279         return -1;
280 }
281
282 static void
283 sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
284 {
285         if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0)
286                 ssk->recv_request_head = ring_head(ssk->rx_ring) + 1;
287         else
288                 ssk->recv_request_head = ring_tail(ssk->rx_ring);
289         ssk->recv_request = 1;
290 }
291
292 static void
293 sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
294 {
295         u32 new_size = ntohl(buf->size);
296
297         if (new_size > ssk->xmit_size_goal)
298                 ssk->xmit_size_goal = new_size;
299 }
300
301 static struct mbuf *
302 sdp_recv_completion(struct sdp_sock *ssk, int id)
303 {
304         struct sdp_buf *rx_req;
305         struct ib_device *dev;
306         struct mbuf *mb;
307
308         if (unlikely(id != ring_tail(ssk->rx_ring))) {
309                 printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
310                         id, ring_tail(ssk->rx_ring));
311                 return NULL;
312         }
313
314         dev = ssk->ib_device;
315         rx_req = &ssk->rx_ring.buffer[id & (SDP_RX_SIZE - 1)];
316         mb = rx_req->mb;
317         sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
318
319         atomic_inc(&ssk->rx_ring.tail);
320         atomic_dec(&ssk->remote_credits);
321         return mb;
322 }
323
324 static void
325 sdp_process_rx_ctl_mb(struct sdp_sock *ssk, struct mbuf *mb)
326 {
327         struct sdp_bsdh *h;
328         struct socket *sk;
329
330         SDP_WLOCK_ASSERT(ssk);
331
332         sk = ssk->socket;
333         h = mtod(mb, struct sdp_bsdh *);
334         switch (h->mid) {
335         case SDP_MID_DATA:
336         case SDP_MID_SRCAVAIL:
337                 sdp_dbg(sk, "DATA after socket rcv was shutdown\n");
338
339                 /* got data in RCV_SHUTDOWN */
340                 if (ssk->state == TCPS_FIN_WAIT_1) {
341                         sdp_dbg(sk, "RX data when state = FIN_WAIT1\n");
342                         sdp_notify(ssk, ECONNRESET);
343                 }
344
345                 break;
346 #ifdef SDP_ZCOPY
347         case SDP_MID_RDMARDCOMPL:
348                 break;
349         case SDP_MID_SENDSM:
350                 sdp_handle_sendsm(ssk, ntohl(h->mseq_ack));
351                 break;
352         case SDP_MID_SRCAVAIL_CANCEL:
353                 sdp_dbg_data(sk, "Handling SrcAvailCancel\n");
354                 sdp_prf(sk, NULL, "Handling SrcAvailCancel");
355                 if (ssk->rx_sa) {
356                         ssk->srcavail_cancel_mseq = ntohl(h->mseq);
357                         ssk->rx_sa->flags |= RX_SA_ABORTED;
358                         ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get 
359                                               the dirty logic from recvmsg */
360                 } else {
361                         sdp_dbg(sk, "Got SrcAvailCancel - "
362                                         "but no SrcAvail in process\n");
363                 }
364                 break;
365         case SDP_MID_SINKAVAIL:
366                 sdp_dbg_data(sk, "Got SinkAvail - not supported: ignored\n");
367                 sdp_prf(sk, NULL, "Got SinkAvail - not supported: ignored");
368                 /* FALLTHROUGH */
369 #endif
370         case SDP_MID_ABORT:
371                 sdp_dbg_data(sk, "Handling ABORT\n");
372                 sdp_prf(sk, NULL, "Handling ABORT");
373                 sdp_notify(ssk, ECONNRESET);
374                 break;
375         case SDP_MID_DISCONN:
376                 sdp_dbg_data(sk, "Handling DISCONN\n");
377                 sdp_prf(sk, NULL, "Handling DISCONN");
378                 sdp_handle_disconn(ssk);
379                 break;
380         case SDP_MID_CHRCVBUF:
381                 sdp_dbg_data(sk, "Handling RX CHRCVBUF\n");
382                 sdp_handle_resize_request(ssk, (struct sdp_chrecvbuf *)(h+1));
383                 break;
384         case SDP_MID_CHRCVBUF_ACK:
385                 sdp_dbg_data(sk, "Handling RX CHRCVBUF_ACK\n");
386                 sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)(h+1));
387                 break;
388         default:
389                 /* TODO: Handle other messages */
390                 sdp_warn(sk, "SDP: FIXME MID %d\n", h->mid);
391                 break;
392         }
393         m_freem(mb);
394 }
395
396 static int
397 sdp_process_rx_mb(struct sdp_sock *ssk, struct mbuf *mb)
398 {
399         struct socket *sk;
400         struct sdp_bsdh *h;
401         unsigned long mseq_ack;
402         int credits_before;
403
404         h = mtod(mb, struct sdp_bsdh *);
405         sk = ssk->socket;
406         /*
407          * If another thread is in so_pcbfree this may be partially torn
408          * down but no further synchronization is required as the destroying
409          * thread will wait for receive to shutdown before discarding the
410          * socket.
411          */
412         if (sk == NULL) {
413                 m_freem(mb);
414                 return 0;
415         }
416
417         SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk));
418
419         mseq_ack = ntohl(h->mseq_ack);
420         credits_before = tx_credits(ssk);
421         atomic_set(&ssk->tx_ring.credits, mseq_ack - ring_head(ssk->tx_ring) +
422                         1 + ntohs(h->bufs));
423         if (mseq_ack >= ssk->nagle_last_unacked)
424                 ssk->nagle_last_unacked = 0;
425
426         sdp_prf1(ssk->socket, mb, "RX %s +%d c:%d->%d mseq:%d ack:%d\n",
427                 mid2str(h->mid), ntohs(h->bufs), credits_before,
428                 tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack));
429
430         if (unlikely(h->mid == SDP_MID_DATA &&
431             mb->m_pkthdr.len == SDP_HEAD_SIZE)) {
432                 /* Credit update is valid even after RCV_SHUTDOWN */
433                 m_freem(mb);
434                 return 0;
435         }
436
437         if ((h->mid != SDP_MID_DATA && h->mid != SDP_MID_SRCAVAIL) ||
438             TCPS_HAVERCVDFIN(ssk->state)) {
439                 sdp_prf(sk, NULL, "Control mb - queing to control queue");
440 #ifdef SDP_ZCOPY
441                 if (h->mid == SDP_MID_SRCAVAIL_CANCEL) {
442                         sdp_dbg_data(sk, "Got SrcAvailCancel. "
443                                         "seq: 0x%d seq_ack: 0x%d\n",
444                                         ntohl(h->mseq), ntohl(h->mseq_ack));
445                         ssk->srcavail_cancel_mseq = ntohl(h->mseq);
446                 }
447
448
449                 if (h->mid == SDP_MID_RDMARDCOMPL) {
450                         struct sdp_rrch *rrch = (struct sdp_rrch *)(h+1);
451                         sdp_dbg_data(sk, "RdmaRdCompl message arrived\n");
452                         sdp_handle_rdma_read_compl(ssk, ntohl(h->mseq_ack),
453                                         ntohl(rrch->len));
454                 }
455 #endif
456                 if (mbufq_enqueue(&ssk->rxctlq, mb) != 0)
457                         m_freem(mb);
458                 return (0);
459         }
460
461         sdp_prf1(sk, NULL, "queueing %s mb\n", mid2str(h->mid));
462         mb = sdp_sock_queue_rcv_mb(sk, mb);
463
464
465         return 0;
466 }
467
468 /* called only from irq */
469 static struct mbuf *
470 sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
471 {
472         struct mbuf *mb;
473         struct sdp_bsdh *h;
474         struct socket *sk = ssk->socket;
475         int mseq;
476
477         mb = sdp_recv_completion(ssk, wc->wr_id);
478         if (unlikely(!mb))
479                 return NULL;
480
481         if (unlikely(wc->status)) {
482                 if (ssk->qp_active && sk) {
483                         sdp_dbg(sk, "Recv completion with error. "
484                                         "Status %d, vendor: %d\n",
485                                 wc->status, wc->vendor_err);
486                         sdp_abort(sk);
487                         ssk->qp_active = 0;
488                 }
489                 m_freem(mb);
490                 return NULL;
491         }
492
493         sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
494                         (int)wc->wr_id, wc->byte_len);
495         if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
496                 sdp_warn(sk, "SDP BUG! byte_len %d < %zd\n",
497                                 wc->byte_len, sizeof(struct sdp_bsdh));
498                 m_freem(mb);
499                 return NULL;
500         }
501         /* Use m_adj to trim the tail of data we didn't use. */
502         m_adj(mb, -(mb->m_pkthdr.len - wc->byte_len));
503         h = mtod(mb, struct sdp_bsdh *);
504
505         SDP_DUMP_PACKET(ssk->socket, "RX", mb, h);
506
507         ssk->rx_packets++;
508         ssk->rx_bytes += mb->m_pkthdr.len;
509
510         mseq = ntohl(h->mseq);
511         atomic_set(&ssk->mseq_ack, mseq);
512         if (mseq != (int)wc->wr_id)
513                 sdp_warn(sk, "SDP BUG! mseq %d != wrid %d\n",
514                                 mseq, (int)wc->wr_id);
515
516         return mb;
517 }
518
519 /* Wakeup writers if we now have credits. */
520 static void
521 sdp_bzcopy_write_space(struct sdp_sock *ssk)
522 {
523         struct socket *sk = ssk->socket;
524
525         if (tx_credits(ssk) >= ssk->min_bufs && sk)
526                 sowwakeup(sk);
527 }
528
529 /* only from interrupt. */
530 static int
531 sdp_poll_rx_cq(struct sdp_sock *ssk)
532 {
533         struct ib_cq *cq = ssk->rx_ring.cq;
534         struct ib_wc ibwc[SDP_NUM_WC];
535         int n, i;
536         int wc_processed = 0;
537         struct mbuf *mb;
538
539         do {
540                 n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
541                 for (i = 0; i < n; ++i) {
542                         struct ib_wc *wc = &ibwc[i];
543
544                         BUG_ON(!(wc->wr_id & SDP_OP_RECV));
545                         mb = sdp_process_rx_wc(ssk, wc);
546                         if (!mb)
547                                 continue;
548
549                         sdp_process_rx_mb(ssk, mb);
550                         wc_processed++;
551                 }
552         } while (n == SDP_NUM_WC);
553
554         if (wc_processed)
555                 sdp_bzcopy_write_space(ssk);
556
557         return wc_processed;
558 }
559
560 static void
561 sdp_rx_comp_work(struct work_struct *work)
562 {
563         struct sdp_sock *ssk = container_of(work, struct sdp_sock,
564                         rx_comp_work);
565
566         sdp_prf(ssk->socket, NULL, "%s", __func__);
567
568         SDP_WLOCK(ssk);
569         if (unlikely(!ssk->qp)) {
570                 sdp_prf(ssk->socket, NULL, "qp was destroyed");
571                 goto out;
572         }
573         if (unlikely(!ssk->rx_ring.cq)) {
574                 sdp_prf(ssk->socket, NULL, "rx_ring.cq is NULL");
575                 goto out;
576         }
577
578         if (unlikely(!ssk->poll_cq)) {
579                 struct rdma_cm_id *id = ssk->id;
580                 if (id && id->qp)
581                         rdma_notify(id, IB_EVENT_COMM_EST);
582                 goto out;
583         }
584
585         sdp_do_posts(ssk);
586 out:
587         SDP_WUNLOCK(ssk);
588 }
589
590 void
591 sdp_do_posts(struct sdp_sock *ssk)
592 {
593         struct socket *sk = ssk->socket;
594         int xmit_poll_force;
595         struct mbuf *mb;
596
597         SDP_WLOCK_ASSERT(ssk);
598         if (!ssk->qp_active) {
599                 sdp_dbg(sk, "QP is deactivated\n");
600                 return;
601         }
602
603         while ((mb = mbufq_dequeue(&ssk->rxctlq)) != NULL)
604                 sdp_process_rx_ctl_mb(ssk, mb);
605
606         if (ssk->state == TCPS_TIME_WAIT)
607                 return;
608
609         if (!ssk->rx_ring.cq || !ssk->tx_ring.cq)
610                 return;
611
612         sdp_post_recvs(ssk);
613
614         if (tx_ring_posted(ssk))
615                 sdp_xmit_poll(ssk, 1);
616
617         sdp_post_sends(ssk, M_NOWAIT);
618
619         xmit_poll_force = tx_credits(ssk) < SDP_MIN_TX_CREDITS;
620
621         if (credit_update_needed(ssk) || xmit_poll_force) {
622                 /* if has pending tx because run out of tx_credits - xmit it */
623                 sdp_prf(sk, NULL, "Processing to free pending sends");
624                 sdp_xmit_poll(ssk,  xmit_poll_force);
625                 sdp_prf(sk, NULL, "Sending credit update");
626                 sdp_post_sends(ssk, M_NOWAIT);
627         }
628
629 }
630
631 int
632 sdp_process_rx(struct sdp_sock *ssk)
633 {
634         int wc_processed = 0;
635         int credits_before;
636
637         if (!rx_ring_trylock(&ssk->rx_ring)) {
638                 sdp_dbg(ssk->socket, "ring destroyed. not polling it\n");
639                 return 0;
640         }
641
642         credits_before = tx_credits(ssk);
643
644         wc_processed = sdp_poll_rx_cq(ssk);
645         sdp_prf(ssk->socket, NULL, "processed %d", wc_processed);
646
647         if (wc_processed) {
648                 sdp_prf(ssk->socket, NULL, "credits:  %d -> %d",
649                                 credits_before, tx_credits(ssk));
650                 queue_work(rx_comp_wq, &ssk->rx_comp_work);
651         }
652         sdp_arm_rx_cq(ssk);
653
654         rx_ring_unlock(&ssk->rx_ring);
655
656         return (wc_processed);
657 }
658
659 static void
660 sdp_rx_irq(struct ib_cq *cq, void *cq_context)
661 {
662         struct sdp_sock *ssk;
663
664         ssk = cq_context;
665         KASSERT(cq == ssk->rx_ring.cq,
666             ("%s: mismatched cq on %p", __func__, ssk));
667
668         SDPSTATS_COUNTER_INC(rx_int_count);
669
670         sdp_prf(sk, NULL, "rx irq");
671
672         sdp_process_rx(ssk);
673 }
674
675 static
676 void sdp_rx_ring_purge(struct sdp_sock *ssk)
677 {
678         while (rx_ring_posted(ssk) > 0) {
679                 struct mbuf *mb;
680                 mb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring));
681                 if (!mb)
682                         break;
683                 m_freem(mb);
684         }
685 }
686
687 void
688 sdp_rx_ring_init(struct sdp_sock *ssk)
689 {
690         ssk->rx_ring.buffer = NULL;
691         ssk->rx_ring.destroyed = 0;
692         rw_init(&ssk->rx_ring.destroyed_lock, "sdp rx lock");
693 }
694
695 static void
696 sdp_rx_cq_event_handler(struct ib_event *event, void *data)
697 {
698 }
699
700 int
701 sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
702 {
703         struct ib_cq_init_attr rx_cq_attr = {
704                 .cqe = SDP_RX_SIZE,
705                 .comp_vector = 0,
706                 .flags = 0,
707         };
708         struct ib_cq *rx_cq;
709         int rc = 0;
710
711         sdp_dbg(ssk->socket, "rx ring created");
712         INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
713         atomic_set(&ssk->rx_ring.head, 1);
714         atomic_set(&ssk->rx_ring.tail, 1);
715
716         ssk->rx_ring.buffer = malloc(sizeof(*ssk->rx_ring.buffer) * SDP_RX_SIZE,
717             M_SDP, M_WAITOK);
718
719         rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler,
720             ssk, &rx_cq_attr);
721         if (IS_ERR(rx_cq)) {
722                 rc = PTR_ERR(rx_cq);
723                 sdp_warn(ssk->socket, "Unable to allocate RX CQ: %d.\n", rc);
724                 goto err_cq;
725         }
726
727         sdp_sk(ssk->socket)->rx_ring.cq = rx_cq;
728         sdp_arm_rx_cq(ssk);
729
730         return 0;
731
732 err_cq:
733         free(ssk->rx_ring.buffer, M_SDP);
734         ssk->rx_ring.buffer = NULL;
735         return rc;
736 }
737
738 void
739 sdp_rx_ring_destroy(struct sdp_sock *ssk)
740 {
741
742         cancel_work_sync(&ssk->rx_comp_work);
743         rx_ring_destroy_lock(&ssk->rx_ring);
744
745         if (ssk->rx_ring.buffer) {
746                 sdp_rx_ring_purge(ssk);
747                 free(ssk->rx_ring.buffer, M_SDP);
748                 ssk->rx_ring.buffer = NULL;
749         }
750
751         if (ssk->rx_ring.cq) {
752                 if (ib_destroy_cq(ssk->rx_ring.cq)) {
753                         sdp_warn(ssk->socket, "destroy cq(%p) failed\n",
754                                 ssk->rx_ring.cq);
755                 } else {
756                         ssk->rx_ring.cq = NULL;
757                 }
758         }
759
760         WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring));
761 }