Prepare Open vSwitch 1.1.2 release.
[sliver-openvswitch.git] / lib / sflow_receiver.c
1 /* Copyright (c) 2002-2009 InMon Corp. Licensed under the terms of the InMon sFlow licence: */
2 /* http://www.inmon.com/technology/sflowlicense.txt */
3
4 #include <assert.h>
5 #include "sflow_api.h"
6
7 static void resetSampleCollector(SFLReceiver *receiver);
8 static void sendSample(SFLReceiver *receiver);
9 static void sflError(SFLReceiver *receiver, char *errm);
10 inline static void putNet32(SFLReceiver *receiver, u_int32_t val);
11 inline static void putAddress(SFLReceiver *receiver, SFLAddress *addr);
12 #ifdef SFLOW_DO_SOCKET
13 static void initSocket(SFLReceiver *receiver);
14 #endif
15
16 /*_________________--------------------------__________________
17   _________________    sfl_receiver_init     __________________
18   -----------------__________________________------------------
19 */
20
21 void sfl_receiver_init(SFLReceiver *receiver, SFLAgent *agent)
22 {
23     /* first clear everything */
24     memset(receiver, 0, sizeof(*receiver));
25
26     /* now copy in the parameters */
27     receiver->agent = agent;
28
29     /* set defaults */
30     receiver->sFlowRcvrMaximumDatagramSize = SFL_DEFAULT_DATAGRAM_SIZE;
31     receiver->sFlowRcvrPort = SFL_DEFAULT_COLLECTOR_PORT;
32
33 #ifdef SFLOW_DO_SOCKET
34     /* initialize the socket address */
35     initSocket(receiver);
36 #endif
37
38     /* preset some of the header fields */
39     receiver->sampleCollector.datap = receiver->sampleCollector.data;
40     putNet32(receiver, SFLDATAGRAM_VERSION5);
41     putAddress(receiver, &agent->myIP);
42     putNet32(receiver, agent->subId);
43
44     /* prepare to receive the first sample */
45     resetSampleCollector(receiver);
46 }
47
48 /*_________________---------------------------__________________
49   _________________      reset                __________________
50   -----------------___________________________------------------
51
52   called on timeout, or when owner string is cleared
53 */
54
55 static void reset(SFLReceiver *receiver) {
56     // ask agent to tell samplers and pollers to stop sending samples
57     sfl_agent_resetReceiver(receiver->agent, receiver);
58     // reinitialize
59     sfl_receiver_init(receiver, receiver->agent);
60 }
61
62 #ifdef SFLOW_DO_SOCKET
63 /*_________________---------------------------__________________
64   _________________      initSocket           __________________
65   -----------------___________________________------------------
66 */
67
68 static void initSocket(SFLReceiver *receiver) {
69     if(receiver->sFlowRcvrAddress.type == SFLADDRESSTYPE_IP_V6) {
70         struct sockaddr_in6 *sa6 = &receiver->receiver6;
71         sa6->sin6_port = htons((u_int16_t)receiver->sFlowRcvrPort);
72         sa6->sin6_family = AF_INET6;
73         sa6->sin6_addr = receiver->sFlowRcvrAddress.address.ip_v6;
74     }
75     else {
76         struct sockaddr_in *sa4 = &receiver->receiver4;
77         sa4->sin_port = htons((u_int16_t)receiver->sFlowRcvrPort);
78         sa4->sin_family = AF_INET;
79         sa4->sin_addr = receiver->sFlowRcvrAddress.address.ip_v4;
80     }
81 }
82 #endif
83
84 /*_________________----------------------------------------_____________
85   _________________          MIB Vars                      _____________
86   -----------------________________________________________-------------
87 */
88
89 char * sfl_receiver_get_sFlowRcvrOwner(SFLReceiver *receiver) {
90     return receiver->sFlowRcvrOwner;
91 }
92 void sfl_receiver_set_sFlowRcvrOwner(SFLReceiver *receiver, char *sFlowRcvrOwner) {
93     receiver->sFlowRcvrOwner = sFlowRcvrOwner;
94     if(sFlowRcvrOwner == NULL || sFlowRcvrOwner[0] == '\0') {
95         // reset condition! owner string was cleared
96         reset(receiver);
97     }
98 }
99 time_t sfl_receiver_get_sFlowRcvrTimeout(SFLReceiver *receiver) {
100     return receiver->sFlowRcvrTimeout;
101 }
102 void sfl_receiver_set_sFlowRcvrTimeout(SFLReceiver *receiver, time_t sFlowRcvrTimeout) {
103     receiver->sFlowRcvrTimeout =sFlowRcvrTimeout;
104 }
105 u_int32_t sfl_receiver_get_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver) {
106     return receiver->sFlowRcvrMaximumDatagramSize;
107 }
108 void sfl_receiver_set_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver, u_int32_t sFlowRcvrMaximumDatagramSize) {
109     u_int32_t mdz = sFlowRcvrMaximumDatagramSize;
110     if(mdz < SFL_MIN_DATAGRAM_SIZE) mdz = SFL_MIN_DATAGRAM_SIZE;
111     receiver->sFlowRcvrMaximumDatagramSize = mdz;
112 }
113 SFLAddress *sfl_receiver_get_sFlowRcvrAddress(SFLReceiver *receiver) {
114     return &receiver->sFlowRcvrAddress;
115 }
116 void sfl_receiver_set_sFlowRcvrAddress(SFLReceiver *receiver, SFLAddress *sFlowRcvrAddress) {
117     if(sFlowRcvrAddress) receiver->sFlowRcvrAddress = *sFlowRcvrAddress; // structure copy
118 #ifdef SFLOW_DO_SOCKET
119     initSocket(receiver);
120 #endif
121 }
122 u_int32_t sfl_receiver_get_sFlowRcvrPort(SFLReceiver *receiver) {
123     return receiver->sFlowRcvrPort;
124 }
125 void sfl_receiver_set_sFlowRcvrPort(SFLReceiver *receiver, u_int32_t sFlowRcvrPort) {
126     receiver->sFlowRcvrPort = sFlowRcvrPort;
127     // update the socket structure
128 #ifdef SFLOW_DO_SOCKET
129     initSocket(receiver);
130 #endif
131 }
132
133 /*_________________---------------------------__________________
134   _________________   sfl_receiver_tick       __________________
135   -----------------___________________________------------------
136 */
137
138 void sfl_receiver_tick(SFLReceiver *receiver, time_t now)
139 {
140     // if there are any samples to send, flush them now
141     if(receiver->sampleCollector.numSamples > 0) sendSample(receiver);
142     // check the timeout
143     if(receiver->sFlowRcvrTimeout && (u_int32_t)receiver->sFlowRcvrTimeout != 0xFFFFFFFF) {
144         // count down one tick and reset if we reach 0
145         if(--receiver->sFlowRcvrTimeout == 0) reset(receiver);
146     }
147 }
148
149 /*_________________-----------------------------__________________
150   _________________   receiver write utilities  __________________
151   -----------------_____________________________------------------
152 */
153
154 inline static void put32(SFLReceiver *receiver, u_int32_t val)
155 {
156     *receiver->sampleCollector.datap++ = val;
157 }
158
159 inline static void putNet32(SFLReceiver *receiver, u_int32_t val)
160 {
161     *receiver->sampleCollector.datap++ = htonl(val);
162 }
163
164 inline static void putNet32_run(SFLReceiver *receiver, void *obj, size_t quads)
165 {
166     u_int32_t *from = (u_int32_t *)obj;
167     while(quads--) putNet32(receiver, *from++);
168 }
169
170 inline static void putNet64(SFLReceiver *receiver, u_int64_t val64)
171 {
172     u_int32_t *firstQuadPtr = receiver->sampleCollector.datap;
173     // first copy the bytes in
174     memcpy((u_char *)firstQuadPtr, &val64, 8);
175     if(htonl(1) != 1) {
176         // swap the bytes, and reverse the quads too
177         u_int32_t tmp = *receiver->sampleCollector.datap++;
178         *firstQuadPtr = htonl(*receiver->sampleCollector.datap);
179         *receiver->sampleCollector.datap++ = htonl(tmp);
180     }
181     else receiver->sampleCollector.datap += 2;
182 }
183
184 inline static void put128(SFLReceiver *receiver, u_char *val)
185 {
186     memcpy(receiver->sampleCollector.datap, val, 16);
187     receiver->sampleCollector.datap += 4;
188 }
189
190 inline static void putString(SFLReceiver *receiver, SFLString *s)
191 {
192     putNet32(receiver, s->len);
193     memcpy(receiver->sampleCollector.datap, s->str, s->len);
194     receiver->sampleCollector.datap += (s->len + 3) / 4; /* pad to 4-byte boundary */
195 }
196
197 inline static u_int32_t stringEncodingLength(SFLString *s) {
198     // answer in bytes,  so remember to mulitply by 4 after rounding up to nearest 4-byte boundary
199     return 4 + (((s->len + 3) / 4) * 4);
200 }
201
202 inline static void putAddress(SFLReceiver *receiver, SFLAddress *addr)
203 {
204     // encode unspecified addresses as IPV4:0.0.0.0 - or should we flag this as an error?
205     if(addr->type == 0) {
206         putNet32(receiver, SFLADDRESSTYPE_IP_V4);
207         put32(receiver, 0);
208     }
209     else {
210         putNet32(receiver, addr->type);
211         if(addr->type == SFLADDRESSTYPE_IP_V4) put32(receiver, addr->address.ip_v4.addr);
212         else put128(receiver, addr->address.ip_v6.addr);
213     }
214 }
215
216 inline static u_int32_t addressEncodingLength(SFLAddress *addr) {
217     return (addr->type == SFLADDRESSTYPE_IP_V6) ? 20 : 8;  // type + address (unspecified == IPV4)
218 }
219
220 inline static void putMACAddress(SFLReceiver *receiver, u_int8_t *mac)
221 {
222     memcpy(receiver->sampleCollector.datap, mac, 6);
223     receiver->sampleCollector.datap += 2;
224 }
225
226 inline static void putSwitch(SFLReceiver *receiver, SFLExtended_switch *sw)
227 {
228     putNet32(receiver, sw->src_vlan);
229     putNet32(receiver, sw->src_priority);
230     putNet32(receiver, sw->dst_vlan);
231     putNet32(receiver, sw->dst_priority);
232 }
233
234 inline static void putRouter(SFLReceiver *receiver, SFLExtended_router *router)
235 {
236     putAddress(receiver, &router->nexthop);
237     putNet32(receiver, router->src_mask);
238     putNet32(receiver, router->dst_mask);
239 }
240
241 inline static u_int32_t routerEncodingLength(SFLExtended_router *router) {
242     return addressEncodingLength(&router->nexthop) + 8;
243 }
244
245 inline static void putGateway(SFLReceiver *receiver, SFLExtended_gateway *gw)
246 {
247     putAddress(receiver, &gw->nexthop);
248     putNet32(receiver, gw->as);
249     putNet32(receiver, gw->src_as);
250     putNet32(receiver, gw->src_peer_as);
251     putNet32(receiver, gw->dst_as_path_segments);
252     {
253         u_int32_t seg = 0;
254         for(; seg < gw->dst_as_path_segments; seg++) {
255             putNet32(receiver, gw->dst_as_path[seg].type);
256             putNet32(receiver, gw->dst_as_path[seg].length);
257             putNet32_run(receiver, gw->dst_as_path[seg].as.seq, gw->dst_as_path[seg].length);
258         }
259     }
260     putNet32(receiver, gw->communities_length);
261     putNet32_run(receiver, gw->communities, gw->communities_length);
262     putNet32(receiver, gw->localpref);
263 }
264
265 inline static u_int32_t gatewayEncodingLength(SFLExtended_gateway *gw) {
266     u_int32_t elemSiz = addressEncodingLength(&gw->nexthop);
267     u_int32_t seg = 0;
268     elemSiz += 16; // as, src_as, src_peer_as, dst_as_path_segments
269     for(; seg < gw->dst_as_path_segments; seg++) {
270         elemSiz += 8; // type, length
271         elemSiz += 4 * gw->dst_as_path[seg].length; // set/seq bytes
272     }
273     elemSiz += 4; // communities_length
274     elemSiz += 4 * gw->communities_length; // communities
275     elemSiz += 4; // localpref
276     return elemSiz;
277 }
278
279 inline static void putUser(SFLReceiver *receiver, SFLExtended_user *user)
280 {
281     putNet32(receiver, user->src_charset);
282     putString(receiver, &user->src_user);
283     putNet32(receiver, user->dst_charset);
284     putString(receiver, &user->dst_user);
285 }
286
287 inline static u_int32_t userEncodingLength(SFLExtended_user *user) {
288     return 4
289         + stringEncodingLength(&user->src_user)
290         + 4
291         + stringEncodingLength(&user->dst_user);
292 }
293
294 inline static void putUrl(SFLReceiver *receiver, SFLExtended_url *url)
295 {
296     putNet32(receiver, url->direction);
297     putString(receiver, &url->url);
298     putString(receiver, &url->host);
299 }
300
301 inline static u_int32_t urlEncodingLength(SFLExtended_url *url) {
302     return 4
303         + stringEncodingLength(&url->url)
304         + stringEncodingLength(&url->host);
305 }
306
307 inline static void putLabelStack(SFLReceiver *receiver, SFLLabelStack *labelStack)
308 {
309     putNet32(receiver, labelStack->depth);
310     putNet32_run(receiver, labelStack->stack, labelStack->depth);
311 }
312
313 inline static u_int32_t labelStackEncodingLength(SFLLabelStack *labelStack) {
314     return 4 + (4 * labelStack->depth);
315 }
316
317 inline static void putMpls(SFLReceiver *receiver, SFLExtended_mpls *mpls)
318 {
319     putAddress(receiver, &mpls->nextHop);
320     putLabelStack(receiver, &mpls->in_stack);
321     putLabelStack(receiver, &mpls->out_stack);
322 }
323
324 inline static u_int32_t mplsEncodingLength(SFLExtended_mpls *mpls) {
325     return addressEncodingLength(&mpls->nextHop)
326         + labelStackEncodingLength(&mpls->in_stack)
327         + labelStackEncodingLength(&mpls->out_stack);
328 }
329
330 inline static void putNat(SFLReceiver *receiver, SFLExtended_nat *nat)
331 {
332     putAddress(receiver, &nat->src);
333     putAddress(receiver, &nat->dst);
334 }
335
336 inline static u_int32_t natEncodingLength(SFLExtended_nat *nat) {
337     return addressEncodingLength(&nat->src)
338         + addressEncodingLength(&nat->dst);
339 }
340
341 inline static void putMplsTunnel(SFLReceiver *receiver, SFLExtended_mpls_tunnel *tunnel)
342 {
343     putString(receiver, &tunnel->tunnel_lsp_name);
344     putNet32(receiver, tunnel->tunnel_id);
345     putNet32(receiver, tunnel->tunnel_cos);
346 }
347
348 inline static u_int32_t mplsTunnelEncodingLength(SFLExtended_mpls_tunnel *tunnel) {
349     return stringEncodingLength(&tunnel->tunnel_lsp_name) + 8;
350 }
351
352 inline static void putMplsVc(SFLReceiver *receiver, SFLExtended_mpls_vc *vc)
353 {
354     putString(receiver, &vc->vc_instance_name);
355     putNet32(receiver, vc->vll_vc_id);
356     putNet32(receiver, vc->vc_label_cos);
357 }
358
359 inline static u_int32_t mplsVcEncodingLength(SFLExtended_mpls_vc *vc) {
360     return stringEncodingLength( &vc->vc_instance_name) + 8;
361 }
362
363 inline static void putMplsFtn(SFLReceiver *receiver, SFLExtended_mpls_FTN *ftn)
364 {
365     putString(receiver, &ftn->mplsFTNDescr);
366     putNet32(receiver, ftn->mplsFTNMask);
367 }
368
369 inline static u_int32_t mplsFtnEncodingLength(SFLExtended_mpls_FTN *ftn) {
370     return stringEncodingLength( &ftn->mplsFTNDescr) + 4;
371 }
372
373 inline static void putMplsLdpFec(SFLReceiver *receiver, SFLExtended_mpls_LDP_FEC *ldpfec)
374 {
375     putNet32(receiver, ldpfec->mplsFecAddrPrefixLength);
376 }
377
378 inline static u_int32_t mplsLdpFecEncodingLength(SFLExtended_mpls_LDP_FEC *ldpfec) {
379     return 4;
380 }
381
382 inline static void putVlanTunnel(SFLReceiver *receiver, SFLExtended_vlan_tunnel *vlanTunnel)
383 {
384     putLabelStack(receiver, &vlanTunnel->stack);
385 }
386
387 inline static u_int32_t vlanTunnelEncodingLength(SFLExtended_vlan_tunnel *vlanTunnel) {
388     return labelStackEncodingLength(&vlanTunnel->stack);
389 }
390
391
392 inline static void putGenericCounters(SFLReceiver *receiver, SFLIf_counters *counters)
393 {
394     putNet32(receiver, counters->ifIndex);
395     putNet32(receiver, counters->ifType);
396     putNet64(receiver, counters->ifSpeed);
397     putNet32(receiver, counters->ifDirection);
398     putNet32(receiver, counters->ifStatus);
399     putNet64(receiver, counters->ifInOctets);
400     putNet32(receiver, counters->ifInUcastPkts);
401     putNet32(receiver, counters->ifInMulticastPkts);
402     putNet32(receiver, counters->ifInBroadcastPkts);
403     putNet32(receiver, counters->ifInDiscards);
404     putNet32(receiver, counters->ifInErrors);
405     putNet32(receiver, counters->ifInUnknownProtos);
406     putNet64(receiver, counters->ifOutOctets);
407     putNet32(receiver, counters->ifOutUcastPkts);
408     putNet32(receiver, counters->ifOutMulticastPkts);
409     putNet32(receiver, counters->ifOutBroadcastPkts);
410     putNet32(receiver, counters->ifOutDiscards);
411     putNet32(receiver, counters->ifOutErrors);
412     putNet32(receiver, counters->ifPromiscuousMode);
413 }
414
415
416 /*_________________-----------------------------__________________
417   _________________      computeFlowSampleSize  __________________
418   -----------------_____________________________------------------
419 */
420
421 static int computeFlowSampleSize(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs)
422 {
423     SFLFlow_sample_element *elem = fs->elements;
424 #ifdef SFL_USE_32BIT_INDEX
425     u_int siz = 52; /* tag, length, sequence_number, ds_class, ds_index, sampling_rate,
426                        sample_pool, drops, inputFormat, input, outputFormat, output, number of elements */
427 #else
428     u_int siz = 40; /* tag, length, sequence_number, source_id, sampling_rate,
429                        sample_pool, drops, input, output, number of elements */
430 #endif
431
432     fs->num_elements = 0; /* we're going to count them again even if this was set by the client */
433     for(; elem != NULL; elem = elem->nxt) {
434         u_int elemSiz = 0;
435         fs->num_elements++;
436         siz += 8; /* tag, length */
437         switch(elem->tag) {
438         case SFLFLOW_HEADER:
439             elemSiz = 16; /* header_protocol, frame_length, stripped, header_length */
440             elemSiz += ((elem->flowType.header.header_length + 3) / 4) * 4; /* header, rounded up to nearest 4 bytes */
441             break;
442         case SFLFLOW_ETHERNET: elemSiz = sizeof(SFLSampled_ethernet); break;
443         case SFLFLOW_IPV4: elemSiz = sizeof(SFLSampled_ipv4); break;
444         case SFLFLOW_IPV6: elemSiz = sizeof(SFLSampled_ipv6); break;
445         case SFLFLOW_EX_SWITCH: elemSiz = sizeof(SFLExtended_switch); break;
446         case SFLFLOW_EX_ROUTER: elemSiz = routerEncodingLength(&elem->flowType.router); break;
447         case SFLFLOW_EX_GATEWAY: elemSiz = gatewayEncodingLength(&elem->flowType.gateway); break;
448         case SFLFLOW_EX_USER: elemSiz = userEncodingLength(&elem->flowType.user); break;
449         case SFLFLOW_EX_URL: elemSiz = urlEncodingLength(&elem->flowType.url); break;
450         case SFLFLOW_EX_MPLS: elemSiz = mplsEncodingLength(&elem->flowType.mpls); break;
451         case SFLFLOW_EX_NAT: elemSiz = natEncodingLength(&elem->flowType.nat); break;
452         case SFLFLOW_EX_MPLS_TUNNEL: elemSiz = mplsTunnelEncodingLength(&elem->flowType.mpls_tunnel); break;
453         case SFLFLOW_EX_MPLS_VC: elemSiz = mplsVcEncodingLength(&elem->flowType.mpls_vc); break;
454         case SFLFLOW_EX_MPLS_FTN: elemSiz = mplsFtnEncodingLength(&elem->flowType.mpls_ftn); break;
455         case SFLFLOW_EX_MPLS_LDP_FEC: elemSiz = mplsLdpFecEncodingLength(&elem->flowType.mpls_ldp_fec); break;
456         case SFLFLOW_EX_VLAN_TUNNEL: elemSiz = vlanTunnelEncodingLength(&elem->flowType.vlan_tunnel); break;
457         default:
458             sflError(receiver, "unexpected packet_data_tag");
459             return -1;
460             break;
461         }
462         // cache the element size, and accumulate it into the overall FlowSample size
463         elem->length = elemSiz;
464         siz += elemSiz;
465     }
466
467     return siz;
468 }
469
470 /*_________________-------------------------------__________________
471   _________________ sfl_receiver_writeFlowSample  __________________
472   -----------------_______________________________------------------
473 */
474
475 int sfl_receiver_writeFlowSample(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs)
476 {
477     int packedSize;
478     if(fs == NULL) return -1;
479     if((packedSize = computeFlowSampleSize(receiver, fs)) == -1) return -1;
480
481     // check in case this one sample alone is too big for the datagram
482     // in fact - if it is even half as big then we should ditch it. Very
483     // important to avoid overruning the packet buffer.
484     if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize / 2)) {
485         sflError(receiver, "flow sample too big for datagram");
486         return -1;
487     }
488
489     // if the sample pkt is full enough so that this sample might put
490     // it over the limit, then we should send it now before going on.
491     if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize)
492         sendSample(receiver);
493
494     receiver->sampleCollector.numSamples++;
495
496 #ifdef SFL_USE_32BIT_INDEX
497     putNet32(receiver, SFLFLOW_SAMPLE_EXPANDED);
498 #else
499     putNet32(receiver, SFLFLOW_SAMPLE);
500 #endif
501
502     putNet32(receiver, packedSize - 8); // don't include tag and len
503     putNet32(receiver, fs->sequence_number);
504
505 #ifdef SFL_USE_32BIT_INDEX
506     putNet32(receiver, fs->ds_class);
507     putNet32(receiver, fs->ds_index);
508 #else
509     putNet32(receiver, fs->source_id);
510 #endif
511
512     putNet32(receiver, fs->sampling_rate);
513     putNet32(receiver, fs->sample_pool);
514     putNet32(receiver, fs->drops);
515
516 #ifdef SFL_USE_32BIT_INDEX
517     putNet32(receiver, fs->inputFormat);
518     putNet32(receiver, fs->input);
519     putNet32(receiver, fs->outputFormat);
520     putNet32(receiver, fs->output);
521 #else
522     putNet32(receiver, fs->input);
523     putNet32(receiver, fs->output);
524 #endif
525
526     putNet32(receiver, fs->num_elements);
527
528     {
529         SFLFlow_sample_element *elem = fs->elements;
530         for(; elem != NULL; elem = elem->nxt) {
531
532             putNet32(receiver, elem->tag);
533             putNet32(receiver, elem->length); // length cached in computeFlowSampleSize()
534
535             switch(elem->tag) {
536             case SFLFLOW_HEADER:
537                 putNet32(receiver, elem->flowType.header.header_protocol);
538                 putNet32(receiver, elem->flowType.header.frame_length);
539                 putNet32(receiver, elem->flowType.header.stripped);
540                 putNet32(receiver, elem->flowType.header.header_length);
541                 /* the header */
542                 memcpy(receiver->sampleCollector.datap, elem->flowType.header.header_bytes, elem->flowType.header.header_length);
543                 /* round up to multiple of 4 to preserve alignment */
544                 receiver->sampleCollector.datap += ((elem->flowType.header.header_length + 3) / 4);
545                 break;
546             case SFLFLOW_ETHERNET:
547                 putNet32(receiver, elem->flowType.ethernet.eth_len);
548                 putMACAddress(receiver, elem->flowType.ethernet.src_mac);
549                 putMACAddress(receiver, elem->flowType.ethernet.dst_mac);
550                 putNet32(receiver, elem->flowType.ethernet.eth_type);
551                 break;
552             case SFLFLOW_IPV4:
553                 putNet32(receiver, elem->flowType.ipv4.length);
554                 putNet32(receiver, elem->flowType.ipv4.protocol);
555                 put32(receiver, elem->flowType.ipv4.src_ip.addr);
556                 put32(receiver, elem->flowType.ipv4.dst_ip.addr);
557                 putNet32(receiver, elem->flowType.ipv4.src_port);
558                 putNet32(receiver, elem->flowType.ipv4.dst_port);
559                 putNet32(receiver, elem->flowType.ipv4.tcp_flags);
560                 putNet32(receiver, elem->flowType.ipv4.tos);
561                 break;
562             case SFLFLOW_IPV6:
563                 putNet32(receiver, elem->flowType.ipv6.length);
564                 putNet32(receiver, elem->flowType.ipv6.protocol);
565                 put128(receiver, elem->flowType.ipv6.src_ip.addr);
566                 put128(receiver, elem->flowType.ipv6.dst_ip.addr);
567                 putNet32(receiver, elem->flowType.ipv6.src_port);
568                 putNet32(receiver, elem->flowType.ipv6.dst_port);
569                 putNet32(receiver, elem->flowType.ipv6.tcp_flags);
570                 putNet32(receiver, elem->flowType.ipv6.priority);
571                 break;
572             case SFLFLOW_EX_SWITCH: putSwitch(receiver, &elem->flowType.sw); break;
573             case SFLFLOW_EX_ROUTER: putRouter(receiver, &elem->flowType.router); break;
574             case SFLFLOW_EX_GATEWAY: putGateway(receiver, &elem->flowType.gateway); break;
575             case SFLFLOW_EX_USER: putUser(receiver, &elem->flowType.user); break;
576             case SFLFLOW_EX_URL: putUrl(receiver, &elem->flowType.url); break;
577             case SFLFLOW_EX_MPLS: putMpls(receiver, &elem->flowType.mpls); break;
578             case SFLFLOW_EX_NAT: putNat(receiver, &elem->flowType.nat); break;
579             case SFLFLOW_EX_MPLS_TUNNEL: putMplsTunnel(receiver, &elem->flowType.mpls_tunnel); break;
580             case SFLFLOW_EX_MPLS_VC: putMplsVc(receiver, &elem->flowType.mpls_vc); break;
581             case SFLFLOW_EX_MPLS_FTN: putMplsFtn(receiver, &elem->flowType.mpls_ftn); break;
582             case SFLFLOW_EX_MPLS_LDP_FEC: putMplsLdpFec(receiver, &elem->flowType.mpls_ldp_fec); break;
583             case SFLFLOW_EX_VLAN_TUNNEL: putVlanTunnel(receiver, &elem->flowType.vlan_tunnel); break;
584             default:
585                 sflError(receiver, "unexpected packet_data_tag");
586                 return -1;
587                 break;
588             }
589         }
590     }
591
592     // sanity check
593     assert(((u_char *)receiver->sampleCollector.datap
594             - (u_char *)receiver->sampleCollector.data
595             - receiver->sampleCollector.pktlen)  == (u_int32_t)packedSize);
596
597     // update the pktlen
598     receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;
599     return packedSize;
600 }
601
602 /*_________________-----------------------------__________________
603   _________________ computeCountersSampleSize   __________________
604   -----------------_____________________________------------------
605 */
606
607 static int computeCountersSampleSize(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs)
608 {
609     SFLCounters_sample_element *elem = cs->elements;
610 #ifdef SFL_USE_32BIT_INDEX
611     u_int siz = 24; /* tag, length, sequence_number, ds_class, ds_index, number of elements */
612 #else
613     u_int siz = 20; /* tag, length, sequence_number, source_id, number of elements */
614 #endif
615
616     cs->num_elements = 0; /* we're going to count them again even if this was set by the client */
617     for(; elem != NULL; elem = elem->nxt) {
618         u_int elemSiz = 0;
619         cs->num_elements++;
620         siz += 8; /* tag, length */
621         switch(elem->tag) {
622         case SFLCOUNTERS_GENERIC:  elemSiz = sizeof(elem->counterBlock.generic); break;
623         case SFLCOUNTERS_ETHERNET: elemSiz = sizeof(elem->counterBlock.ethernet); break;
624         case SFLCOUNTERS_TOKENRING: elemSiz = sizeof(elem->counterBlock.tokenring); break;
625         case SFLCOUNTERS_VG: elemSiz = sizeof(elem->counterBlock.vg); break;
626         case SFLCOUNTERS_VLAN: elemSiz = sizeof(elem->counterBlock.vlan); break;
627         default:
628             sflError(receiver, "unexpected counters_tag");
629             return -1;
630             break;
631         }
632         // cache the element size, and accumulate it into the overall FlowSample size
633         elem->length = elemSiz;
634         siz += elemSiz;
635     }
636     return siz;
637 }
638
639 /*_________________----------------------------------__________________
640   _________________ sfl_receiver_writeCountersSample __________________
641   -----------------__________________________________------------------
642 */
643
644 int sfl_receiver_writeCountersSample(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs)
645 {
646     int packedSize;
647     if(cs == NULL) return -1;
648     // if the sample pkt is full enough so that this sample might put
649     // it over the limit, then we should send it now.
650     if((packedSize = computeCountersSampleSize(receiver, cs)) == -1) return -1;
651
652     // check in case this one sample alone is too big for the datagram
653     // in fact - if it is even half as big then we should ditch it. Very
654     // important to avoid overruning the packet buffer.
655     if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize / 2)) {
656         sflError(receiver, "counters sample too big for datagram");
657         return -1;
658     }
659
660     if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize)
661         sendSample(receiver);
662
663     receiver->sampleCollector.numSamples++;
664
665 #ifdef SFL_USE_32BIT_INDEX
666     putNet32(receiver, SFLCOUNTERS_SAMPLE_EXPANDED);
667 #else
668     putNet32(receiver, SFLCOUNTERS_SAMPLE);
669 #endif
670
671     putNet32(receiver, packedSize - 8); // tag and length not included
672     putNet32(receiver, cs->sequence_number);
673
674 #ifdef SFL_USE_32BIT_INDEX
675     putNet32(receiver, cs->ds_class);
676     putNet32(receiver, cs->ds_index);
677 #else
678     putNet32(receiver, cs->source_id);
679 #endif
680
681     putNet32(receiver, cs->num_elements);
682
683     {
684         SFLCounters_sample_element *elem = cs->elements;
685         for(; elem != NULL; elem = elem->nxt) {
686
687             putNet32(receiver, elem->tag);
688             putNet32(receiver, elem->length); // length cached in computeCountersSampleSize()
689
690             switch(elem->tag) {
691             case SFLCOUNTERS_GENERIC:
692                 putGenericCounters(receiver, &(elem->counterBlock.generic));
693                 break;
694             case SFLCOUNTERS_ETHERNET:
695                 // all these counters are 32-bit
696                 putNet32_run(receiver, &elem->counterBlock.ethernet, sizeof(elem->counterBlock.ethernet) / 4);
697                 break;
698             case SFLCOUNTERS_TOKENRING:
699                 // all these counters are 32-bit
700                 putNet32_run(receiver, &elem->counterBlock.tokenring, sizeof(elem->counterBlock.tokenring) / 4);
701                 break;
702             case SFLCOUNTERS_VG:
703                 // mixed sizes
704                 putNet32(receiver, elem->counterBlock.vg.dot12InHighPriorityFrames);
705                 putNet64(receiver, elem->counterBlock.vg.dot12InHighPriorityOctets);
706                 putNet32(receiver, elem->counterBlock.vg.dot12InNormPriorityFrames);
707                 putNet64(receiver, elem->counterBlock.vg.dot12InNormPriorityOctets);
708                 putNet32(receiver, elem->counterBlock.vg.dot12InIPMErrors);
709                 putNet32(receiver, elem->counterBlock.vg.dot12InOversizeFrameErrors);
710                 putNet32(receiver, elem->counterBlock.vg.dot12InDataErrors);
711                 putNet32(receiver, elem->counterBlock.vg.dot12InNullAddressedFrames);
712                 putNet32(receiver, elem->counterBlock.vg.dot12OutHighPriorityFrames);
713                 putNet64(receiver, elem->counterBlock.vg.dot12OutHighPriorityOctets);
714                 putNet32(receiver, elem->counterBlock.vg.dot12TransitionIntoTrainings);
715                 putNet64(receiver, elem->counterBlock.vg.dot12HCInHighPriorityOctets);
716                 putNet64(receiver, elem->counterBlock.vg.dot12HCInNormPriorityOctets);
717                 putNet64(receiver, elem->counterBlock.vg.dot12HCOutHighPriorityOctets);
718                 break;
719             case SFLCOUNTERS_VLAN:
720                 // mixed sizes
721                 putNet32(receiver, elem->counterBlock.vlan.vlan_id);
722                 putNet64(receiver, elem->counterBlock.vlan.octets);
723                 putNet32(receiver, elem->counterBlock.vlan.ucastPkts);
724                 putNet32(receiver, elem->counterBlock.vlan.multicastPkts);
725                 putNet32(receiver, elem->counterBlock.vlan.broadcastPkts);
726                 putNet32(receiver, elem->counterBlock.vlan.discards);
727                 break;
728             default:
729                 sflError(receiver, "unexpected counters_tag");
730                 return -1;
731                 break;
732             }
733         }
734     }
735     // sanity check
736     assert(((u_char *)receiver->sampleCollector.datap
737             - (u_char *)receiver->sampleCollector.data
738             - receiver->sampleCollector.pktlen)  == (u_int32_t)packedSize);
739
740     // update the pktlen
741     receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;
742     return packedSize;
743 }
744
745 /*_________________---------------------------------__________________
746   _________________ sfl_receiver_samplePacketsSent  __________________
747   -----------------_________________________________------------------
748 */
749
750 u_int32_t sfl_receiver_samplePacketsSent(SFLReceiver *receiver)
751 {
752     return receiver->sampleCollector.packetSeqNo;
753 }
754
755 /*_________________---------------------------__________________
756   _________________     sendSample            __________________
757   -----------------___________________________------------------
758 */
759
760 static void sendSample(SFLReceiver *receiver)
761 {
762     /* construct and send out the sample, then reset for the next one... */
763     /* first fill in the header with the latest values */
764     /* version, agent_address and sub_agent_id were pre-set. */
765     u_int32_t hdrIdx = (receiver->agent->myIP.type == SFLADDRESSTYPE_IP_V6) ? 7 : 4;
766     receiver->sampleCollector.data[hdrIdx++] = htonl(++receiver->sampleCollector.packetSeqNo); /* seq no */
767     receiver->sampleCollector.data[hdrIdx++] = htonl((receiver->agent->now - receiver->agent->bootTime) * 1000); /* uptime */
768     receiver->sampleCollector.data[hdrIdx++] = htonl(receiver->sampleCollector.numSamples); /* num samples */
769     /* send */
770     if(receiver->agent->sendFn) (*receiver->agent->sendFn)(receiver->agent->magic,
771                                                            receiver->agent,
772                                                            receiver,
773                                                            (u_char *)receiver->sampleCollector.data,
774                                                            receiver->sampleCollector.pktlen);
775     else {
776 #ifdef SFLOW_DO_SOCKET
777         /* send it myself */
778         if (receiver->sFlowRcvrAddress.type == SFLADDRESSTYPE_IP_V6) {
779             u_int32_t soclen = sizeof(struct sockaddr_in6);
780             int result = sendto(receiver->agent->receiverSocket6,
781                                 receiver->sampleCollector.data,
782                                 receiver->sampleCollector.pktlen,
783                                 0,
784                                 (struct sockaddr *)&receiver->receiver6,
785                                 soclen);
786             if(result == -1 && errno != EINTR) sfl_agent_sysError(receiver->agent, "receiver", "IPv6 socket sendto error");
787             if(result == 0) sfl_agent_error(receiver->agent, "receiver", "IPv6 socket sendto returned 0");
788         }
789         else {
790             u_int32_t soclen = sizeof(struct sockaddr_in);
791             int result = sendto(receiver->agent->receiverSocket4,
792                                 receiver->sampleCollector.data,
793                                 receiver->sampleCollector.pktlen,
794                                 0,
795                                 (struct sockaddr *)&receiver->receiver4,
796                                 soclen);
797             if(result == -1 && errno != EINTR) sfl_agent_sysError(receiver->agent, "receiver", "socket sendto error");
798             if(result == 0) sfl_agent_error(receiver->agent, "receiver", "socket sendto returned 0");
799         }
800 #endif
801     }
802
803     /* reset for the next time */
804     resetSampleCollector(receiver);
805 }
806
807 /*_________________---------------------------__________________
808   _________________   resetSampleCollector    __________________
809   -----------------___________________________------------------
810 */
811
812 static void resetSampleCollector(SFLReceiver *receiver)
813 {
814     receiver->sampleCollector.pktlen = 0;
815     receiver->sampleCollector.numSamples = 0;
816     /* point the datap to just after the header */
817     receiver->sampleCollector.datap = (receiver->agent->myIP.type == SFLADDRESSTYPE_IP_V6) ?
818         (receiver->sampleCollector.data + 10) :  (receiver->sampleCollector.data + 7);
819
820     receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;
821 }
822
823 /*_________________---------------------------__________________
824   _________________         sflError          __________________
825   -----------------___________________________------------------
826 */
827
828 static void sflError(SFLReceiver *receiver, char *msg)
829 {
830     sfl_agent_error(receiver->agent, "receiver", msg);
831     resetSampleCollector(receiver);
832 }