Merge "master" into "wdp".
[sliver-openvswitch.git] / ofproto / ofproto-sflow.c
1 /*
2  * Copyright (c) 2009, 2010 InMon Corp.
3  * Copyright (c) 2009 Nicira Networks.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at:
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17
18 #include <config.h>
19 #include "ofproto-sflow.h"
20 #include <inttypes.h>
21 #include <stdlib.h>
22 #include "collectors.h"
23 #include "compiler.h"
24 #include "hash.h"
25 #include "hmap.h"
26 #include "netdev.h"
27 #include "ofpbuf.h"
28 #include "ofproto.h"
29 #include "packets.h"
30 #include "poll-loop.h"
31 #include "sflow_api.h"
32 #include "socket-util.h"
33 #include "timeval.h"
34 #include "vlog.h"
35 #include "wdp.h"
36 #include "xfif.h"
37
38 VLOG_DEFINE_THIS_MODULE(sflow)
39
40 struct ofproto_sflow_port {
41     struct hmap_node hmap_node; /* In struct ofproto_sflow's "ports" hmap. */
42     struct netdev *netdev;      /* Underlying network device, for stats. */
43     SFLDataSource_instance dsi; /* sFlow library's notion of port number. */
44     uint16_t xflow_port;        /* xflow port number. */
45 };
46
47 struct ofproto_sflow {
48     struct ofproto *ofproto;
49     struct collectors *collectors;
50     SFLAgent *sflow_agent;
51     struct ofproto_sflow_options *options;
52     struct wdp *wdp;
53     time_t next_tick;
54     size_t n_flood, n_all;
55     struct hmap ports;          /* Contains "struct ofproto_sflow_port"s. */
56 };
57
58 static void ofproto_sflow_del_port__(struct ofproto_sflow *,
59                                      struct ofproto_sflow_port *);
60
61 #define RECEIVER_INDEX 1
62
63 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
64
65 static bool
66 nullable_string_is_equal(const char *a, const char *b)
67 {
68     return a ? b && !strcmp(a, b) : !b;
69 }
70
71 static bool
72 ofproto_sflow_options_equal(const struct ofproto_sflow_options *a,
73                             const struct ofproto_sflow_options *b)
74 {
75     return (svec_equal(&a->targets, &b->targets)
76             && a->sampling_rate == b->sampling_rate
77             && a->polling_interval == b->polling_interval
78             && a->header_len == b->header_len
79             && a->sub_id == b->sub_id
80             && nullable_string_is_equal(a->agent_device, b->agent_device)
81             && nullable_string_is_equal(a->control_ip, b->control_ip));
82 }
83
84 static struct ofproto_sflow_options *
85 ofproto_sflow_options_clone(const struct ofproto_sflow_options *old)
86 {
87     struct ofproto_sflow_options *new = xmemdup(old, sizeof *old);
88     svec_clone(&new->targets, &old->targets);
89     new->agent_device = old->agent_device ? xstrdup(old->agent_device) : NULL;
90     new->control_ip = old->control_ip ? xstrdup(old->control_ip) : NULL;
91     return new;
92 }
93
94 static void
95 ofproto_sflow_options_destroy(struct ofproto_sflow_options *options)
96 {
97     if (options) {
98         svec_destroy(&options->targets);
99         free(options->agent_device);
100         free(options->control_ip);
101         free(options);
102     }
103 }
104
105 /* sFlow library callback to allocate memory. */
106 static void *
107 sflow_agent_alloc_cb(void *magic OVS_UNUSED, SFLAgent *agent OVS_UNUSED,
108                      size_t bytes)
109 {
110     return calloc(1, bytes);
111 }
112
113 /* sFlow library callback to free memory. */
114 static int
115 sflow_agent_free_cb(void *magic OVS_UNUSED, SFLAgent *agent OVS_UNUSED,
116                     void *obj)
117 {
118     free(obj);
119     return 0;
120 }
121
122 /* sFlow library callback to report error. */
123 static void
124 sflow_agent_error_cb(void *magic OVS_UNUSED, SFLAgent *agent OVS_UNUSED,
125                      char *msg)
126 {
127     VLOG_WARN("sFlow agent error: %s", msg);
128 }
129
130 /* sFlow library callback to send datagram. */
131 static void
132 sflow_agent_send_packet_cb(void *os_, SFLAgent *agent OVS_UNUSED,
133                            SFLReceiver *receiver OVS_UNUSED, u_char *pkt,
134                            uint32_t pktLen)
135 {
136     struct ofproto_sflow *os = os_;
137     collectors_send(os->collectors, pkt, pktLen);
138 }
139
140 static struct ofproto_sflow_port *
141 ofproto_sflow_find_port(const struct ofproto_sflow *os, uint16_t xflow_port)
142 {
143     struct ofproto_sflow_port *osp;
144
145     HMAP_FOR_EACH_IN_BUCKET (osp, hmap_node,
146                              hash_int(xflow_port, 0), &os->ports) {
147         if (osp->xflow_port == xflow_port) {
148             return osp;
149         }
150     }
151     return NULL;
152 }
153
154 static void
155 sflow_agent_get_counters(void *os_, SFLPoller *poller,
156                          SFL_COUNTERS_SAMPLE_TYPE *cs)
157 {
158     struct ofproto_sflow *os = os_;
159     SFLCounters_sample_element elem;
160     struct ofproto_sflow_port *osp;
161     SFLIf_counters *counters;
162     struct netdev_stats stats;
163     enum netdev_flags flags;
164     uint32_t current;
165
166     osp = ofproto_sflow_find_port(os, poller->bridgePort);
167     if (!osp) {
168         return;
169     }
170
171     elem.tag = SFLCOUNTERS_GENERIC;
172     counters = &elem.counterBlock.generic;
173     counters->ifIndex = SFL_DS_INDEX(poller->dsi);
174     counters->ifType = 6;
175     if (!netdev_get_features(osp->netdev, &current, NULL, NULL, NULL)) {
176       /* The values of ifDirection come from MAU MIB (RFC 2668): 0 = unknown,
177          1 = full-duplex, 2 = half-duplex, 3 = in, 4=out */
178         counters->ifSpeed = netdev_features_to_bps(current);
179         counters->ifDirection = (netdev_features_is_full_duplex(current)
180                                  ? 1 : 2);
181     } else {
182         counters->ifSpeed = 100000000;
183         counters->ifDirection = 0;
184     }
185     if (!netdev_get_flags(osp->netdev, &flags) && flags & NETDEV_UP) {
186         bool carrier;
187
188         counters->ifStatus = 1; /* ifAdminStatus up. */
189         if (!netdev_get_carrier(osp->netdev, &carrier) && carrier) {
190             counters->ifStatus |= 2; /* ifOperStatus us. */
191         }
192     } else {
193         counters->ifStatus = 0;  /* Down. */
194     }
195
196     /* XXX
197        1. Is the multicast counter filled in?
198        2. Does the multicast counter include broadcasts?
199        3. Does the rx_packets counter include multicasts/broadcasts?
200     */
201     netdev_get_stats(osp->netdev, &stats);
202     counters->ifInOctets = stats.rx_bytes;
203     counters->ifInUcastPkts = stats.rx_packets;
204     counters->ifInMulticastPkts = stats.multicast;
205     counters->ifInBroadcastPkts = -1;
206     counters->ifInDiscards = stats.rx_dropped;
207     counters->ifInErrors = stats.rx_errors;
208     counters->ifInUnknownProtos = -1;
209     counters->ifOutOctets = stats.tx_bytes;
210     counters->ifOutUcastPkts = stats.tx_packets;
211     counters->ifOutMulticastPkts = -1;
212     counters->ifOutBroadcastPkts = -1;
213     counters->ifOutDiscards = stats.tx_dropped;
214     counters->ifOutErrors = stats.tx_errors;
215     counters->ifPromiscuousMode = 0;
216
217     SFLADD_ELEMENT(cs, &elem);
218     sfl_poller_writeCountersSample(poller, cs);
219 }
220
221 /* Obtains an address to use for the local sFlow agent and stores it into
222  * '*agent_addr'.  Returns true if successful, false on failure.
223  *
224  * The sFlow agent address should be a local IP address that is persistent and
225  * reachable over the network, if possible.  The IP address associated with
226  * 'agent_device' is used if it has one, and otherwise 'control_ip', the IP
227  * address used to talk to the controller. */
228 static bool
229 sflow_choose_agent_address(const char *agent_device, const char *control_ip,
230                            SFLAddress *agent_addr)
231 {
232     struct in_addr in4;
233
234     memset(agent_addr, 0, sizeof *agent_addr);
235     agent_addr->type = SFLADDRESSTYPE_IP_V4;
236
237     if (agent_device) {
238         struct netdev *netdev;
239
240         if (!netdev_open_default(agent_device, &netdev)) {
241             int error = netdev_get_in4(netdev, &in4, NULL);
242             netdev_close(netdev);
243             if (!error) {
244                 goto success;
245             }
246         }
247     }
248
249     if (control_ip && !lookup_ip(control_ip, &in4)) {
250         goto success;
251     }
252
253     VLOG_ERR("could not determine IP address for sFlow agent");
254     return false;
255
256 success:
257     agent_addr->address.ip_v4.addr = in4.s_addr;
258     return true;
259 }
260
261 void
262 ofproto_sflow_clear(struct ofproto_sflow *os)
263 {
264     if (os->sflow_agent) {
265         sfl_agent_release(os->sflow_agent);
266         os->sflow_agent = NULL;
267     }
268     collectors_destroy(os->collectors);
269     os->collectors = NULL;
270     ofproto_sflow_options_destroy(os->options);
271     os->options = NULL;
272
273     /* Turn off sampling to save CPU cycles. */
274     wdp_set_sflow_probability(os->wdp, 0);
275 }
276
277 bool
278 ofproto_sflow_is_enabled(const struct ofproto_sflow *os)
279 {
280     return os->collectors != NULL;
281 }
282
283 struct ofproto_sflow *
284 ofproto_sflow_create(struct wdp *wdp)
285 {
286     struct ofproto_sflow *os;
287
288     os = xcalloc(1, sizeof *os);
289     os->wdp = wdp;
290     os->next_tick = time_now() + 1;
291     hmap_init(&os->ports);
292     return os;
293 }
294
295 void
296 ofproto_sflow_destroy(struct ofproto_sflow *os)
297 {
298     if (os) {
299         struct ofproto_sflow_port *osp, *next;
300
301         ofproto_sflow_clear(os);
302         HMAP_FOR_EACH_SAFE (osp, next, hmap_node, &os->ports) {
303             ofproto_sflow_del_port__(os, osp);
304         }
305         hmap_destroy(&os->ports);
306         free(os);
307     }
308 }
309
310 static void
311 ofproto_sflow_add_poller(struct ofproto_sflow *os,
312                          struct ofproto_sflow_port *osp, uint16_t xflow_port)
313 {
314     SFLPoller *poller = sfl_agent_addPoller(os->sflow_agent, &osp->dsi, os,
315                                             sflow_agent_get_counters);
316     sfl_poller_set_sFlowCpInterval(poller, os->options->polling_interval);
317     sfl_poller_set_sFlowCpReceiver(poller, RECEIVER_INDEX);
318     sfl_poller_set_bridgePort(poller, xflow_port);
319 }
320
321 static void
322 ofproto_sflow_add_sampler(struct ofproto_sflow *os,
323                           struct ofproto_sflow_port *osp)
324 {
325     SFLSampler *sampler = sfl_agent_addSampler(os->sflow_agent, &osp->dsi);
326     sfl_sampler_set_sFlowFsPacketSamplingRate(sampler, os->options->sampling_rate);
327     sfl_sampler_set_sFlowFsMaximumHeaderSize(sampler, os->options->header_len);
328     sfl_sampler_set_sFlowFsReceiver(sampler, RECEIVER_INDEX);
329 }
330
331 void
332 ofproto_sflow_add_port(struct ofproto_sflow *os, uint16_t xflow_port,
333                        const char *netdev_name)
334 {
335     struct ofproto_sflow_port *osp;
336     struct netdev *netdev;
337     uint32_t ifindex;
338     int error;
339
340     ofproto_sflow_del_port(os, xflow_port);
341
342     /* Open network device. */
343     error = netdev_open_default(netdev_name, &netdev);
344     if (error) {
345         VLOG_WARN_RL(&rl, "failed to open network device \"%s\": %s",
346                      netdev_name, strerror(error));
347         return;
348     }
349
350     /* Add to table of ports. */
351     osp = xmalloc(sizeof *osp);
352     osp->netdev = netdev;
353     ifindex = netdev_get_ifindex(netdev);
354     if (ifindex <= 0) {
355         ifindex = (os->sflow_agent->subId << 16) + xflow_port;
356     }
357     SFL_DS_SET(osp->dsi, 0, ifindex, 0);
358     osp->xflow_port = xflow_port;
359     hmap_insert(&os->ports, &osp->hmap_node, hash_int(xflow_port, 0));
360
361     /* Add poller and sampler. */
362     if (os->sflow_agent) {
363         ofproto_sflow_add_poller(os, osp, xflow_port);
364         ofproto_sflow_add_sampler(os, osp);
365     }
366 }
367
368 static void
369 ofproto_sflow_del_port__(struct ofproto_sflow *os,
370                          struct ofproto_sflow_port *osp)
371 {
372     if (os->sflow_agent) {
373         sfl_agent_removePoller(os->sflow_agent, &osp->dsi);
374         sfl_agent_removeSampler(os->sflow_agent, &osp->dsi);
375     }
376     netdev_close(osp->netdev);
377     hmap_remove(&os->ports, &osp->hmap_node);
378     free(osp);
379 }
380
381 void
382 ofproto_sflow_del_port(struct ofproto_sflow *os, uint16_t xflow_port)
383 {
384     struct ofproto_sflow_port *osp = ofproto_sflow_find_port(os, xflow_port);
385     if (osp) {
386         ofproto_sflow_del_port__(os, osp);
387     }
388 }
389
390 void
391 ofproto_sflow_set_options(struct ofproto_sflow *os,
392                           const struct ofproto_sflow_options *options)
393 {
394     struct ofproto_sflow_port *osp;
395     bool options_changed;
396     SFLReceiver *receiver;
397     SFLAddress agentIP;
398     time_t now;
399
400     if (!options->targets.n || !options->sampling_rate) {
401         /* No point in doing any work if there are no targets or nothing to
402          * sample. */
403         ofproto_sflow_clear(os);
404         return;
405     }
406
407     options_changed = (!os->options
408                        || !ofproto_sflow_options_equal(options, os->options));
409
410     /* Configure collectors if options have changed or if we're shortchanged in
411      * collectors (which indicates that opening one or more of the configured
412      * collectors failed, so that we should retry). */
413     if (options_changed
414         || collectors_count(os->collectors) < options->targets.n) {
415         collectors_destroy(os->collectors);
416         collectors_create(&options->targets, SFL_DEFAULT_COLLECTOR_PORT,
417                           &os->collectors);
418         if (os->collectors == NULL) {
419             VLOG_WARN_RL(&rl, "no collectors could be initialized, "
420                          "sFlow disabled");
421             ofproto_sflow_clear(os);
422             return;
423         }
424     }
425
426     /* Avoid reconfiguring if options didn't change. */
427     if (!options_changed) {
428         return;
429     }
430     ofproto_sflow_options_destroy(os->options);
431     os->options = ofproto_sflow_options_clone(options);
432
433     /* Choose agent IP address. */
434     if (!sflow_choose_agent_address(options->agent_device,
435                                     options->control_ip, &agentIP)) {
436         ofproto_sflow_clear(os);
437         return;
438     }
439
440     /* Create agent. */
441     VLOG_INFO("creating sFlow agent %d", options->sub_id);
442     if (os->sflow_agent) {
443         sfl_agent_release(os->sflow_agent);
444     }
445     os->sflow_agent = xcalloc(1, sizeof *os->sflow_agent);
446     now = time_wall();
447     sfl_agent_init(os->sflow_agent,
448                    &agentIP,
449                    options->sub_id,
450                    now,         /* Boot time. */
451                    now,         /* Current time. */
452                    os,          /* Pointer supplied to callbacks. */
453                    sflow_agent_alloc_cb,
454                    sflow_agent_free_cb,
455                    sflow_agent_error_cb,
456                    sflow_agent_send_packet_cb);
457
458     receiver = sfl_agent_addReceiver(os->sflow_agent);
459     sfl_receiver_set_sFlowRcvrOwner(receiver, "Open vSwitch sFlow");
460     sfl_receiver_set_sFlowRcvrTimeout(receiver, 0xffffffff);
461
462     /* Set the sampling_rate down in the datapath. */
463     wdp_set_sflow_probability(os->wdp,
464                               MAX(1, UINT32_MAX / options->sampling_rate));
465
466     /* Add samplers and pollers for the currently known ports. */
467     HMAP_FOR_EACH (osp, hmap_node, &os->ports) {
468         ofproto_sflow_add_poller(os, osp, osp->xflow_port);
469         ofproto_sflow_add_sampler(os, osp);
470     }
471 }
472
473 static int
474 ofproto_sflow_xflow_port_to_ifindex(const struct ofproto_sflow *os,
475                                     uint16_t xflow_port)
476 {
477     struct ofproto_sflow_port *osp = ofproto_sflow_find_port(os, xflow_port);
478     return osp ? SFL_DS_INDEX(osp->dsi) : 0;
479 }
480
481 void
482 ofproto_sflow_received(struct ofproto_sflow *os, struct xflow_msg *msg)
483 {
484     SFL_FLOW_SAMPLE_TYPE fs;
485     SFLFlow_sample_element hdrElem;
486     SFLSampled_header *header;
487     SFLFlow_sample_element switchElem;
488     SFLSampler *sampler;
489     const struct xflow_sflow_sample_header *hdr;
490     const union xflow_action *actions;
491     struct ofpbuf payload;
492     size_t n_actions, n_outputs;
493     size_t min_size;
494     flow_t flow;
495     size_t i;
496
497     /* Get xflow_sflow_sample_header. */
498     min_size = sizeof *msg + sizeof *hdr;
499     if (min_size > msg->length) {
500         VLOG_WARN_RL(&rl, "sFlow packet too small (%"PRIu32" < %zu)",
501                      msg->length, min_size);
502         return;
503     }
504     hdr = (const struct xflow_sflow_sample_header *) (msg + 1);
505
506     /* Get actions. */
507     n_actions = hdr->n_actions;
508     if (n_actions > 65536 / sizeof *actions) {
509         VLOG_WARN_RL(&rl, "too many actions in sFlow packet (%zu > %zu)",
510                      65536 / sizeof *actions, n_actions);
511         return;
512     }
513     min_size += n_actions * sizeof *actions;
514     if (min_size > msg->length) {
515         VLOG_WARN_RL(&rl, "sFlow packet with %zu actions too small "
516                      "(%"PRIu32" < %zu)",
517                      n_actions, msg->length, min_size);
518         return;
519     }
520     actions = (const union xflow_action *) (hdr + 1);
521
522     /* Get packet payload and extract flow. */
523     payload.data = (union xflow_action *) (actions + n_actions);
524     payload.size = msg->length - min_size;
525     flow_extract(&payload, 0, msg->port, &flow);
526
527     /* Build a flow sample */
528     memset(&fs, 0, sizeof fs);
529     fs.input = ofproto_sflow_xflow_port_to_ifindex(os, msg->port);
530     fs.output = 0;              /* Filled in correctly below. */
531     fs.sample_pool = hdr->sample_pool;
532
533     /* We are going to give it to the sampler that represents this input port.
534      * By implementing "ingress-only" sampling like this we ensure that we
535      * never have to offer the same sample to more than one sampler. */
536     sampler = sfl_agent_getSamplerByIfIndex(os->sflow_agent, fs.input);
537     if (!sampler) {
538         VLOG_WARN_RL(&rl, "no sampler for input ifIndex (%"PRIu32")",
539                      fs.input);
540         return;
541     }
542
543     /* Sampled header. */
544     memset(&hdrElem, 0, sizeof hdrElem);
545     hdrElem.tag = SFLFLOW_HEADER;
546     header = &hdrElem.flowType.header;
547     header->header_protocol = SFLHEADER_ETHERNET_ISO8023;
548     /* The frame_length should include the Ethernet FCS (4 bytes),
549        but it has already been stripped,  so we need to add 4 here. */
550     header->frame_length = payload.size + 4;
551     /* Ethernet FCS stripped off. */
552     header->stripped = 4;
553     header->header_length = MIN(payload.size,
554                                 sampler->sFlowFsMaximumHeaderSize);
555     header->header_bytes = payload.data;
556
557     /* Add extended switch element. */
558     memset(&switchElem, 0, sizeof(switchElem));
559     switchElem.tag = SFLFLOW_EX_SWITCH;
560     switchElem.flowType.sw.src_vlan = ntohs(flow.dl_vlan);
561     switchElem.flowType.sw.src_priority = -1; /* XXX */
562      /* Initialize the output VLAN and priority to be the same as the input,
563         but these fields can be overriden below if affected by an action. */
564     switchElem.flowType.sw.dst_vlan = switchElem.flowType.sw.src_vlan;
565     switchElem.flowType.sw.dst_priority = switchElem.flowType.sw.src_priority;
566
567     /* Figure out the output ports. */
568     n_outputs = 0;
569     for (i = 0; i < n_actions; i++) {
570         const union xflow_action *a = &actions[i];
571
572         switch (a->type) {
573         case XFLOWAT_OUTPUT:
574             fs.output = ofproto_sflow_xflow_port_to_ifindex(os, a->output.port);
575             n_outputs++;
576             break;
577
578         case XFLOWAT_OUTPUT_GROUP:
579 #if 0
580             n_outputs += (a->output_group.group == DP_GROUP_FLOOD ? os->n_flood
581                           : a->output_group.group == DP_GROUP_ALL ? os->n_all
582                           : 0);
583 #endif
584             break;
585
586         case XFLOWAT_SET_DL_TCI:
587             if (a->dl_tci.mask & htons(VLAN_VID_MASK)) {
588                 switchElem.flowType.sw.dst_vlan = vlan_tci_to_vid(a->dl_tci.tci);
589             }
590             if (a->dl_tci.mask & htons(VLAN_PCP_MASK)) {
591                 switchElem.flowType.sw.dst_priority = vlan_tci_to_pcp(a->dl_tci.tci);
592             }
593             break;
594
595         default:
596             break;
597         }
598     }
599
600     /* Set output port, as defined by http://www.sflow.org/sflow_version_5.txt
601        (search for "Input/output port information"). */
602     if (!n_outputs) {
603         /* This value indicates that the packet was dropped for an unknown
604          * reason. */
605         fs.output = 0x40000000 | 256;
606     } else if (n_outputs > 1 || !fs.output) {
607         /* Setting the high bit means "multiple output ports". */
608         fs.output = 0x80000000 | n_outputs;
609     }
610
611     /* Submit the flow sample to be encoded into the next datagram. */
612     SFLADD_ELEMENT(&fs, &hdrElem);
613     SFLADD_ELEMENT(&fs, &switchElem);
614     sfl_sampler_writeFlowSample(sampler, &fs);
615 }
616
617 void
618 ofproto_sflow_set_group_sizes(struct ofproto_sflow *os,
619                               size_t n_flood, size_t n_all)
620 {
621     os->n_flood = n_flood;
622     os->n_all = n_all;
623 }
624
625 void
626 ofproto_sflow_run(struct ofproto_sflow *os)
627 {
628     if (ofproto_sflow_is_enabled(os)) {
629         time_t now = time_now();
630         if (now >= os->next_tick) {
631             sfl_agent_tick(os->sflow_agent, time_wall());
632             os->next_tick = now + 1;
633         }
634     }
635 }
636
637 void
638 ofproto_sflow_wait(struct ofproto_sflow *os)
639 {
640     if (ofproto_sflow_is_enabled(os)) {
641         poll_timer_wait_until(os->next_tick * 1000LL);
642     }
643 }