# trove.cmd.api from oslo_concurrency import processutils from trove.cmd.common import with_initialize from trove.common import profile
@with_initialize defmain(CONF): from trove.common import cfg from trove.common import notification from trove.common import wsgi from trove.instance import models as inst_models
# trove.common.wsgi ... classRouter(base_wsgi.Router): # Original router did not allow for serialization of the 404 error. # To fix this the _dispatch was modified to use Fault() objects. @staticmethod @webob.dec.wsgify def_dispatch(req): """ Called by self._router after matching the incoming request to a route and putting the information into req.environ. Either returns 404 or the routed WSGI app's response. """
""" WSGI middleware that maps incoming requests to WSGI apps. """
def__init__(self, mapper): """ Create a router for the given routes.Mapper. Each route in `mapper` must specify a 'controller', which is a WSGI app to call. You'll probably want to specify an 'action' as well and have your controller be a wsgi.Controller, who will route the request to the action method. Examples: mapper = routes.Mapper() sc = ServerController() # Explicit mapping of one route to a controller+action mapper.connect(None, "/svrlist", controller=sc, action="list") # Actions are all implicitly defined mapper.resource("server", "servers", controller=sc) # Pointing to an arbitrary WSGI app. You can specify the # {path_info:.*} parameter so the target app can be handed just that # section of the URL. mapper.connect(None, "/v1.0/{path_info:.*}", controller=BlogApp()) """ self.map = mapper self._router = routes.middleware.RoutesMiddleware(self._dispatch, self.map)
@webob.dec.wsgify def__call__(self, req): """ Route the incoming request to a controller based on self.map. If no match, return a 404. """ return self._router
@staticmethod @webob.dec.wsgify def_dispatch(req): """ Called by self._router after matching the incoming request to a route and putting the information into req.environ. Either returns 404 or the routed WSGI app's response. """ match = req.environ['wsgiorg.routing_args'][1] ifnot match: return webob.exc.HTTPNotFound() app = match['controller'] return app ...
trove api的详细router定义是在trove.common.api:API中,定义了trove api所有接口路由。根据Trove API Rrouter定义的接口,cluster支持增删改查能力,Cluster Controller的路由代码如下:
@staticmethod def_validate_cluster_instances(context, instances, datastore, datastore_version): """Validate the flavor and volume""" ds_conf = CONF.get(datastore_version.manager) num_instances = len(instances)
# Checking volumes and get delta for quota check cluster_models.validate_instance_flavors( context, instances, ds_conf.volume_support, ds_conf.device_path)
def_create_cluster(): # Fetch instances by cluster_id against instances table. db_instances = DBInstance.find_all(cluster_id=cluster_id).all() instance_ids = [db_instance.id for db_instance in db_instances]
LOG.debug("Waiting for instances to get to cluster-ready status.") # Wait for cluster members to get to cluster-ready status. ifnot self._all_instances_ready(instance_ids, cluster_id): raise TroveError(_("Instances in cluster did not report " "ACTIVE"))
LOG.debug("All members ready, proceeding for cluster setup.") instances = [Instance.load(context, instance_id) for instance_id in instance_ids]
cluster_ips = [self.get_ip(instance) for instance in instances] instance_guests = []
# Create replication user and password for synchronizing the # galera cluster replication_user = { "name": self.CLUSTER_REPLICATION_USER, "password": utils.generate_random_password(), }
# Galera cluster name must be unique and be shorter than a full # uuid string so we remove the hyphens and chop it off. It was # recommended to be 16 chars or less. # (this is not currently documented on Galera docs) cluster_name = utils.generate_uuid().replace("-", "")[:16]
LOG.debug("Configuring cluster configuration.") try: # Set the admin password for all the instances because the # password in the my.cnf will be wrong after the joiner # instances syncs with the donor instance. admin_password = str(utils.generate_random_password())
bootstrap = True for instance in instances: guest = self.get_guest(instance) instance_guests.append(guest) guest.reset_admin_password(admin_password) # render the conf.d/cluster.cnf configuration cluster_configuration = self._render_cluster_config( context, instance, ",".join(cluster_ips), cluster_name, replication_user)
# push the cluster config and bootstrap the first instance guest.install_cluster(replication_user, cluster_configuration, bootstrap) bootstrap = False
LOG.debug("Finalizing cluster configuration.") for guest in instance_guests: guest.cluster_complete() except Exception: LOG.exception("Error creating cluster.") self.update_statuses_on_failure(cluster_id)
timeout = Timeout(CONF.cluster_usage_timeout) try: _create_cluster() self.reset_task() except Timeout as t: if t isnot timeout: raise# not my timeout LOG.exception("Timeout for building cluster.") self.update_statuses_on_failure(cluster_id) except TroveError: LOG.exception("Error creating cluster %s.", cluster_id) self.update_statuses_on_failure(cluster_id) finally: timeout.cancel()
LOG.debug("End create_cluster for id: %s.", cluster_id) ...
from oslo_config import cfg as openstack_cfg from oslo_log import log as logging from oslo_service import service as openstack_service
from trove.common import cfg from trove.common import debug_utils from trove.common.i18n import _ from trove.guestagent import api as guest_api
CONF = cfg.CONF # The guest_id opt definition must match the one in common/cfg.py CONF.register_opts([openstack_cfg.StrOpt('guest_id', default=None, help="ID of the Guest Instance."), openstack_cfg.StrOpt('instance_rpc_encr_key', help=('Key (OpenSSL aes_cbc) for ' 'instance RPC encryption.'))])
from trove.guestagent import dbaas manager = dbaas.datastore_registry().get(CONF.datastore_manager) ifnot manager: msg = (_("Manager class not registered for datastore manager %s") % CONF.datastore_manager) raise RuntimeError(msg)
ifnot CONF.guest_id: msg = (_("The guest_id parameter is not set. guest_info.conf " "was not injected into the guest or not read by guestagent")) raise RuntimeError(msg)
# BUG(1650518): Cleanup in the Pike release # make it fatal if CONF.instance_rpc_encr_key is None
# rpc module must be loaded after decision about thread monkeypatching # because if thread module is not monkeypatched we can't use eventlet # executor from oslo_messaging library. from trove import rpc rpc.init(CONF)
from trove.common.rpc import service as rpc_service server = rpc_service.RpcService( key=CONF.instance_rpc_encr_key, topic="guestagent.%s" % CONF.guest_id, manager=manager, host=CONF.guest_id, rpc_api_version=guest_api.API.API_LATEST_VERSION)
# trove.guestagent.dbaas """ Handles all processes within the Guest VM, considering it as a Platform The :py:class:`GuestManager` class is a :py:class:`nova.manager.Manager` that handles RPC calls relating to Platform specific operations. **Related Flags** """
from itertools import chain import os
from oslo_log import log as logging
from trove.common import cfg from trove.common.i18n import _ from trove.common import utils
# trove.common.strategies.cluster.experimental.galera_common.guestagent ... classGaleraCommonGuestAgentAPI(guest_api.API): """Cluster Specific Datastore Guest API **** VERSION CONTROLLED API **** The methods in this class are subject to version control as coordinated by guestagent/api.py. Whenever a change is made to any API method in this class, add a version number and comment to the top of guestagent/api.py and use the version number as appropriate in this file """
definstall_cluster(self, replication_user, cluster_configuration, bootstrap): """Install the cluster.""" LOG.debug("Installing Galera cluster.") version = guest_api.API.API_BASE_VERSION
definstall_cluster(self, replication_user, cluster_configuration, bootstrap=False): LOG.info("Installing cluster configuration.") self._grant_cluster_replication_privilege(replication_user) self.stop_db() self.write_cluster_configuration_overrides(cluster_configuration) self.wipe_ib_logfiles() LOG.debug("bootstrap the instance? : %s", bootstrap) # Have to wait to sync up the joiner instances with the donor instance. if bootstrap: self._bootstrap_cluster(timeout=CONF.restore_usage_timeout) else: self.start_mysql(timeout=CONF.restore_usage_timeout)
def_grant_cluster_replication_privilege(self, replication_user): LOG.info("Granting Replication Slave privilege.") with self.local_sql_client(self.get_engine()) as client: perms = ['REPLICATION CLIENT', 'RELOAD', 'LOCK TABLES'] g = sql_query.Grant(permissions=perms, user=replication_user['name'], clear=replication_user['password']) t = text(str(g)) client.execute(t)