kernel - MPSAFE the protocol drain routines
authorMatthew Dillon <dillon@apollo.backplane.com>
Thu, 9 Sep 2010 05:53:49 +0000 (22:53 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Thu, 9 Sep 2010 05:53:49 +0000 (22:53 -0700)
* The ip fragment drain was not MPSAFE at all.  Use a token to protect
  the ipq[] queues.

* The tcp reassembly code was only partially MPSAFE due to being
  per-cpu.  Finish it up.  Use atomic ops for the tcp_reass_qsize
  global.

* Add port assertions in the TCP input and output paths.  If we
  are not in the correct thread we panic, period.

* Code cleanup.

sys/netinet/ip_input.c
sys/netinet/tcp_input.c
sys/netinet/tcp_output.c
sys/netinet/tcp_subr.c
sys/sys/mbuf.h

index 2263614..bbef673 100644 (file)
@@ -206,6 +206,8 @@ static int ip_checkinterface = 0;
 SYSCTL_INT(_net_inet_ip, OID_AUTO, check_interface, CTLFLAG_RW,
     &ip_checkinterface, 0, "Verify packet arrives on correct interface");
 
+static struct lwkt_token ipq_token = LWKT_TOKEN_MP_INITIALIZER(ipq_token);
+
 #ifdef DIAGNOSTIC
 static int ipprintfs = 0;
 #endif
@@ -984,12 +986,14 @@ ip_reass(struct mbuf *m)
        /*
         * Look for queue of fragments of this datagram.
         */
-       for (fp = ipq[sum].next; fp != &ipq[sum]; fp = fp->next)
+       lwkt_gettoken(&ipq_token);
+       for (fp = ipq[sum].next; fp != &ipq[sum]; fp = fp->next) {
                if (ip->ip_id == fp->ipq_id &&
                    ip->ip_src.s_addr == fp->ipq_src.s_addr &&
                    ip->ip_dst.s_addr == fp->ipq_dst.s_addr &&
                    ip->ip_p == fp->ipq_p)
                        goto found;
+       }
 
        fp = NULL;
 
@@ -1032,11 +1036,12 @@ found:
                if (ip->ip_len == 0 || (ip->ip_len & 0x7) != 0) {
                        ipstat.ips_toosmall++; /* XXX */
                        m_freem(m);
-                       return NULL;
+                       goto done;
                }
                m->m_flags |= M_FRAG;
-       } else
+       } else {
                m->m_flags &= ~M_FRAG;
+       }
        ip->ip_off <<= 3;
 
        ipstat.ips_fragments++;
@@ -1085,9 +1090,10 @@ found:
        /*
         * Find a segment which begins after this one does.
         */
-       for (p = NULL, q = fp->ipq_frags; q; p = q, q = q->m_nextpkt)
+       for (p = NULL, q = fp->ipq_frags; q; p = q, q = q->m_nextpkt) {
                if (GETIP(q)->ip_off > ip->ip_off)
                        break;
+       }
 
        /*
         * If there is a preceding segment, it may provide some of
@@ -1155,7 +1161,7 @@ inserted:
                                ipstat.ips_fragdropped += fp->ipq_nfrags;
                                ip_freef(fp);
                        }
-                       return (NULL);
+                       goto done;
                }
                next += GETIP(q)->ip_len;
        }
@@ -1165,7 +1171,7 @@ inserted:
                        ipstat.ips_fragdropped += fp->ipq_nfrags;
                        ip_freef(fp);
                }
-               return (NULL);
+               goto done;
        }
 
        /*
@@ -1177,7 +1183,7 @@ inserted:
                ipstat.ips_toolong++;
                ipstat.ips_fragdropped += fp->ipq_nfrags;
                ip_freef(fp);
-               return (NULL);
+               goto done;
        }
 
        /*
@@ -1231,6 +1237,7 @@ inserted:
        }
 
        ipstat.ips_reassembled++;
+       lwkt_reltoken(&ipq_token);
        return (m);
 
 dropfrag:
@@ -1238,6 +1245,8 @@ dropfrag:
        if (fp != NULL)
                fp->ipq_nfrags--;
        m_freem(m);
+done:
+       lwkt_reltoken(&ipq_token);
        return (NULL);
 
 #undef GETIP
@@ -1246,19 +1255,28 @@ dropfrag:
 /*
  * Free a fragment reassembly header and all
  * associated datagrams.
+ *
+ * Called with ipq_token held.
  */
 static void
 ip_freef(struct ipq *fp)
 {
        struct mbuf *q;
 
+       /*
+        * Remove first to protect against blocking
+        */
+       remque(fp);
+
+       /*
+        * Clean out at our leisure
+        */
        while (fp->ipq_frags) {
                q = fp->ipq_frags;
                fp->ipq_frags = q->m_nextpkt;
                q->m_nextpkt = NULL;
                m_freem(q);
        }
-       remque(fp);
        mpipe_free(&ipq_mpipe, fp);
        nipq--;
 }
@@ -1274,7 +1292,7 @@ ip_slowtimo(void)
        struct ipq *fp;
        int i;
 
-       crit_enter();
+       lwkt_gettoken(&ipq_token);
        for (i = 0; i < IPREASS_NHASH; i++) {
                fp = ipq[i].next;
                if (fp == NULL)
@@ -1303,8 +1321,8 @@ ip_slowtimo(void)
                        }
                }
        }
+       lwkt_reltoken(&ipq_token);
        ipflow_slowtimo();
-       crit_exit();
 }
 
 /*
@@ -1315,12 +1333,14 @@ ip_drain(void)
 {
        int i;
 
+       lwkt_gettoken(&ipq_token);
        for (i = 0; i < IPREASS_NHASH; i++) {
                while (ipq[i].next != &ipq[i]) {
                        ipstat.ips_fragdropped += ipq[i].next->ipq_nfrags;
                        ip_freef(ipq[i].next);
                }
        }
+       lwkt_reltoken(&ipq_token);
        in_rtqdrain();
 }
 
index ab9f352..3069521 100644 (file)
@@ -308,7 +308,7 @@ tcp_reass(struct tcpcb *tp, struct tcphdr *th, int *tlenp, struct mbuf *m)
                tp->reportblk.rblk_start = tp->reportblk.rblk_end;
                return (0);
        }
-       tcp_reass_qsize++;
+       atomic_add_int(&tcp_reass_qsize, 1);
 
        /*
         * Find a segment which begins after this one does.
@@ -341,7 +341,7 @@ tcp_reass(struct tcpcb *tp, struct tcphdr *th, int *tlenp, struct mbuf *m)
                                tcpstat.tcps_rcvdupbyte += *tlenp;
                                m_freem(m);
                                kfree(te, M_TSEGQ);
-                               tcp_reass_qsize--;
+                               atomic_add_int(&tcp_reass_qsize, -1);
                                /*
                                 * Try to present any queued data
                                 * at the left window edge to the user.
@@ -396,7 +396,7 @@ tcp_reass(struct tcpcb *tp, struct tcphdr *th, int *tlenp, struct mbuf *m)
                LIST_REMOVE(q, tqe_q);
                m_freem(q->tqe_m);
                kfree(q, M_TSEGQ);
-               tcp_reass_qsize--;
+               atomic_add_int(&tcp_reass_qsize, -1);
                q = nq;
        }
 
@@ -422,7 +422,7 @@ tcp_reass(struct tcpcb *tp, struct tcphdr *th, int *tlenp, struct mbuf *m)
                        tp->reportblk.rblk_end = tend;
                LIST_REMOVE(q, tqe_q);
                kfree(q, M_TSEGQ);
-               tcp_reass_qsize--;
+               atomic_add_int(&tcp_reass_qsize, -1);
        }
 
        if (p == NULL) {
@@ -440,9 +440,10 @@ tcp_reass(struct tcpcb *tp, struct tcphdr *th, int *tlenp, struct mbuf *m)
                        if (!(tp->t_flags & TF_DUPSEG))
                                tp->reportblk.rblk_start = p->tqe_th->th_seq;
                        kfree(te, M_TSEGQ);
-                       tcp_reass_qsize--;
-               } else
+                       atomic_add_int(&tcp_reass_qsize, -1);
+               } else {
                        LIST_INSERT_AFTER(p, te, tqe_q);
+               }
        }
 
 present:
@@ -472,7 +473,7 @@ present:
        else
                ssb_appendstream(&so->so_rcv, q->tqe_m);
        kfree(q, M_TSEGQ);
-       tcp_reass_qsize--;
+       atomic_add_int(&tcp_reass_qsize, -1);
        ND6_HINT(tp);
        sorwakeup(so);
        return (flags);
@@ -894,13 +895,21 @@ findpcb:
                                        rstreason = BANDLIM_RST_OPENPORT;
                                        goto dropwithreset;
                                }
+
+                               /*
+                                * Could not complete 3-way handshake,
+                                * connection is being closed down, and
+                                * syncache will free mbuf.
+                                */
                                if (so == NULL)
-                                       /*
-                                        * Could not complete 3-way handshake,
-                                        * connection is being closed down, and
-                                        * syncache will free mbuf.
-                                        */
                                        return;
+
+                               /*
+                                * We must be in the correct protocol thread
+                                * for this connection.
+                                */
+                               KKASSERT(so->so_port == &curthread->td_msgport);
+
                                /*
                                 * Socket is created in state SYN_RECEIVED.
                                 * Continue processing segment.
@@ -1024,12 +1033,20 @@ findpcb:
                        tcp_dooptions(&to, optp, optlen, TRUE);
                        if (!syncache_add(&inc, &to, th, &so, m))
                                goto drop;
+
+                       /*
+                        * Entry added to syncache, mbuf used to
+                        * send SYN,ACK packet.
+                        */
                        if (so == NULL)
-                               /*
-                                * Entry added to syncache, mbuf used to
-                                * send SYN,ACK packet.
-                                */
                                return;
+
+                       /*
+                        * We must be in the correct protocol thread for
+                        * this connection.
+                        */
+                       KKASSERT(so->so_port == &curthread->td_msgport);
+
                        inp = so->so_pcb;
                        tp = intotcpcb(inp);
                        tp->snd_wnd = tiwin;
@@ -1061,10 +1078,16 @@ findpcb:
                }
                goto drop;
        }
-after_listen:
 
-       /* should not happen - syncache should pick up these connections */
+after_listen:
+       /*
+        * Should not happen - syncache should pick up these connections.
+        *
+        * Once we are past handling listen sockets we must be in the
+        * correct protocol processing thread.
+        */
        KASSERT(tp->t_state != TCPS_LISTEN, ("tcp_input: TCPS_LISTEN state"));
+       KKASSERT(so->so_port == &curthread->td_msgport);
 
        /*
         * This is the second part of the MSS DoS prevention code (after
index f14f7bb..37bd350 100644 (file)
@@ -170,6 +170,8 @@ tcp_output(struct tcpcb *tp)
        const boolean_t isipv6 = FALSE;
 #endif
 
+       KKASSERT(so->so_port == &curthread->td_msgport);
+
        /*
         * Determine length of data that should be transmitted,
         * and flags that will be used.
index f64b97c..7f07a7d 100644 (file)
@@ -984,7 +984,7 @@ no_valid_rt:
                LIST_REMOVE(q, tqe_q);
                m_freem(q->tqe_m);
                FREE(q, M_TSEGQ);
-               tcp_reass_qsize--;
+               atomic_add_int(&tcp_reass_qsize, -1);
        }
        /* throw away SACK blocks in scoreboard*/
        if (TCP_DO_SACK(tp))
@@ -1040,22 +1040,34 @@ no_valid_rt:
 static __inline void
 tcp_drain_oncpu(struct inpcbhead *head)
 {
+       struct inpcb *marker;
        struct inpcb *inpb;
        struct tcpcb *tcpb;
        struct tseg_qent *te;
 
-       LIST_FOREACH(inpb, head, inp_list) {
-               if (inpb->inp_flags & INP_PLACEMARKER)
-                       continue;
-               if ((tcpb = intotcpcb(inpb))) {
-                       while ((te = LIST_FIRST(&tcpb->t_segq)) != NULL) {
-                               LIST_REMOVE(te, tqe_q);
-                               m_freem(te->tqe_m);
-                               FREE(te, M_TSEGQ);
-                               tcp_reass_qsize--;
-                       }
+       /*
+        * Allows us to block while running the list
+        */
+       marker = kmalloc(sizeof(struct inpcb), M_TEMP, M_WAITOK|M_ZERO);
+       marker->inp_flags |= INP_PLACEMARKER;
+       LIST_INSERT_HEAD(head, marker, inp_list);
+
+       while ((inpb = LIST_NEXT(marker, inp_list)) != NULL) {
+               if ((inpb->inp_flags & INP_PLACEMARKER) == 0 &&
+                   (tcpb = intotcpcb(inpb)) != NULL &&
+                   (te = LIST_FIRST(&tcpb->t_segq)) != NULL) {
+                       LIST_REMOVE(te, tqe_q);
+                       m_freem(te->tqe_m);
+                       FREE(te, M_TSEGQ);
+                       atomic_add_int(&tcp_reass_qsize, -1);
+                       /* retry */
+               } else {
+                       LIST_REMOVE(marker, inp_list);
+                       LIST_INSERT_AFTER(inpb, marker, inp_list);
                }
        }
+       LIST_REMOVE(marker, inp_list);
+       kfree(marker, M_TEMP);
 }
 
 #ifdef SMP
index f796d9a..0e14fe0 100644 (file)
@@ -431,7 +431,7 @@ struct mbstat {
                _mm->m_len += _mplen;                                   \
        } else                                                          \
                _mm = m_prepend(_mm, _mplen, __mhow);                   \
-       if (_mm != NULL && _mm->m_flags & M_PKTHDR)                     \
+       if (_mm != NULL && (_mm->m_flags & M_PKTHDR))                   \
                _mm->m_pkthdr.len += _mplen;                            \
        *_mmp = _mm;                                                    \
 } while (0)