概述

对程序员来说,最好的文档就是源码。Trove 官方对于Clueter Controller能力的描述文档不多,给的参考例子只涉及到MongoDB集群,参考Set up database clustering。为了深入了解Trove的集群能力,本文档对Trove Cluster Controller的能力进行深入剖析,从API入手,解析如何构建整个后端数据库集群生命周期管理架构。摸清Trove现有能力的同时,梳理代码设计思路,为后续研发做准备。

版本

  • OpenStack Train版本(16 October, 2019)
  • Trove 12.1.0

备注:

在深入剖析Trove源码后,发现Trove在14.0.0后,调整了虚机内数据库服务的部署方式,从原先的直接RPM部署调整为Docker部署,优点就是横向扩展很容易,缺点是为了打通Trove虚机与Docker仓库的网络,导致Trove服务网络结构会比较复杂。Trove更新为docker后,留了一个深坑,就是原先已支持的数据库,现在很多都不支持了,Guest Agent正在重构。目前最新版本支持情况:MySQL 5.7.X, MariaDB 10.4.X. PostgreSQL 12.4 is partially supported。目前已经两个大版本过去了,依然没有新的数据库类型支持,考虑到研发代价,决定使用弃用Docker形式的新版本,使用稳定的老版本。

trove-releasenote-14.0.0

trove-administrator-guide-16.0.0

源生能力

Clueter 相关接口

接口 接口路由 请求方式
index cluster /{tenant_id}/clusters GET
show cluster /{tenant_id}/clusters/{id} GET
create cluster /{tenant_id}/clusters POST
cluster action /{tenant_id}/clusters/{id} POST
show instance in cluster /{tenant_id}/clusters/{cluster_id}/instances/ GET
delete cluster /{tenant_id}/clusters/{id} DELETE

Clueter 支持的集群类型

数据库 集群类型 入口函数(strategy)
Percona XtraDB Galera trove.common.strategies.cluster.experimental.galera_common
MariaDB Galera trove.common.strategies.cluster.experimental.galera_common
Redis Redis Cluster trove.common.strategies.cluster.experimental.redis
MongoDB MongoDB Cluster trove.common.strategies.cluster.experimental.mongodb
Cassandra Cassandra Cluster trove.common.strategies.cluster.experimental.cassandra
Vertica Vertica Cluster trove.common.strategies.cluster.experimental.vertica

代码走读

OpenStack源码分析,需要从组件包管理setup.cfg中的entry_points入手,根据entry_pointstrove-api的配置,确定入口为trove.cmd.api:mainsetup.cfg源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
...
[entry_points]
console_scripts =
trove-api = trove.cmd.api:main
trove-taskmanager = trove.cmd.taskmanager:main
trove-mgmt-taskmanager = trove.cmd.taskmanager:mgmt_main
trove-conductor = trove.cmd.conductor:main
trove-manage = trove.cmd.manage:main
trove-guestagent = trove.cmd.guest:main
trove-fake-mode = trove.cmd.fakemode:main
trove-status = trove.cmd.status:main
...

trove-api调用wsgi模块,启动api服务。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# trove.cmd.api
from oslo_concurrency import processutils
from trove.cmd.common import with_initialize
from trove.common import profile


@with_initialize
def main(CONF):
from trove.common import cfg
from trove.common import notification
from trove.common import wsgi
from trove.instance import models as inst_models

notification.DBaaSAPINotification.register_notify_callback(
inst_models.persist_instance_fault)
cfg.set_api_config_defaults()
profile.setup_profiler('api', CONF.host)
conf_file = CONF.find_file(CONF.api_paste_config)
workers = CONF.trove_api_workers or processutils.get_worker_count()
launcher = wsgi.launch('trove', CONF.bind_port, conf_file,
host=CONF.bind_host, workers=workers)
launcher.wait()

确定trove api接口能力,需要先从wsgi模块关于Router的定义,其中wsgi.Router主要负责对请求进行匹配,未匹配到返回404 HTTPNotFoundbase_wsgi.Routerwsgi.Router的父类,定义了Router的使用方式。相关源码见下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# trove.common.wsgi
...
class Router(base_wsgi.Router):
# Original router did not allow for serialization of the 404 error.
# To fix this the _dispatch was modified to use Fault() objects.
@staticmethod
@webob.dec.wsgify
def _dispatch(req):
"""
Called by self._router after matching the incoming request to a route
and putting the information into req.environ. Either returns 404
or the routed WSGI app's response.
"""

match = req.environ['wsgiorg.routing_args'][1]
if not match:
return Fault(webob.exc.HTTPNotFound())
app = match['controller']
return app
...

# trove.common.base_wsgi
...
class Router(object):

"""
WSGI middleware that maps incoming requests to WSGI apps.
"""

def __init__(self, mapper):
"""
Create a router for the given routes.Mapper.

Each route in `mapper` must specify a 'controller', which is a
WSGI app to call. You'll probably want to specify an 'action' as
well and have your controller be a wsgi.Controller, who will route
the request to the action method.

Examples:
mapper = routes.Mapper()
sc = ServerController()

# Explicit mapping of one route to a controller+action
mapper.connect(None, "/svrlist", controller=sc, action="list")

# Actions are all implicitly defined
mapper.resource("server", "servers", controller=sc)

# Pointing to an arbitrary WSGI app. You can specify the
# {path_info:.*} parameter so the target app can be handed just that
# section of the URL.
mapper.connect(None, "/v1.0/{path_info:.*}", controller=BlogApp())
"""
self.map = mapper
self._router = routes.middleware.RoutesMiddleware(self._dispatch,
self.map)

@webob.dec.wsgify
def __call__(self, req):
"""
Route the incoming request to a controller based on self.map.
If no match, return a 404.
"""
return self._router

@staticmethod
@webob.dec.wsgify
def _dispatch(req):
"""
Called by self._router after matching the incoming request to a route
and putting the information into req.environ. Either returns 404
or the routed WSGI app's response.
"""
match = req.environ['wsgiorg.routing_args'][1]
if not match:
return webob.exc.HTTPNotFound()
app = match['controller']
return app
...

trove api的详细router定义是在trove.common.api:API中,定义了trove api所有接口路由。根据Trove API Rrouter定义的接口,cluster支持增删改查能力,Cluster Controller的路由代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# trove.common.api
...
def _cluster_router(self, mapper):
cluster_resource = ClusterController().create_resource()
mapper.connect("/{tenant_id}/clusters",
controller=cluster_resource,
action="index",
conditions={'method': ['GET']})
mapper.connect("/{tenant_id}/clusters/{id}",
controller=cluster_resource,
action="show",
conditions={'method': ['GET']})
mapper.connect("/{tenant_id}/clusters",
controller=cluster_resource,
action="create",
conditions={'method': ['POST']})
mapper.connect("/{tenant_id}/clusters/{id}",
controller=cluster_resource,
action="action",
conditions={'method': ['POST']})
mapper.connect("/{tenant_id}/clusters/{cluster_id}/instances/"
"{instance_id}",
controller=cluster_resource,
action="show_instance",
conditions={'method': ['GET']})
mapper.connect("/{tenant_id}/clusters/{id}",
controller=cluster_resource,
action="delete",
conditions={'method': ['DELETE']})
...

我们从创建流程入手,进行更深一步的分析。路由中定义的后端响应类为trove.cluster.service:ClusterController,每个路由对应一个具体的方法,关于创建的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# trove.cluster.service
...
from trove.cluster import models

class ClusterController(wsgi.Controller):
...
def create(self, req, body, tenant_id):
LOG.debug(("Creating a Cluster for Tenant '%(tenant_id)s'\n"
"req : '%(req)s'\n\nbody : '%(body)s'\n\n"),
{"tenant_id": tenant_id, "req": req, "body": body})

context = req.environ[wsgi.CONTEXT_KEY]
policy.authorize_on_tenant(context, 'cluster:create')

name = body['cluster']['name']
datastore_args = body['cluster'].get('datastore', {})
datastore, datastore_version = (
datastore_models.get_datastore_version(**datastore_args))

extended_properties = body['cluster'].get('extended_properties', {})

try:
clusters_enabled = (CONF.get(datastore_version.manager)
.get('cluster_support'))
except NoSuchOptError:
clusters_enabled = False

if not clusters_enabled:
raise exception.ClusterDatastoreNotSupported(
datastore=datastore.name,
datastore_version=datastore_version.name)

nodes = body['cluster']['instances']
instances = []
for node in nodes:
flavor_id = utils.get_id_from_href(node['flavorRef'])
volume_size = volume_type = nics = availability_zone = None
modules = None
if 'volume' in node:
volume_size = int(node['volume']['size'])
volume_type = node['volume'].get('type')
if 'nics' in node:
nics = node['nics']
if 'availability_zone' in node:
availability_zone = node['availability_zone']
if 'modules' in node:
modules = node['modules']

instances.append({"flavor_id": flavor_id,
"volume_size": volume_size,
"volume_type": volume_type,
"nics": nics,
"availability_zone": availability_zone,
'region_name': node.get('region_name'),
"modules": modules})

locality = body['cluster'].get('locality')
if locality:
locality_domain = ['affinity', 'anti-affinity']
locality_domain_msg = ("Invalid locality '%s'. "
"Must be one of ['%s']" %
(locality,
"', '".join(locality_domain)))
if locality not in locality_domain:
raise exception.BadRequest(message=locality_domain_msg)

configuration = body['cluster'].get('configuration')

context.notification = notification.DBaaSClusterCreate(context,
request=req)
with StartNotification(context, name=name, datastore=datastore.name,
datastore_version=datastore_version.name):
cluster = models.Cluster.create(context, name, datastore,
datastore_version, instances,
extended_properties,
locality, configuration)
cluster.locality = locality
view = views.load_view(cluster, req=req, load_servers=False)
return wsgi.Result(view.data(), 200)
...

分析ClusterController中create的流程,create做了以下三件事:

  1. 根据配置文件中,指定数据库类型的cluster_support配置,检查该数据库是否支持集群模式。
  2. 拼装集群内instance配置参数。
  3. 调用trove.cluster.models.Cluster.create(),创建数据库集群。

创建实际是在trove.cluster.models:Cluster.create执行的,进一步分析trove.cluster.models中的创建流程,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
# trove.cluster.models
...
@classmethod
def create(cls, context, name, datastore, datastore_version,
instances, extended_properties, locality, configuration):
locality = srv_grp.ServerGroup.build_scheduler_hint(
context, locality, name)
api_strategy = strategy.load_api_strategy(datastore_version.manager)
return api_strategy.cluster_class.create(context, name, datastore,
datastore_version, instances,
extended_properties,
locality, configuration)
...

trove.cluster.models:Cluster.create做了三件事,依次分别是:

  1. 调用novaclient,为cluster创建server group,便于统一管理。
  2. 根据数据库版本,加载指定的api strategy。
  3. 调用api strategy中对应的create方法。

研究strategy究竟做了哪些事情,这里看下load_api_strategy代码,源码如下:

1
2
3
4
5
6
7
8
# trove.common.strategise.cluster.strategy
...
def load_api_strategy(manager):
clazz = CONF.get(manager).get('api_strategy')
LOG.debug("Loading class %s", clazz)
api_strategy = import_class(clazz)
return api_strategy()
...

strategy根据配置文件中,不同数据库配置的api_strategy参数,找到指定的管理代码,从而实现不同的数据库类型统一的管理。这里以galera_common为例,进行后续分析,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# trove.common.strategies.cluster.experimental.galera_common.api
...
@classmethod
def create(cls, context, name, datastore, datastore_version,
instances, extended_properties, locality, configuration):
LOG.debug("Initiating Galera cluster creation.")
ds_conf = CONF.get(datastore_version.manager)
# Check number of instances is at least min_cluster_member_count
if len(instances) < ds_conf.min_cluster_member_count:
raise exception.ClusterNumInstancesNotLargeEnough(
num_instances=ds_conf.min_cluster_member_count)
cls._validate_cluster_instances(context, instances, datastore,
datastore_version)
# Updating Cluster Task
db_info = cluster_models.DBCluster.create(
name=name, tenant_id=context.project_id,
datastore_version_id=datastore_version.id,
task_status=ClusterTasks.BUILDING_INITIAL,
configuration_id=configuration)

cls._create_instances(context, db_info, datastore, datastore_version,
instances, extended_properties, locality,
configuration)

# Calling taskmanager to further proceed for cluster-configuration
task_api.load(context, datastore_version.manager).create_cluster(
db_info.id)

return cls(context, db_info, datastore, datastore_version)

@staticmethod
def _create_instances(context, db_info, datastore, datastore_version,
instances, extended_properties, locality,
configuration_id):
member_config = {"id": db_info.id,
"instance_type": "member"}
name_index = int(time.time())
for instance in instances:
if not instance.get("name"):
instance['name'] = "%s-member-%s" % (db_info.name,
str(name_index))
name_index += 1

return [Instance.create(context,
instance['name'],
instance['flavor_id'],
datastore_version.image_id,
[], [],
datastore, datastore_version,
instance.get('volume_size', None),
None,
availability_zone=instance.get(
'availability_zone', None),
nics=instance.get('nics', None),
configuration_id=configuration_id,
cluster_config=member_config,
volume_type=instance.get(
'volume_type', None),
modules=instance.get('modules'),
locality=locality,
region_name=instance.get('region_name')
)
for instance in instances]

@staticmethod
def _validate_cluster_instances(context, instances, datastore,
datastore_version):
"""Validate the flavor and volume"""
ds_conf = CONF.get(datastore_version.manager)
num_instances = len(instances)

# Checking volumes and get delta for quota check
cluster_models.validate_instance_flavors(
context, instances, ds_conf.volume_support, ds_conf.device_path)

req_volume_size = cluster_models.get_required_volume_size(
instances, ds_conf.volume_support)

cluster_models.assert_homogeneous_cluster(instances)

deltas = {'instances': num_instances, 'volumes': req_volume_size}

# quota check
check_quotas(context.project_id, deltas)

# Checking networks are same for the cluster
cluster_models.validate_instance_nics(context, instances)
...

galera_common.api中,做了如下几个步骤:

  1. 检查集群内的实例数量是否满足配置中要求的集群最小实例数。
  2. 检查集群实例配置。
  3. 更新cluster状态信息。
  4. 创建集群内各实例。
  5. 调用galera_common.taskmanager.create_cluster进行进一步的集群配置。

galera_common.taskmanager.create_cluster源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# trove.common.strategies.cluster.experimental.galera_common.taskmanager
...
def create_cluster(self, context, cluster_id):
LOG.debug("Begin create_cluster for id: %s.", cluster_id)

def _create_cluster():
# Fetch instances by cluster_id against instances table.
db_instances = DBInstance.find_all(cluster_id=cluster_id).all()
instance_ids = [db_instance.id for db_instance in db_instances]

LOG.debug("Waiting for instances to get to cluster-ready status.")
# Wait for cluster members to get to cluster-ready status.
if not self._all_instances_ready(instance_ids, cluster_id):
raise TroveError(_("Instances in cluster did not report "
"ACTIVE"))

LOG.debug("All members ready, proceeding for cluster setup.")
instances = [Instance.load(context, instance_id) for instance_id
in instance_ids]

cluster_ips = [self.get_ip(instance) for instance in instances]
instance_guests = []

# Create replication user and password for synchronizing the
# galera cluster
replication_user = {
"name": self.CLUSTER_REPLICATION_USER,
"password": utils.generate_random_password(),
}

# Galera cluster name must be unique and be shorter than a full
# uuid string so we remove the hyphens and chop it off. It was
# recommended to be 16 chars or less.
# (this is not currently documented on Galera docs)
cluster_name = utils.generate_uuid().replace("-", "")[:16]

LOG.debug("Configuring cluster configuration.")
try:
# Set the admin password for all the instances because the
# password in the my.cnf will be wrong after the joiner
# instances syncs with the donor instance.
admin_password = str(utils.generate_random_password())

bootstrap = True
for instance in instances:
guest = self.get_guest(instance)
instance_guests.append(guest)
guest.reset_admin_password(admin_password)
# render the conf.d/cluster.cnf configuration
cluster_configuration = self._render_cluster_config(
context,
instance,
",".join(cluster_ips),
cluster_name,
replication_user)

# push the cluster config and bootstrap the first instance
guest.install_cluster(replication_user,
cluster_configuration,
bootstrap)
bootstrap = False

LOG.debug("Finalizing cluster configuration.")
for guest in instance_guests:
guest.cluster_complete()
except Exception:
LOG.exception("Error creating cluster.")
self.update_statuses_on_failure(cluster_id)

timeout = Timeout(CONF.cluster_usage_timeout)
try:
_create_cluster()
self.reset_task()
except Timeout as t:
if t is not timeout:
raise # not my timeout
LOG.exception("Timeout for building cluster.")
self.update_statuses_on_failure(cluster_id)
except TroveError:
LOG.exception("Error creating cluster %s.", cluster_id)
self.update_statuses_on_failure(cluster_id)
finally:
timeout.cancel()

LOG.debug("End create_cluster for id: %s.", cluster_id)
...

taskmanager依次做了以下几件事情:

  1. 根据cluster id,定位到集群内所有的instance。
  2. 检测所有instance状态是否为ACTIVE。
  3. 生成集群配置信息。
  4. 依次调用各个instance内的guest agent,更新集群配置信息,并启动第一个节点。
  5. 更新集群状态。

到这里,所有流程就走完了。为了更深一步了解trove,这里我们继续深入实例,研究instance内guest agent操作,具体干了哪些事情,首先从guest agent启动代码入手,看guest agent如何响应rpc调用,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# trove.cmd.guest
import sys

from oslo_config import cfg as openstack_cfg
from oslo_log import log as logging
from oslo_service import service as openstack_service

from trove.common import cfg
from trove.common import debug_utils
from trove.common.i18n import _
from trove.guestagent import api as guest_api

CONF = cfg.CONF
# The guest_id opt definition must match the one in common/cfg.py
CONF.register_opts([openstack_cfg.StrOpt('guest_id', default=None,
help="ID of the Guest Instance."),
openstack_cfg.StrOpt('instance_rpc_encr_key',
help=('Key (OpenSSL aes_cbc) for '
'instance RPC encryption.'))])


def main():
cfg.parse_args(sys.argv)
logging.setup(CONF, None)
debug_utils.setup()

from trove.guestagent import dbaas
manager = dbaas.datastore_registry().get(CONF.datastore_manager)
if not manager:
msg = (_("Manager class not registered for datastore manager %s") %
CONF.datastore_manager)
raise RuntimeError(msg)

if not CONF.guest_id:
msg = (_("The guest_id parameter is not set. guest_info.conf "
"was not injected into the guest or not read by guestagent"))
raise RuntimeError(msg)

# BUG(1650518): Cleanup in the Pike release
# make it fatal if CONF.instance_rpc_encr_key is None

# rpc module must be loaded after decision about thread monkeypatching
# because if thread module is not monkeypatched we can't use eventlet
# executor from oslo_messaging library.
from trove import rpc
rpc.init(CONF)

from trove.common.rpc import service as rpc_service
server = rpc_service.RpcService(
key=CONF.instance_rpc_encr_key,
topic="guestagent.%s" % CONF.guest_id,
manager=manager, host=CONF.guest_id,
rpc_api_version=guest_api.API.API_LATEST_VERSION)

launcher = openstack_service.launch(CONF, server, restart_method='mutate')
launcher.wait()

guest agent启动流程中,做了如下几件事:

  1. trove.guestagent.dbaas中到manager入口函数,找到符合的datastore_manager。

  2. 将datastore_manager注册进rpcservice内,远程调用时接收到指定的rpc操作后,即可触发manager对应的方法。

这里我们看下trove.guestagent.dbaas注册了哪些入口,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# trove.guestagent.dbaas
"""
Handles all processes within the Guest VM, considering it as a Platform

The :py:class:`GuestManager` class is a :py:class:`nova.manager.Manager` that
handles RPC calls relating to Platform specific operations.

**Related Flags**

"""

from itertools import chain
import os

from oslo_log import log as logging

from trove.common import cfg
from trove.common.i18n import _
from trove.common import utils


LOG = logging.getLogger(__name__)
defaults = {
'mysql':
'trove.guestagent.datastore.mysql.manager.Manager',
'percona':
'trove.guestagent.datastore.experimental.percona.manager.Manager',
'pxc':
'trove.guestagent.datastore.experimental.pxc.manager.Manager',
'redis':
'trove.guestagent.datastore.experimental.redis.manager.Manager',
'cassandra':
'trove.guestagent.datastore.experimental.cassandra.manager.Manager',
'couchbase':
'trove.guestagent.datastore.experimental.couchbase.manager.Manager',
'mongodb':
'trove.guestagent.datastore.experimental.mongodb.manager.Manager',
'postgresql':
'trove.guestagent.datastore.experimental.postgresql.manager.Manager',
'couchdb':
'trove.guestagent.datastore.experimental.couchdb.manager.Manager',
'vertica':
'trove.guestagent.datastore.experimental.vertica.manager.Manager',
'db2':
'trove.guestagent.datastore.experimental.db2.manager.Manager',
'mariadb':
'trove.guestagent.datastore.experimental.mariadb.manager.Manager'
}
...
def datastore_registry():
return dict(chain(defaults.items(),
get_custom_managers().items()))
...

dbaas内我们可以到所有数据库的入口函数,即可根据rpc的消息类型,找到对应的入口,对应的影响函数了。回到taskmanager发消息的流程,taskmanager调用guest内的install_cluster方法,我们看下install_cluster的方法,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# trove.common.strategies.cluster.experimental.galera_common.guestagent
...
class GaleraCommonGuestAgentAPI(guest_api.API):
"""Cluster Specific Datastore Guest API

**** VERSION CONTROLLED API ****

The methods in this class are subject to version control as
coordinated by guestagent/api.py. Whenever a change is made to
any API method in this class, add a version number and comment
to the top of guestagent/api.py and use the version number as
appropriate in this file
"""

def install_cluster(self, replication_user, cluster_configuration,
bootstrap):
"""Install the cluster."""
LOG.debug("Installing Galera cluster.")
version = guest_api.API.API_BASE_VERSION

self._call("install_cluster", CONF.cluster_usage_timeout,
version=version,
replication_user=replication_user,
cluster_configuration=cluster_configuration,
bootstrap=bootstrap)
...

向guest发送了一个同步调用的rpc请求(_call为同步调用,_cast为异步调用),触发install_cluster方法,这里我们从manager入手,看下install_cluster具体干了哪些事情,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# trove.guestagent.datastore.galera_common.manager
...
def install_cluster(self, context, replication_user, cluster_configuration,
bootstrap):
app = self.mysql_app(self.mysql_app_status.get())
try:
app.install_cluster(
replication_user, cluster_configuration, bootstrap)
LOG.debug("install_cluster call has finished.")
except Exception:
LOG.exception('Cluster installation failed.')
app.status.set_status(
rd_instance.ServiceStatuses.FAILED)
raise
...

这里,实际上就调用了service内对应的install_cluster方法,做了些异常判断,进入service内,看下具体干了那些事,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# trove.guestagent.datastore.galera_common.service
...
class GaleraApp(service.BaseMySqlApp):

def __init__(self, status, local_sql_client, keep_alive_connection_cls):
super(GaleraApp, self).__init__(status, local_sql_client,
keep_alive_connection_cls)

def install_cluster(self, replication_user, cluster_configuration,
bootstrap=False):
LOG.info("Installing cluster configuration.")
self._grant_cluster_replication_privilege(replication_user)
self.stop_db()
self.write_cluster_configuration_overrides(cluster_configuration)
self.wipe_ib_logfiles()
LOG.debug("bootstrap the instance? : %s", bootstrap)
# Have to wait to sync up the joiner instances with the donor instance.
if bootstrap:
self._bootstrap_cluster(timeout=CONF.restore_usage_timeout)
else:
self.start_mysql(timeout=CONF.restore_usage_timeout)

def _grant_cluster_replication_privilege(self, replication_user):
LOG.info("Granting Replication Slave privilege.")
with self.local_sql_client(self.get_engine()) as client:
perms = ['REPLICATION CLIENT', 'RELOAD', 'LOCK TABLES']
g = sql_query.Grant(permissions=perms,
user=replication_user['name'],
clear=replication_user['password'])
t = text(str(g))
client.execute(t)

def write_cluster_configuration_overrides(self, cluster_configuration):
self.configuration_manager.apply_system_override(
cluster_configuration, 'cluster')

def _bootstrap_cluster(self, timeout=120):
LOG.info("Bootstraping cluster.")
try:
utils.execute_with_timeout(
self.mysql_service['cmd_bootstrap_galera_cluster'],
shell=True, timeout=timeout)
except KeyError:
LOG.exception("Error bootstrapping cluster.")
raise RuntimeError(_("Service is not discovered."))
...

这里就是各实例内的具体操作流程了,流程如下:

  1. 对集群内各实例赋权。
  2. 停止数据库服务。
  3. 更新MySQL配置文件。
  4. 删除ib_logfiles。
  5. 首节点为启动集群方式启动,其余节点直接启动mysql即可。

到这里,MySQL集群创建流程即全部解析完成。

参考文档