1 # Copyright (C) 2014 Nippon Telegraph and Telephone Corporation.
2 # Copyright (C) 2014 YAMAMOTO Takashi <yamamoto at valinux co jp>
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
21 from ryu.base import app_manager
23 from ryu.controller import ofp_event
24 from ryu.controller.handler import CONFIG_DISPATCHER, MAIN_DISPATCHER,\
26 from ryu.controller.handler import set_ev_cls
29 from . import exception
32 class _SwitchInfo(object):
33 def __init__(self, datapath):
34 self.datapath = datapath
40 class OfctlService(app_manager.RyuApp):
41 def __init__(self, *args, **kwargs):
42 super(OfctlService, self).__init__(*args, **kwargs)
43 self.name = 'ofctl_service'
45 self._observing_events = {}
47 def _observe_msg(self, msg_cls):
48 assert msg_cls is not None
49 ev_cls = ofp_event.ofp_msg_to_ev_cls(msg_cls)
50 self._observing_events.setdefault(ev_cls, 0)
51 if self._observing_events[ev_cls] == 0:
52 self.logger.debug('ofctl: start observing %s', ev_cls)
53 self.register_handler(ev_cls, self._handle_reply)
54 self.observe_event(ev_cls)
55 self._observing_events[ev_cls] += 1
57 def _unobserve_msg(self, msg_cls):
58 assert msg_cls is not None
59 ev_cls = ofp_event.ofp_msg_to_ev_cls(msg_cls)
60 assert self._observing_events[ev_cls] > 0
61 self._observing_events[ev_cls] -= 1
62 if self._observing_events[ev_cls] == 0:
63 self.unregister_handler(ev_cls, self._handle_reply)
64 self.unobserve_event(ev_cls)
65 self.logger.debug('ofctl: stop observing %s', ev_cls)
67 def _cancel(self, info, barrier_xid, exception):
68 xid = info.barriers.pop(barrier_xid)
69 req = info.xids.pop(xid)
71 datapath = msg.datapath
72 parser = datapath.ofproto_parser
73 is_barrier = isinstance(msg, parser.OFPBarrierRequest)
77 if not is_barrier and req.reply_cls is not None:
78 self._unobserve_msg(req.reply_cls)
80 self.logger.error('failed to send message <%s>', req.msg)
81 self.reply_to_request(req, event.Reply(exception=exception))
85 return (ofp_event.ofp_msg_to_ev_cls(type(msg)) ==
86 ofp_event.EventOFPErrorMsg)
88 @set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
89 def _switch_features_handler(self, ev):
90 datapath = ev.msg.datapath
92 assert isinstance(id, numbers.Integral)
93 old_info = self._switches.get(id, None)
94 new_info = _SwitchInfo(datapath=datapath)
95 self.logger.debug('add dpid %s datapath %s new_info %s old_info %s',
96 id, datapath, new_info, old_info)
97 self._switches[id] = new_info
99 old_info.datapath.close()
100 for xid in list(old_info.barriers):
102 old_info, xid, exception.InvalidDatapath(result=id))
104 @set_ev_cls(ofp_event.EventOFPStateChange, DEAD_DISPATCHER)
105 def _handle_dead(self, ev):
106 datapath = ev.datapath
108 self.logger.debug('del dpid %s datapath %s', id, datapath)
112 info = self._switches[id]
115 if info.datapath is datapath:
116 self.logger.debug('forget info %s', info)
117 self._switches.pop(id)
118 for xid in list(info.barriers):
119 self._cancel(info, xid, exception.InvalidDatapath(result=id))
121 @set_ev_cls(event.GetDatapathRequest, MAIN_DISPATCHER)
122 def _handle_get_datapath(self, req):
125 result = [v.datapath for v in self._switches.values()]
127 if req.dpid in self._switches:
128 result = self._switches[req.dpid].datapath
129 self.reply_to_request(req, event.Reply(result=result))
131 @set_ev_cls(event.SendMsgRequest, MAIN_DISPATCHER)
132 def _handle_send_msg(self, req):
134 datapath = msg.datapath
135 parser = datapath.ofproto_parser
136 is_barrier = isinstance(msg, parser.OFPBarrierRequest)
139 si = self._switches[datapath.id]
141 self.logger.error('unknown dpid %s' % (datapath.id,))
142 rep = event.Reply(exception=exception.
143 InvalidDatapath(result=datapath.id))
144 self.reply_to_request(req, rep)
147 def _store_xid(xid, barrier_xid):
148 assert xid not in si.results
149 assert xid not in si.xids
150 assert barrier_xid not in si.barriers
153 si.barriers[barrier_xid] = xid
157 datapath.set_xid(barrier)
158 _store_xid(barrier.xid, barrier.xid)
160 if req.reply_cls is not None:
161 self._observe_msg(req.reply_cls)
162 datapath.set_xid(msg)
163 barrier = datapath.ofproto_parser.OFPBarrierRequest(datapath)
164 datapath.set_xid(barrier)
165 _store_xid(msg.xid, barrier.xid)
166 if not datapath.send_msg(msg):
169 exception.InvalidDatapath(result=datapath.id))
171 if not datapath.send_msg(barrier):
174 exception.InvalidDatapath(result=datapath.id))
176 @set_ev_cls(ofp_event.EventOFPBarrierReply, MAIN_DISPATCHER)
177 def _handle_barrier(self, ev):
179 datapath = msg.datapath
180 parser = datapath.ofproto_parser
182 si = self._switches[datapath.id]
184 self.logger.error('unknown dpid %s', datapath.id)
187 xid = si.barriers.pop(msg.xid)
189 self.logger.error('unknown barrier xid %s', msg.xid)
191 result = si.results.pop(xid)
192 req = si.xids.pop(xid)
193 is_barrier = isinstance(req.msg, parser.OFPBarrierRequest)
194 if req.reply_cls is not None and not is_barrier:
195 self._unobserve_msg(req.reply_cls)
196 if is_barrier and req.reply_cls == parser.OFPBarrierReply:
197 rep = event.Reply(result=ev.msg)
198 elif any(self._is_error(r) for r in result):
199 rep = event.Reply(exception=exception.OFError(result=result))
200 elif req.reply_multi:
201 rep = event.Reply(result=result)
202 elif len(result) == 0:
204 elif len(result) == 1:
205 rep = event.Reply(result=result[0])
207 rep = event.Reply(exception=exception.
208 UnexpectedMultiReply(result=result))
209 self.reply_to_request(req, rep)
211 @set_ev_cls(ofp_event.EventOFPErrorMsg, MAIN_DISPATCHER)
212 def _handle_reply(self, ev):
214 datapath = msg.datapath
216 si = self._switches[datapath.id]
218 self.logger.error('unknown dpid %s', datapath.id)
221 req = si.xids[msg.xid]
223 self.logger.error('unknown error xid %s', msg.xid)
225 if ((not isinstance(ev, ofp_event.EventOFPErrorMsg)) and
226 (req.reply_cls is None or not isinstance(ev.msg, req.reply_cls))):
227 self.logger.error('unexpected reply %s for xid %s', ev, msg.xid)
230 si.results[msg.xid].append(ev.msg)
232 self.logger.error('unknown error xid %s', msg.xid)