From: Matthew Dillon Date: Thu, 9 Sep 2010 05:53:49 +0000 (-0700) Subject: kernel - MPSAFE the protocol drain routines X-Git-Tag: v2.9.0~250 X-Git-Url: https://gitweb.dragonflybsd.org/dragonfly.git/commitdiff_plain/2d23a8bebdf3b83c06495aceae0d5f64f404b9d0 kernel - MPSAFE the protocol drain routines * The ip fragment drain was not MPSAFE at all. Use a token to protect the ipq[] queues. * The tcp reassembly code was only partially MPSAFE due to being per-cpu. Finish it up. Use atomic ops for the tcp_reass_qsize global. * Add port assertions in the TCP input and output paths. If we are not in the correct thread we panic, period. * Code cleanup. --- diff --git a/sys/netinet/ip_input.c b/sys/netinet/ip_input.c index 226361401d..bbef6735af 100644 --- a/sys/netinet/ip_input.c +++ b/sys/netinet/ip_input.c @@ -206,6 +206,8 @@ static int ip_checkinterface = 0; SYSCTL_INT(_net_inet_ip, OID_AUTO, check_interface, CTLFLAG_RW, &ip_checkinterface, 0, "Verify packet arrives on correct interface"); +static struct lwkt_token ipq_token = LWKT_TOKEN_MP_INITIALIZER(ipq_token); + #ifdef DIAGNOSTIC static int ipprintfs = 0; #endif @@ -984,12 +986,14 @@ ip_reass(struct mbuf *m) /* * Look for queue of fragments of this datagram. */ - for (fp = ipq[sum].next; fp != &ipq[sum]; fp = fp->next) + lwkt_gettoken(&ipq_token); + for (fp = ipq[sum].next; fp != &ipq[sum]; fp = fp->next) { if (ip->ip_id == fp->ipq_id && ip->ip_src.s_addr == fp->ipq_src.s_addr && ip->ip_dst.s_addr == fp->ipq_dst.s_addr && ip->ip_p == fp->ipq_p) goto found; + } fp = NULL; @@ -1032,11 +1036,12 @@ found: if (ip->ip_len == 0 || (ip->ip_len & 0x7) != 0) { ipstat.ips_toosmall++; /* XXX */ m_freem(m); - return NULL; + goto done; } m->m_flags |= M_FRAG; - } else + } else { m->m_flags &= ~M_FRAG; + } ip->ip_off <<= 3; ipstat.ips_fragments++; @@ -1085,9 +1090,10 @@ found: /* * Find a segment which begins after this one does. */ - for (p = NULL, q = fp->ipq_frags; q; p = q, q = q->m_nextpkt) + for (p = NULL, q = fp->ipq_frags; q; p = q, q = q->m_nextpkt) { if (GETIP(q)->ip_off > ip->ip_off) break; + } /* * If there is a preceding segment, it may provide some of @@ -1155,7 +1161,7 @@ inserted: ipstat.ips_fragdropped += fp->ipq_nfrags; ip_freef(fp); } - return (NULL); + goto done; } next += GETIP(q)->ip_len; } @@ -1165,7 +1171,7 @@ inserted: ipstat.ips_fragdropped += fp->ipq_nfrags; ip_freef(fp); } - return (NULL); + goto done; } /* @@ -1177,7 +1183,7 @@ inserted: ipstat.ips_toolong++; ipstat.ips_fragdropped += fp->ipq_nfrags; ip_freef(fp); - return (NULL); + goto done; } /* @@ -1231,6 +1237,7 @@ inserted: } ipstat.ips_reassembled++; + lwkt_reltoken(&ipq_token); return (m); dropfrag: @@ -1238,6 +1245,8 @@ dropfrag: if (fp != NULL) fp->ipq_nfrags--; m_freem(m); +done: + lwkt_reltoken(&ipq_token); return (NULL); #undef GETIP @@ -1246,19 +1255,28 @@ dropfrag: /* * Free a fragment reassembly header and all * associated datagrams. + * + * Called with ipq_token held. */ static void ip_freef(struct ipq *fp) { struct mbuf *q; + /* + * Remove first to protect against blocking + */ + remque(fp); + + /* + * Clean out at our leisure + */ while (fp->ipq_frags) { q = fp->ipq_frags; fp->ipq_frags = q->m_nextpkt; q->m_nextpkt = NULL; m_freem(q); } - remque(fp); mpipe_free(&ipq_mpipe, fp); nipq--; } @@ -1274,7 +1292,7 @@ ip_slowtimo(void) struct ipq *fp; int i; - crit_enter(); + lwkt_gettoken(&ipq_token); for (i = 0; i < IPREASS_NHASH; i++) { fp = ipq[i].next; if (fp == NULL) @@ -1303,8 +1321,8 @@ ip_slowtimo(void) } } } + lwkt_reltoken(&ipq_token); ipflow_slowtimo(); - crit_exit(); } /* @@ -1315,12 +1333,14 @@ ip_drain(void) { int i; + lwkt_gettoken(&ipq_token); for (i = 0; i < IPREASS_NHASH; i++) { while (ipq[i].next != &ipq[i]) { ipstat.ips_fragdropped += ipq[i].next->ipq_nfrags; ip_freef(ipq[i].next); } } + lwkt_reltoken(&ipq_token); in_rtqdrain(); } diff --git a/sys/netinet/tcp_input.c b/sys/netinet/tcp_input.c index ab9f352ecd..30695214d8 100644 --- a/sys/netinet/tcp_input.c +++ b/sys/netinet/tcp_input.c @@ -308,7 +308,7 @@ tcp_reass(struct tcpcb *tp, struct tcphdr *th, int *tlenp, struct mbuf *m) tp->reportblk.rblk_start = tp->reportblk.rblk_end; return (0); } - tcp_reass_qsize++; + atomic_add_int(&tcp_reass_qsize, 1); /* * Find a segment which begins after this one does. @@ -341,7 +341,7 @@ tcp_reass(struct tcpcb *tp, struct tcphdr *th, int *tlenp, struct mbuf *m) tcpstat.tcps_rcvdupbyte += *tlenp; m_freem(m); kfree(te, M_TSEGQ); - tcp_reass_qsize--; + atomic_add_int(&tcp_reass_qsize, -1); /* * Try to present any queued data * at the left window edge to the user. @@ -396,7 +396,7 @@ tcp_reass(struct tcpcb *tp, struct tcphdr *th, int *tlenp, struct mbuf *m) LIST_REMOVE(q, tqe_q); m_freem(q->tqe_m); kfree(q, M_TSEGQ); - tcp_reass_qsize--; + atomic_add_int(&tcp_reass_qsize, -1); q = nq; } @@ -422,7 +422,7 @@ tcp_reass(struct tcpcb *tp, struct tcphdr *th, int *tlenp, struct mbuf *m) tp->reportblk.rblk_end = tend; LIST_REMOVE(q, tqe_q); kfree(q, M_TSEGQ); - tcp_reass_qsize--; + atomic_add_int(&tcp_reass_qsize, -1); } if (p == NULL) { @@ -440,9 +440,10 @@ tcp_reass(struct tcpcb *tp, struct tcphdr *th, int *tlenp, struct mbuf *m) if (!(tp->t_flags & TF_DUPSEG)) tp->reportblk.rblk_start = p->tqe_th->th_seq; kfree(te, M_TSEGQ); - tcp_reass_qsize--; - } else + atomic_add_int(&tcp_reass_qsize, -1); + } else { LIST_INSERT_AFTER(p, te, tqe_q); + } } present: @@ -472,7 +473,7 @@ present: else ssb_appendstream(&so->so_rcv, q->tqe_m); kfree(q, M_TSEGQ); - tcp_reass_qsize--; + atomic_add_int(&tcp_reass_qsize, -1); ND6_HINT(tp); sorwakeup(so); return (flags); @@ -894,13 +895,21 @@ findpcb: rstreason = BANDLIM_RST_OPENPORT; goto dropwithreset; } + + /* + * Could not complete 3-way handshake, + * connection is being closed down, and + * syncache will free mbuf. + */ if (so == NULL) - /* - * Could not complete 3-way handshake, - * connection is being closed down, and - * syncache will free mbuf. - */ return; + + /* + * We must be in the correct protocol thread + * for this connection. + */ + KKASSERT(so->so_port == &curthread->td_msgport); + /* * Socket is created in state SYN_RECEIVED. * Continue processing segment. @@ -1024,12 +1033,20 @@ findpcb: tcp_dooptions(&to, optp, optlen, TRUE); if (!syncache_add(&inc, &to, th, &so, m)) goto drop; + + /* + * Entry added to syncache, mbuf used to + * send SYN,ACK packet. + */ if (so == NULL) - /* - * Entry added to syncache, mbuf used to - * send SYN,ACK packet. - */ return; + + /* + * We must be in the correct protocol thread for + * this connection. + */ + KKASSERT(so->so_port == &curthread->td_msgport); + inp = so->so_pcb; tp = intotcpcb(inp); tp->snd_wnd = tiwin; @@ -1061,10 +1078,16 @@ findpcb: } goto drop; } -after_listen: - /* should not happen - syncache should pick up these connections */ +after_listen: + /* + * Should not happen - syncache should pick up these connections. + * + * Once we are past handling listen sockets we must be in the + * correct protocol processing thread. + */ KASSERT(tp->t_state != TCPS_LISTEN, ("tcp_input: TCPS_LISTEN state")); + KKASSERT(so->so_port == &curthread->td_msgport); /* * This is the second part of the MSS DoS prevention code (after diff --git a/sys/netinet/tcp_output.c b/sys/netinet/tcp_output.c index f14f7bb2da..37bd350de3 100644 --- a/sys/netinet/tcp_output.c +++ b/sys/netinet/tcp_output.c @@ -170,6 +170,8 @@ tcp_output(struct tcpcb *tp) const boolean_t isipv6 = FALSE; #endif + KKASSERT(so->so_port == &curthread->td_msgport); + /* * Determine length of data that should be transmitted, * and flags that will be used. diff --git a/sys/netinet/tcp_subr.c b/sys/netinet/tcp_subr.c index f64b97cb09..7f07a7d70d 100644 --- a/sys/netinet/tcp_subr.c +++ b/sys/netinet/tcp_subr.c @@ -984,7 +984,7 @@ no_valid_rt: LIST_REMOVE(q, tqe_q); m_freem(q->tqe_m); FREE(q, M_TSEGQ); - tcp_reass_qsize--; + atomic_add_int(&tcp_reass_qsize, -1); } /* throw away SACK blocks in scoreboard*/ if (TCP_DO_SACK(tp)) @@ -1040,22 +1040,34 @@ no_valid_rt: static __inline void tcp_drain_oncpu(struct inpcbhead *head) { + struct inpcb *marker; struct inpcb *inpb; struct tcpcb *tcpb; struct tseg_qent *te; - LIST_FOREACH(inpb, head, inp_list) { - if (inpb->inp_flags & INP_PLACEMARKER) - continue; - if ((tcpb = intotcpcb(inpb))) { - while ((te = LIST_FIRST(&tcpb->t_segq)) != NULL) { - LIST_REMOVE(te, tqe_q); - m_freem(te->tqe_m); - FREE(te, M_TSEGQ); - tcp_reass_qsize--; - } + /* + * Allows us to block while running the list + */ + marker = kmalloc(sizeof(struct inpcb), M_TEMP, M_WAITOK|M_ZERO); + marker->inp_flags |= INP_PLACEMARKER; + LIST_INSERT_HEAD(head, marker, inp_list); + + while ((inpb = LIST_NEXT(marker, inp_list)) != NULL) { + if ((inpb->inp_flags & INP_PLACEMARKER) == 0 && + (tcpb = intotcpcb(inpb)) != NULL && + (te = LIST_FIRST(&tcpb->t_segq)) != NULL) { + LIST_REMOVE(te, tqe_q); + m_freem(te->tqe_m); + FREE(te, M_TSEGQ); + atomic_add_int(&tcp_reass_qsize, -1); + /* retry */ + } else { + LIST_REMOVE(marker, inp_list); + LIST_INSERT_AFTER(inpb, marker, inp_list); } } + LIST_REMOVE(marker, inp_list); + kfree(marker, M_TEMP); } #ifdef SMP diff --git a/sys/sys/mbuf.h b/sys/sys/mbuf.h index f796d9aa64..0e14fe06ca 100644 --- a/sys/sys/mbuf.h +++ b/sys/sys/mbuf.h @@ -431,7 +431,7 @@ struct mbstat { _mm->m_len += _mplen; \ } else \ _mm = m_prepend(_mm, _mplen, __mhow); \ - if (_mm != NULL && _mm->m_flags & M_PKTHDR) \ + if (_mm != NULL && (_mm->m_flags & M_PKTHDR)) \ _mm->m_pkthdr.len += _mplen; \ *_mmp = _mm; \ } while (0)