@ -112,6 +112,11 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)
return tipc_net ( net ) - > bcbase ;
}
static struct tipc_link * tipc_bc_sndlink ( struct net * net )
{
return tipc_net ( net ) - > bcl ;
}
/**
* tipc_nmap_equal - test for equality of node maps
*/
@ -121,6 +126,7 @@ static int tipc_nmap_equal(struct tipc_node_map *nm_a,
return ! memcmp ( nm_a , nm_b , sizeof ( * nm_a ) ) ;
}
static void tipc_bcbearer_xmit ( struct net * net , struct sk_buff_head * xmitq ) ;
static void tipc_nmap_diff ( struct tipc_node_map * nm_a ,
struct tipc_node_map * nm_b ,
struct tipc_node_map * nm_diff ) ;
@ -148,14 +154,14 @@ uint tipc_bcast_get_mtu(void)
return MAX_PKT_DEFAULT_MCAST ;
}
static u32 bcbuf_acks ( struct sk_buff * buf )
static u16 bcbuf_acks ( struct sk_buff * sk b)
{
return ( u32 ) ( unsigned long ) TIPC_SKB_CB ( buf ) - > handle ;
return TIPC_SKB_CB ( sk b) - > ackers ;
}
static void bcbuf_set_acks ( struct sk_buff * buf , u32 ack s )
static void bcbuf_set_acks ( struct sk_buff * buf , u16 acker s )
{
TIPC_SKB_CB ( buf ) - > handle = ( void * ) ( unsigned long ) ack s;
TIPC_SKB_CB ( buf ) - > ackers = acker s;
}
static void bcbuf_decr_acks ( struct sk_buff * buf )
@ -166,9 +172,10 @@ static void bcbuf_decr_acks(struct sk_buff *buf)
void tipc_bclink_add_node ( struct net * net , u32 addr )
{
struct tipc_net * tn = net_generic ( net , tipc_net_id ) ;
struct tipc_link * l = tipc_bc_sndlink ( net ) ;
tipc_bclink_lock ( net ) ;
tipc_nmap_add ( & tn - > bcbase - > bcast_nodes , addr ) ;
tipc_link_add_bc_peer ( l ) ;
tipc_bclink_unlock ( net ) ;
}
@ -178,6 +185,7 @@ void tipc_bclink_remove_node(struct net *net, u32 addr)
tipc_bclink_lock ( net ) ;
tipc_nmap_remove ( & tn - > bcbase - > bcast_nodes , addr ) ;
tn - > bcl - > ackers - - ;
/* Last node? => reset backlog queue */
if ( ! tn - > bcbase - > bcast_nodes . count )
@ -295,7 +303,6 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
if ( unlikely ( ! n_ptr - > bclink . recv_permitted ) )
return ;
tipc_bclink_lock ( net ) ;
/* Bail out if tx queue is empty (no clean up is required) */
@ -324,13 +331,11 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
less_eq ( acked , n_ptr - > bclink . acked ) )
goto exit ;
}
/* Skip over packets that node has previously acknowledged */
skb_queue_walk ( & tn - > bcl - > transmq , skb ) {
if ( more ( buf_seqno ( skb ) , n_ptr - > bclink . acked ) )
break ;
}
/* Update packets that node is now acknowledging */
skb_queue_walk_from_safe ( & tn - > bcl - > transmq , skb , tmp ) {
if ( more ( buf_seqno ( skb ) , acked ) )
@ -367,6 +372,7 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
struct sk_buff * buf ;
struct net * net = n_ptr - > net ;
struct tipc_net * tn = net_generic ( net , tipc_net_id ) ;
struct tipc_link * bcl = tn - > bcl ;
/* Ignore "stale" link state info */
if ( less_eq ( last_sent , n_ptr - > bclink . last_in ) )
@ -375,6 +381,10 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
/* Update link synchronization state; quit if in sync */
bclink_update_last_sent ( n_ptr , last_sent ) ;
/* This is a good location for statistical profiling */
bcl - > stats . queue_sz_counts + + ;
bcl - > stats . accu_queue_sz + = skb_queue_len ( & bcl - > transmq ) ;
if ( n_ptr - > bclink . last_sent = = n_ptr - > bclink . last_in )
return ;
@ -468,52 +478,35 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
*/
int tipc_bcast_xmit ( struct net * net , struct sk_buff_head * list )
{
struct tipc_net * tn = net_generic ( net , tipc_net_id ) ;
struct tipc_link * bcl = tn - > bcl ;
struct tipc_bc_base * bclink = tn - > bcbase ;
struct tipc_link * l = tipc_bc_sndlink ( net ) ;
struct sk_buff_head xmitq , inputq , rcvq ;
int rc = 0 ;
int bc = 0 ;
struct sk_buff * skb ;
struct sk_buff_head arrvq ;
struct sk_buff_head inputq ;
/* Prepare clone of message for local node */
skb = tipc_msg_reassemble ( list ) ;
if ( unlikely ( ! skb ) )
return - EHOSTUNREACH ;
__skb_queue_head_init ( & rcvq ) ;
__skb_queue_head_init ( & xmitq ) ;
skb_queue_head_init ( & inputq ) ;
/* Broadcast to all nodes */
if ( likely ( bclink ) ) {
tipc_bclink_lock ( net ) ;
if ( likely ( bclink - > bcast_nodes . count ) ) {
rc = __tipc_link_xmit ( net , bcl , list ) ;
if ( likely ( ! rc ) ) {
u32 len = skb_queue_len ( & bcl - > transmq ) ;
bclink_set_last_sent ( net ) ;
bcl - > stats . queue_sz_counts + + ;
bcl - > stats . accu_queue_sz + = len ;
}
bc = 1 ;
}
tipc_bclink_unlock ( net ) ;
}
/* Prepare message clone for local node */
if ( unlikely ( ! tipc_msg_reassemble ( list , & rcvq ) ) )
return - EHOSTUNREACH ;
if ( unlikely ( ! bc ) )
__skb_queue_purge ( list ) ;
tipc_bcast_lock ( net ) ;
if ( tipc_link_bc_peers ( l ) )
rc = tipc_link_xmit ( l , list , & xmitq ) ;
bclink_set_last_sent ( net ) ;
tipc_bcast_unlock ( net ) ;
/* Don't send to local node if adding to link failed */
if ( unlikely ( rc ) ) {
kfree_skb ( skb ) ;
__skb_queue_purge ( & rcvq ) ;
return rc ;
}
/* Deliver message clone */
__skb_queue_head_init ( & arrvq ) ;
skb_queue_head_init ( & inputq ) ;
__skb_queue_tail ( & arrvq , skb ) ;
tipc_sk_mcast_rcv ( net , & arrvq , & inputq ) ;
return rc ;
/* Broadcast to all nodes, inluding local node */
tipc_bcbearer_xmit ( net , & xmitq ) ;
tipc_sk_mcast_rcv ( net , & rcvq , & inputq ) ;
__skb_queue_purge ( list ) ;
return 0 ;
}
/**
* bclink_accept_pkt - accept an incoming , in - sequence broadcast packet
*
@ -564,7 +557,6 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
node = tipc_node_find ( net , msg_prevnode ( msg ) ) ;
if ( unlikely ( ! node ) )
goto exit ;
tipc_node_lock ( node ) ;
if ( unlikely ( ! node - > bclink . recv_permitted ) )
goto unlock ;
@ -589,7 +581,6 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
tipc_node_put ( node ) ;
goto exit ;
}
/* Handle in-sequence broadcast message */
seqno = msg_seqno ( msg ) ;
next_in = mod ( node - > bclink . last_in + 1 ) ;
@ -778,6 +769,19 @@ static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
return 0 ;
}
static void tipc_bcbearer_xmit ( struct net * net , struct sk_buff_head * xmitq )
{
struct sk_buff * skb , * tmp ;
skb_queue_walk_safe ( xmitq , skb , tmp ) {
__skb_dequeue ( xmitq ) ;
tipc_bcbearer_send ( net , skb , NULL , NULL ) ;
/* Until we remove cloning in tipc_l2_send_msg(): */
kfree_skb ( skb ) ;
}
}
/**
* tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
*/