Update and rename MantenerFIFO to MantenerFIFO.md
[vsorcdistro/.git] / mininet / build / lib.linux-armv6l-2.7 / mininet / examples / cluster.py
1 #!/usr/bin/python
2
3 """
4 cluster.py: prototyping/experimentation for distributed Mininet,
5             aka Mininet: Cluster Edition
6
7 Author: Bob Lantz
8
9 Core classes:
10
11     RemoteNode: a Node() running on a remote server
12     RemoteOVSSwitch(): an OVSSwitch() running on a remote server
13     RemoteLink: a Link() on a remote server
14     Tunnel: a Link() between a local Node() and a RemoteNode()
15
16 These are largely interoperable with local objects.
17
18 - One Mininet to rule them all
19
20 It is important that the same topologies, APIs, and CLI can be used
21 with minimal or no modification in both local and distributed environments.
22
23 - Multiple placement models
24
25 Placement should be as easy as possible. We should provide basic placement
26 support and also allow for explicit placement.
27
28 Questions:
29
30 What is the basic communication mechanism?
31
32 To start with? Probably a single multiplexed ssh connection between each
33 pair of mininet servers that needs to communicate.
34
35 How are tunnels created?
36
37 We have several options including ssh, GRE, OF capsulator, socat, VDE, l2tp,
38 etc..  It's not clear what the best one is.  For now, we use ssh tunnels since
39 they are encrypted and semi-automatically shared.  We will probably want to
40 support GRE as well because it's very easy to set up with OVS.
41
42 How are tunnels destroyed?
43
44 They are destroyed when the links are deleted in Mininet.stop()
45
46 How does RemoteNode.popen() work?
47
48 It opens a shared ssh connection to the remote server and attaches to
49 the namespace using mnexec -a -g.
50
51 Is there any value to using Paramiko vs. raw ssh?
52
53 Maybe, but it doesn't seem to support L2 tunneling.
54
55 Should we preflight the entire network, including all server-to-server
56 connections?
57
58 Yes! We don't yet do this with remote server-to-server connections yet.
59
60 Should we multiplex the link ssh connections?
61
62 Yes, this is done automatically with ControlMaster=auto.
63
64 Note on ssh and DNS:
65 Please add UseDNS: no to your /etc/ssh/sshd_config!!!
66
67 Things to do:
68
69 - asynchronous/pipelined/parallel startup
70 - ssh debugging/profiling
71 - make connections into real objects
72 - support for other tunneling schemes
73 - tests and benchmarks
74 - hifi support (e.g. delay compensation)
75 """
76
77
78 from mininet.node import Node, Host, OVSSwitch, Controller
79 from mininet.link import Link, Intf
80 from mininet.net import Mininet
81 from mininet.topo import LinearTopo
82 from mininet.topolib import TreeTopo
83 from mininet.util import quietRun, errRun
84 from mininet.examples.clustercli import CLI
85 from mininet.log import setLogLevel, debug, info, error
86 from mininet.clean import addCleanupCallback
87
88 from signal import signal, SIGINT, SIG_IGN
89 from subprocess import Popen, PIPE, STDOUT
90 import os
91 from random import randrange
92 import sys
93 import re
94 from itertools import groupby
95 from operator import attrgetter
96 from distutils.version import StrictVersion
97
98
99 def findUser():
100     "Try to return logged-in (usually non-root) user"
101     return (
102             # If we're running sudo
103             os.environ.get( 'SUDO_USER', False ) or
104             # Logged-in user (if we have a tty)
105             ( quietRun( 'who am i' ).split() or [ False ] )[ 0 ] or
106             # Give up and return effective user
107             quietRun( 'whoami' ).strip() )
108
109
110 class ClusterCleanup( object ):
111     "Cleanup callback"
112
113     inited = False
114     serveruser = {}
115
116     @classmethod
117     def add( cls, server, user='' ):
118         "Add an entry to server: user dict"
119         if not cls.inited:
120             addCleanupCallback( cls.cleanup )
121         if not user:
122             user = findUser()
123         cls.serveruser[ server ] = user
124
125     @classmethod
126     def cleanup( cls ):
127         "Clean up"
128         info( '*** Cleaning up cluster\n' )
129         for server, user in cls.serveruser.items():
130             if server == 'localhost':
131                 # Handled by mininet.clean.cleanup()
132                 continue
133             else:
134                 cmd = [ 'su', user, '-c',
135                         'ssh %s@%s sudo mn -c' % ( user, server ) ]
136                 info( cmd, '\n' )
137                 info( quietRun( cmd ) )
138
139 # BL note: so little code is required for remote nodes,
140 # we will probably just want to update the main Node()
141 # class to enable it for remote access! However, there
142 # are a large number of potential failure conditions with
143 # remote nodes which we may want to detect and handle.
144 # Another interesting point is that we could put everything
145 # in a mix-in class and easily add cluster mode to 2.0.
146
147 class RemoteMixin( object ):
148
149     "A mix-in class to turn local nodes into remote nodes"
150
151     # ssh base command
152     # -q: don't print stupid diagnostic messages
153     # BatchMode yes: don't ask for password
154     # ForwardAgent yes: forward authentication credentials
155     sshbase = [ 'ssh', '-q',
156                 '-o', 'BatchMode=yes',
157                 '-o', 'ForwardAgent=yes', '-tt' ]
158
159     def __init__( self, name, server='localhost', user=None, serverIP=None,
160                   controlPath=False, splitInit=False, **kwargs):
161         """Instantiate a remote node
162            name: name of remote node
163            server: remote server (optional)
164            user: user on remote server (optional)
165            controlPath: specify shared ssh control path (optional)
166            splitInit: split initialization?
167            **kwargs: see Node()"""
168         # We connect to servers by IP address
169         self.server = server if server else 'localhost'
170         self.serverIP = ( serverIP if serverIP
171                           else self.findServerIP( self.server ) )
172         self.user = user if user else findUser()
173         ClusterCleanup.add( server=server, user=user )
174         if controlPath is True:
175             # Set a default control path for shared SSH connections
176             controlPath = '/tmp/mn-%r@%h:%p'
177         self.controlPath = controlPath
178         self.splitInit = splitInit
179         if self.user and self.server != 'localhost':
180             self.dest = '%s@%s' % ( self.user, self.serverIP )
181             self.sshcmd = [ 'sudo', '-E', '-u', self.user ] + self.sshbase
182             if self.controlPath:
183                 self.sshcmd += [ '-o', 'ControlPath=' + self.controlPath,
184                                  '-o', 'ControlMaster=auto',
185                                  '-o', 'ControlPersist=' + '1' ]
186             self.sshcmd += [ self.dest ]
187             self.isRemote = True
188         else:
189             self.dest = None
190             self.sshcmd = []
191             self.isRemote = False
192         # Satisfy pylint
193         self.shell, self.pid = None, None
194         super( RemoteMixin, self ).__init__( name, **kwargs )
195
196     # Determine IP address of local host
197     _ipMatchRegex = re.compile( r'\d+\.\d+\.\d+\.\d+' )
198
199     @classmethod
200     def findServerIP( cls, server ):
201         "Return our server's IP address"
202         # First, check for an IP address
203         ipmatch = cls._ipMatchRegex.findall( server )
204         if ipmatch:
205             return ipmatch[ 0 ]
206         # Otherwise, look up remote server
207         output = quietRun( 'getent ahostsv4 %s' % server )
208         ips = cls._ipMatchRegex.findall( output )
209         ip = ips[ 0 ] if ips else None
210         return ip
211
212     # Command support via shell process in namespace
213     def startShell( self, *args, **kwargs ):
214         "Start a shell process for running commands"
215         if self.isRemote:
216             kwargs.update( mnopts='-c' )
217         super( RemoteMixin, self ).startShell( *args, **kwargs )
218         # Optional split initialization
219         self.sendCmd( 'echo $$' )
220         if not self.splitInit:
221             self.finishInit()
222
223     def finishInit( self ):
224         "Wait for split initialization to complete"
225         self.pid = int( self.waitOutput() )
226
227     def rpopen( self, *cmd, **opts ):
228         "Return a Popen object on underlying server in root namespace"
229         params = { 'stdin': PIPE,
230                    'stdout': PIPE,
231                    'stderr': STDOUT,
232                    'sudo': True }
233         params.update( opts )
234         return self._popen( *cmd, **params )
235
236     def rcmd( self, *cmd, **opts):
237         """rcmd: run a command on underlying server
238            in root namespace
239            args: string or list of strings
240            returns: stdout and stderr"""
241         popen = self.rpopen( *cmd, **opts )
242         # info( 'RCMD: POPEN:', popen, '\n' )
243         # These loops are tricky to get right.
244         # Once the process exits, we can read
245         # EOF twice if necessary.
246         result = ''
247         while True:
248             poll = popen.poll()
249             result += popen.stdout.read()
250             if poll is not None:
251                 break
252         return result
253
254     @staticmethod
255     def _ignoreSignal():
256         "Detach from process group to ignore all signals"
257         os.setpgrp()
258
259     def _popen( self, cmd, sudo=True, tt=True, **params):
260         """Spawn a process on a remote node
261             cmd: remote command to run (list)
262             **params: parameters to Popen()
263             returns: Popen() object"""
264         if type( cmd ) is str:
265             cmd = cmd.split()
266         if self.isRemote:
267             if sudo:
268                 cmd = [ 'sudo', '-E' ] + cmd
269             if tt:
270                 cmd = self.sshcmd + cmd
271             else:
272                 # Hack: remove -tt
273                 sshcmd = list( self.sshcmd )
274                 sshcmd.remove( '-tt' )
275                 cmd = sshcmd + cmd
276         else:
277             if self.user and not sudo:
278                 # Drop privileges
279                 cmd = [ 'sudo', '-E', '-u', self.user ] + cmd
280         params.update( preexec_fn=self._ignoreSignal )
281         debug( '_popen', cmd, '\n' )
282         popen = super( RemoteMixin, self )._popen( cmd, **params )
283         return popen
284
285     def popen( self, *args, **kwargs ):
286         "Override: disable -tt"
287         return super( RemoteMixin, self).popen( *args, tt=False, **kwargs )
288
289     def addIntf( self, *args, **kwargs ):
290         "Override: use RemoteLink.moveIntf"
291         # kwargs.update( moveIntfFn=RemoteLink.moveIntf )
292         return super( RemoteMixin, self).addIntf( *args, **kwargs )
293
294
295 class RemoteNode( RemoteMixin, Node ):
296     "A node on a remote server"
297     pass
298
299
300 class RemoteHost( RemoteNode ):
301     "A RemoteHost is simply a RemoteNode"
302     pass
303
304
305 class RemoteOVSSwitch( RemoteMixin, OVSSwitch ):
306     "Remote instance of Open vSwitch"
307
308     OVSVersions = {}
309
310     def __init__( self, *args, **kwargs ):
311         # No batch startup yet
312         kwargs.update( batch=True )
313         super( RemoteOVSSwitch, self ).__init__( *args, **kwargs )
314
315     def isOldOVS( self ):
316         "Is remote switch using an old OVS version?"
317         cls = type( self )
318         if self.server not in cls.OVSVersions:
319             # pylint: disable=not-callable
320             vers = self.cmd( 'ovs-vsctl --version' )
321             # pylint: enable=not-callable
322             cls.OVSVersions[ self.server ] = re.findall(
323                 r'\d+\.\d+', vers )[ 0 ]
324         return ( StrictVersion( cls.OVSVersions[ self.server ] ) <
325                  StrictVersion( '1.10' ) )
326
327     @classmethod
328     def batchStartup( cls, switches, **_kwargs ):
329         "Start up switches in per-server batches"
330         key = attrgetter( 'server' )
331         for server, switchGroup in groupby( sorted( switches, key=key ), key ):
332             info( '(%s)' % server )
333             group = tuple( switchGroup )
334             switch = group[ 0 ]
335             OVSSwitch.batchStartup( group, run=switch.cmd )
336         return switches
337
338     @classmethod
339     def batchShutdown( cls, switches, **_kwargs ):
340         "Stop switches in per-server batches"
341         key = attrgetter( 'server' )
342         for server, switchGroup in groupby( sorted( switches, key=key ), key ):
343             info( '(%s)' % server )
344             group = tuple( switchGroup )
345             switch = group[ 0 ]
346             OVSSwitch.batchShutdown( group, run=switch.rcmd )
347         return switches
348
349
350 class RemoteLink( Link ):
351     "A RemoteLink is a link between nodes which may be on different servers"
352
353     def __init__( self, node1, node2, **kwargs ):
354         """Initialize a RemoteLink
355            see Link() for parameters"""
356         # Create links on remote node
357         self.node1 = node1
358         self.node2 = node2
359         self.tunnel = None
360         kwargs.setdefault( 'params1', {} )
361         kwargs.setdefault( 'params2', {} )
362         self.cmd = None  # satisfy pylint
363         Link.__init__( self, node1, node2, **kwargs )
364
365     def stop( self ):
366         "Stop this link"
367         if self.tunnel:
368             self.tunnel.terminate()
369             self.intf1.delete()
370             self.intf2.delete()
371         else:
372             Link.stop( self )
373         self.tunnel = None
374
375     def makeIntfPair( self, intfname1, intfname2, addr1=None, addr2=None,
376                       node1=None, node2=None, deleteIntfs=True   ):
377         """Create pair of interfaces
378             intfname1: name of interface 1
379             intfname2: name of interface 2
380             (override this method [and possibly delete()]
381             to change link type)"""
382         node1 = self.node1 if node1 is None else node1
383         node2 = self.node2 if node2 is None else node2
384         server1 = getattr( node1, 'server', 'localhost' )
385         server2 = getattr( node2, 'server', 'localhost' )
386         if server1 == server2:
387             # Link within same server
388             return Link.makeIntfPair( intfname1, intfname2, addr1, addr2,
389                                       node1, node2, deleteIntfs=deleteIntfs )
390         # Otherwise, make a tunnel
391         self.tunnel = self.makeTunnel( node1, node2, intfname1, intfname2,
392                                        addr1, addr2 )
393         return self.tunnel
394
395     @staticmethod
396     def moveIntf( intf, node ):
397         """Move remote interface from root ns to node
398             intf: string, interface
399             dstNode: destination Node
400             srcNode: source Node or None (default) for root ns"""
401         intf = str( intf )
402         cmd = 'ip link set %s netns %s' % ( intf, node.pid )
403         result = node.rcmd( cmd )
404         if result:
405             raise Exception('error executing command %s' % cmd)
406         return True
407
408     def makeTunnel( self, node1, node2, intfname1, intfname2,
409                     addr1=None, addr2=None ):
410         "Make a tunnel across switches on different servers"
411         # We should never try to create a tunnel to ourselves!
412         assert node1.server != node2.server
413         # And we can't ssh into this server remotely as 'localhost',
414         # so try again swappping node1 and node2
415         if node2.server == 'localhost':
416             return self.makeTunnel( node2, node1, intfname2, intfname1,
417                                     addr2, addr1 )
418         debug( '\n*** Make SSH tunnel ' + node1.server + ':' + intfname1 +
419                ' == ' + node2.server + ':' + intfname2 )
420         # 1. Create tap interfaces
421         for node in node1, node2:
422             # For now we are hard-wiring tap9, which we will rename
423             cmd = 'ip tuntap add dev tap9 mode tap user ' + node.user
424             result = node.rcmd( cmd )
425             if result:
426                 raise Exception( 'error creating tap9 on %s: %s' %
427                                  ( node, result ) )
428         # 2. Create ssh tunnel between tap interfaces
429         # -n: close stdin
430         dest = '%s@%s' % ( node2.user, node2.serverIP )
431         cmd = [ 'ssh', '-n', '-o', 'Tunnel=Ethernet', '-w', '9:9',
432                 dest, 'echo @' ]
433         self.cmd = cmd
434         tunnel = node1.rpopen( cmd, sudo=False )
435         # When we receive the character '@', it means that our
436         # tunnel should be set up
437         debug( '\nWaiting for tunnel to come up...\n' )
438         ch = tunnel.stdout.read( 1 )
439         if ch != '@':
440             raise Exception( 'makeTunnel:\n',
441                              'Tunnel setup failed for',
442                              '%s:%s' % ( node1, node1.dest ), 'to',
443                              '%s:%s\n' % ( node2, node2.dest ),
444                              'command was:', cmd, '\n' )
445         # 3. Move interfaces if necessary
446         for node in node1, node2:
447             if not self.moveIntf( 'tap9', node ):
448                 raise Exception( 'interface move failed on node %s' % node )
449         # 4. Rename tap interfaces to desired names
450         for node, intf, addr in ( ( node1, intfname1, addr1 ),
451                                   ( node2, intfname2, addr2 ) ):
452             if not addr:
453                 result = node.cmd( 'ip link set tap9 name', intf )
454             else:
455                 result = node.cmd( 'ip link set tap9 name', intf,
456                                    'address', addr )
457             if result:
458                 raise Exception( 'error renaming %s: %s' % ( intf, result ) )
459         return tunnel
460
461     def status( self ):
462         "Detailed representation of link"
463         if self.tunnel:
464             if self.tunnel.poll() is not None:
465                 status = "Tunnel EXITED %s" % self.tunnel.returncode
466             else:
467                 status = "Tunnel Running (%s: %s)" % (
468                     self.tunnel.pid, self.cmd )
469         else:
470             status = "OK"
471         result = "%s %s" % ( Link.status( self ), status )
472         return result
473
474
475 class RemoteSSHLink( RemoteLink ):
476     "Remote link using SSH tunnels"
477     def __init__(self, node1, node2, **kwargs):
478         RemoteLink.__init__( self, node1, node2, **kwargs )
479
480
481 class RemoteGRELink( RemoteLink ):
482     "Remote link using GRE tunnels"
483
484     GRE_KEY = 0
485
486     def __init__(self, node1, node2, **kwargs):
487         RemoteLink.__init__( self, node1, node2, **kwargs )
488
489     def stop( self ):
490         "Stop this link"
491         if self.tunnel:
492             self.intf1.delete()
493             self.intf2.delete()
494         else:
495             Link.stop( self )
496         self.tunnel = None
497
498     def makeIntfPair( self, intfname1, intfname2, addr1=None, addr2=None,
499                       node1=None, node2=None, deleteIntfs=True   ):
500         """Create pair of interfaces
501             intfname1: name of interface 1
502             intfname2: name of interface 2
503             (override this method [and possibly delete()]
504             to change link type)"""
505         node1 = self.node1 if node1 is None else node1
506         node2 = self.node2 if node2 is None else node2
507         server1 = getattr( node1, 'server', 'localhost' )
508         server2 = getattr( node2, 'server', 'localhost' )
509         if server1 == server2:
510             # Link within same server
511             Link.makeIntfPair( intfname1, intfname2, addr1, addr2,
512                                node1, node2, deleteIntfs=deleteIntfs )
513             # Need to reduce the MTU of all emulated hosts to 1450 for GRE
514             # tunneling, otherwise packets larger than 1400 bytes cannot be
515             # successfully transmitted through the tunnel.
516             node1.cmd('ip link set dev %s mtu 1450' % intfname1)
517             node2.cmd('ip link set dev %s mtu 1450' % intfname2)
518         else:
519             # Otherwise, make a tunnel
520             self.makeTunnel( node1, node2, intfname1, intfname2, addr1, addr2 )
521             self.tunnel = 1
522
523     def makeTunnel(self, node1, node2, intfname1, intfname2,
524                        addr1=None, addr2=None):
525         "Make a tunnel across switches on different servers"
526         # We should never try to create a tunnel to ourselves!
527         assert node1.server != node2.server
528         if node2.server == 'localhost':
529             return self.makeTunnel( node2, node1, intfname2, intfname1,
530                                     addr2, addr1 )
531         IP1, IP2 = node1.serverIP, node2.serverIP
532         # GRE tunnel needs to be set up with the IP of the local interface
533         # that connects the remote node, NOT '127.0.0.1' of localhost
534         if node1.server == 'localhost':
535             output = quietRun('ip route get %s' % node2.serverIP)
536             IP1 = output.split(' src ')[1].split()[0]
537         debug( '\n*** Make GRE tunnel ' + node1.server + ':' + intfname1 +
538                ' == ' + node2.server + ':' + intfname2 )
539         tun1 = 'local ' + IP1 + ' remote ' + IP2
540         tun2 = 'local ' + IP2 + ' remote ' + IP1
541         self.__class__.GRE_KEY += 1
542         for (node, intfname, addr, tun) in [(node1, intfname1, addr1, tun1),
543                                             (node2, intfname2, addr2, tun2)]:
544             node.rcmd('ip link delete ' + intfname)
545             result = node.rcmd('ip link add name ' + intfname + ' type gretap '
546                                + tun + ' ttl 64 key '
547                                + str( self.__class__.GRE_KEY) )
548             if result:
549                 raise Exception('error creating gretap on %s: %s'
550                                 % (node, result))
551             if addr:
552                 node.rcmd('ip link set %s address %s' % (intfname, addr))
553
554             node.rcmd('ip link set dev %s up' % intfname)
555             node.rcmd('ip link set dev %s mtu 1450' % intfname)
556             if not self.moveIntf(intfname, node):
557                 raise Exception('interface move failed on node %s' % node)
558
559
560 # Some simple placement algorithms for MininetCluster
561
562 class Placer( object ):
563     "Node placement algorithm for MininetCluster"
564
565     def __init__( self, servers=None, nodes=None, hosts=None,
566                   switches=None, controllers=None, links=None ):
567         """Initialize placement object
568            servers: list of servers
569            nodes: list of all nodes
570            hosts: list of hosts
571            switches: list of switches
572            controllers: list of controllers
573            links: list of links
574            (all arguments are optional)
575            returns: server"""
576         self.servers = servers or []
577         self.nodes = nodes or []
578         self.hosts = hosts or []
579         self.switches = switches or []
580         self.controllers = controllers or []
581         self.links = links or []
582
583     def place( self, node ):
584         "Return server for a given node"
585         assert self, node  # satisfy pylint
586         # Default placement: run locally
587         return 'localhost'
588
589
590 class RandomPlacer( Placer ):
591     "Random placement"
592     def place( self, nodename ):
593         """Random placement function
594             nodename: node name"""
595         assert nodename  # please pylint
596         # This may be slow with lots of servers
597         return self.servers[ randrange( 0, len( self.servers ) ) ]
598
599
600 class RoundRobinPlacer( Placer ):
601     """Round-robin placement
602        Note this will usually result in cross-server links between
603        hosts and switches"""
604
605     def __init__( self, *args, **kwargs ):
606         Placer.__init__( self, *args, **kwargs )
607         self.next = 0
608
609     def place( self, nodename ):
610         """Round-robin placement function
611             nodename: node name"""
612         assert nodename  # please pylint
613         # This may be slow with lots of servers
614         server = self.servers[ self.next ]
615         self.next = ( self.next + 1 ) % len( self.servers )
616         return server
617
618
619 class SwitchBinPlacer( Placer ):
620     """Place switches (and controllers) into evenly-sized bins,
621        and attempt to co-locate hosts and switches"""
622
623     def __init__( self, *args, **kwargs ):
624         Placer.__init__( self, *args, **kwargs )
625         # Easy lookup for servers and node sets
626         self.servdict = dict( enumerate( self.servers ) )
627         self.hset = frozenset( self.hosts )
628         self.sset = frozenset( self.switches )
629         self.cset = frozenset( self.controllers )
630         # Server and switch placement indices
631         self.placement = self.calculatePlacement()
632
633     @staticmethod
634     def bin( nodes, servers ):
635         "Distribute nodes evenly over servers"
636         # Calculate base bin size
637         nlen = len( nodes )
638         slen = len( servers )
639         # Basic bin size
640         quotient = int( nlen / slen )
641         binsizes = { server: quotient for server in servers }
642         # Distribute remainder
643         remainder = nlen % slen
644         for server in servers[ 0 : remainder ]:
645             binsizes[ server ] += 1
646         # Create binsize[ server ] tickets for each server
647         tickets = sum( [ binsizes[ server ] * [ server ]
648                          for server in servers ], [] )
649         # And assign one ticket to each node
650         return { node: ticket for node, ticket in zip( nodes, tickets ) }
651
652     def calculatePlacement( self ):
653         "Pre-calculate node placement"
654         placement = {}
655         # Create host-switch connectivity map,
656         # associating host with last switch that it's
657         # connected to
658         switchFor = {}
659         for src, dst in self.links:
660             if src in self.hset and dst in self.sset:
661                 switchFor[ src ] = dst
662             if dst in self.hset and src in self.sset:
663                 switchFor[ dst ] = src
664         # Place switches
665         placement = self.bin( self.switches, self.servers )
666         # Place controllers and merge into placement dict
667         placement.update( self.bin( self.controllers, self.servers ) )
668         # Co-locate hosts with their switches
669         for h in self.hosts:
670             if h in placement:
671                 # Host is already placed - leave it there
672                 continue
673             if h in switchFor:
674                 placement[ h ] = placement[ switchFor[ h ] ]
675             else:
676                 raise Exception(
677                         "SwitchBinPlacer: cannot place isolated host " + h )
678         return placement
679
680     def place( self, node ):
681         """Simple placement algorithm:
682            place switches into evenly sized bins,
683            and place hosts near their switches"""
684         return self.placement[ node ]
685
686
687 class HostSwitchBinPlacer( Placer ):
688     """Place switches *and hosts* into evenly-sized bins
689        Note that this will usually result in cross-server
690        links between hosts and switches"""
691
692     def __init__( self, *args, **kwargs ):
693         Placer.__init__( self, *args, **kwargs )
694         # Calculate bin sizes
695         scount = len( self.servers )
696         self.hbin = max( int( len( self.hosts ) / scount ), 1 )
697         self.sbin = max( int( len( self.switches ) / scount ), 1 )
698         self.cbin = max( int( len( self.controllers ) / scount ), 1 )
699         info( 'scount:', scount )
700         info( 'bins:', self.hbin, self.sbin, self.cbin, '\n' )
701         self.servdict = dict( enumerate( self.servers ) )
702         self.hset = frozenset( self.hosts )
703         self.sset = frozenset( self.switches )
704         self.cset = frozenset( self.controllers )
705         self.hind, self.sind, self.cind = 0, 0, 0
706
707     def place( self, nodename ):
708         """Simple placement algorithm:
709             place nodes into evenly sized bins"""
710         # Place nodes into bins
711         if nodename in self.hset:
712             server = self.servdict[ self.hind / self.hbin ]
713             self.hind += 1
714         elif nodename in self.sset:
715             server = self.servdict[ self.sind / self.sbin ]
716             self.sind += 1
717         elif nodename in self.cset:
718             server = self.servdict[ self.cind / self.cbin ]
719             self.cind += 1
720         else:
721             info( 'warning: unknown node', nodename )
722             server = self.servdict[ 0 ]
723         return server
724
725
726 # The MininetCluster class is not strictly necessary.
727 # However, it has several purposes:
728 # 1. To set up ssh connection sharing/multiplexing
729 # 2. To pre-flight the system so that everything is more likely to work
730 # 3. To allow connection/connectivity monitoring
731 # 4. To support pluggable placement algorithms
732
733 class MininetCluster( Mininet ):
734
735     "Cluster-enhanced version of Mininet class"
736
737     # Default ssh command
738     # BatchMode yes: don't ask for password
739     # ForwardAgent yes: forward authentication credentials
740     sshcmd = [ 'ssh', '-o', 'BatchMode=yes', '-o', 'ForwardAgent=yes' ]
741
742     def __init__( self, *args, **kwargs ):
743         """servers: a list of servers to use (note: include
744            localhost or None to use local system as well)
745            user: user name for server ssh
746            placement: Placer() subclass"""
747         params = { 'host': RemoteHost,
748                    'switch': RemoteOVSSwitch,
749                    'link': RemoteLink,
750                    'precheck': True }
751         params.update( kwargs )
752         servers = params.pop( 'servers', [ 'localhost' ] )
753         servers = [ s if s else 'localhost' for s in servers ]
754         self.servers = servers
755         self.serverIP = params.pop( 'serverIP', {} )
756         if not self.serverIP:
757             self.serverIP = { server: RemoteMixin.findServerIP( server )
758                               for server in self.servers }
759         self.user = params.pop( 'user', findUser() )
760         if params.pop( 'precheck' ):
761             self.precheck()
762         self.connections = {}
763         self.placement = params.pop( 'placement', SwitchBinPlacer )
764         # Make sure control directory exists
765         self.cdir = os.environ[ 'HOME' ] + '/.ssh/mn'
766         errRun( [ 'mkdir', '-p', self.cdir ] )
767         Mininet.__init__( self, *args, **params )
768
769     def popen( self, cmd ):
770         "Popen() for server connections"
771         assert self  # please pylint
772         old = signal( SIGINT, SIG_IGN )
773         conn = Popen( cmd, stdin=PIPE, stdout=PIPE, close_fds=True )
774         signal( SIGINT, old )
775         return conn
776
777     def baddLink( self, *args, **kwargs ):
778         "break addlink for testing"
779         pass
780
781     def precheck( self ):
782         """Pre-check to make sure connection works and that
783            we can call sudo without a password"""
784         result = 0
785         info( '*** Checking servers\n' )
786         for server in self.servers:
787             ip = self.serverIP[ server ]
788             if not server or server == 'localhost':
789                 continue
790             info( server, '' )
791             dest = '%s@%s' % ( self.user, ip )
792             cmd = [ 'sudo', '-E', '-u', self.user ]
793             cmd += self.sshcmd + [ '-n', dest, 'sudo true' ]
794             debug( ' '.join( cmd ), '\n' )
795             _out, _err, code = errRun( cmd )
796             if code != 0:
797                 error( '\nstartConnection: server connection check failed '
798                        'to %s using command:\n%s\n'
799                         % ( server, ' '.join( cmd ) ) )
800             result |= code
801         if result:
802             error( '*** Server precheck failed.\n'
803                    '*** Make sure that the above ssh command works'
804                    ' correctly.\n'
805                    '*** You may also need to run mn -c on all nodes, and/or\n'
806                    '*** use sudo -E.\n' )
807             sys.exit( 1 )
808         info( '\n' )
809
810     def modifiedaddHost( self, *args, **kwargs ):
811         "Slightly modify addHost"
812         assert self  # please pylint
813         kwargs[ 'splitInit' ] = True
814         return Mininet.addHost( *args, **kwargs )
815
816     def placeNodes( self ):
817         """Place nodes on servers (if they don't have a server), and
818            start shell processes"""
819         if not self.servers or not self.topo:
820             # No shirt, no shoes, no service
821             return
822         nodes = self.topo.nodes()
823         placer = self.placement( servers=self.servers,
824                                  nodes=self.topo.nodes(),
825                                  hosts=self.topo.hosts(),
826                                  switches=self.topo.switches(),
827                                  links=self.topo.links() )
828         for node in nodes:
829             config = self.topo.nodeInfo( node )
830             # keep local server name consistent accross nodes
831             if 'server' in config.keys() and config[ 'server' ] is None:
832                 config[ 'server' ] = 'localhost'
833             server = config.setdefault( 'server', placer.place( node ) )
834             if server:
835                 config.setdefault( 'serverIP', self.serverIP[ server ] )
836             info( '%s:%s ' % ( node, server ) )
837             key = ( None, server )
838             _dest, cfile, _conn = self.connections.get(
839                         key, ( None, None, None ) )
840             if cfile:
841                 config.setdefault( 'controlPath', cfile )
842
843     def addController( self, *args, **kwargs ):
844         "Patch to update IP address to global IP address"
845         controller = Mininet.addController( self, *args, **kwargs )
846         loopback = '127.0.0.1'
847         if ( not isinstance( controller, Controller ) or
848              controller.IP() != loopback ):
849             return
850         # Find route to a different server IP address
851         serverIPs = [ ip for ip in self.serverIP.values()
852                       if ip is not controller.IP() ]
853         if not serverIPs:
854             return  # no remote servers - loopback is fine
855         remoteIP = serverIPs[ 0 ]
856         # Route should contain 'dev <intfname>'
857         route = controller.cmd( 'ip route get', remoteIP,
858                                 r'| egrep -o "dev\s[^[:space:]]+"' )
859         if not route:
860             raise Exception('addController: no route from', controller,
861                             'to', remoteIP )
862         intf = route.split()[ 1 ].strip()
863         debug( 'adding', intf, 'to', controller )
864         Intf( intf, node=controller ).updateIP()
865         debug( controller, 'IP address updated to', controller.IP() )
866         return controller
867
868     def buildFromTopo( self, *args, **kwargs ):
869         "Start network"
870         info( '*** Placing nodes\n' )
871         self.placeNodes()
872         info( '\n' )
873         Mininet.buildFromTopo( self, *args, **kwargs )
874
875
876 def testNsTunnels( remote='ubuntu2', link=RemoteGRELink ):
877     "Test tunnels between nodes in namespaces"
878     net = Mininet( host=RemoteHost, link=link )
879     h1 = net.addHost( 'h1')
880     h2 = net.addHost( 'h2', server=remote )
881     net.addLink( h1, h2 )
882     net.start()
883     net.pingAll()
884     net.stop()
885
886 # Manual topology creation with net.add*()
887 #
888 # This shows how node options may be used to manage
889 # cluster placement using the net.add*() API
890
891 def testRemoteNet( remote='ubuntu2', link=RemoteGRELink ):
892     "Test remote Node classes"
893     info( '*** Remote Node Test\n' )
894     net = Mininet( host=RemoteHost, switch=RemoteOVSSwitch, link=link )
895     c0 = net.addController( 'c0' )
896     # Make sure controller knows its non-loopback address
897     Intf( 'eth0', node=c0 ).updateIP()
898     info( "*** Creating local h1\n" )
899     h1 = net.addHost( 'h1' )
900     info( "*** Creating remote h2\n" )
901     h2 = net.addHost( 'h2', server=remote )
902     info( "*** Creating local s1\n" )
903     s1 = net.addSwitch( 's1' )
904     info( "*** Creating remote s2\n" )
905     s2 = net.addSwitch( 's2', server=remote )
906     info( "*** Adding links\n" )
907     net.addLink( h1, s1 )
908     net.addLink( s1, s2 )
909     net.addLink( h2, s2 )
910     net.start()
911     info( 'Mininet is running on', quietRun( 'hostname' ).strip(), '\n' )
912     for node in c0, h1, h2, s1, s2:
913         info( 'Node', node, 'is running on',
914               node.cmd( 'hostname' ).strip(), '\n' )
915     net.pingAll()
916     CLI( net )
917     net.stop()
918
919
920 # High-level/Topo API example
921 #
922 # This shows how existing Mininet topologies may be used in cluster
923 # mode by creating node placement functions and a controller which
924 # can be accessed remotely. This implements a very compatible version
925 # of cluster edition with a minimum of code!
926
927 remoteHosts = [ 'h2' ]
928 remoteSwitches = [ 's2' ]
929 remoteServer = 'ubuntu2'
930
931 def HostPlacer( name, *args, **params ):
932     "Custom Host() constructor which places hosts on servers"
933     if name in remoteHosts:
934         return RemoteHost( name, *args, server=remoteServer, **params )
935     else:
936         return Host( name, *args, **params )
937
938 def SwitchPlacer( name, *args, **params ):
939     "Custom Switch() constructor which places switches on servers"
940     if name in remoteSwitches:
941         return RemoteOVSSwitch( name, *args, server=remoteServer, **params )
942     else:
943         return RemoteOVSSwitch( name, *args, **params )
944
945 def ClusterController( *args, **kwargs):
946     "Custom Controller() constructor which updates its eth0 IP address"
947     controller = Controller( *args, **kwargs )
948     # Find out its IP address so that cluster switches can connect
949     Intf( 'eth0', node=controller ).updateIP()
950     return controller
951
952 def testRemoteTopo( link=RemoteGRELink ):
953     "Test remote Node classes using Mininet()/Topo() API"
954     topo = LinearTopo( 2 )
955     net = Mininet( topo=topo, host=HostPlacer, switch=SwitchPlacer,
956                    link=link, controller=ClusterController )
957     net.start()
958     net.pingAll()
959     net.stop()
960
961 # Need to test backwards placement, where each host is on
962 # a server other than its switch!! But seriously we could just
963 # do random switch placement rather than completely random
964 # host placement.
965
966 def testRemoteSwitches( remote='ubuntu2', link=RemoteGRELink ):
967     "Test with local hosts and remote switches"
968     servers = [ 'localhost', remote]
969     topo = TreeTopo( depth=4, fanout=2 )
970     net = MininetCluster( topo=topo, servers=servers, link=link,
971                           placement=RoundRobinPlacer )
972     net.start()
973     net.pingAll()
974     net.stop()
975
976
977 # For testing and demo purposes it would be nice to draw the
978 # network graph and color it based on server.
979
980 # The MininetCluster() class integrates pluggable placement
981 # functions, for maximum ease of use. MininetCluster() also
982 # pre-flights and multiplexes server connections.
983
984 def testMininetCluster( remote='ubuntu2', link=RemoteGRELink ):
985     "Test MininetCluster()"
986     servers = [ 'localhost', remote ]
987     topo = TreeTopo( depth=3, fanout=3 )
988     net = MininetCluster( topo=topo, servers=servers, link=link,
989                           placement=SwitchBinPlacer )
990     net.start()
991     net.pingAll()
992     net.stop()
993
994 def signalTest( remote='ubuntu2'):
995     "Make sure hosts are robust to signals"
996     h = RemoteHost( 'h0', server=remote )
997     h.shell.send_signal( SIGINT )
998     h.shell.poll()
999     if h.shell.returncode is None:
1000         info( 'signalTest: SUCCESS: ', h, 'has not exited after SIGINT', '\n' )
1001     else:
1002         info( 'signalTest: FAILURE:', h, 'exited with code',
1003               h.shell.returncode, '\n' )
1004     h.stop()
1005
1006
1007 if __name__ == '__main__':
1008     setLogLevel( 'info' )
1009     remoteServer = 'ubuntu2'
1010     remoteLink = RemoteSSHLink
1011     testRemoteTopo(link=remoteLink)
1012     testNsTunnels( remote=remoteServer, link=remoteLink )
1013     testRemoteNet( remote=remoteServer, link=remoteLink)
1014     testMininetCluster( remote=remoteServer, link=remoteLink)
1015     testRemoteSwitches( remote=remoteServer, link=remoteLink)
1016     signalTest( remote=remoteServer )