1 # Copyright (c) 2014 Rackspace Hosting
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
21 from ovs import jsonrpc
22 from ovs import poller
23 from ovs import reconnect
24 from ovs import stream
25 from ovs import timeval
26 from ovs.db import idl
28 from ryu.base import app_manager
29 from ryu.lib import hub
30 from ryu.services.protocols.ovsdb import event
31 from ryu.services.protocols.ovsdb import model
37 def _uuid_to_row(atom, base):
39 value = base.ref_table.rows.get(atom)
43 if isinstance(value, idl.Row):
44 value = str(value.uuid)
55 for key, value in row._data.items():
56 result[key] = value.to_python(_uuid_to_row)
62 def transact_block(request, connection):
63 """Emulate jsonrpc.Connection.transact_block without blocking eventlet.
65 error = connection.send(request)
71 ovs_poller = poller.Poller()
73 ovs_poller.immediate_wake()
74 error, reply = connection.recv()
76 if error != errno.EAGAIN:
80 reply.id == request.id and
81 reply.type in (jsonrpc.Message.T_REPLY,
82 jsonrpc.Message.T_ERROR)):
86 connection.wait(ovs_poller)
87 connection.recv_wait(ovs_poller)
95 def discover_schemas(connection):
96 # NOTE(jkoelker) currently only the Open_vSwitch schema
98 # TODO(jkoelker) support arbitrary schemas
99 req = jsonrpc.Message.create_request('list_dbs', [])
100 error, reply = transact_block(req, connection)
102 if error or reply.error:
106 for db in reply.result:
107 if db != 'Open_vSwitch':
110 req = jsonrpc.Message.create_request('get_schema', [db])
111 error, reply = transact_block(req, connection)
113 if error or reply.error:
114 # TODO(jkoelker) Error handling
117 schemas.append(reply.result)
122 def discover_system_id(idl):
125 while system_id is None and idl._session.is_connected():
127 openvswitch = idl.tables['Open_vSwitch'].rows
130 row = openvswitch.get(list(openvswitch.keys())[0])
131 system_id = row.external_ids.get('system-id')
136 def _filter_schemas(schemas, schema_tables, exclude_table_columns):
137 """Wrapper method for _filter_schema to filter multiple schemas."""
138 return [_filter_schema(s, schema_tables, exclude_table_columns)
142 def _filter_schema(schema, schema_tables, exclude_table_columns):
143 """Filters a schema to only include the specified tables in the
144 schema_tables parameter. This will also filter out any colums for
145 included tables that reference tables that are not included
146 in the schema_tables parameter
148 :param schema: Schema dict to be filtered
149 :param schema_tables: List of table names to filter on.
150 EX: ['Bridge', 'Controller', 'Interface']
151 NOTE: This list is case sensitive.
152 :return: Schema dict:
153 filtered if the schema_table parameter contains table names,
154 else the original schema dict
158 for tbl_name, tbl_data in schema['tables'].items():
159 if not schema_tables or tbl_name in schema_tables:
162 exclude_columns = exclude_table_columns.get(tbl_name, [])
163 for col_name, col_data in tbl_data['columns'].items():
164 if col_name in exclude_columns:
167 # NOTE(Alan Quillin) Needs to check and remove
168 # and columns that have references to tables that
169 # are not to be configured
170 type_ = col_data.get('type')
172 if type_ and isinstance(type_, dict):
173 key = type_.get('key')
174 if key and isinstance(key, dict):
175 ref_tbl = key.get('refTable')
176 if ref_tbl and isinstance(ref_tbl,
178 if ref_tbl not in schema_tables:
180 value = type_.get('value')
181 if value and isinstance(value, dict):
182 ref_tbl = value.get('refTable')
183 if ref_tbl and isinstance(ref_tbl,
185 if ref_tbl not in schema_tables:
188 columns[col_name] = col_data
190 tbl_data['columns'] = columns
191 tables[tbl_name] = tbl_data
193 schema['tables'] = tables
198 # NOTE(jkoelker) Wrap ovs's Idl to accept an existing session, and
199 # trigger callbacks on changes
201 def __init__(self, session, schema):
202 if not isinstance(schema, idl.SchemaHelper):
203 schema = idl.SchemaHelper(schema_json=schema)
204 schema.register_all()
206 schema = schema.get_idl_schema()
208 # NOTE(jkoelker) event buffer
211 self.tables = schema.tables
212 self.readonly = schema.readonly
214 self._session = session
215 self._monitor_request_id = None
216 self._last_seqno = None
217 self.change_seqno = 0
218 self.uuid = uuid.uuid1()
219 self.state = self.IDL_S_INITIAL
222 self.lock_name = None # Name of lock we need, None if none.
223 self.has_lock = False # Has db server said we have the lock?
224 self.is_lock_contended = False # Has db server said we can't get lock?
225 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
227 # Transaction support.
229 self._outstanding_txns = {}
231 for table in schema.tables.values():
232 for column in table.columns.values():
233 if not hasattr(column, 'alert'):
235 table.need_table = False
239 table.cond_changed = False
243 events = self._events
247 def __process_update(self, table, uuid, old, new):
248 old_row = table.rows.get(uuid)
249 if old_row is not None:
250 old_row = model.Row(dictify(old_row))
251 old_row['_uuid'] = uuid
253 changed = idl.Idl.__process_update(self, table, uuid, old, new)
257 ev = (event.EventRowDelete, (table.name, old_row))
260 new_row = model.Row(dictify(table.rows.get(uuid)))
261 new_row['_uuid'] = uuid
262 ev = (event.EventRowInsert, (table.name, new_row))
265 new_row = model.Row(dictify(table.rows.get(uuid)))
266 new_row['_uuid'] = uuid
268 ev = (event.EventRowUpdate, (table.name, old_row, new_row))
270 self._events.append(ev)
275 class RemoteOvsdb(app_manager.RyuApp):
276 _EVENTS = [event.EventRowUpdate,
277 event.EventRowDelete,
278 event.EventRowInsert,
279 event.EventInterfaceDeleted,
280 event.EventInterfaceInserted,
281 event.EventInterfaceUpdated,
282 event.EventPortDeleted,
283 event.EventPortInserted,
284 event.EventPortUpdated]
287 def factory(cls, sock, address, probe_interval=None, min_backoff=None,
288 max_backoff=None, schema_tables=None,
289 schema_exclude_columns=None, *args, **kwargs):
290 schema_exclude_columns = schema_exclude_columns or {}
291 ovs_stream = stream.Stream(sock, None, None)
292 connection = jsonrpc.Connection(ovs_stream)
293 schemas = discover_schemas(connection)
298 if schema_tables or schema_exclude_columns:
299 schemas = _filter_schemas(schemas, schema_tables,
300 schema_exclude_columns)
302 fsm = reconnect.Reconnect(now())
303 fsm.set_name('%s:%s' % address[:2])
305 fsm.set_passive(True, now())
306 fsm.set_max_tries(-1)
308 if probe_interval is not None:
309 fsm.set_probe_interval(probe_interval)
311 if min_backoff is None:
312 min_backoff = fsm.get_min_backoff()
314 if max_backoff is None:
315 max_backoff = fsm.get_max_backoff()
317 if min_backoff and max_backoff:
318 fsm.set_backoff(min_backoff, max_backoff)
322 session = jsonrpc.Session(fsm, connection)
323 idl = Idl(session, schemas[0])
325 system_id = discover_system_id(idl)
330 name = cls.instance_name(system_id)
331 ovs_stream.name = name
332 connection.name = name
335 kwargs = kwargs.copy()
336 kwargs['socket'] = sock
337 kwargs['address'] = address
339 kwargs['name'] = name
340 kwargs['system_id'] = system_id
342 app_mgr = app_manager.AppManager.get_instance()
344 old_app = app_manager.lookup_service_brick(name)
347 old_events = old_app.events
348 app_mgr.uninstantiate(name)
350 app = app_mgr.instantiate(cls, *args, **kwargs)
353 app.events = old_events
358 def instance_name(cls, system_id):
359 return '%s-%s' % (cls.__name__, system_id)
361 def __init__(self, *args, **kwargs):
362 super(RemoteOvsdb, self).__init__(*args, **kwargs)
363 self.socket = kwargs['socket']
364 self.address = kwargs['address']
365 self._idl = kwargs['idl']
366 self.system_id = kwargs['system_id']
367 self.name = kwargs['name']
368 self._txn_q = collections.deque()
370 def _event_proxy_loop(self):
371 while self.is_active:
372 events = self._idl.events
381 self._submit_event(ev(self.system_id, *args))
385 def _submit_event(self, ev):
386 self.send_event_to_observers(ev)
388 ev_cls_name = 'Event' + ev.table + ev.event_type
389 proxy_ev_cls = getattr(event, ev_cls_name, None)
391 self.send_event_to_observers(proxy_ev_cls(ev))
393 self.logger.exception(
394 'Error submitting specific event for OVSDB %s', self.system_id)
397 while self.is_active:
402 self.logger.exception('Error running IDL for system_id %s' %
408 def _run_thread(self, func, *args, **kwargs):
410 func(*args, **kwargs)
415 def _transactions(self):
419 # NOTE(jkoelker) possibly run multiple transactions per loop?
422 def _transaction(self):
423 req = self._txn_q.popleft()
424 txn = idl.Transaction(self._idl)
426 uuids = req.func(self._idl.tables, txn.insert)
427 status = txn.commit_block()
432 if status in (idl.Transaction.SUCCESS,
433 idl.Transaction.UNCHANGED):
435 if isinstance(uuids, uuid.UUID):
436 insert_uuids[uuids] = txn.get_insert_uuid(uuids)
439 insert_uuids = dict((uuid, txn.get_insert_uuid(uuid))
442 err_msg = txn.get_error()
444 rep = event.EventModifyReply(self.system_id, status, insert_uuids,
446 self.reply_to_request(req, rep)
448 def modify_request_handler(self, ev):
449 self._txn_q.append(ev)
451 def read_request_handler(self, ev, bulk=False):
452 result = ev.func(self._idl.tables)
454 # NOTE(jkoelker) If this was a bulk request, the parent OVSDB app is
455 # responsible for the reply
458 return (self.system_id, result)
460 rep = event.EventReadReply(self.system_id, result)
461 self.reply_to_request(ev, rep)
464 super(RemoteOvsdb, self).start()
465 t = hub.spawn(self._run_thread, self._idl_loop)
466 self.threads.append(t)
468 t = hub.spawn(self._run_thread, self._event_proxy_loop)
469 self.threads.append(t)
472 # NOTE(jkoelker) Stop the idl and event_proxy threads first
473 # letting them finish their current loop.
474 self.is_active = False
475 hub.joinall(self.threads)
478 super(RemoteOvsdb, self).stop()