backing up
[vsorcdistro/.git] / ryu / build / lib.linux-armv7l-2.7 / ryu / services / protocols / ovsdb / client.py
1 # Copyright (c) 2014 Rackspace Hosting
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #    http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12 # implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 import collections
17 import errno
18 import six
19 import uuid
20
21 from ovs import jsonrpc
22 from ovs import poller
23 from ovs import reconnect
24 from ovs import stream
25 from ovs import timeval
26 from ovs.db import idl
27
28 from ryu.base import app_manager
29 from ryu.lib import hub
30 from ryu.services.protocols.ovsdb import event
31 from ryu.services.protocols.ovsdb import model
32
33
34 now = timeval.msec
35
36
37 def _uuid_to_row(atom, base):
38     if base.ref_table:
39         value = base.ref_table.rows.get(atom)
40     else:
41         value = atom
42
43     if isinstance(value, idl.Row):
44         value = str(value.uuid)
45
46     return value
47
48
49 def dictify(row):
50     if row is None:
51         return
52
53     result = {}
54
55     for key, value in row._data.items():
56         result[key] = value.to_python(_uuid_to_row)
57         hub.sleep(0)
58
59     return result
60
61
62 def transact_block(request, connection):
63     """Emulate jsonrpc.Connection.transact_block without blocking eventlet.
64     """
65     error = connection.send(request)
66     reply = None
67
68     if error:
69         return error, reply
70
71     ovs_poller = poller.Poller()
72     while not error:
73         ovs_poller.immediate_wake()
74         error, reply = connection.recv()
75
76         if error != errno.EAGAIN:
77             break
78
79         if (reply and
80             reply.id == request.id and
81             reply.type in (jsonrpc.Message.T_REPLY,
82                            jsonrpc.Message.T_ERROR)):
83             break
84
85         connection.run()
86         connection.wait(ovs_poller)
87         connection.recv_wait(ovs_poller)
88         ovs_poller.block()
89
90         hub.sleep(0)
91
92     return error, reply
93
94
95 def discover_schemas(connection):
96     # NOTE(jkoelker) currently only the Open_vSwitch schema
97     #                is supported.
98     # TODO(jkoelker) support arbitrary schemas
99     req = jsonrpc.Message.create_request('list_dbs', [])
100     error, reply = transact_block(req, connection)
101
102     if error or reply.error:
103         return
104
105     schemas = []
106     for db in reply.result:
107         if db != 'Open_vSwitch':
108             continue
109
110         req = jsonrpc.Message.create_request('get_schema', [db])
111         error, reply = transact_block(req, connection)
112
113         if error or reply.error:
114             # TODO(jkoelker) Error handling
115             continue
116
117         schemas.append(reply.result)
118
119     return schemas
120
121
122 def discover_system_id(idl):
123     system_id = None
124
125     while system_id is None and idl._session.is_connected():
126         idl.run()
127         openvswitch = idl.tables['Open_vSwitch'].rows
128
129         if openvswitch:
130             row = openvswitch.get(list(openvswitch.keys())[0])
131             system_id = row.external_ids.get('system-id')
132
133     return system_id
134
135
136 def _filter_schemas(schemas, schema_tables, exclude_table_columns):
137     """Wrapper method for _filter_schema to filter multiple schemas."""
138     return [_filter_schema(s, schema_tables, exclude_table_columns)
139             for s in schemas]
140
141
142 def _filter_schema(schema, schema_tables, exclude_table_columns):
143     """Filters a schema to only include the specified tables in the
144        schema_tables parameter.  This will also filter out any colums for
145        included tables that reference tables that are not included
146        in the schema_tables parameter
147
148     :param schema: Schema dict to be filtered
149     :param schema_tables: List of table names to filter on.
150                           EX: ['Bridge', 'Controller', 'Interface']
151                           NOTE: This list is case sensitive.
152     :return: Schema dict:
153                 filtered if the schema_table parameter contains table names,
154                 else the original schema dict
155     """
156
157     tables = {}
158     for tbl_name, tbl_data in schema['tables'].items():
159         if not schema_tables or tbl_name in schema_tables:
160             columns = {}
161
162             exclude_columns = exclude_table_columns.get(tbl_name, [])
163             for col_name, col_data in tbl_data['columns'].items():
164                 if col_name in exclude_columns:
165                     continue
166
167                 # NOTE(Alan Quillin) Needs to check and remove
168                 # and columns that have references to tables that
169                 # are not to be configured
170                 type_ = col_data.get('type')
171                 if type_:
172                     if type_ and isinstance(type_, dict):
173                         key = type_.get('key')
174                         if key and isinstance(key, dict):
175                             ref_tbl = key.get('refTable')
176                             if ref_tbl and isinstance(ref_tbl,
177                                                       six.string_types):
178                                 if ref_tbl not in schema_tables:
179                                     continue
180                         value = type_.get('value')
181                         if value and isinstance(value, dict):
182                             ref_tbl = value.get('refTable')
183                             if ref_tbl and isinstance(ref_tbl,
184                                                       six.string_types):
185                                 if ref_tbl not in schema_tables:
186                                     continue
187
188                 columns[col_name] = col_data
189
190             tbl_data['columns'] = columns
191             tables[tbl_name] = tbl_data
192
193     schema['tables'] = tables
194
195     return schema
196
197
198 # NOTE(jkoelker) Wrap ovs's Idl to accept an existing session, and
199 #                trigger callbacks on changes
200 class Idl(idl.Idl):
201     def __init__(self, session, schema):
202         if not isinstance(schema, idl.SchemaHelper):
203             schema = idl.SchemaHelper(schema_json=schema)
204             schema.register_all()
205
206         schema = schema.get_idl_schema()
207
208         # NOTE(jkoelker) event buffer
209         self._events = []
210
211         self.tables = schema.tables
212         self.readonly = schema.readonly
213         self._db = schema
214         self._session = session
215         self._monitor_request_id = None
216         self._last_seqno = None
217         self.change_seqno = 0
218         self.uuid = uuid.uuid1()
219         self.state = self.IDL_S_INITIAL
220
221         # Database locking.
222         self.lock_name = None          # Name of lock we need, None if none.
223         self.has_lock = False          # Has db server said we have the lock?
224         self.is_lock_contended = False  # Has db server said we can't get lock?
225         self._lock_request_id = None   # JSON-RPC ID of in-flight lock request.
226
227         # Transaction support.
228         self.txn = None
229         self._outstanding_txns = {}
230
231         for table in schema.tables.values():
232             for column in table.columns.values():
233                 if not hasattr(column, 'alert'):
234                     column.alert = True
235             table.need_table = False
236             table.rows = {}
237             table.idl = self
238             table.condition = []
239             table.cond_changed = False
240
241     @property
242     def events(self):
243         events = self._events
244         self._events = []
245         return events
246
247     def __process_update(self, table, uuid, old, new):
248         old_row = table.rows.get(uuid)
249         if old_row is not None:
250             old_row = model.Row(dictify(old_row))
251             old_row['_uuid'] = uuid
252
253         changed = idl.Idl.__process_update(self, table, uuid, old, new)
254
255         if changed:
256             if not new:
257                 ev = (event.EventRowDelete, (table.name, old_row))
258
259             elif not old:
260                 new_row = model.Row(dictify(table.rows.get(uuid)))
261                 new_row['_uuid'] = uuid
262                 ev = (event.EventRowInsert, (table.name, new_row))
263
264             else:
265                 new_row = model.Row(dictify(table.rows.get(uuid)))
266                 new_row['_uuid'] = uuid
267
268                 ev = (event.EventRowUpdate, (table.name, old_row, new_row))
269
270             self._events.append(ev)
271
272         return changed
273
274
275 class RemoteOvsdb(app_manager.RyuApp):
276     _EVENTS = [event.EventRowUpdate,
277                event.EventRowDelete,
278                event.EventRowInsert,
279                event.EventInterfaceDeleted,
280                event.EventInterfaceInserted,
281                event.EventInterfaceUpdated,
282                event.EventPortDeleted,
283                event.EventPortInserted,
284                event.EventPortUpdated]
285
286     @classmethod
287     def factory(cls, sock, address, probe_interval=None, min_backoff=None,
288                 max_backoff=None, schema_tables=None,
289                 schema_exclude_columns=None, *args, **kwargs):
290         schema_exclude_columns = schema_exclude_columns or {}
291         ovs_stream = stream.Stream(sock, None, None)
292         connection = jsonrpc.Connection(ovs_stream)
293         schemas = discover_schemas(connection)
294
295         if not schemas:
296             return
297
298         if schema_tables or schema_exclude_columns:
299             schemas = _filter_schemas(schemas, schema_tables,
300                                       schema_exclude_columns)
301
302         fsm = reconnect.Reconnect(now())
303         fsm.set_name('%s:%s' % address[:2])
304         fsm.enable(now())
305         fsm.set_passive(True, now())
306         fsm.set_max_tries(-1)
307
308         if probe_interval is not None:
309             fsm.set_probe_interval(probe_interval)
310
311         if min_backoff is None:
312             min_backoff = fsm.get_min_backoff()
313
314         if max_backoff is None:
315             max_backoff = fsm.get_max_backoff()
316
317         if min_backoff and max_backoff:
318             fsm.set_backoff(min_backoff, max_backoff)
319
320         fsm.connected(now())
321
322         session = jsonrpc.Session(fsm, connection)
323         idl = Idl(session, schemas[0])
324
325         system_id = discover_system_id(idl)
326
327         if not system_id:
328             return None
329
330         name = cls.instance_name(system_id)
331         ovs_stream.name = name
332         connection.name = name
333         fsm.set_name(name)
334
335         kwargs = kwargs.copy()
336         kwargs['socket'] = sock
337         kwargs['address'] = address
338         kwargs['idl'] = idl
339         kwargs['name'] = name
340         kwargs['system_id'] = system_id
341
342         app_mgr = app_manager.AppManager.get_instance()
343
344         old_app = app_manager.lookup_service_brick(name)
345         old_events = None
346         if old_app:
347             old_events = old_app.events
348             app_mgr.uninstantiate(name)
349
350         app = app_mgr.instantiate(cls, *args, **kwargs)
351
352         if old_events:
353             app.events = old_events
354
355         return app
356
357     @classmethod
358     def instance_name(cls, system_id):
359         return '%s-%s' % (cls.__name__, system_id)
360
361     def __init__(self, *args, **kwargs):
362         super(RemoteOvsdb, self).__init__(*args, **kwargs)
363         self.socket = kwargs['socket']
364         self.address = kwargs['address']
365         self._idl = kwargs['idl']
366         self.system_id = kwargs['system_id']
367         self.name = kwargs['name']
368         self._txn_q = collections.deque()
369
370     def _event_proxy_loop(self):
371         while self.is_active:
372             events = self._idl.events
373
374             if not events:
375                 hub.sleep(0.1)
376                 continue
377
378             for e in events:
379                 ev = e[0]
380                 args = e[1]
381                 self._submit_event(ev(self.system_id, *args))
382
383             hub.sleep(0)
384
385     def _submit_event(self, ev):
386         self.send_event_to_observers(ev)
387         try:
388             ev_cls_name = 'Event' + ev.table + ev.event_type
389             proxy_ev_cls = getattr(event, ev_cls_name, None)
390             if proxy_ev_cls:
391                 self.send_event_to_observers(proxy_ev_cls(ev))
392         except Exception:
393             self.logger.exception(
394                 'Error submitting specific event for OVSDB %s', self.system_id)
395
396     def _idl_loop(self):
397         while self.is_active:
398             try:
399                 self._idl.run()
400                 self._transactions()
401             except Exception:
402                 self.logger.exception('Error running IDL for system_id %s' %
403                                       self.system_id)
404                 raise
405
406             hub.sleep(0)
407
408     def _run_thread(self, func, *args, **kwargs):
409         try:
410             func(*args, **kwargs)
411
412         except:
413             self.stop()
414
415     def _transactions(self):
416         if not self._txn_q:
417             return
418
419         # NOTE(jkoelker) possibly run multiple transactions per loop?
420         self._transaction()
421
422     def _transaction(self):
423         req = self._txn_q.popleft()
424         txn = idl.Transaction(self._idl)
425
426         uuids = req.func(self._idl.tables, txn.insert)
427         status = txn.commit_block()
428
429         insert_uuids = {}
430         err_msg = None
431
432         if status in (idl.Transaction.SUCCESS,
433                       idl.Transaction.UNCHANGED):
434             if uuids:
435                 if isinstance(uuids, uuid.UUID):
436                     insert_uuids[uuids] = txn.get_insert_uuid(uuids)
437
438                 else:
439                     insert_uuids = dict((uuid, txn.get_insert_uuid(uuid))
440                                         for uuid in uuids)
441         else:
442             err_msg = txn.get_error()
443
444         rep = event.EventModifyReply(self.system_id, status, insert_uuids,
445                                      err_msg)
446         self.reply_to_request(req, rep)
447
448     def modify_request_handler(self, ev):
449         self._txn_q.append(ev)
450
451     def read_request_handler(self, ev, bulk=False):
452         result = ev.func(self._idl.tables)
453
454         # NOTE(jkoelker) If this was a bulk request, the parent OVSDB app is
455         #                responsible for the reply
456
457         if bulk:
458             return (self.system_id, result)
459
460         rep = event.EventReadReply(self.system_id, result)
461         self.reply_to_request(ev, rep)
462
463     def start(self):
464         super(RemoteOvsdb, self).start()
465         t = hub.spawn(self._run_thread, self._idl_loop)
466         self.threads.append(t)
467
468         t = hub.spawn(self._run_thread, self._event_proxy_loop)
469         self.threads.append(t)
470
471     def stop(self):
472         # NOTE(jkoelker) Stop the idl and event_proxy threads first
473         #                letting them finish their current loop.
474         self.is_active = False
475         hub.joinall(self.threads)
476
477         self._idl.close()
478         super(RemoteOvsdb, self).stop()