Merge 'next' into 'master'.
[sliver-openvswitch.git] / lib / sflow_receiver.c
1 /* Copyright (c) 2002-2009 InMon Corp. Licensed under the terms of the InMon sFlow licence: */
2 /* http://www.inmon.com/technology/sflowlicense.txt */
3
4 #ifndef __CHECKER__            /* Don't run sparse on anything in this file. */
5
6 #include <assert.h>
7 #include "sflow_api.h"
8
9 static void resetSampleCollector(SFLReceiver *receiver);
10 static void sendSample(SFLReceiver *receiver);
11 static void sflError(SFLReceiver *receiver, char *errm);
12 inline static void putNet32(SFLReceiver *receiver, u_int32_t val);
13 inline static void putAddress(SFLReceiver *receiver, SFLAddress *addr);
14 #ifdef SFLOW_DO_SOCKET
15 static void initSocket(SFLReceiver *receiver);
16 #endif
17
18 /*_________________--------------------------__________________
19   _________________    sfl_receiver_init     __________________
20   -----------------__________________________------------------
21 */
22
23 void sfl_receiver_init(SFLReceiver *receiver, SFLAgent *agent)
24 {
25     /* first clear everything */
26     memset(receiver, 0, sizeof(*receiver));
27
28     /* now copy in the parameters */
29     receiver->agent = agent;
30
31     /* set defaults */
32     receiver->sFlowRcvrMaximumDatagramSize = SFL_DEFAULT_DATAGRAM_SIZE;
33     receiver->sFlowRcvrPort = SFL_DEFAULT_COLLECTOR_PORT;
34
35 #ifdef SFLOW_DO_SOCKET
36     /* initialize the socket address */
37     initSocket(receiver);
38 #endif
39
40     /* preset some of the header fields */
41     receiver->sampleCollector.datap = receiver->sampleCollector.data;
42     putNet32(receiver, SFLDATAGRAM_VERSION5);
43     putAddress(receiver, &agent->myIP);
44     putNet32(receiver, agent->subId);
45
46     /* prepare to receive the first sample */
47     resetSampleCollector(receiver);
48 }
49
50 /*_________________---------------------------__________________
51   _________________      reset                __________________
52   -----------------___________________________------------------
53
54   called on timeout, or when owner string is cleared
55 */
56
57 static void reset(SFLReceiver *receiver) {
58     // ask agent to tell samplers and pollers to stop sending samples
59     sfl_agent_resetReceiver(receiver->agent, receiver);
60     // reinitialize
61     sfl_receiver_init(receiver, receiver->agent);
62 }
63
64 #ifdef SFLOW_DO_SOCKET
65 /*_________________---------------------------__________________
66   _________________      initSocket           __________________
67   -----------------___________________________------------------
68 */
69
70 static void initSocket(SFLReceiver *receiver) {
71     if(receiver->sFlowRcvrAddress.type == SFLADDRESSTYPE_IP_V6) {
72         struct sockaddr_in6 *sa6 = &receiver->receiver6;
73         sa6->sin6_port = htons((u_int16_t)receiver->sFlowRcvrPort);
74         sa6->sin6_family = AF_INET6;
75         sa6->sin6_addr = receiver->sFlowRcvrAddress.address.ip_v6;
76     }
77     else {
78         struct sockaddr_in *sa4 = &receiver->receiver4;
79         sa4->sin_port = htons((u_int16_t)receiver->sFlowRcvrPort);
80         sa4->sin_family = AF_INET;
81         sa4->sin_addr = receiver->sFlowRcvrAddress.address.ip_v4;
82     }
83 }
84 #endif
85
86 /*_________________----------------------------------------_____________
87   _________________          MIB Vars                      _____________
88   -----------------________________________________________-------------
89 */
90
91 char * sfl_receiver_get_sFlowRcvrOwner(SFLReceiver *receiver) {
92     return receiver->sFlowRcvrOwner;
93 }
94 void sfl_receiver_set_sFlowRcvrOwner(SFLReceiver *receiver, char *sFlowRcvrOwner) {
95     receiver->sFlowRcvrOwner = sFlowRcvrOwner;
96     if(sFlowRcvrOwner == NULL || sFlowRcvrOwner[0] == '\0') {
97         // reset condition! owner string was cleared
98         reset(receiver);
99     }
100 }
101 time_t sfl_receiver_get_sFlowRcvrTimeout(SFLReceiver *receiver) {
102     return receiver->sFlowRcvrTimeout;
103 }
104 void sfl_receiver_set_sFlowRcvrTimeout(SFLReceiver *receiver, time_t sFlowRcvrTimeout) {
105     receiver->sFlowRcvrTimeout =sFlowRcvrTimeout;
106 }
107 u_int32_t sfl_receiver_get_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver) {
108     return receiver->sFlowRcvrMaximumDatagramSize;
109 }
110 void sfl_receiver_set_sFlowRcvrMaximumDatagramSize(SFLReceiver *receiver, u_int32_t sFlowRcvrMaximumDatagramSize) {
111     u_int32_t mdz = sFlowRcvrMaximumDatagramSize;
112     if(mdz < SFL_MIN_DATAGRAM_SIZE) mdz = SFL_MIN_DATAGRAM_SIZE;
113     receiver->sFlowRcvrMaximumDatagramSize = mdz;
114 }
115 SFLAddress *sfl_receiver_get_sFlowRcvrAddress(SFLReceiver *receiver) {
116     return &receiver->sFlowRcvrAddress;
117 }
118 void sfl_receiver_set_sFlowRcvrAddress(SFLReceiver *receiver, SFLAddress *sFlowRcvrAddress) {
119     if(sFlowRcvrAddress) receiver->sFlowRcvrAddress = *sFlowRcvrAddress; // structure copy
120 #ifdef SFLOW_DO_SOCKET
121     initSocket(receiver);
122 #endif
123 }
124 u_int32_t sfl_receiver_get_sFlowRcvrPort(SFLReceiver *receiver) {
125     return receiver->sFlowRcvrPort;
126 }
127 void sfl_receiver_set_sFlowRcvrPort(SFLReceiver *receiver, u_int32_t sFlowRcvrPort) {
128     receiver->sFlowRcvrPort = sFlowRcvrPort;
129     // update the socket structure
130 #ifdef SFLOW_DO_SOCKET
131     initSocket(receiver);
132 #endif
133 }
134
135 /*_________________---------------------------__________________
136   _________________   sfl_receiver_tick       __________________
137   -----------------___________________________------------------
138 */
139
140 void sfl_receiver_tick(SFLReceiver *receiver, time_t now)
141 {
142     // if there are any samples to send, flush them now
143     if(receiver->sampleCollector.numSamples > 0) sendSample(receiver);
144     // check the timeout
145     if(receiver->sFlowRcvrTimeout && (u_int32_t)receiver->sFlowRcvrTimeout != 0xFFFFFFFF) {
146         // count down one tick and reset if we reach 0
147         if(--receiver->sFlowRcvrTimeout == 0) reset(receiver);
148     }
149 }
150
151 /*_________________-----------------------------__________________
152   _________________   receiver write utilities  __________________
153   -----------------_____________________________------------------
154 */
155
156 inline static void put32(SFLReceiver *receiver, u_int32_t val)
157 {
158     *receiver->sampleCollector.datap++ = val;
159 }
160
161 inline static void putNet32(SFLReceiver *receiver, u_int32_t val)
162 {
163     *receiver->sampleCollector.datap++ = htonl(val);
164 }
165
166 inline static void putNet32_run(SFLReceiver *receiver, void *obj, size_t quads)
167 {
168     u_int32_t *from = (u_int32_t *)obj;
169     while(quads--) putNet32(receiver, *from++);
170 }
171
172 inline static void putNet64(SFLReceiver *receiver, u_int64_t val64)
173 {
174     u_int32_t *firstQuadPtr = receiver->sampleCollector.datap;
175     // first copy the bytes in
176     memcpy((u_char *)firstQuadPtr, &val64, 8);
177     if(htonl(1) != 1) {
178         // swap the bytes, and reverse the quads too
179         u_int32_t tmp = *receiver->sampleCollector.datap++;
180         *firstQuadPtr = htonl(*receiver->sampleCollector.datap);
181         *receiver->sampleCollector.datap++ = htonl(tmp);
182     }
183     else receiver->sampleCollector.datap += 2;
184 }
185
186 inline static void put128(SFLReceiver *receiver, u_char *val)
187 {
188     memcpy(receiver->sampleCollector.datap, val, 16);
189     receiver->sampleCollector.datap += 4;
190 }
191
192 inline static void putString(SFLReceiver *receiver, SFLString *s)
193 {
194     putNet32(receiver, s->len);
195     memcpy(receiver->sampleCollector.datap, s->str, s->len);
196     receiver->sampleCollector.datap += (s->len + 3) / 4; /* pad to 4-byte boundary */
197 }
198
199 inline static u_int32_t stringEncodingLength(SFLString *s) {
200     // answer in bytes,  so remember to mulitply by 4 after rounding up to nearest 4-byte boundary
201     return 4 + (((s->len + 3) / 4) * 4);
202 }
203
204 inline static void putAddress(SFLReceiver *receiver, SFLAddress *addr)
205 {
206     // encode unspecified addresses as IPV4:0.0.0.0 - or should we flag this as an error?
207     if(addr->type == 0) {
208         putNet32(receiver, SFLADDRESSTYPE_IP_V4);
209         put32(receiver, 0);
210     }
211     else {
212         putNet32(receiver, addr->type);
213         if(addr->type == SFLADDRESSTYPE_IP_V4) put32(receiver, addr->address.ip_v4.addr);
214         else put128(receiver, addr->address.ip_v6.addr);
215     }
216 }
217
218 inline static u_int32_t addressEncodingLength(SFLAddress *addr) {
219     return (addr->type == SFLADDRESSTYPE_IP_V6) ? 20 : 8;  // type + address (unspecified == IPV4)
220 }
221
222 inline static void putMACAddress(SFLReceiver *receiver, u_int8_t *mac)
223 {
224     memcpy(receiver->sampleCollector.datap, mac, 6);
225     receiver->sampleCollector.datap += 2;
226 }
227
228 inline static void putSwitch(SFLReceiver *receiver, SFLExtended_switch *sw)
229 {
230     putNet32(receiver, sw->src_vlan);
231     putNet32(receiver, sw->src_priority);
232     putNet32(receiver, sw->dst_vlan);
233     putNet32(receiver, sw->dst_priority);
234 }
235
236 inline static void putRouter(SFLReceiver *receiver, SFLExtended_router *router)
237 {
238     putAddress(receiver, &router->nexthop);
239     putNet32(receiver, router->src_mask);
240     putNet32(receiver, router->dst_mask);
241 }
242
243 inline static u_int32_t routerEncodingLength(SFLExtended_router *router) {
244     return addressEncodingLength(&router->nexthop) + 8;
245 }
246
247 inline static void putGateway(SFLReceiver *receiver, SFLExtended_gateway *gw)
248 {
249     putAddress(receiver, &gw->nexthop);
250     putNet32(receiver, gw->as);
251     putNet32(receiver, gw->src_as);
252     putNet32(receiver, gw->src_peer_as);
253     putNet32(receiver, gw->dst_as_path_segments);
254     {
255         u_int32_t seg = 0;
256         for(; seg < gw->dst_as_path_segments; seg++) {
257             putNet32(receiver, gw->dst_as_path[seg].type);
258             putNet32(receiver, gw->dst_as_path[seg].length);
259             putNet32_run(receiver, gw->dst_as_path[seg].as.seq, gw->dst_as_path[seg].length);
260         }
261     }
262     putNet32(receiver, gw->communities_length);
263     putNet32_run(receiver, gw->communities, gw->communities_length);
264     putNet32(receiver, gw->localpref);
265 }
266
267 inline static u_int32_t gatewayEncodingLength(SFLExtended_gateway *gw) {
268     u_int32_t elemSiz = addressEncodingLength(&gw->nexthop);
269     u_int32_t seg = 0;
270     elemSiz += 16; // as, src_as, src_peer_as, dst_as_path_segments
271     for(; seg < gw->dst_as_path_segments; seg++) {
272         elemSiz += 8; // type, length
273         elemSiz += 4 * gw->dst_as_path[seg].length; // set/seq bytes
274     }
275     elemSiz += 4; // communities_length
276     elemSiz += 4 * gw->communities_length; // communities
277     elemSiz += 4; // localpref
278     return elemSiz;
279 }
280
281 inline static void putUser(SFLReceiver *receiver, SFLExtended_user *user)
282 {
283     putNet32(receiver, user->src_charset);
284     putString(receiver, &user->src_user);
285     putNet32(receiver, user->dst_charset);
286     putString(receiver, &user->dst_user);
287 }
288
289 inline static u_int32_t userEncodingLength(SFLExtended_user *user) {
290     return 4
291         + stringEncodingLength(&user->src_user)
292         + 4
293         + stringEncodingLength(&user->dst_user);
294 }
295
296 inline static void putUrl(SFLReceiver *receiver, SFLExtended_url *url)
297 {
298     putNet32(receiver, url->direction);
299     putString(receiver, &url->url);
300     putString(receiver, &url->host);
301 }
302
303 inline static u_int32_t urlEncodingLength(SFLExtended_url *url) {
304     return 4
305         + stringEncodingLength(&url->url)
306         + stringEncodingLength(&url->host);
307 }
308
309 inline static void putLabelStack(SFLReceiver *receiver, SFLLabelStack *labelStack)
310 {
311     putNet32(receiver, labelStack->depth);
312     putNet32_run(receiver, labelStack->stack, labelStack->depth);
313 }
314
315 inline static u_int32_t labelStackEncodingLength(SFLLabelStack *labelStack) {
316     return 4 + (4 * labelStack->depth);
317 }
318
319 inline static void putMpls(SFLReceiver *receiver, SFLExtended_mpls *mpls)
320 {
321     putAddress(receiver, &mpls->nextHop);
322     putLabelStack(receiver, &mpls->in_stack);
323     putLabelStack(receiver, &mpls->out_stack);
324 }
325
326 inline static u_int32_t mplsEncodingLength(SFLExtended_mpls *mpls) {
327     return addressEncodingLength(&mpls->nextHop)
328         + labelStackEncodingLength(&mpls->in_stack)
329         + labelStackEncodingLength(&mpls->out_stack);
330 }
331
332 inline static void putNat(SFLReceiver *receiver, SFLExtended_nat *nat)
333 {
334     putAddress(receiver, &nat->src);
335     putAddress(receiver, &nat->dst);
336 }
337
338 inline static u_int32_t natEncodingLength(SFLExtended_nat *nat) {
339     return addressEncodingLength(&nat->src)
340         + addressEncodingLength(&nat->dst);
341 }
342
343 inline static void putMplsTunnel(SFLReceiver *receiver, SFLExtended_mpls_tunnel *tunnel)
344 {
345     putString(receiver, &tunnel->tunnel_lsp_name);
346     putNet32(receiver, tunnel->tunnel_id);
347     putNet32(receiver, tunnel->tunnel_cos);
348 }
349
350 inline static u_int32_t mplsTunnelEncodingLength(SFLExtended_mpls_tunnel *tunnel) {
351     return stringEncodingLength(&tunnel->tunnel_lsp_name) + 8;
352 }
353
354 inline static void putMplsVc(SFLReceiver *receiver, SFLExtended_mpls_vc *vc)
355 {
356     putString(receiver, &vc->vc_instance_name);
357     putNet32(receiver, vc->vll_vc_id);
358     putNet32(receiver, vc->vc_label_cos);
359 }
360
361 inline static u_int32_t mplsVcEncodingLength(SFLExtended_mpls_vc *vc) {
362     return stringEncodingLength( &vc->vc_instance_name) + 8;
363 }
364
365 inline static void putMplsFtn(SFLReceiver *receiver, SFLExtended_mpls_FTN *ftn)
366 {
367     putString(receiver, &ftn->mplsFTNDescr);
368     putNet32(receiver, ftn->mplsFTNMask);
369 }
370
371 inline static u_int32_t mplsFtnEncodingLength(SFLExtended_mpls_FTN *ftn) {
372     return stringEncodingLength( &ftn->mplsFTNDescr) + 4;
373 }
374
375 inline static void putMplsLdpFec(SFLReceiver *receiver, SFLExtended_mpls_LDP_FEC *ldpfec)
376 {
377     putNet32(receiver, ldpfec->mplsFecAddrPrefixLength);
378 }
379
380 inline static u_int32_t mplsLdpFecEncodingLength(SFLExtended_mpls_LDP_FEC *ldpfec) {
381     return 4;
382 }
383
384 inline static void putVlanTunnel(SFLReceiver *receiver, SFLExtended_vlan_tunnel *vlanTunnel)
385 {
386     putLabelStack(receiver, &vlanTunnel->stack);
387 }
388
389 inline static u_int32_t vlanTunnelEncodingLength(SFLExtended_vlan_tunnel *vlanTunnel) {
390     return labelStackEncodingLength(&vlanTunnel->stack);
391 }
392
393
394 inline static void putGenericCounters(SFLReceiver *receiver, SFLIf_counters *counters)
395 {
396     putNet32(receiver, counters->ifIndex);
397     putNet32(receiver, counters->ifType);
398     putNet64(receiver, counters->ifSpeed);
399     putNet32(receiver, counters->ifDirection);
400     putNet32(receiver, counters->ifStatus);
401     putNet64(receiver, counters->ifInOctets);
402     putNet32(receiver, counters->ifInUcastPkts);
403     putNet32(receiver, counters->ifInMulticastPkts);
404     putNet32(receiver, counters->ifInBroadcastPkts);
405     putNet32(receiver, counters->ifInDiscards);
406     putNet32(receiver, counters->ifInErrors);
407     putNet32(receiver, counters->ifInUnknownProtos);
408     putNet64(receiver, counters->ifOutOctets);
409     putNet32(receiver, counters->ifOutUcastPkts);
410     putNet32(receiver, counters->ifOutMulticastPkts);
411     putNet32(receiver, counters->ifOutBroadcastPkts);
412     putNet32(receiver, counters->ifOutDiscards);
413     putNet32(receiver, counters->ifOutErrors);
414     putNet32(receiver, counters->ifPromiscuousMode);
415 }
416
417
418 /*_________________-----------------------------__________________
419   _________________      computeFlowSampleSize  __________________
420   -----------------_____________________________------------------
421 */
422
423 static int computeFlowSampleSize(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs)
424 {
425     SFLFlow_sample_element *elem = fs->elements;
426 #ifdef SFL_USE_32BIT_INDEX
427     u_int siz = 52; /* tag, length, sequence_number, ds_class, ds_index, sampling_rate,
428                        sample_pool, drops, inputFormat, input, outputFormat, output, number of elements */
429 #else
430     u_int siz = 40; /* tag, length, sequence_number, source_id, sampling_rate,
431                        sample_pool, drops, input, output, number of elements */
432 #endif
433
434     fs->num_elements = 0; /* we're going to count them again even if this was set by the client */
435     for(; elem != NULL; elem = elem->nxt) {
436         u_int elemSiz = 0;
437         fs->num_elements++;
438         siz += 8; /* tag, length */
439         switch(elem->tag) {
440         case SFLFLOW_HEADER:
441             elemSiz = 16; /* header_protocol, frame_length, stripped, header_length */
442             elemSiz += ((elem->flowType.header.header_length + 3) / 4) * 4; /* header, rounded up to nearest 4 bytes */
443             break;
444         case SFLFLOW_ETHERNET: elemSiz = sizeof(SFLSampled_ethernet); break;
445         case SFLFLOW_IPV4: elemSiz = sizeof(SFLSampled_ipv4); break;
446         case SFLFLOW_IPV6: elemSiz = sizeof(SFLSampled_ipv6); break;
447         case SFLFLOW_EX_SWITCH: elemSiz = sizeof(SFLExtended_switch); break;
448         case SFLFLOW_EX_ROUTER: elemSiz = routerEncodingLength(&elem->flowType.router); break;
449         case SFLFLOW_EX_GATEWAY: elemSiz = gatewayEncodingLength(&elem->flowType.gateway); break;
450         case SFLFLOW_EX_USER: elemSiz = userEncodingLength(&elem->flowType.user); break;
451         case SFLFLOW_EX_URL: elemSiz = urlEncodingLength(&elem->flowType.url); break;
452         case SFLFLOW_EX_MPLS: elemSiz = mplsEncodingLength(&elem->flowType.mpls); break;
453         case SFLFLOW_EX_NAT: elemSiz = natEncodingLength(&elem->flowType.nat); break;
454         case SFLFLOW_EX_MPLS_TUNNEL: elemSiz = mplsTunnelEncodingLength(&elem->flowType.mpls_tunnel); break;
455         case SFLFLOW_EX_MPLS_VC: elemSiz = mplsVcEncodingLength(&elem->flowType.mpls_vc); break;
456         case SFLFLOW_EX_MPLS_FTN: elemSiz = mplsFtnEncodingLength(&elem->flowType.mpls_ftn); break;
457         case SFLFLOW_EX_MPLS_LDP_FEC: elemSiz = mplsLdpFecEncodingLength(&elem->flowType.mpls_ldp_fec); break;
458         case SFLFLOW_EX_VLAN_TUNNEL: elemSiz = vlanTunnelEncodingLength(&elem->flowType.vlan_tunnel); break;
459         default:
460             sflError(receiver, "unexpected packet_data_tag");
461             return -1;
462             break;
463         }
464         // cache the element size, and accumulate it into the overall FlowSample size
465         elem->length = elemSiz;
466         siz += elemSiz;
467     }
468
469     return siz;
470 }
471
472 /*_________________-------------------------------__________________
473   _________________ sfl_receiver_writeFlowSample  __________________
474   -----------------_______________________________------------------
475 */
476
477 int sfl_receiver_writeFlowSample(SFLReceiver *receiver, SFL_FLOW_SAMPLE_TYPE *fs)
478 {
479     int packedSize;
480     if(fs == NULL) return -1;
481     if((packedSize = computeFlowSampleSize(receiver, fs)) == -1) return -1;
482
483     // check in case this one sample alone is too big for the datagram
484     // in fact - if it is even half as big then we should ditch it. Very
485     // important to avoid overruning the packet buffer.
486     if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize / 2)) {
487         sflError(receiver, "flow sample too big for datagram");
488         return -1;
489     }
490
491     // if the sample pkt is full enough so that this sample might put
492     // it over the limit, then we should send it now before going on.
493     if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize)
494         sendSample(receiver);
495
496     receiver->sampleCollector.numSamples++;
497
498 #ifdef SFL_USE_32BIT_INDEX
499     putNet32(receiver, SFLFLOW_SAMPLE_EXPANDED);
500 #else
501     putNet32(receiver, SFLFLOW_SAMPLE);
502 #endif
503
504     putNet32(receiver, packedSize - 8); // don't include tag and len
505     putNet32(receiver, fs->sequence_number);
506
507 #ifdef SFL_USE_32BIT_INDEX
508     putNet32(receiver, fs->ds_class);
509     putNet32(receiver, fs->ds_index);
510 #else
511     putNet32(receiver, fs->source_id);
512 #endif
513
514     putNet32(receiver, fs->sampling_rate);
515     putNet32(receiver, fs->sample_pool);
516     putNet32(receiver, fs->drops);
517
518 #ifdef SFL_USE_32BIT_INDEX
519     putNet32(receiver, fs->inputFormat);
520     putNet32(receiver, fs->input);
521     putNet32(receiver, fs->outputFormat);
522     putNet32(receiver, fs->output);
523 #else
524     putNet32(receiver, fs->input);
525     putNet32(receiver, fs->output);
526 #endif
527
528     putNet32(receiver, fs->num_elements);
529
530     {
531         SFLFlow_sample_element *elem = fs->elements;
532         for(; elem != NULL; elem = elem->nxt) {
533
534             putNet32(receiver, elem->tag);
535             putNet32(receiver, elem->length); // length cached in computeFlowSampleSize()
536
537             switch(elem->tag) {
538             case SFLFLOW_HEADER:
539                 putNet32(receiver, elem->flowType.header.header_protocol);
540                 putNet32(receiver, elem->flowType.header.frame_length);
541                 putNet32(receiver, elem->flowType.header.stripped);
542                 putNet32(receiver, elem->flowType.header.header_length);
543                 /* the header */
544                 memcpy(receiver->sampleCollector.datap, elem->flowType.header.header_bytes, elem->flowType.header.header_length);
545                 /* round up to multiple of 4 to preserve alignment */
546                 receiver->sampleCollector.datap += ((elem->flowType.header.header_length + 3) / 4);
547                 break;
548             case SFLFLOW_ETHERNET:
549                 putNet32(receiver, elem->flowType.ethernet.eth_len);
550                 putMACAddress(receiver, elem->flowType.ethernet.src_mac);
551                 putMACAddress(receiver, elem->flowType.ethernet.dst_mac);
552                 putNet32(receiver, elem->flowType.ethernet.eth_type);
553                 break;
554             case SFLFLOW_IPV4:
555                 putNet32(receiver, elem->flowType.ipv4.length);
556                 putNet32(receiver, elem->flowType.ipv4.protocol);
557                 put32(receiver, elem->flowType.ipv4.src_ip.addr);
558                 put32(receiver, elem->flowType.ipv4.dst_ip.addr);
559                 putNet32(receiver, elem->flowType.ipv4.src_port);
560                 putNet32(receiver, elem->flowType.ipv4.dst_port);
561                 putNet32(receiver, elem->flowType.ipv4.tcp_flags);
562                 putNet32(receiver, elem->flowType.ipv4.tos);
563                 break;
564             case SFLFLOW_IPV6:
565                 putNet32(receiver, elem->flowType.ipv6.length);
566                 putNet32(receiver, elem->flowType.ipv6.protocol);
567                 put128(receiver, elem->flowType.ipv6.src_ip.addr);
568                 put128(receiver, elem->flowType.ipv6.dst_ip.addr);
569                 putNet32(receiver, elem->flowType.ipv6.src_port);
570                 putNet32(receiver, elem->flowType.ipv6.dst_port);
571                 putNet32(receiver, elem->flowType.ipv6.tcp_flags);
572                 putNet32(receiver, elem->flowType.ipv6.priority);
573                 break;
574             case SFLFLOW_EX_SWITCH: putSwitch(receiver, &elem->flowType.sw); break;
575             case SFLFLOW_EX_ROUTER: putRouter(receiver, &elem->flowType.router); break;
576             case SFLFLOW_EX_GATEWAY: putGateway(receiver, &elem->flowType.gateway); break;
577             case SFLFLOW_EX_USER: putUser(receiver, &elem->flowType.user); break;
578             case SFLFLOW_EX_URL: putUrl(receiver, &elem->flowType.url); break;
579             case SFLFLOW_EX_MPLS: putMpls(receiver, &elem->flowType.mpls); break;
580             case SFLFLOW_EX_NAT: putNat(receiver, &elem->flowType.nat); break;
581             case SFLFLOW_EX_MPLS_TUNNEL: putMplsTunnel(receiver, &elem->flowType.mpls_tunnel); break;
582             case SFLFLOW_EX_MPLS_VC: putMplsVc(receiver, &elem->flowType.mpls_vc); break;
583             case SFLFLOW_EX_MPLS_FTN: putMplsFtn(receiver, &elem->flowType.mpls_ftn); break;
584             case SFLFLOW_EX_MPLS_LDP_FEC: putMplsLdpFec(receiver, &elem->flowType.mpls_ldp_fec); break;
585             case SFLFLOW_EX_VLAN_TUNNEL: putVlanTunnel(receiver, &elem->flowType.vlan_tunnel); break;
586             default:
587                 sflError(receiver, "unexpected packet_data_tag");
588                 return -1;
589                 break;
590             }
591         }
592     }
593
594     // sanity check
595     assert(((u_char *)receiver->sampleCollector.datap
596             - (u_char *)receiver->sampleCollector.data
597             - receiver->sampleCollector.pktlen)  == (u_int32_t)packedSize);
598
599     // update the pktlen
600     receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;
601     return packedSize;
602 }
603
604 /*_________________-----------------------------__________________
605   _________________ computeCountersSampleSize   __________________
606   -----------------_____________________________------------------
607 */
608
609 static int computeCountersSampleSize(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs)
610 {
611     SFLCounters_sample_element *elem = cs->elements;
612 #ifdef SFL_USE_32BIT_INDEX
613     u_int siz = 24; /* tag, length, sequence_number, ds_class, ds_index, number of elements */
614 #else
615     u_int siz = 20; /* tag, length, sequence_number, source_id, number of elements */
616 #endif
617
618     cs->num_elements = 0; /* we're going to count them again even if this was set by the client */
619     for(; elem != NULL; elem = elem->nxt) {
620         u_int elemSiz = 0;
621         cs->num_elements++;
622         siz += 8; /* tag, length */
623         switch(elem->tag) {
624         case SFLCOUNTERS_GENERIC:  elemSiz = sizeof(elem->counterBlock.generic); break;
625         case SFLCOUNTERS_ETHERNET: elemSiz = sizeof(elem->counterBlock.ethernet); break;
626         case SFLCOUNTERS_TOKENRING: elemSiz = sizeof(elem->counterBlock.tokenring); break;
627         case SFLCOUNTERS_VG: elemSiz = sizeof(elem->counterBlock.vg); break;
628         case SFLCOUNTERS_VLAN: elemSiz = sizeof(elem->counterBlock.vlan); break;
629         default:
630             sflError(receiver, "unexpected counters_tag");
631             return -1;
632             break;
633         }
634         // cache the element size, and accumulate it into the overall FlowSample size
635         elem->length = elemSiz;
636         siz += elemSiz;
637     }
638     return siz;
639 }
640
641 /*_________________----------------------------------__________________
642   _________________ sfl_receiver_writeCountersSample __________________
643   -----------------__________________________________------------------
644 */
645
646 int sfl_receiver_writeCountersSample(SFLReceiver *receiver, SFL_COUNTERS_SAMPLE_TYPE *cs)
647 {
648     int packedSize;
649     if(cs == NULL) return -1;
650     // if the sample pkt is full enough so that this sample might put
651     // it over the limit, then we should send it now.
652     if((packedSize = computeCountersSampleSize(receiver, cs)) == -1) return -1;
653
654     // check in case this one sample alone is too big for the datagram
655     // in fact - if it is even half as big then we should ditch it. Very
656     // important to avoid overruning the packet buffer.
657     if(packedSize > (int)(receiver->sFlowRcvrMaximumDatagramSize / 2)) {
658         sflError(receiver, "counters sample too big for datagram");
659         return -1;
660     }
661
662     if((receiver->sampleCollector.pktlen + packedSize) >= receiver->sFlowRcvrMaximumDatagramSize)
663         sendSample(receiver);
664
665     receiver->sampleCollector.numSamples++;
666
667 #ifdef SFL_USE_32BIT_INDEX
668     putNet32(receiver, SFLCOUNTERS_SAMPLE_EXPANDED);
669 #else
670     putNet32(receiver, SFLCOUNTERS_SAMPLE);
671 #endif
672
673     putNet32(receiver, packedSize - 8); // tag and length not included
674     putNet32(receiver, cs->sequence_number);
675
676 #ifdef SFL_USE_32BIT_INDEX
677     putNet32(receiver, cs->ds_class);
678     putNet32(receiver, cs->ds_index);
679 #else
680     putNet32(receiver, cs->source_id);
681 #endif
682
683     putNet32(receiver, cs->num_elements);
684
685     {
686         SFLCounters_sample_element *elem = cs->elements;
687         for(; elem != NULL; elem = elem->nxt) {
688
689             putNet32(receiver, elem->tag);
690             putNet32(receiver, elem->length); // length cached in computeCountersSampleSize()
691
692             switch(elem->tag) {
693             case SFLCOUNTERS_GENERIC:
694                 putGenericCounters(receiver, &(elem->counterBlock.generic));
695                 break;
696             case SFLCOUNTERS_ETHERNET:
697                 // all these counters are 32-bit
698                 putNet32_run(receiver, &elem->counterBlock.ethernet, sizeof(elem->counterBlock.ethernet) / 4);
699                 break;
700             case SFLCOUNTERS_TOKENRING:
701                 // all these counters are 32-bit
702                 putNet32_run(receiver, &elem->counterBlock.tokenring, sizeof(elem->counterBlock.tokenring) / 4);
703                 break;
704             case SFLCOUNTERS_VG:
705                 // mixed sizes
706                 putNet32(receiver, elem->counterBlock.vg.dot12InHighPriorityFrames);
707                 putNet64(receiver, elem->counterBlock.vg.dot12InHighPriorityOctets);
708                 putNet32(receiver, elem->counterBlock.vg.dot12InNormPriorityFrames);
709                 putNet64(receiver, elem->counterBlock.vg.dot12InNormPriorityOctets);
710                 putNet32(receiver, elem->counterBlock.vg.dot12InIPMErrors);
711                 putNet32(receiver, elem->counterBlock.vg.dot12InOversizeFrameErrors);
712                 putNet32(receiver, elem->counterBlock.vg.dot12InDataErrors);
713                 putNet32(receiver, elem->counterBlock.vg.dot12InNullAddressedFrames);
714                 putNet32(receiver, elem->counterBlock.vg.dot12OutHighPriorityFrames);
715                 putNet64(receiver, elem->counterBlock.vg.dot12OutHighPriorityOctets);
716                 putNet32(receiver, elem->counterBlock.vg.dot12TransitionIntoTrainings);
717                 putNet64(receiver, elem->counterBlock.vg.dot12HCInHighPriorityOctets);
718                 putNet64(receiver, elem->counterBlock.vg.dot12HCInNormPriorityOctets);
719                 putNet64(receiver, elem->counterBlock.vg.dot12HCOutHighPriorityOctets);
720                 break;
721             case SFLCOUNTERS_VLAN:
722                 // mixed sizes
723                 putNet32(receiver, elem->counterBlock.vlan.vlan_id);
724                 putNet64(receiver, elem->counterBlock.vlan.octets);
725                 putNet32(receiver, elem->counterBlock.vlan.ucastPkts);
726                 putNet32(receiver, elem->counterBlock.vlan.multicastPkts);
727                 putNet32(receiver, elem->counterBlock.vlan.broadcastPkts);
728                 putNet32(receiver, elem->counterBlock.vlan.discards);
729                 break;
730             default:
731                 sflError(receiver, "unexpected counters_tag");
732                 return -1;
733                 break;
734             }
735         }
736     }
737     // sanity check
738     assert(((u_char *)receiver->sampleCollector.datap
739             - (u_char *)receiver->sampleCollector.data
740             - receiver->sampleCollector.pktlen)  == (u_int32_t)packedSize);
741
742     // update the pktlen
743     receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;
744     return packedSize;
745 }
746
747 /*_________________---------------------------------__________________
748   _________________ sfl_receiver_samplePacketsSent  __________________
749   -----------------_________________________________------------------
750 */
751
752 u_int32_t sfl_receiver_samplePacketsSent(SFLReceiver *receiver)
753 {
754     return receiver->sampleCollector.packetSeqNo;
755 }
756
757 /*_________________---------------------------__________________
758   _________________     sendSample            __________________
759   -----------------___________________________------------------
760 */
761
762 static void sendSample(SFLReceiver *receiver)
763 {
764     /* construct and send out the sample, then reset for the next one... */
765     /* first fill in the header with the latest values */
766     /* version, agent_address and sub_agent_id were pre-set. */
767     u_int32_t hdrIdx = (receiver->agent->myIP.type == SFLADDRESSTYPE_IP_V6) ? 7 : 4;
768     receiver->sampleCollector.data[hdrIdx++] = htonl(++receiver->sampleCollector.packetSeqNo); /* seq no */
769     receiver->sampleCollector.data[hdrIdx++] = htonl((receiver->agent->now - receiver->agent->bootTime) * 1000); /* uptime */
770     receiver->sampleCollector.data[hdrIdx++] = htonl(receiver->sampleCollector.numSamples); /* num samples */
771     /* send */
772     if(receiver->agent->sendFn) (*receiver->agent->sendFn)(receiver->agent->magic,
773                                                            receiver->agent,
774                                                            receiver,
775                                                            (u_char *)receiver->sampleCollector.data,
776                                                            receiver->sampleCollector.pktlen);
777     else {
778 #ifdef SFLOW_DO_SOCKET
779         /* send it myself */
780         if (receiver->sFlowRcvrAddress.type == SFLADDRESSTYPE_IP_V6) {
781             u_int32_t soclen = sizeof(struct sockaddr_in6);
782             int result = sendto(receiver->agent->receiverSocket6,
783                                 receiver->sampleCollector.data,
784                                 receiver->sampleCollector.pktlen,
785                                 0,
786                                 (struct sockaddr *)&receiver->receiver6,
787                                 soclen);
788             if(result == -1 && errno != EINTR) sfl_agent_sysError(receiver->agent, "receiver", "IPv6 socket sendto error");
789             if(result == 0) sfl_agent_error(receiver->agent, "receiver", "IPv6 socket sendto returned 0");
790         }
791         else {
792             u_int32_t soclen = sizeof(struct sockaddr_in);
793             int result = sendto(receiver->agent->receiverSocket4,
794                                 receiver->sampleCollector.data,
795                                 receiver->sampleCollector.pktlen,
796                                 0,
797                                 (struct sockaddr *)&receiver->receiver4,
798                                 soclen);
799             if(result == -1 && errno != EINTR) sfl_agent_sysError(receiver->agent, "receiver", "socket sendto error");
800             if(result == 0) sfl_agent_error(receiver->agent, "receiver", "socket sendto returned 0");
801         }
802 #endif
803     }
804
805     /* reset for the next time */
806     resetSampleCollector(receiver);
807 }
808
809 /*_________________---------------------------__________________
810   _________________   resetSampleCollector    __________________
811   -----------------___________________________------------------
812 */
813
814 static void resetSampleCollector(SFLReceiver *receiver)
815 {
816     receiver->sampleCollector.pktlen = 0;
817     receiver->sampleCollector.numSamples = 0;
818     /* point the datap to just after the header */
819     receiver->sampleCollector.datap = (receiver->agent->myIP.type == SFLADDRESSTYPE_IP_V6) ?
820         (receiver->sampleCollector.data + 10) :  (receiver->sampleCollector.data + 7);
821
822     receiver->sampleCollector.pktlen = (u_char *)receiver->sampleCollector.datap - (u_char *)receiver->sampleCollector.data;
823 }
824
825 /*_________________---------------------------__________________
826   _________________         sflError          __________________
827   -----------------___________________________------------------
828 */
829
830 static void sflError(SFLReceiver *receiver, char *msg)
831 {
832     sfl_agent_error(receiver->agent, "receiver", msg);
833     resetSampleCollector(receiver);
834 }
835
836 #endif  /* !__CHECKER__ */