Keystone 是 OpenStack 的核心组件之一,作为身份认证服务(Identity Service),它为整个 OpenStack 生态系统提供统一的身份验证、授权和服务目录管理。
三大核心功能:
- 身份认证(Authentication):验证用户身份(如用户名 / 密码、令牌等)
- 授权(Authorization):确定用户是否有权限执行特定操作
- 服务目录(Service Catalog):管理 OpenStack 各服务的访问端点(Endpoint)
1、核心概念
1.1 User(用户)
代表一个使用 OpenStack 服务的个体、系统或服务。它拥有登录凭证(密码、API Key 等)。
1.2 Project / Tenant(项目 / 租户)
Project 用于将 OpenStack 的资源(计算、存储和网络)进行分组和隔离。
根据 OpenStack 服务的对象不同,Project 可以是一个客户(公有云,也叫租户)、部门或者项目组(私有云)。
注意:
- 资源的所有权是属于 Project 的,而不是 User。
- 每个 User(包括 admin)必须挂在 Project 里才能访问该 Project 的资源。 一个User可以属于多个 Project。
- admin 相当于 root 用户,具有最高权限
资源隔离的单元,用户必须属于某个项目才能操作资源(Tenant 是旧称,现在多称为 Project)
1.3 Role(角色)
权限的集合。它定义了一组用户可以执行的操作权限(如 member, admin, network-admin)。角色本身没有权限,其权限由各个 OpenStack 服务(Nova, Neutron等)在策略文件(policy.json)中定义。
示例:- { //用户需具有 admin角色 //
- "context_is_admin": "role:admin",
- //有管理员权限或请求用户的project_id和资源的project_id匹配 //
- "admin_or_owner": "is_admin:True or project_id:%(project_id)s",
- //检查用户上下文是否具有管理员权限。//
- "admin_api": "is_admin:True",
- //用户需具有 cloud_admin 角色 //
- "cloud_admin": "role:cloud_admin",
- // 引用规则admin_or_owner//
- "os_compute_api:servers:start": "rule:admin_or_owner",
- // 允许任何用户查看服务器元数据(metadata)
- "os_compute_api:servers:show_server_metadata": ""
- }
复制代码 1.4 Token(令牌)
一个加密字符串,是访问资源的凭证。用户认证后,Keystone 会颁发一个 Token。在后续调用其他服务 API 时,必须出示此 Token 以验证身份和权限。Token 有有效期。
1.5 Service(服务)
代表一个 OpenStack 服务,如 nova, neutron, cinder 等。在 Keystone 中注册
1.6 Endpoint(端点)
服务的访问地址(URL),分为 public(公网)、internal(内网)、admin(管理)三种类型一个服务对外暴露的可访问地址(URL)。
1.7 Catalog(服务目录)
所有 Service 和其 Endpoint 的列表。它在用户认证成功后随 Token 一起返回,告诉用户哪里可以访问所需的服务。
1.8 Domain(域)
项目、用户和组的集合。用于实现更高级别的隔离和管理隔离。一个域可以被认为是一个更大的组织或部门,其下包含多个项目和用户。默认有一个 Default 域。
1.9 Group(组)
用户的集合。可以给一个组分配角色,那么组内的所有用户都会继承这个角色在项目中的权限,简化了用户权限管理。
2、工作原理
sequenceDiagram actor User participant H as Horizon/CLI participant K as Keystone participant S as Other Service
(e.g., Nova) User->>H: 1. 提供用户名/密码
和项目名称 H->>K: 2. 认证请求
(用户名、密码、项目) K->>K: 3. 验证凭证
计算权限
生成Token
组装服务目录 K-->>H: 4. 返回认证响应
(Token & Service Catalog) H->>S: 5. 请求创建虚拟机
(携带Token) S->>K: 6. 验证Token有效性 K-->>S: 7. 返回Token详情
(用户、项目、角色) S->>S: 8. 根据策略(policy.json)
校验权限 S-->>H: 9. 执行操作并返回结果 H-->>User: 10. 显示操作结果
- 用户认证 (Authentication): 用户(通过 Horizon 或 CLI)向 Keystone 提供用户名、密码和要访问的项目名称。
- 验证与令牌颁发: Keystone 验证用户身份及其在该项目中的成员资格。验证通过后,Keystone 会:
- 生成一个 Token。
- 根据用户的角色计算其权限。
- 组装该用户可以访问的服务目录 (Catalog)。
- 服务访问: 用户使用收到的 Token 去调用其他 OpenStack 服务(例如 Nova 创建虚拟机)。该请求中必须包含 X-Auth-Token 头信息。
- 令牌验证 (Validation): Nova 接收到请求后,无法自己识别 Token,它会向 Keystone 发出请求,询问“这个 Token 是否有效?它对应哪个用户和项目?”
- 授权决策 (Authorization): Keystone 验证 Token 并返回Token的详情(用户、项目、角色等)。Nova 然后根据自己的policy.json文件,判断该用户在该项目下拥有的角色是否有权执行创建虚拟机操作。
- 执行操作: 如果授权通过,Nova 执行操作;否则返回权限错误。
3、Keystone源码解析
Keystone 的源码主要位于 /keystone/ 目录下。
3.1 主要目录结构
- keystone/common/: 通用工具(如配置、策略执行、上下文)
- keystone/identity/: 身份后端驱动(SQL, LDAP)
- keystone/assignment/: 资源(项目/域)和角色分配后端驱动
- keystone/auth/: 认证插件(密码、令牌等)
- keystone/token/: 令牌提供者(Fernet, UUID, PKI, PKIZ)
- keystone/catalog/: 服务目录后端驱动
- keystone/api/: 主要的 WSGI 路由和控制器(RESTful API 入口)
- keystone/server/: 启动和部署相关的 Flask 应用配置
3.2 核心流程源码分析
3.2.1 密码认证与令牌颁发流程
- 请求入口:WSGI 路由与控制器
文件keystone/auth/routers.py
- class AuthRouter(wsgi.ComposableRouter):
- def add_routes(self, mapper):
- # 注册认证路由
- mapper.connect('/auth/tokens',
- controller=self._controllers['auth'],
- action='authenticate_for_token',
- conditions={'method': ['POST']})
复制代码 当收到 POST /v3/auth/tokens请求时,路由将请求分发给 AuthController.authenticate_for_token方法。
2. 认证控制器处理
文件路径: keystone/auth/controllers.py- class AuthController(controller.V3Controller):
- def authenticate_for_token(self, context, auth_payload):
- # 1. 验证请求格式
- self._validate_authentication_request(context, auth_payload)
-
- # 2. 提取认证方法
- auth_methods = auth_payload['auth']['identity']['methods']
- auth_data = auth_payload['auth']['identity'].get('password', {})
-
- # 3. 选择认证插件
- auth_plugin = self._get_auth_plugin(auth_methods[0])
-
- # 4. 执行认证
- auth_context = auth_plugin.authenticate(context, **auth_data)
-
- # 5. 颁发令牌
- token_data = self.token_provider_api.issue_token(
- user_id=auth_context['user_id'],
- method_names=auth_methods,
- project_id=auth_context.get('project_id'),
- domain_id=auth_context.get('domain_id')
- )
-
- # 6. 构建响应
- response = wsgi.render_response(
- body=token_data,
- status=(201, 'Created')
- )
- response.headers['X-Subject-Token'] = token_data['token']['id']
- return response
复制代码
- 密码认证插件实现
文件路径: keystone/auth/plugins/password.py
- class Password(controller.AuthMethodHandler):
- def authenticate(self, context, user_id=None, password=None,
- user_domain_id=None, username=None):
- # 1. 获取用户域ID(如果未提供)
- if not user_domain_id:
- user_domain_id = CONF.identity.default_domain_id
-
- # 2. 获取用户对象
- try:
- if user_id:
- user_ref = self.identity_api.get_user(context, user_id)
- else:
- user_ref = self.identity_api.get_user_by_name(
- context, username, user_domain_id)
- except exception.UserNotFound:
- raise exception.Unauthorized('Invalid user')
-
- # 3. 验证用户状态
- if not user_ref.get('enabled', True):
- raise exception.Unauthorized('User disabled')
-
- # 4. 验证密码
- try:
- self.identity_api.authenticate(
- context,
- user_id=user_ref['id'],
- password=password
- )
- except AssertionError:
- raise exception.Unauthorized('Invalid password')
-
- # 5. 返回认证上下文
- return {
- 'user_id': user_ref['id'],
- 'user_domain_id': user_domain_id,
- 'project_id': self._get_project_id(context, auth_context),
- 'domain_id': self._get_domain_id(context, auth_context)
- }
复制代码
- 身份驱动验证密码
文件路径: keystone/identity/drivers/sql.py
- class Identity(identity.Driver):
- def authenticate(self, context, user_id, password):
- # 1. 获取用户凭证
- try:
- cred_ref = self.get_credential(context, user_id)
- except exception.CredentialNotFound:
- raise AssertionError("Invalid credentials")
-
- # 2. 验证密码哈希
- if not self._check_password(password, cred_ref['blob']):
- raise AssertionError("Invalid credentials")
-
- # 3. 验证凭证状态
- if not cred_ref.get('enabled', True):
- raise AssertionError("Credential disabled")
复制代码
- 令牌提供者颁发令牌
文件路径: keystone/token/provider.py
- class TokenProviderAPI(object):
- def issue_token(self, user_id, method_names, **kwargs):
- # 1. 验证用户和项目/域
- self._assert_valid_user(user_id)
- self._assert_valid_project_or_domain(kwargs)
-
- # 2. 生成令牌ID
- token_id = self.token_formatter.create_token(
- user_id=user_id,
- **kwargs
- )
-
- # 3. 构建令牌数据
- token_data = {
- 'token': {
- 'methods': method_names,
- 'expires_at': self._get_expiration_time(),
- 'user': self._get_user_data(user_id),
- 'project': self._get_project_data(kwargs.get('project_id')),
- 'domain': self._get_domain_data(kwargs.get('domain_id')),
- 'catalog': self._get_service_catalog(user_id, **kwargs)
- }
- }
-
- # 4. 持久化令牌
- self.token_provider.persist_token(token_id, token_data)
-
- return token_data
复制代码
- Fernet 令牌驱动实现
文件路径: keystone/token/providers/fernet.py
- class TokenFormatter(object):
- def create_token(self, user_id, project_id=None, domain_id=None):
- # 1. 准备令牌载荷
- payload = {
- 'exp': int(time.time()) + CONF.token.expiration,
- 'iat': int(time.time()),
- 'user': {'id': user_id},
- 'aud': self._get_audience(),
- 'iss': CONF.token.issuer
- }
-
- if project_id:
- payload['project'] = {'id': project_id}
- elif domain_id:
- payload['domain'] = {'id': domain_id}
-
- # 2. 序列化载荷
- serialized_payload = msgpack.packb(payload)
-
- # 3. 使用Fernet加密
- fernet = Fernet(CONF.fernet_tokens.key_repository)
- token = fernet.encrypt(serialized_payload)
-
- return token.decode('utf-8')
复制代码 3.2.2 令牌验证流程
其他服务调用 GET /v3/auth/tokens
- 请求入口:路由与控制器
文件路径: keystone/auth/routers.py
- class AuthRouter(wsgi.ComposableRouter):
- def add_routes(self, mapper):
- # 注册令牌验证路由
- mapper.connect('/auth/tokens',
- controller=self._controllers['auth'],
- action='validate_token',
- conditions={'method': ['GET']})
复制代码
- 令牌验证控制器
文件路径: keystone/auth/controllers.py
- class AuthController(controller.V3Controller):
- def validate_token(self, context):
- # 1. 获取待验证的令牌ID
- token_id = self._get_subject_token_id(context)
-
- # 2. 验证请求者权限
- self.assert_authenticated()
-
- # 3. 调用令牌提供者API进行验证
- token_data = self.token_provider_api.validate_token(token_id)
-
- # 4. 构建响应
- return wsgi.render_response(body=token_data)
复制代码
- 令牌提供者 API 实现
文件路径: keystone/token/provider.py
- class TokenProviderAPI(object):
- def validate_token(self, token_id):
- # 1. 检查令牌格式
- if not self.token_formatter.validate_token(token_id):
- raise exception.ValidationError(_('Invalid token format'))
-
- # 2. 从缓存或持久化存储获取令牌数据
- token_data = self._get_token_data(token_id)
-
- # 3. 验证令牌状态
- self._validate_token_state(token_data)
-
- # 4. 验证令牌有效期
- self._validate_token_expiration(token_data)
-
- # 5. 验证用户状态
- self._assert_valid_user(token_data['token']['user']['id'])
-
- # 6. 验证项目/域状态
- if 'project' in token_data['token']:
- self._assert_valid_project(token_data['token']['project']['id'])
- elif 'domain' in token_data['token']:
- self._assert_valid_domain(token_data['token']['domain']['id'])
-
- # 7. 构建响应数据
- return self._format_token_response(token_data)
复制代码
- 令牌数据获取逻辑
文件路径: keystone/token/provider.py
- class TokenProviderAPI(object):
- def _get_token_data(self, token_id):
- # 1. 检查缓存
- token_data = self.token_cache.get_token(token_id)
- if token_data:
- return token_data
-
- # 2. 从持久化存储获取
- token_data = self.token_driver.get_token_data(token_id)
- if not token_data:
- raise exception.TokenNotFound(_('Token not found'))
-
- # 3. 验证令牌签名
- if not self.token_formatter.validate_token_signature(token_id):
- raise exception.Unauthorized(_('Invalid token signature'))
-
- # 4. 缓存结果
- self.token_cache.set_token(token_id, token_data)
-
- return token_data
复制代码
- Fernet 令牌验证实现
文件路径: keystone/token/providers/fernet.py
- class TokenFormatter(object):
- def validate_token(self, token_id):
- # 1. 基本格式检查
- if not token_id or len(token_id) < CONF.fernet_tokens.min_token_size:
- return False
-
- # 2. Fernet 令牌格式检查
- return token_id.count('-') == 3 and len(token_id.split('-')[0]) == 8
-
- def validate_token_signature(self, token_id):
- # 1. 尝试使用所有可用密钥解密
- for key in self._get_valid_keys():
- try:
- fernet = Fernet(key)
- payload = fernet.decrypt(token_id.encode('utf-8'))
- msgpack.unpackb(payload) # 验证载荷格式
- return True
- except (cryptography.fernet.InvalidToken, msgpack.UnpackException):
- continue
-
- return False
-
- def _get_valid_keys(self):
- """获取所有有效的加密密钥"""
- keys = []
- for key_repo in CONF.fernet_tokens.key_repository:
- for key_file in os.listdir(key_repo):
- if key_file.endswith('.key'):
- with open(os.path.join(key_repo, key_file), 'rb') as f:
- keys.append(f.read().strip())
- return keys
复制代码
- 令牌缓存实现
文件路径: keystone/token/providers/common.py
- class TokenCache(object):
- def __init__(self):
- self._cache = {}
- self._lock = threading.Lock()
-
- def get_token(self, token_id):
- with self._lock:
- # 1. 检查内存缓存
- if token_id in self._cache:
- entry = self._cache[token_id]
- if time.time() < entry['exp']:
- return entry['data']
- del self._cache[token_id]
-
- # 2. 检查外部缓存(如 Memcached)
- if CONF.token.caching and CONF.cache.enabled:
- cache_key = f"token_{token_id}"
- cached = cache.get(cache_key)
- if cached and time.time() < cached['exp']:
- self._cache[token_id] = cached # 填充内存缓存
- return cached['data']
-
- return None
-
- def set_token(self, token_id, token_data):
- with self._lock:
- # 1. 设置内存缓存
- entry = {
- 'data': token_data,
- 'exp': time.time() + CONF.token.cache_time
- }
- self._cache[token_id] = entry
-
- # 2. 设置外部缓存
- if CONF.token.caching and CONF.cache.enabled:
- cache_key = f"token_{token_id}"
- cache.set(cache_key, entry, CONF.token.cache_time)
复制代码 3.2.3 服务注册
- 服务注册 API 入口
文件路径: keystone/service/routers.py
- class ServiceRouter(wsgi.ComposableRouter):
- def add_routes(self, mapper):
- # 服务注册路由
- mapper.connect('/services',
- controller=self._controllers['service'],
- action='create_service',
- conditions={'method': ['POST']})
-
- # 端点注册路由
- mapper.connect('/endpoints',
- controller=self._controllers['endpoint'],
- action='create_endpoint',
- conditions={'method': ['POST']})
复制代码
- 服务控制器实现
文件路径: keystone/service/controllers.py
- class ServiceController(controller.V3Controller):
- @controller.protected()
- def create_service(self, context, service):
- """创建服务实体"""
- # 1. 验证输入数据
- validation.lazy_validate(schema.service_create, service)
- service_ref = service['service']
-
- # 2. 生成服务ID
- service_id = service_ref.get('id') or uuid.uuid4().hex
-
- # 3. 提取服务属性
- service_type = service_ref['type']
- service_name = service_ref.get('name', service_type)
- description = service_ref.get('description', '')
- enabled = service_ref.get('enabled', True)
-
- # 4. 调用服务API创建服务
- service_dict = self.catalog_api.create_service(
- service_id,
- {'name': service_name, 'type': service_type,
- 'description': description, 'enabled': enabled}
- )
-
- # 5. 返回创建的服务信息
- return {'service': service_dict}
复制代码
- 端点控制器实现
文件路径: keystone/service/controllers.py
- class EndpointController(controller.V3Controller):
- @controller.protected()
- def create_endpoint(self, context, endpoint):
- """创建服务端点"""
- # 1. 验证输入数据
- validation.lazy_validate(schema.endpoint_create, endpoint)
- endpoint_ref = endpoint['endpoint']
-
- # 2. 生成端点ID
- endpoint_id = endpoint_ref.get('id') or uuid.uuid4().hex
-
- # 3. 提取端点属性
- service_id = endpoint_ref['service_id']
- interface = endpoint_ref['interface'] # public, admin, internal
- url = endpoint_ref['url']
- region_id = endpoint_ref.get('region_id')
- enabled = endpoint_ref.get('enabled', True)
-
- # 4. 验证服务是否存在
- self.catalog_api.get_service(service_id)
-
- # 5. 调用端点API创建端点
- endpoint_dict = self.catalog_api.create_endpoint(
- endpoint_id,
- {'service_id': service_id, 'interface': interface,
- 'url': url, 'region_id': region_id, 'enabled': enabled}
- )
-
- # 6. 返回创建的端点信息
- return {'endpoint': endpoint_dict}
复制代码
- 目录服务 API 实现
文件路径: keystone/catalog/core.py
- class CatalogManager(object):
- def __init__(self):
- # 加载后端驱动
- self.driver = self._get_driver()
-
- def _get_driver(self):
- # 根据配置选择驱动
- if CONF.catalog.driver == 'sql':
- from keystone.catalog.backends import sql
- return sql.Catalog()
- elif CONF.catalog.driver == 'templated':
- from keystone.catalog.backends import templated
- return templated.Catalog()
- else:
- raise exception.ConfigError('Invalid catalog driver')
-
- def create_service(self, service_id, service):
- """创建服务实体"""
- # 验证服务类型
- self._validate_service_type(service['type'])
-
- # 调用驱动创建服务
- return self.driver.create_service(service_id, service)
-
- def create_endpoint(self, endpoint_id, endpoint):
- """创建服务端点"""
- # 验证接口类型
- self._validate_interface(endpoint['interface'])
-
- # 调用驱动创建端点
- return self.driver.create_endpoint(endpoint_id, endpoint)
复制代码
- SQL 后端驱动实现
文件路径: keystone/catalog/backends/sql.py
- class Catalog(DriverBase):
- def create_service(self, service_id, service):
- """在SQL数据库中创建服务"""
- with sql.session_for_write() as session:
- # 检查服务类型是否唯一
- if session.query(models.Service).filter_by(
- type=service['type']).first():
- raise exception.Conflict('Service type already exists')
-
- # 创建服务对象
- service_ref = models.Service()
- service_ref.update({
- 'id': service_id,
- 'name': service['name'],
- 'type': service['type'],
- 'description': service.get('description'),
- 'enabled': service.get('enabled', True)
- })
-
- # 添加到会话并提交
- session.add(service_ref)
- session.flush()
-
- return service_ref.to_dict()
-
- def create_endpoint(self, endpoint_id, endpoint):
- """在SQL数据库中创建端点"""
- with sql.session_for_write() as session:
- # 检查服务是否存在
- service_ref = session.query(models.Service).get(
- endpoint['service_id'])
- if not service_ref:
- raise exception.ServiceNotFound(
- service_id=endpoint['service_id'])
-
- # 检查接口类型是否重复
- if session.query(models.Endpoint).filter_by(
- service_id=endpoint['service_id'],
- interface=endpoint['interface']).first():
- raise exception.Conflict(
- 'Endpoint interface already exists for this service')
-
- # 创建端点对象
- endpoint_ref = models.Endpoint()
- endpoint_ref.update({
- 'id': endpoint_id,
- 'service_id': endpoint['service_id'],
- 'interface': endpoint['interface'],
- 'url': endpoint['url'],
- 'region_id': endpoint.get('region_id'),
- 'enabled': endpoint.get('enabled', True)
- })
-
- # 添加到会话并提交
- session.add(endpoint_ref)
- session.flush()
-
- return endpoint_ref.to_dict()
复制代码
- 模板化后端驱动实现
文件路径: keystone/catalog/backends/templated.py
- class Catalog(DriverBase):
- def __init__(self):
- # 加载配置文件
- self.conf = self._load_config()
-
- def _load_config(self):
- """从配置文件加载服务目录模板"""
- with open(CONF.catalog.template_file, 'r') as f:
- return json.load(f)
-
- def create_service(self, service_id, service):
- """模板驱动不支持动态创建服务"""
- raise exception.NotImplemented(
- 'Dynamic service creation not supported in templated driver')
-
- def create_endpoint(self, endpoint_id, endpoint):
- """模板驱动不支持动态创建端点"""
- raise exception.NotImplemented(
- 'Dynamic endpoint creation not supported in templated driver')
-
- def get_catalog(self, user_id, project_id):
- """从模板生成服务目录"""
- # 替换模板中的变量
- catalog = copy.deepcopy(self.conf['catalog'])
- for service in catalog:
- for endpoint in service['endpoints']:
- endpoint['url'] = endpoint['url'].replace(
- '$HOST', CONF.default_public_host)
- return catalog
复制代码 4、常用命令
4.1 身份(Identity)管理
1. 身份(Identity)管理
- 创建用户:
- openstack user create --password-prompt --email user@example.com --domain Default myuser
复制代码
- --password-prompt 会交互式让你输入密码。
- --domain 指定用户所在的域。
- 列出用户:
- 创建用户组 (用于LDAP集成或批量分配角色):
- openstack group create --description "Developers Group" developers
复制代码 - 将用户加入用户组:
- openstack group add user developers myuser
复制代码 4.2 资源(Resource)与分配(Assignment)管理
- 创建项目(Project):
- openstack project create --description "Demo Project" --domain Default demoproject
复制代码 - 创建角色(Role):
- openstack role create myrole
复制代码
- 通常内置 admin, _member_, reader 等角色。
- 给用户在项目上授予角色 (核心操作):
- openstack role add --project demoproject --user myuser myrole
复制代码
- 这条命令意味着 myuser 在 demoproject 中拥有 myrole 的权限。
- 检查角色分配:
- openstack role assignment list --user myuser --project demoproject --names
复制代码 4.3 服务目录(Catalog)管理
- 注册服务 (如注册Glance镜像服务):
- openstack service create --name glance --description "OpenStack Image" image
复制代码 - 创建服务的端点(Endpoint):
- openstack endpoint create --region RegionOne image public http://glance-host:9292
- openstack endpoint create --region RegionOne image internal http://glance-host:9292
- openstack endpoint create --region RegionOne image admin http://glance-host:9292
复制代码 - 列出所有服务和端点:
- openstack service list
- openstack endpoint list
复制代码 4.4 令牌(Token)操作
- 作为特定用户获取令牌(调试神器):
- openstack --os-username myuser --os-password mypassword --os-project-name demoproject token issue
复制代码
- 这会返回一个详细的令牌信息,包括过期时间、授权范围等,用于调试权限问题。
4.5 域(Domain)管理
- 创建域:
- openstack domain create --description "My Company" mycompany
复制代码 - 在特定域中创建用户:
- openstack user create --email john@mycompany.com --domain mycompany john
复制代码
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |