1 /* Copyright (c) 2002-2009 InMon Corp. Licensed under the terms of the InMon sFlow licence: */
2 /* http://www.inmon.com/technology/sflowlicense.txt */
7 static void resetSampleCollector(SFLReceiver *receiver);
8 static void sendSample(SFLReceiver *receiver);
9 static void sflError(SFLReceiver *receiver, char *errm);
10 inline static void putNet32(SFLReceiver *receiver, u_int32_t val);
11 inline static void putAddress(SFLReceiver *receiver, SFLAddress *addr);
12 #ifdef SFLOW_DO_SOCKET
13 static void initSocket(SFLReceiver *receiver);
16 /*_________________--------------------------__________________
17 _________________ sfl_receiver_init __________________
18 -----------------__________________________------------------
21 void sfl_receiver_init(SFLReceiver *receiver, SFLAgent *agent)
23 /* first clear everything */
24 memset(receiver, 0, sizeof(*receiver));
26 /* now copy in the parameters */
27 receiver->agent = agent;
30 receiver->sFlowRcvrMaximumDatagramSize = SFL_DEFAULT_DATAGRAM_SIZE;
31 receiver->sFlowRcvrPort = SFL_DEFAULT_COLLECTOR_PORT;
33 #ifdef SFLOW_DO_SOCKET
34 /* initialize the socket address */
38 /* preset some of the header fields */
39 receiver->sampleCollector.datap = receiver->sampleCollector.data;
40 putNet32(receiver, SFLDATAGRAM_VERSION5);
41 putAddress(receiver, &agent->myIP);
42 putNet32(receiver, agent->subId);
44 /* prepare to receive the first sample */
45 resetSampleCollector(receiver);
48 /*_________________---------------------------__________________
49 _________________ reset __________________
50 -----------------___________________________------------------
52 called on timeout, or when owner string is cleared
55 static void reset(SFLReceiver *receiver) {
56 // ask agent to tell samplers and pollers to stop sending samples
57 sfl_agent_resetReceiver(receiver->agent, receiver);
59 sfl_receiver_init(receiver, receiver->agent);
62 #ifdef SFLOW_DO_SOCKET
63 /*_________________---------------------------__________________
64 _________________ initSocket __________________
65 -----------------___________________________------------------
68 static void initSocket(SFLReceiver *receiver) {
69 if(receiver->sFlowRcvrAddress.type == SFLADDRESSTYPE_IP_V6) {
70 struct sockaddr_in6 *sa6 = &receiver->receiver6;
71 sa6->sin6_port = htons((u_int16_t)receiver->sFlowRcvrPort);
72 sa6->sin6_family = AF_INET6;
73 sa6->sin6_addr = receiver->sFlowRcvrAddress.address.ip_v6;
76 struct sockaddr_in *sa4 = &receiver->receiver4;
77 sa4->sin_port = htons((u_int16_t)receiver->sFlowRcvrPort);
78 sa4->sin_family = AF_INET;
79 sa4->sin_addr = receiver->sFlowRcvrAddress.address.ip_v4;
84 /*_________________----------------------------------------_____________
85 _________________ MIB Vars _____________
86 -----------------________________________________________-------------
89 char * sfl_receiver_get_sFlowRcvrOwner(SFLReceiver *receiver) {
90 return receiver->sFlowRcvrOwner;
92 void sfl_receiver_set_sFlowRcvrOwner(SFLReceiver *receiver, char *sFlowRcvrOwner) {
93 receiver->sFlowRcvrOwner = sFlowRcvrOwner;
94 if(sFlowRcvrOwner == NULL || sFlowRcvrOwner[0] == '\0') {
95 // reset condition! owner string was cleared
99 time_t sfl_receiver_get_sFlowRcvrTimeout(SFLReceiver *receiver) {
100 return receiver->sFlowRcvrTimeout;
102 void sfl_receiver_set_sFlowRcvrTimeout(SFLReceiver *receiver, time_t sFlowRcvrTimeout) {
103 receiver->sFlowRcvrTimeout =sFlowRcvrTimeout;
105 u_int32_t sfl_receiver_get_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver) {
106 return receiver->sFlowRcvrMaximumDatagramSize;
108 void sfl_receiver_set_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver, u_int32_t sFlowRcvrMaximumDatagramSize) {
109 u_int32_t mdz = sFlowRcvrMaximumDatagramSize;
110 if(mdz < SFL_MIN_DATAGRAM_SIZE) mdz = SFL_MIN_DATAGRAM_SIZE;
111 receiver->sFlowRcvrMaximumDatagramSize = mdz;
113 SFLAddress *sfl_receiver_get_sFlowRcvrAddress(SFLReceiver *receiver) {
114 return &receiver->sFlowRcvrAddress;
116 void sfl_receiver_set_sFlowRcvrAddress(SFLReceiver *receiver, SFLAddress *sFlowRcvrAddress) {
117 if(sFlowRcvrAddress) receiver->sFlowRcvrAddress = *sFlowRcvrAddress; // structure copy
118 #ifdef SFLOW_DO_SOCKET
119 initSocket(receiver);
122 u_int32_t sfl_receiver_get_sFlowRcvrPort(SFLReceiver *receiver) {
123 return receiver->sFlowRcvrPort;
125 void sfl_receiver_set_sFlowRcvrPort(SFLReceiver *receiver, u_int32_t sFlowRcvrPort) {
126 receiver->sFlowRcvrPort = sFlowRcvrPort;
127 // update the socket structure
128 #ifdef SFLOW_DO_SOCKET
129 initSocket(receiver);
133 /*_________________---------------------------__________________
134 _________________ sfl_receiver_tick __________________
135 -----------------___________________________------------------
138 void sfl_receiver_tick(SFLReceiver *receiver, time_t now)
140 // if there are any samples to send, flush them now
141 if(receiver->sampleCollector.numSamples > 0) sendSample(receiver);
143 if(receiver->sFlowRcvrTimeout && (u_int32_t)receiver->sFlowRcvrTimeout != 0xFFFFFFFF) {
144 // count down one tick and reset if we reach 0
145 if(--receiver->sFlowRcvrTimeout == 0) reset(receiver);
149 /*_________________-----------------------------__________________
150 _________________ receiver write utilities __________________
151 -----------------_____________________________------------------
154 inline static void put32(SFLReceiver *receiver, u_int32_t val)
156 *receiver->sampleCollector.datap++ = val;
159 inline static void putNet32(SFLReceiver *receiver, u_int32_t val)
161 *receiver->sampleCollector.datap++ = htonl(val);
164 inline static void putNet32_run(SFLReceiver *receiver, void *obj, size_t quads)
166 u_int32_t *from = (u_int32_t *)obj;
167 while(quads--) putNet32(receiver, *from++);
170 inline static void putNet64(SFLReceiver *receiver, u_int64_t val64)
172 u_int32_t *firstQuadPtr = receiver->sampleCollector.datap;
173 // first copy the bytes in
174 memcpy((u_char *)firstQuadPtr, &val64, 8);
176 // swap the bytes, and reverse the quads too
177 u_int32_t tmp = *receiver->sampleCollector.datap++;
178 *firstQuadPtr = htonl(*receiver->sampleCollector.datap);
179 *receiver->sampleCollector.datap++ = htonl(tmp);
181 else receiver->sampleCollector.datap += 2;
184 inline static void put128(SFLReceiver *receiver, u_char *val)
186 memcpy(receiver->sampleCollector.datap, val, 16);
187 receiver->sampleCollector.datap += 4;
190 inline static void putString(SFLReceiver *receiver, SFLString *s)
192 putNet32(receiver, s->len);
193 memcpy(receiver->sampleCollector.datap, s->str, s->len);
194 receiver->sampleCollector.datap += (s->len + 3) / 4; /* pad to 4-byte boundary */
197 inline static u_int32_t stringEncodingLength(SFLString *s) {
198 // answer in bytes, so remember to mulitply by 4 after rounding up to nearest 4-byte boundary
199 return 4 + (((s->len + 3) / 4) * 4);
202 inline static void putAddress(SFLReceiver *receiver, SFLAddress *addr)
204 // encode unspecified addresses as IPV4:0.0.0.0 - or should we flag this as an error?
205 if(addr->type == 0) {
206 putNet32(receiver, SFLADDRESSTYPE_IP_V4);
210 putNet32(receiver, addr->type);
211 if(addr->type == SFLADDRESSTYPE_IP_V4) put32(receiver, addr->address.ip_v4.addr);
212 else put128(receiver, addr->address.ip_v6.addr);
216 inline static u_int32_t addressEncodingLength(SFLAddress *addr) {
217 return (addr->type == SFLADDRESSTYPE_IP_V6) ? 20 : 8; // type + address (unspecified == IPV4)
220 inline static void putMACAddress(SFLReceiver *receiver, u_int8_t *mac)
222 memcpy(receiver->sampleCollector.datap, mac, 6);
223 receiver->sampleCollector.datap += 2;
226 inline static void putSwitch(SFLReceiver *receiver, SFLExtended_switch *sw)
228 putNet32(receiver, sw->src_vlan);
229 putNet32(receiver, sw->src_priority);
230 putNet32(receiver, sw->dst_vlan);
231 putNet32(receiver, sw->dst_priority);
234 inline static void putRouter(SFLReceiver *receiver, SFLExtended_router *router)
236 putAddress(receiver, &router->nexthop);
237 putNet32(receiver, router->src_mask);
238 putNet32(receiver, router->dst_mask);
241 inline static u_int32_t routerEncodingLength(SFLExtended_router *router) {
242 return addressEncodingLength(&router->nexthop) + 8;
245 inline static void putGateway(SFLReceiver *receiver, SFLExtended_gateway *gw)
247 putAddress(receiver, &gw->nexthop);
248 putNet32(receiver, gw->as);
249 putNet32(receiver, gw->src_as);
250 putNet32(receiver, gw->src_peer_as);
251 putNet32(receiver, gw->dst_as_path_segments);
254 for(; seg < gw->dst_as_path_segments; seg++) {
255 putNet32(receiver, gw->dst_as_path[seg].type);
256 putNet32(receiver, gw->dst_as_path[seg].length);
257 putNet32_run(receiver, gw->dst_as_path[seg].as.seq, gw->dst_as_path[seg].length);
260 putNet32(receiver, gw->communities_length);
261 putNet32_run(receiver, gw->communities, gw->communities_length);
262 putNet32(receiver, gw->localpref);
265 inline static u_int32_t gatewayEncodingLength(SFLExtended_gateway *gw) {
266 u_int32_t elemSiz = addressEncodingLength(&gw->nexthop);
268 elemSiz += 16; // as, src_as, src_peer_as, dst_as_path_segments
269 for(; seg < gw->dst_as_path_segments; seg++) {
270 elemSiz += 8; // type, length
271 elemSiz += 4 * gw->dst_as_path[seg].length; // set/seq bytes
273 elemSiz += 4; // communities_length
274 elemSiz += 4 * gw->communities_length; // communities
275 elemSiz += 4; // localpref
279 inline static void putUser(SFLReceiver *receiver, SFLExtended_user *user)
281 putNet32(receiver, user->src_charset);
282 putString(receiver, &user->src_user);
283 putNet32(receiver, user->dst_charset);
284 putString(receiver, &user->dst_user);
287 inline static u_int32_t userEncodingLength(SFLExtended_user *user) {
289 + stringEncodingLength(&user->src_user)
291 + stringEncodingLength(&user->dst_user);
294 inline static void putUrl(SFLReceiver *receiver, SFLExtended_url *url)
296 putNet32(receiver, url->direction);
297 putString(receiver, &url->url);
298 putString(receiver, &url->host);
301 inline static u_int32_t urlEncodingLength(SFLExtended_url *url) {
303 + stringEncodingLength(&url->url)
304 + stringEncodingLength(&url->host);
307 inline static void putLabelStack(SFLReceiver *receiver, SFLLabelStack *labelStack)
309 putNet32(receiver, labelStack->depth);
310 putNet32_run(receiver, labelStack->stack, labelStack->depth);
313 inline static u_int32_t labelStackEncodingLength(SFLLabelStack *labelStack) {
314 return 4 + (4 * labelStack->depth);
317 inline static void putMpls(SFLReceiver *receiver, SFLExtended_mpls *mpls)
319 putAddress(receiver, &mpls->nextHop);
320 putLabelStack(receiver, &mpls->in_stack);
321 putLabelStack(receiver, &mpls->out_stack);
324 inline static u_int32_t mplsEncodingLength(SFLExtended_mpls *mpls) {
325 return addressEncodingLength(&mpls->nextHop)
326 + labelStackEncodingLength(&mpls->in_stack)
327 + labelStackEncodingLength(&mpls->out_stack);
330 inline static void putNat(SFLReceiver *receiver, SFLExtended_nat *nat)
332 putAddress(receiver, &nat->src);
333 putAddress(receiver, &nat->dst);
336 inline static u_int32_t natEncodingLength(SFLExtended_nat *nat) {
337 return addressEncodingLength(&nat->src)
338 + addressEncodingLength(&nat->dst);
341 inline static void putMplsTunnel(SFLReceiver *receiver, SFLExtended_mpls_tunnel *tunnel)
343 putString(receiver, &tunnel->tunnel_lsp_name);
344 putNet32(receiver, tunnel->tunnel_id);
345 putNet32(receiver, tunnel->tunnel_cos);
348 inline static u_int32_t mplsTunnelEncodingLength(SFLExtended_mpls_tunnel *tunnel) {
349 return stringEncodingLength(&tunnel->tunnel_lsp_name) + 8;
352 inline static void putMplsVc(SFLReceiver *receiver, SFLExtended_mpls_vc *vc)
354 putString(receiver, &vc->vc_instance_name);
355 putNet32(receiver, vc->vll_vc_id);
356 putNet32(receiver, vc->vc_label_cos);
359 inline static u_int32_t mplsVcEncodingLength(SFLExtended_mpls_vc *vc) {
360 return stringEncodingLength( &vc->vc_instance_name) + 8;
363 inline static void putMplsFtn(SFLReceiver *receiver, SFLExtended_mpls_FTN *ftn)
365 putString(receiver, &ftn->mplsFTNDescr);
366 putNet32(receiver, ftn->mplsFTNMask);
369 inline static u_int32_t mplsFtnEncodingLength(SFLExtended_mpls_FTN *ftn) {
370 return stringEncodingLength( &ftn->mplsFTNDescr) + 4;
373 inline static void putMplsLdpFec(SFLReceiver *receiver, SFLExtended_mpls_LDP_FEC *ldpfec)
375 putNet32(receiver, ldpfec->mplsFecAddrPrefixLength);
378 inline static u_int32_t mplsLdpFecEncodingLength(SFLExtended_mpls_LDP_FEC *ldpfec) {
382 inline static void putVlanTunnel(SFLReceiver *receiver, SFLExtended_vlan_tunnel *vlanTunnel)
384 putLabelStack(receiver, &vlanTunnel->stack);
387 inline static u_int32_t vlanTunnelEncodingLength(SFLExtended_vlan_tunnel *vlanTunnel) {
388 return labelStackEncodingLength(&vlanTunnel->stack);
392 inline static void putGenericCounters(SFLReceiver *receiver, SFLIf_counters *counters)
394 putNet32(receiver, counters->ifIndex);
395 putNet32(receiver, counters->ifType);
396 putNet64(receiver, counters->ifSpeed);
397 putNet32(receiver, counters->ifDirection);
398 putNet32(receiver, counters->ifStatus);
399 putNet64(receiver, counters->ifInOctets);
400 putNet32(receiver, counters->ifInUcastPkts);
401 putNet32(receiver, counters->ifInMulticastPkts);
402 putNet32(receiver, counters->ifInBroadcastPkts);
403 putNet32(receiver, counters->ifInDiscards);
404 putNet32(receiver, counters->ifInErrors);
405 putNet32(receiver, counters->ifInUnknownProtos);
406 putNet64(receiver, counters->ifOutOctets);
407 putNet32(receiver, counters->ifOutUcastPkts);
408 putNet32(receiver, counters->ifOutMulticastPkts);
409 putNet32(receiver, counters->ifOutBroadcastPkts);
410 putNet32(receiver, counters->ifOutDiscards);
411 putNet32(receiver, counters->ifOutErrors);
412 putNet32(receiver, counters->ifPromiscuousMode);
416 /*_________________-----------------------------__________________
417 _________________ computeFlowSampleSize __________________
418 -----------------_____________________________------------------
421 static int computeFlowSampleSize(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs)
423 SFLFlow_sample_element *elem = fs->elements;
424 #ifdef SFL_USE_32BIT_INDEX
425 u_int siz = 52; /* tag, length, sequence_number, ds_class, ds_index, sampling_rate,
426 sample_pool, drops, inputFormat, input, outputFormat, output, number of elements */
428 u_int siz = 40; /* tag, length, sequence_number, source_id, sampling_rate,
429 sample_pool, drops, input, output, number of elements */
432 fs->num_elements = 0; /* we're going to count them again even if this was set by the client */
433 for(; elem != NULL; elem = elem->nxt) {
436 siz += 8; /* tag, length */
439 elemSiz = 16; /* header_protocol, frame_length, stripped, header_length */
440 elemSiz += ((elem->flowType.header.header_length + 3) / 4) * 4; /* header, rounded up to nearest 4 bytes */
442 case SFLFLOW_ETHERNET: elemSiz = sizeof(SFLSampled_ethernet); break;
443 case SFLFLOW_IPV4: elemSiz = sizeof(SFLSampled_ipv4); break;
444 case SFLFLOW_IPV6: elemSiz = sizeof(SFLSampled_ipv6); break;
445 case SFLFLOW_EX_SWITCH: elemSiz = sizeof(SFLExtended_switch); break;
446 case SFLFLOW_EX_ROUTER: elemSiz = routerEncodingLength(&elem->flowType.router); break;
447 case SFLFLOW_EX_GATEWAY: elemSiz = gatewayEncodingLength(&elem->flowType.gateway); break;
448 case SFLFLOW_EX_USER: elemSiz = userEncodingLength(&elem->flowType.user); break;
449 case SFLFLOW_EX_URL: elemSiz = urlEncodingLength(&elem->flowType.url); break;
450 case SFLFLOW_EX_MPLS: elemSiz = mplsEncodingLength(&elem->flowType.mpls); break;
451 case SFLFLOW_EX_NAT: elemSiz = natEncodingLength(&elem->flowType.nat); break;
452 case SFLFLOW_EX_MPLS_TUNNEL: elemSiz = mplsTunnelEncodingLength(&elem->flowType.mpls_tunnel); break;
453 case SFLFLOW_EX_MPLS_VC: elemSiz = mplsVcEncodingLength(&elem->flowType.mpls_vc); break;
454 case SFLFLOW_EX_MPLS_FTN: elemSiz = mplsFtnEncodingLength(&elem->flowType.mpls_ftn); break;
455 case SFLFLOW_EX_MPLS_LDP_FEC: elemSiz = mplsLdpFecEncodingLength(&elem->flowType.mpls_ldp_fec); break;
456 case SFLFLOW_EX_VLAN_TUNNEL: elemSiz = vlanTunnelEncodingLength(&elem->flowType.vlan_tunnel); break;
458 sflError(receiver, "unexpected packet_data_tag");
462 // cache the element size, and accumulate it into the overall FlowSample size
463 elem->length = elemSiz;
470 /*_________________-------------------------------__________________
471 _________________ sfl_receiver_writeFlowSample __________________
472 -----------------_______________________________------------------
475 int sfl_receiver_writeFlowSample(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs)
478 if(fs == NULL) return -1;
479 if((packedSize = computeFlowSampleSize(receiver, fs)) == -1) return -1;
481 // check in case this one sample alone is too big for the datagram
482 // in fact - if it is even half as big then we should ditch it. Very
483 // important to avoid overruning the packet buffer.
484 if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize / 2)) {
485 sflError(receiver, "flow sample too big for datagram");
489 // if the sample pkt is full enough so that this sample might put
490 // it over the limit, then we should send it now before going on.
491 if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize)
492 sendSample(receiver);
494 receiver->sampleCollector.numSamples++;
496 #ifdef SFL_USE_32BIT_INDEX
497 putNet32(receiver, SFLFLOW_SAMPLE_EXPANDED);
499 putNet32(receiver, SFLFLOW_SAMPLE);
502 putNet32(receiver, packedSize - 8); // don't include tag and len
503 putNet32(receiver, fs->sequence_number);
505 #ifdef SFL_USE_32BIT_INDEX
506 putNet32(receiver, fs->ds_class);
507 putNet32(receiver, fs->ds_index);
509 putNet32(receiver, fs->source_id);
512 putNet32(receiver, fs->sampling_rate);
513 putNet32(receiver, fs->sample_pool);
514 putNet32(receiver, fs->drops);
516 #ifdef SFL_USE_32BIT_INDEX
517 putNet32(receiver, fs->inputFormat);
518 putNet32(receiver, fs->input);
519 putNet32(receiver, fs->outputFormat);
520 putNet32(receiver, fs->output);
522 putNet32(receiver, fs->input);
523 putNet32(receiver, fs->output);
526 putNet32(receiver, fs->num_elements);
529 SFLFlow_sample_element *elem = fs->elements;
530 for(; elem != NULL; elem = elem->nxt) {
532 putNet32(receiver, elem->tag);
533 putNet32(receiver, elem->length); // length cached in computeFlowSampleSize()
537 putNet32(receiver, elem->flowType.header.header_protocol);
538 putNet32(receiver, elem->flowType.header.frame_length);
539 putNet32(receiver, elem->flowType.header.stripped);
540 putNet32(receiver, elem->flowType.header.header_length);
542 memcpy(receiver->sampleCollector.datap, elem->flowType.header.header_bytes, elem->flowType.header.header_length);
543 /* round up to multiple of 4 to preserve alignment */
544 receiver->sampleCollector.datap += ((elem->flowType.header.header_length + 3) / 4);
546 case SFLFLOW_ETHERNET:
547 putNet32(receiver, elem->flowType.ethernet.eth_len);
548 putMACAddress(receiver, elem->flowType.ethernet.src_mac);
549 putMACAddress(receiver, elem->flowType.ethernet.dst_mac);
550 putNet32(receiver, elem->flowType.ethernet.eth_type);
553 putNet32(receiver, elem->flowType.ipv4.length);
554 putNet32(receiver, elem->flowType.ipv4.protocol);
555 put32(receiver, elem->flowType.ipv4.src_ip.addr);
556 put32(receiver, elem->flowType.ipv4.dst_ip.addr);
557 putNet32(receiver, elem->flowType.ipv4.src_port);
558 putNet32(receiver, elem->flowType.ipv4.dst_port);
559 putNet32(receiver, elem->flowType.ipv4.tcp_flags);
560 putNet32(receiver, elem->flowType.ipv4.tos);
563 putNet32(receiver, elem->flowType.ipv6.length);
564 putNet32(receiver, elem->flowType.ipv6.protocol);
565 put128(receiver, elem->flowType.ipv6.src_ip.addr);
566 put128(receiver, elem->flowType.ipv6.dst_ip.addr);
567 putNet32(receiver, elem->flowType.ipv6.src_port);
568 putNet32(receiver, elem->flowType.ipv6.dst_port);
569 putNet32(receiver, elem->flowType.ipv6.tcp_flags);
570 putNet32(receiver, elem->flowType.ipv6.priority);
572 case SFLFLOW_EX_SWITCH: putSwitch(receiver, &elem->flowType.sw); break;
573 case SFLFLOW_EX_ROUTER: putRouter(receiver, &elem->flowType.router); break;
574 case SFLFLOW_EX_GATEWAY: putGateway(receiver, &elem->flowType.gateway); break;
575 case SFLFLOW_EX_USER: putUser(receiver, &elem->flowType.user); break;
576 case SFLFLOW_EX_URL: putUrl(receiver, &elem->flowType.url); break;
577 case SFLFLOW_EX_MPLS: putMpls(receiver, &elem->flowType.mpls); break;
578 case SFLFLOW_EX_NAT: putNat(receiver, &elem->flowType.nat); break;
579 case SFLFLOW_EX_MPLS_TUNNEL: putMplsTunnel(receiver, &elem->flowType.mpls_tunnel); break;
580 case SFLFLOW_EX_MPLS_VC: putMplsVc(receiver, &elem->flowType.mpls_vc); break;
581 case SFLFLOW_EX_MPLS_FTN: putMplsFtn(receiver, &elem->flowType.mpls_ftn); break;
582 case SFLFLOW_EX_MPLS_LDP_FEC: putMplsLdpFec(receiver, &elem->flowType.mpls_ldp_fec); break;
583 case SFLFLOW_EX_VLAN_TUNNEL: putVlanTunnel(receiver, &elem->flowType.vlan_tunnel); break;
585 sflError(receiver, "unexpected packet_data_tag");
593 assert(((u_char *)receiver->sampleCollector.datap
594 - (u_char *)receiver->sampleCollector.data
595 - receiver->sampleCollector.pktlen) == (u_int32_t)packedSize);
598 receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;
602 /*_________________-----------------------------__________________
603 _________________ computeCountersSampleSize __________________
604 -----------------_____________________________------------------
607 static int computeCountersSampleSize(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs)
609 SFLCounters_sample_element *elem = cs->elements;
610 #ifdef SFL_USE_32BIT_INDEX
611 u_int siz = 24; /* tag, length, sequence_number, ds_class, ds_index, number of elements */
613 u_int siz = 20; /* tag, length, sequence_number, source_id, number of elements */
616 cs->num_elements = 0; /* we're going to count them again even if this was set by the client */
617 for(; elem != NULL; elem = elem->nxt) {
620 siz += 8; /* tag, length */
622 case SFLCOUNTERS_GENERIC: elemSiz = sizeof(elem->counterBlock.generic); break;
623 case SFLCOUNTERS_ETHERNET: elemSiz = sizeof(elem->counterBlock.ethernet); break;
624 case SFLCOUNTERS_TOKENRING: elemSiz = sizeof(elem->counterBlock.tokenring); break;
625 case SFLCOUNTERS_VG: elemSiz = sizeof(elem->counterBlock.vg); break;
626 case SFLCOUNTERS_VLAN: elemSiz = sizeof(elem->counterBlock.vlan); break;
628 sflError(receiver, "unexpected counters_tag");
632 // cache the element size, and accumulate it into the overall FlowSample size
633 elem->length = elemSiz;
639 /*_________________----------------------------------__________________
640 _________________ sfl_receiver_writeCountersSample __________________
641 -----------------__________________________________------------------
644 int sfl_receiver_writeCountersSample(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs)
647 if(cs == NULL) return -1;
648 // if the sample pkt is full enough so that this sample might put
649 // it over the limit, then we should send it now.
650 if((packedSize = computeCountersSampleSize(receiver, cs)) == -1) return -1;
652 // check in case this one sample alone is too big for the datagram
653 // in fact - if it is even half as big then we should ditch it. Very
654 // important to avoid overruning the packet buffer.
655 if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize / 2)) {
656 sflError(receiver, "counters sample too big for datagram");
660 if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize)
661 sendSample(receiver);
663 receiver->sampleCollector.numSamples++;
665 #ifdef SFL_USE_32BIT_INDEX
666 putNet32(receiver, SFLCOUNTERS_SAMPLE_EXPANDED);
668 putNet32(receiver, SFLCOUNTERS_SAMPLE);
671 putNet32(receiver, packedSize - 8); // tag and length not included
672 putNet32(receiver, cs->sequence_number);
674 #ifdef SFL_USE_32BIT_INDEX
675 putNet32(receiver, cs->ds_class);
676 putNet32(receiver, cs->ds_index);
678 putNet32(receiver, cs->source_id);
681 putNet32(receiver, cs->num_elements);
684 SFLCounters_sample_element *elem = cs->elements;
685 for(; elem != NULL; elem = elem->nxt) {
687 putNet32(receiver, elem->tag);
688 putNet32(receiver, elem->length); // length cached in computeCountersSampleSize()
691 case SFLCOUNTERS_GENERIC:
692 putGenericCounters(receiver, &(elem->counterBlock.generic));
694 case SFLCOUNTERS_ETHERNET:
695 // all these counters are 32-bit
696 putNet32_run(receiver, &elem->counterBlock.ethernet, sizeof(elem->counterBlock.ethernet) / 4);
698 case SFLCOUNTERS_TOKENRING:
699 // all these counters are 32-bit
700 putNet32_run(receiver, &elem->counterBlock.tokenring, sizeof(elem->counterBlock.tokenring) / 4);
704 putNet32(receiver, elem->counterBlock.vg.dot12InHighPriorityFrames);
705 putNet64(receiver, elem->counterBlock.vg.dot12InHighPriorityOctets);
706 putNet32(receiver, elem->counterBlock.vg.dot12InNormPriorityFrames);
707 putNet64(receiver, elem->counterBlock.vg.dot12InNormPriorityOctets);
708 putNet32(receiver, elem->counterBlock.vg.dot12InIPMErrors);
709 putNet32(receiver, elem->counterBlock.vg.dot12InOversizeFrameErrors);
710 putNet32(receiver, elem->counterBlock.vg.dot12InDataErrors);
711 putNet32(receiver, elem->counterBlock.vg.dot12InNullAddressedFrames);
712 putNet32(receiver, elem->counterBlock.vg.dot12OutHighPriorityFrames);
713 putNet64(receiver, elem->counterBlock.vg.dot12OutHighPriorityOctets);
714 putNet32(receiver, elem->counterBlock.vg.dot12TransitionIntoTrainings);
715 putNet64(receiver, elem->counterBlock.vg.dot12HCInHighPriorityOctets);
716 putNet64(receiver, elem->counterBlock.vg.dot12HCInNormPriorityOctets);
717 putNet64(receiver, elem->counterBlock.vg.dot12HCOutHighPriorityOctets);
719 case SFLCOUNTERS_VLAN:
721 putNet32(receiver, elem->counterBlock.vlan.vlan_id);
722 putNet64(receiver, elem->counterBlock.vlan.octets);
723 putNet32(receiver, elem->counterBlock.vlan.ucastPkts);
724 putNet32(receiver, elem->counterBlock.vlan.multicastPkts);
725 putNet32(receiver, elem->counterBlock.vlan.broadcastPkts);
726 putNet32(receiver, elem->counterBlock.vlan.discards);
729 sflError(receiver, "unexpected counters_tag");
736 assert(((u_char *)receiver->sampleCollector.datap
737 - (u_char *)receiver->sampleCollector.data
738 - receiver->sampleCollector.pktlen) == (u_int32_t)packedSize);
741 receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;
745 /*_________________---------------------------------__________________
746 _________________ sfl_receiver_samplePacketsSent __________________
747 -----------------_________________________________------------------
750 u_int32_t sfl_receiver_samplePacketsSent(SFLReceiver *receiver)
752 return receiver->sampleCollector.packetSeqNo;
755 /*_________________---------------------------__________________
756 _________________ sendSample __________________
757 -----------------___________________________------------------
760 static void sendSample(SFLReceiver *receiver)
762 /* construct and send out the sample, then reset for the next one... */
763 /* first fill in the header with the latest values */
764 /* version, agent_address and sub_agent_id were pre-set. */
765 u_int32_t hdrIdx = (receiver->agent->myIP.type == SFLADDRESSTYPE_IP_V6) ? 7 : 4;
766 receiver->sampleCollector.data[hdrIdx++] = htonl(++receiver->sampleCollector.packetSeqNo); /* seq no */
767 receiver->sampleCollector.data[hdrIdx++] = htonl((receiver->agent->now - receiver->agent->bootTime) * 1000); /* uptime */
768 receiver->sampleCollector.data[hdrIdx++] = htonl(receiver->sampleCollector.numSamples); /* num samples */
770 if(receiver->agent->sendFn) (*receiver->agent->sendFn)(receiver->agent->magic,
773 (u_char *)receiver->sampleCollector.data,
774 receiver->sampleCollector.pktlen);
776 #ifdef SFLOW_DO_SOCKET
778 if (receiver->sFlowRcvrAddress.type == SFLADDRESSTYPE_IP_V6) {
779 u_int32_t soclen = sizeof(struct sockaddr_in6);
780 int result = sendto(receiver->agent->receiverSocket6,
781 receiver->sampleCollector.data,
782 receiver->sampleCollector.pktlen,
784 (struct sockaddr *)&receiver->receiver6,
786 if(result == -1 && errno != EINTR) sfl_agent_sysError(receiver->agent, "receiver", "IPv6 socket sendto error");
787 if(result == 0) sfl_agent_error(receiver->agent, "receiver", "IPv6 socket sendto returned 0");
790 u_int32_t soclen = sizeof(struct sockaddr_in);
791 int result = sendto(receiver->agent->receiverSocket4,
792 receiver->sampleCollector.data,
793 receiver->sampleCollector.pktlen,
795 (struct sockaddr *)&receiver->receiver4,
797 if(result == -1 && errno != EINTR) sfl_agent_sysError(receiver->agent, "receiver", "socket sendto error");
798 if(result == 0) sfl_agent_error(receiver->agent, "receiver", "socket sendto returned 0");
803 /* reset for the next time */
804 resetSampleCollector(receiver);
807 /*_________________---------------------------__________________
808 _________________ resetSampleCollector __________________
809 -----------------___________________________------------------
812 static void resetSampleCollector(SFLReceiver *receiver)
814 receiver->sampleCollector.pktlen = 0;
815 receiver->sampleCollector.numSamples = 0;
816 /* point the datap to just after the header */
817 receiver->sampleCollector.datap = (receiver->agent->myIP.type == SFLADDRESSTYPE_IP_V6) ?
818 (receiver->sampleCollector.data + 10) : (receiver->sampleCollector.data + 7);
820 receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;
823 /*_________________---------------------------__________________
824 _________________ sflError __________________
825 -----------------___________________________------------------
828 static void sflError(SFLReceiver *receiver, char *msg)
830 sfl_agent_error(receiver->agent, "receiver", msg);
831 resetSampleCollector(receiver);