2 * SPDX-License-Identifier: BSD-2-Clause OR GPL-2.0
4 * Copyright (c) 2009 Mellanox Technologies Ltd. All rights reserved.
6 * This software is available to you under a choice of one of two
7 * licenses. You may choose to be licensed under the terms of the GNU
8 * General Public License (GPL) Version 2, available from the file
9 * COPYING in the main directory of this source tree, or the
10 * OpenIB.org BSD license below:
12 * Redistribution and use in source and binary forms, with or
13 * without modification, are permitted provided that the following
16 * - Redistributions of source code must retain the above
17 * copyright notice, this list of conditions and the following
20 * - Redistributions in binary form must reproduce the above
21 * copyright notice, this list of conditions and the following
22 * disclaimer in the documentation and/or other materials
23 * provided with the distribution.
25 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
36 SDP_MODPARAM_INT(rcvbuf_initial_size, 32 * 1024,
37 "Receive buffer initial size in bytes.");
38 SDP_MODPARAM_SINT(rcvbuf_scale, 0x8,
39 "Receive buffer size scale factor.");
41 /* Like tcp_fin - called when SDP_MID_DISCONNECT is received */
43 sdp_handle_disconn(struct sdp_sock *ssk)
46 sdp_dbg(ssk->socket, "%s\n", __func__);
48 SDP_WLOCK_ASSERT(ssk);
49 if (TCPS_HAVERCVDFIN(ssk->state) == 0)
50 socantrcvmore(ssk->socket);
53 case TCPS_SYN_RECEIVED:
54 case TCPS_ESTABLISHED:
55 ssk->state = TCPS_CLOSE_WAIT;
59 /* Received a reply FIN - start Infiniband tear down */
61 "%s: Starting Infiniband tear down sending DREQ\n",
64 sdp_cancel_dreq_wait_timeout(ssk);
67 struct rdma_cm_id *id;
75 "%s: ssk->id is NULL\n", __func__);
80 /* This is a mutual close situation and we've got the DREQ from
81 the peer before the SDP_MID_DISCONNECT */
84 /* FIN arrived after IB teardown started - do nothing */
85 sdp_dbg(ssk->socket, "%s: fin in state %s\n",
86 __func__, sdp_state_str(ssk->state));
90 "%s: FIN in unexpected state. state=%d\n",
91 __func__, ssk->state);
97 sdp_post_recv(struct sdp_sock *ssk)
99 struct sdp_buf *rx_req;
102 struct ib_device *dev;
103 struct ib_recv_wr rx_wr = { NULL };
104 struct ib_sge ibsge[SDP_MAX_RECV_SGES];
105 struct ib_sge *sge = ibsge;
106 struct ib_recv_wr *bad_wr;
109 int id = ring_head(ssk->rx_ring);
111 /* Now, allocate and repost recv */
112 sdp_prf(ssk->socket, mb, "Posting mb");
113 mb = m_getm2(NULL, ssk->recv_bytes, M_NOWAIT, MT_DATA, M_PKTHDR);
115 /* Retry so we can't stall out with no memory. */
116 if (!rx_ring_posted(ssk))
117 queue_work(rx_comp_wq, &ssk->rx_comp_work);
120 for (m = mb; m != NULL; m = m->m_next) {
121 m->m_len = M_SIZE(m);
122 mb->m_pkthdr.len += m->m_len;
124 h = mtod(mb, struct sdp_bsdh *);
125 rx_req = ssk->rx_ring.buffer + (id & (SDP_RX_SIZE - 1));
127 dev = ssk->ib_device;
128 for (i = 0; mb != NULL; i++, mb = mb->m_next, sge++) {
129 addr = ib_dma_map_single(dev, mb->m_data, mb->m_len,
131 /* TODO: proper error handling */
132 BUG_ON(ib_dma_mapping_error(dev, addr));
133 BUG_ON(i >= SDP_MAX_RECV_SGES);
134 rx_req->mapping[i] = addr;
136 sge->length = mb->m_len;
137 sge->lkey = ssk->sdp_dev->pd->local_dma_lkey;
141 rx_wr.wr_id = id | SDP_OP_RECV;
142 rx_wr.sg_list = ibsge;
144 rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr);
146 sdp_warn(ssk->socket, "ib_post_recv failed. status %d\n", rc);
148 sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
151 sdp_notify(ssk, ECONNRESET);
156 atomic_inc(&ssk->rx_ring.head);
157 SDPSTATS_COUNTER_INC(post_recv);
163 sdp_post_recvs_needed(struct sdp_sock *ssk)
165 unsigned long bytes_in_process;
166 unsigned long max_bytes;
170 if (!ssk->qp_active || !ssk->socket)
173 posted = rx_ring_posted(ssk);
174 if (posted >= SDP_RX_SIZE)
176 if (posted < SDP_MIN_TX_CREDITS)
179 buffer_size = ssk->recv_bytes;
180 max_bytes = max(ssk->socket->so_rcv.sb_hiwat,
181 (1 + SDP_MIN_TX_CREDITS) * buffer_size);
182 max_bytes *= rcvbuf_scale;
184 * Compute bytes in the receive queue and socket buffer.
186 bytes_in_process = (posted - SDP_MIN_TX_CREDITS) * buffer_size;
187 bytes_in_process += sbused(&ssk->socket->so_rcv);
189 return bytes_in_process < max_bytes;
193 sdp_post_recvs(struct sdp_sock *ssk)
196 while (sdp_post_recvs_needed(ssk))
197 if (sdp_post_recv(ssk))
201 static inline struct mbuf *
202 sdp_sock_queue_rcv_mb(struct socket *sk, struct mbuf *mb)
204 struct sdp_sock *ssk = sdp_sk(sk);
207 h = mtod(mb, struct sdp_bsdh *);
210 SDP_SKB_CB(mb)->seq = rcv_nxt(ssk);
211 if (h->mid == SDP_MID_SRCAVAIL) {
212 struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1);
213 struct rx_srcavail_state *rx_sa;
215 ssk->srcavail_cancel_mseq = 0;
217 ssk->rx_sa = rx_sa = RX_SRCAVAIL_STATE(mb) = kzalloc(
218 sizeof(struct rx_srcavail_state), M_NOWAIT);
220 rx_sa->mseq = ntohl(h->mseq);
222 rx_sa->len = mb_len = ntohl(srcah->len);
223 rx_sa->rkey = ntohl(srcah->rkey);
224 rx_sa->vaddr = be64_to_cpu(srcah->vaddr);
228 sdp_dbg_data(ssk->socket, "got RX SrcAvail while waiting "
229 "for TX SrcAvail. waking up TX SrcAvail"
231 wake_up(sk->sk_sleep);
234 atomic_add(mb->len, &ssk->rcv_nxt);
235 sdp_dbg_data(sk, "queueing SrcAvail. mb_len = %d vaddr = %lld\n",
236 mb_len, rx_sa->vaddr);
240 atomic_add(mb->m_pkthdr.len, &ssk->rcv_nxt);
243 m_adj(mb, SDP_HEAD_SIZE);
244 SOCKBUF_LOCK(&sk->so_rcv);
245 if (unlikely(h->flags & SDP_OOB_PRES))
247 sbappend_locked(&sk->so_rcv, mb, 0);
248 sorwakeup_locked(sk);
253 sdp_get_recv_bytes(struct sdp_sock *ssk, u32 new_size)
256 return MIN(new_size, SDP_MAX_PACKET);
260 sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
263 ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
270 sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size)
272 u32 curr_size = ssk->recv_bytes;
273 u32 max_size = SDP_MAX_PACKET;
275 if (new_size > curr_size && new_size <= max_size) {
276 ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
283 sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
285 if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0)
286 ssk->recv_request_head = ring_head(ssk->rx_ring) + 1;
288 ssk->recv_request_head = ring_tail(ssk->rx_ring);
289 ssk->recv_request = 1;
293 sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
295 u32 new_size = ntohl(buf->size);
297 if (new_size > ssk->xmit_size_goal)
298 ssk->xmit_size_goal = new_size;
302 sdp_recv_completion(struct sdp_sock *ssk, int id)
304 struct sdp_buf *rx_req;
305 struct ib_device *dev;
308 if (unlikely(id != ring_tail(ssk->rx_ring))) {
309 printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
310 id, ring_tail(ssk->rx_ring));
314 dev = ssk->ib_device;
315 rx_req = &ssk->rx_ring.buffer[id & (SDP_RX_SIZE - 1)];
317 sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
319 atomic_inc(&ssk->rx_ring.tail);
320 atomic_dec(&ssk->remote_credits);
325 sdp_process_rx_ctl_mb(struct sdp_sock *ssk, struct mbuf *mb)
330 SDP_WLOCK_ASSERT(ssk);
333 h = mtod(mb, struct sdp_bsdh *);
336 case SDP_MID_SRCAVAIL:
337 sdp_dbg(sk, "DATA after socket rcv was shutdown\n");
339 /* got data in RCV_SHUTDOWN */
340 if (ssk->state == TCPS_FIN_WAIT_1) {
341 sdp_dbg(sk, "RX data when state = FIN_WAIT1\n");
342 sdp_notify(ssk, ECONNRESET);
347 case SDP_MID_RDMARDCOMPL:
350 sdp_handle_sendsm(ssk, ntohl(h->mseq_ack));
352 case SDP_MID_SRCAVAIL_CANCEL:
353 sdp_dbg_data(sk, "Handling SrcAvailCancel\n");
354 sdp_prf(sk, NULL, "Handling SrcAvailCancel");
356 ssk->srcavail_cancel_mseq = ntohl(h->mseq);
357 ssk->rx_sa->flags |= RX_SA_ABORTED;
358 ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get
359 the dirty logic from recvmsg */
361 sdp_dbg(sk, "Got SrcAvailCancel - "
362 "but no SrcAvail in process\n");
365 case SDP_MID_SINKAVAIL:
366 sdp_dbg_data(sk, "Got SinkAvail - not supported: ignored\n");
367 sdp_prf(sk, NULL, "Got SinkAvail - not supported: ignored");
371 sdp_dbg_data(sk, "Handling ABORT\n");
372 sdp_prf(sk, NULL, "Handling ABORT");
373 sdp_notify(ssk, ECONNRESET);
375 case SDP_MID_DISCONN:
376 sdp_dbg_data(sk, "Handling DISCONN\n");
377 sdp_prf(sk, NULL, "Handling DISCONN");
378 sdp_handle_disconn(ssk);
380 case SDP_MID_CHRCVBUF:
381 sdp_dbg_data(sk, "Handling RX CHRCVBUF\n");
382 sdp_handle_resize_request(ssk, (struct sdp_chrecvbuf *)(h+1));
384 case SDP_MID_CHRCVBUF_ACK:
385 sdp_dbg_data(sk, "Handling RX CHRCVBUF_ACK\n");
386 sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)(h+1));
389 /* TODO: Handle other messages */
390 sdp_warn(sk, "SDP: FIXME MID %d\n", h->mid);
397 sdp_process_rx_mb(struct sdp_sock *ssk, struct mbuf *mb)
401 unsigned long mseq_ack;
404 h = mtod(mb, struct sdp_bsdh *);
407 * If another thread is in so_pcbfree this may be partially torn
408 * down but no further synchronization is required as the destroying
409 * thread will wait for receive to shutdown before discarding the
417 SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk));
419 mseq_ack = ntohl(h->mseq_ack);
420 credits_before = tx_credits(ssk);
421 atomic_set(&ssk->tx_ring.credits, mseq_ack - ring_head(ssk->tx_ring) +
423 if (mseq_ack >= ssk->nagle_last_unacked)
424 ssk->nagle_last_unacked = 0;
426 sdp_prf1(ssk->socket, mb, "RX %s +%d c:%d->%d mseq:%d ack:%d\n",
427 mid2str(h->mid), ntohs(h->bufs), credits_before,
428 tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack));
430 if (unlikely(h->mid == SDP_MID_DATA &&
431 mb->m_pkthdr.len == SDP_HEAD_SIZE)) {
432 /* Credit update is valid even after RCV_SHUTDOWN */
437 if ((h->mid != SDP_MID_DATA && h->mid != SDP_MID_SRCAVAIL) ||
438 TCPS_HAVERCVDFIN(ssk->state)) {
439 sdp_prf(sk, NULL, "Control mb - queing to control queue");
441 if (h->mid == SDP_MID_SRCAVAIL_CANCEL) {
442 sdp_dbg_data(sk, "Got SrcAvailCancel. "
443 "seq: 0x%d seq_ack: 0x%d\n",
444 ntohl(h->mseq), ntohl(h->mseq_ack));
445 ssk->srcavail_cancel_mseq = ntohl(h->mseq);
449 if (h->mid == SDP_MID_RDMARDCOMPL) {
450 struct sdp_rrch *rrch = (struct sdp_rrch *)(h+1);
451 sdp_dbg_data(sk, "RdmaRdCompl message arrived\n");
452 sdp_handle_rdma_read_compl(ssk, ntohl(h->mseq_ack),
456 if (mbufq_enqueue(&ssk->rxctlq, mb) != 0)
461 sdp_prf1(sk, NULL, "queueing %s mb\n", mid2str(h->mid));
462 mb = sdp_sock_queue_rcv_mb(sk, mb);
468 /* called only from irq */
470 sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
474 struct socket *sk = ssk->socket;
477 mb = sdp_recv_completion(ssk, wc->wr_id);
481 if (unlikely(wc->status)) {
482 if (ssk->qp_active && sk) {
483 sdp_dbg(sk, "Recv completion with error. "
484 "Status %d, vendor: %d\n",
485 wc->status, wc->vendor_err);
493 sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
494 (int)wc->wr_id, wc->byte_len);
495 if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
496 sdp_warn(sk, "SDP BUG! byte_len %d < %zd\n",
497 wc->byte_len, sizeof(struct sdp_bsdh));
501 /* Use m_adj to trim the tail of data we didn't use. */
502 m_adj(mb, -(mb->m_pkthdr.len - wc->byte_len));
503 h = mtod(mb, struct sdp_bsdh *);
505 SDP_DUMP_PACKET(ssk->socket, "RX", mb, h);
508 ssk->rx_bytes += mb->m_pkthdr.len;
510 mseq = ntohl(h->mseq);
511 atomic_set(&ssk->mseq_ack, mseq);
512 if (mseq != (int)wc->wr_id)
513 sdp_warn(sk, "SDP BUG! mseq %d != wrid %d\n",
514 mseq, (int)wc->wr_id);
519 /* Wakeup writers if we now have credits. */
521 sdp_bzcopy_write_space(struct sdp_sock *ssk)
523 struct socket *sk = ssk->socket;
525 if (tx_credits(ssk) >= ssk->min_bufs && sk)
529 /* only from interrupt. */
531 sdp_poll_rx_cq(struct sdp_sock *ssk)
533 struct ib_cq *cq = ssk->rx_ring.cq;
534 struct ib_wc ibwc[SDP_NUM_WC];
536 int wc_processed = 0;
540 n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
541 for (i = 0; i < n; ++i) {
542 struct ib_wc *wc = &ibwc[i];
544 BUG_ON(!(wc->wr_id & SDP_OP_RECV));
545 mb = sdp_process_rx_wc(ssk, wc);
549 sdp_process_rx_mb(ssk, mb);
552 } while (n == SDP_NUM_WC);
555 sdp_bzcopy_write_space(ssk);
561 sdp_rx_comp_work(struct work_struct *work)
563 struct sdp_sock *ssk = container_of(work, struct sdp_sock,
566 sdp_prf(ssk->socket, NULL, "%s", __func__);
569 if (unlikely(!ssk->qp)) {
570 sdp_prf(ssk->socket, NULL, "qp was destroyed");
573 if (unlikely(!ssk->rx_ring.cq)) {
574 sdp_prf(ssk->socket, NULL, "rx_ring.cq is NULL");
578 if (unlikely(!ssk->poll_cq)) {
579 struct rdma_cm_id *id = ssk->id;
581 rdma_notify(id, IB_EVENT_COMM_EST);
591 sdp_do_posts(struct sdp_sock *ssk)
593 struct socket *sk = ssk->socket;
597 SDP_WLOCK_ASSERT(ssk);
598 if (!ssk->qp_active) {
599 sdp_dbg(sk, "QP is deactivated\n");
603 while ((mb = mbufq_dequeue(&ssk->rxctlq)) != NULL)
604 sdp_process_rx_ctl_mb(ssk, mb);
606 if (ssk->state == TCPS_TIME_WAIT)
609 if (!ssk->rx_ring.cq || !ssk->tx_ring.cq)
614 if (tx_ring_posted(ssk))
615 sdp_xmit_poll(ssk, 1);
617 sdp_post_sends(ssk, M_NOWAIT);
619 xmit_poll_force = tx_credits(ssk) < SDP_MIN_TX_CREDITS;
621 if (credit_update_needed(ssk) || xmit_poll_force) {
622 /* if has pending tx because run out of tx_credits - xmit it */
623 sdp_prf(sk, NULL, "Processing to free pending sends");
624 sdp_xmit_poll(ssk, xmit_poll_force);
625 sdp_prf(sk, NULL, "Sending credit update");
626 sdp_post_sends(ssk, M_NOWAIT);
632 sdp_process_rx(struct sdp_sock *ssk)
634 int wc_processed = 0;
637 if (!rx_ring_trylock(&ssk->rx_ring)) {
638 sdp_dbg(ssk->socket, "ring destroyed. not polling it\n");
642 credits_before = tx_credits(ssk);
644 wc_processed = sdp_poll_rx_cq(ssk);
645 sdp_prf(ssk->socket, NULL, "processed %d", wc_processed);
648 sdp_prf(ssk->socket, NULL, "credits: %d -> %d",
649 credits_before, tx_credits(ssk));
650 queue_work(rx_comp_wq, &ssk->rx_comp_work);
654 rx_ring_unlock(&ssk->rx_ring);
656 return (wc_processed);
660 sdp_rx_irq(struct ib_cq *cq, void *cq_context)
662 struct sdp_sock *ssk;
665 KASSERT(cq == ssk->rx_ring.cq,
666 ("%s: mismatched cq on %p", __func__, ssk));
668 SDPSTATS_COUNTER_INC(rx_int_count);
670 sdp_prf(sk, NULL, "rx irq");
676 void sdp_rx_ring_purge(struct sdp_sock *ssk)
678 while (rx_ring_posted(ssk) > 0) {
680 mb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring));
688 sdp_rx_ring_init(struct sdp_sock *ssk)
690 ssk->rx_ring.buffer = NULL;
691 ssk->rx_ring.destroyed = 0;
692 rw_init(&ssk->rx_ring.destroyed_lock, "sdp rx lock");
696 sdp_rx_cq_event_handler(struct ib_event *event, void *data)
701 sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
703 struct ib_cq_init_attr rx_cq_attr = {
711 sdp_dbg(ssk->socket, "rx ring created");
712 INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
713 atomic_set(&ssk->rx_ring.head, 1);
714 atomic_set(&ssk->rx_ring.tail, 1);
716 ssk->rx_ring.buffer = malloc(sizeof(*ssk->rx_ring.buffer) * SDP_RX_SIZE,
719 rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler,
723 sdp_warn(ssk->socket, "Unable to allocate RX CQ: %d.\n", rc);
727 sdp_sk(ssk->socket)->rx_ring.cq = rx_cq;
733 free(ssk->rx_ring.buffer, M_SDP);
734 ssk->rx_ring.buffer = NULL;
739 sdp_rx_ring_destroy(struct sdp_sock *ssk)
742 cancel_work_sync(&ssk->rx_comp_work);
743 rx_ring_destroy_lock(&ssk->rx_ring);
745 if (ssk->rx_ring.buffer) {
746 sdp_rx_ring_purge(ssk);
747 free(ssk->rx_ring.buffer, M_SDP);
748 ssk->rx_ring.buffer = NULL;
751 if (ssk->rx_ring.cq) {
752 if (ib_destroy_cq(ssk->rx_ring.cq)) {
753 sdp_warn(ssk->socket, "destroy cq(%p) failed\n",
756 ssk->rx_ring.cq = NULL;
760 WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring));