LOADING

Follow me

keystone 认证分析
十月 13, 2015|Keystone

keystone 认证分析

keystone 认证分析
<token>
   关于V2token的分析
   1、/v2.0/token
      keystone.service下面的
      def admin_app_factory
      token.routers.Router()
   2、keystone.token.routers
      mapper.connect('/tokens', controller=token_controller,
                  action='authenticate',
                  conditions=dict(method=['POST']))
      token_controller = controllers.Auth()

   3、keystone.token.controllers.Auth
      @controller.v2_deprecated
      def authenticate(self, context, auth=None):
         <auth function="认证的字典">
            tenantName 可选,如果不指定tenantName那么返回的就是unscoped token,这个token
            会被用于获取scoped token,否则就是限定到tenant的scoped token
            {
                "auth":{
                    "passwordCredentials":{
                        "username":"test_user",
                        "password":"mypass"
                    },
                    "tenantName":"customer-x"
                }

         </auth>

         if auth is None:
            raise exception.ValidationError(attribute='auth', target='request body')
         # auth不能为空

         if ‘token’ in auth:
            # 也就是说不是第一次获取token,那么下面就检测是unscoped token 还是scoped token
            auth_info = self._authenticate_token(context, auth)
            # auth_info = (current_user_ref, tenant_ref, metadata_ref, expiry, bind, audit_id)
            <self._authenticate_token value="(context, auth)" function="检测token是否已经存在,返回 auth_token_data">
               def _authenticate_token(self, context, auth):
                  # 如果token不在auth字典里,报错
                  if 'token' not in auth:
                     raise exception.ValidationError(attribute='token', target='auth')
                  if 'id' not in auth['token']:
                     raise exception.ValidationError(attribute='id', target='token')

                  old_token = auth['token']['id']
                  # 获取token id
                  if len(old_token) > CONF.max_token_size: CONF.max_token_size = 8192
                     raise exception.ValidationSizeError(attibute='token', size=CONF.max_token_size)

                  try:
                     token_model_ref = token_model.KeystoneToken(
                        token_id=old_token,
                        token_data=self.token_provider_api.validate_token(old_token)
                        # 通过token_provider_api 获取ID为old_token的token data
                        # keystone.token.provider
                        <self.token_provider_api.validate_token values="old_token" function="通过token_provider_api 获取ID为old_token的token data">
                           def validate_token(self, token_id, belongs_to=None):
                              unique_id = utils.generate_unique_id(token_id)
                              # 如果token是asn1或者pki,返回hash后的token,否则直接返回
                              <utils.generate_unique_id values="token_id" function="为token生成唯一的ID" path="keystone.token.utils">
                                 def generate_unique_id(token_id):

                                    return cms.cms_hash_token(token_id, mode=cfg.CONF.token.hash_algorithm) # mode='md5'

                              </utils.generate_unique_id>

                              token = self._validate_token(unique_id)
                              # 需要检查persistence的持久化后端
                              # 默认使用的provider后端是uuid
                              # 持久化只负责数据的CRUD操作,provider负责逻辑判断
                              <self._validate_token values="unique_id" function="">
                                 @MEMOIZE
                                 def _validate_token(sef, token_id):
                                    if not self._needs_persistence:
                                    # self._needs_persistence 是返回的uuid need_persistence ,返回是True
                                    # 然后通过持久化驱动根据token_id 获取token_ref
                                    #self._persistence 获取持久化驱动
                                    # 默认的持久化驱动是sql
                                       return self.driver.validate_v3_token(token_id)

                                    token_ref = self._persistence.get_token(token_id)
                                    # _persistence驱动是token.persistence.backends.sql
                                    <self._persistence.get_token values="token_id" function="从数据库中获取指定id的token_ref">
                                       # 查询的是TokenModel表
                                       class TokenModel(sql.ModelBase, sql.DictBase):
                                           __tablename__ = 'token'
                                           attributes = ['id', 'expires', 'user_id', 'trust_id']
                                           id = sql.Column(sql.String(64), primary_key=True)
                                           expires = sql.Column(sql.DateTime(), default=None)
                                           extra = sql.Column(sql.JsonBlob())
                                           valid = sql.Column(sql.Boolean(), default=True, nullable=False)
                                           user_id = sql.Column(sql.String(64))
                                           trust_id = sql.Column(sql.String(64))
                                           __table_args__ = (
                                               sql.Index('ix_token_expires', 'expires'),
                                               sql.Index('ix_token_expires_valid', 'expires', 'valid'),
                                               sql.Index('ix_token_user_id', 'user_id'),
                                               sql.Index('ix_token_trust_id', 'trust_id')
                                           )

                                       class Token(token.persistence.TokenDriverV8):
                                          def get_token(self, token_id):
                                             if token_id is None:
                                                raise exception.TokenNotFound(token_id=token_id)
                                             session = sql.get_session()
                                             token_ref = session.query(TokenModel).get(token_id)
                                             if not token_ref or not token_ref.valid:
                                                raise exception.TokenNotFound(token_id=token_id)

                                             return token_ref.to_dict()
                                          # to_dict调用的是class DictBase下面的to_dict 将属性变成字典

                                    </self._persistence.get_token>

                                    version = self.driver.get_token_version(token_ref)
                                    # uuid类继承的token.provider.common.BaseProvider
                                    # self.driver也就返回的BaseProvider
                                    <self.driver.get_token_version values="token_ref">
                                       class BaseProvider(provider.Provider):
                                          def get_token_version(self, token_data):
                                             token_data['access']['id'] =====>v2
                                             token_data['token']['methods'] ======>v3
                                    </self.driver.get_token_version>

                                    if version == self.V3:
                                       return self.driver.validate_v3_token(token_ref)
                                       # self.driver 最后是调用的common.BaseProvider
                                       <self.driver.validate_v3_token values="token_ref" function="返回token_data">
                                          class BaseProvider(provider.Provider):
                                             def validate_v3_token(self, token_ref):
                                                trust_id = token_ref.get('trust_id')
                                                if trust_id:
                                                   # 这个组件以后再说
                                                   self.trust_api.get_trust(trust_id)
                                                token_data = token_ref.get('token_data')

                                                # 从token_ref中取出token_data

                                                if not token_data or 'token' not in token_data:
                                                   # token_ref 是由V2 api创建的,则需要变换
                                                   project_id = None
                                                   project_ref = token_ref.get('tenant')
                                                   if project_ref:
                                                      project_id = project_ref['id']

                                                   issued_at = token_ref['token_data']['access']['token']['issued_at']
                                                   audit = token_ref['token_data']['token'].get('audit_ids')

                                                   token_data = self.v3_token_data_helper.get_token_data(
                                                         token_ref['user']['id'],
                                                         ['password', 'token'],
                                                         project_id=project_id,
                                                         bind=token_ref.get('bind'),
                                                         expires=token_ref['expires'],
                                                         issued_at=issued_at,
                                                         audit_info=audit

                                                   )

                                                   <get_token_data path="keystone.token.common.V3_Token_Data_Helper" function="">
                                                      # 返回{'token': token_data}字典
                                                      def get_token_data(...):
                                                         if extras is None:
                                                            extras = {}
                                                         if extras:
                                                            # 这边是要废弃的提示
                                                         token_data = {'methods': method_names, 'extras': extras}

                                                         # 如果token不为空,直接将属性作字典存储
                                                         if token:
                                                            for x in ('roles', 'user', 'catalog', 'project', 'domain'):
                                                               if x in token:
                                                                  token_data[x] = token[x]

                                                         if CONF.trust.enabled and trust:
                                                            if user_id != trust['trustee_user_id']:
                                                               raise exception.Forbidden(_('User is not a trustee.'))

                                                         if bind:
                                                            token_data['bind'] = bind

                                                         self._populate_scope(token_data, domain_id, project_id)
                                                            # 将domain project放入token_data字典中
                                                            <self._populate_scope values="token_data, domain_id, project_id" function="">
                                                               def _populate_scope(self, token_data, domain_id, project_id):
                                                                  if 'domain' in token_data or 'project' in token_data:
                                                                  # scope已经存在
                                                                     return
                                                                  if domain_id:
                                                                     token_data['domain'] = self._get_filtered_domain(domain_id)
                                                                     # 通过resource_api获取domain_ref
                                                                     # 返回{'id': domain_ref['id'], 'name': domain_ref['name']}
                                                                  if project_id:
                                                                     token_data['project'] = self._get_filtered_project(project)
                                                                     # 通过resource_api获取project_ref
                                                                     # filter_project = {'id': project_ref['id'], 'name': project_ref['name']}
                                                                     # filter_project['domain'] = self._get_filtered_domain(project_ref['domain_id'])
                                                                     # 返回 {'id': project_ref['id'], 'name': project_ref['name'], 'domain': {...上面的那样}}
                                                            </self._populate_scope>
                                                         self._populate_user(token_data, user_id, trust)
                                                            # user_ref 调用identity_api 返回对应user_id的user_ref
                                                            # 没有使用trust,所以就返回
                                                            # token_data['user'] = {'id': user_ref['id'], 'name': user_ref['name'], 'domain': self._get_filtered_domain(user_ref['domain_id'])}
                                                         self._populate_roles(token_data, user_id, domain_id, project_id, trust, access_token)
                                                            # 返回 token_data['roles'] = filtered_roles
                                                         self._populate_audit_info(token_data, access_token)

                                                         if include_catalog:
                                                            self._populate_service_catalog(token_data, user_id, domain_id, project_id, trust)
                                                            # 返回 token_data['catalog'] = catalog_api.get_v3_catalog(user_id, project_id)
                                                         self._populate_service_providers(token_data)
                                                            # fedration启用才会处理
                                                         self._populate_token_dates(token_data, expires=expires, trust=trust, issued_at=issued_at)
                                                            # 处理token_data时间
                                                            # token_data['expires_at']
                                                            # token_data['issued_at']
                                                         self._populate_oauth_section(token_data, access_token)
                                                            # access_token不为空再处理
                                                            def _populate_oauth_section(self, token_data, access_token):
                                                               if access_token:
                                                                  access_token_id = access_token['id']
                                                                  consumer_id = access_token['consumer_id']
                                                                  token_data['OS-OAUTH1'] = ({'access_token_id': access_token_id, 'consumer_id': consumer_id})

                                                         return {'token': token_data}
                                                   </get_token_data>
                                                return token_data



                                       </self.driver.validate_v3_token>
                                    elif version == self.V2:
                                       return self.driver.validate_v2_token(token_ref)
                                          # 判断是否为federation token 以及是否为默认domain
                                          # 如果是V3token 就将其转变成v2 token
                                          def validate_v2_token(self, token_ref):
                                             try:
                                                self._assert_is_not_federation_token(token_ref)
                                                self._assert_default_domain(token_ref)
                                                token_data = token_ref.get('token_data')

                                                if (self.get_token_version(token_data) != token.provider.V2):
                                                   token_data = self.v2_token_data_helper.v3_to_v2_token(token_data)
                                                   # v3 token 转变到v2 token


                                                trust_id = token_data['access'].get('trust', {}).get('id')
                                                if trust_id:
                                                   self.trust_api.get_trust(trust_id)
                                                return token_data
                                             except exception.ValidationError as e:
                                                LOG.exception(_LE('Failed to validate token'))
                                                raise exception.TokenNotFound(e)



                                    raise exception.UnsupportedTokenVersionException()
                              </self._validate_token>

                              self._token_belongs_to(token, belongs_to)
                              self._is_valid_token(token)
                              return token
                        </self.token_provider_api.validate_token>

                     )
                  except exception.NotFound as e:
                     raise exception.Unauthorized(e)

                  wsgi.validate_token_bind(context, token_model_ref)
                     # token bind 设置的为permissive,所以直接返回

                  self._restrict_scope(token_model_ref)
                     # 设置all_rescoped_scoped_token为False,以阻止用户用已经变更的token来获取其他token
                  user_id = token_model_ref.user_id
                     # 用户id
                  tenant_id = self._get_project_id_from_auth(auth)
                     # 通过resource_api根据auth的project name获取project id
                     # resource_api通过tenant name 以及domain id获取tenant id
                  current_user_ref = self.identity_api.get_user(user_id)

                  metadata_ref = {}
                  tenant_ref, metadata_ref['roles'] = self._get_project_roles_and_ref(user_id, tenant_id)
                     # 根据tenant id通过resource_api获取tenant ref
                     # 根据user id tenant id通过assignment_api的get_roles_for_user_and_project获取role list

                  expiry = token_model_ref.expires

                  bind = token_mode_ref.bind

                  audit_id = token_model_ref.audit_chain_id

                  return (current_user_ref, tenant_ref, metadata_ref, metadata_ref, expiry, bind, audit_id)



            </self._authenticate_token>
         else:
            # 其他认证方式
            try:
               auth_info = self._authenticate_external(context, auth)
                  # 通过REMOTE_USER认证外部user,并返回auth_token_data(user_ref, tenant_ref, metadata_ref)
                  <self._authenticate_extenral values="(context, auth)">
                     def _authenticate_external(self, context, auth):
                        environment = context.get('environment', {})
                        if not environment.get('REMOTE_USER'):
                           raise ExternalAuthNotApplicable()

                        username = environment['REMOTE_USER']
                        try:
                           user_ref = self.identity_api.get_user_by_name(username, CONF.identity.default_domain_id)
                           # CONF.identity.default_domain_id = 'default'
                           # 根据username domain id通过identity_api获取user_ref
                           user_id = user_ref['id']
                        except exception.UserNotFound as e:
                           raise exception.Unauthorized(e)

                        metadata_ref = {}
                        tenant_id = self._get_project_id_from_auth(auth)
                           # 根据auth['tenantName']通过resource_api 的get_project_by_name获取tenant id
                        tenant_ref, metadata_ref['roles'] = self._get_project_roles_and_ref(user_id, tenant_id)
                           # 返回tenant_ref role_list

                        expiry = provider.default_expire_time()

                        bind = None

                        if ('kerberos' in CONF.token.bind and environment.get('AUTH_TYPE', '').lower() == 'negotiate'):
                           bind = {'kerberos': username}

                        audit_id = None
                        return (user_ref, tenant_ref, metadata_ref, expiry, bind, audit_id)




                  </self._authenticate_extenral>
            except ExternalAuthNotApplicable:
               auth_info = self._authenticate_local(context, auth)
               # 通过后端进行身份认证
               <self._authenticate_local >
               if 'passwordCredentials' not in auth:
                  # 不在就报错
               if 'password' not in auth['passwordCredentials']:
                  # password不在passwordCredentials里面就报错
               password = auth['passwordCredentials']['password']

               if password and len(password) > CONF.identity.max_password_length:
                  # 长度超过max_password_length报错
               if # passwordCredentials字典里面即没有userId又没有userName 报错
               user_id = auth['passwordCredentials'].get('userId')

               if user_id超出max_param_size报错

               username = auth['passwordCredentials'].get('username', '')

               if username:
                  # username 长度超出max_param_size,报错
                  user_ref = self.identity_api.get_user_by_name(username, CONF.identity.default_domain_id)
                  # ?为什么不用get_user_by_id,这个是因为identity_api未提供此接口,而且在get_user_by_id中没有default_id这个参数
                  user_id = user_ref['id']
                  # 如果没有就报错
               metadata_ref = {}
               tenant_ref, metadata_ref['roles'] = self._get_project_roles_and_ref(user_id, tenant_id)
               expiry = provider.default_expire_time()
               bind = None
               audit_id = None
               return (user_ref, tenant_ref, metadata_ref, expiry, bind, audit_id)
               </self._authenticate_local>


         user_ref, tenant_ref, metadata_ref, expiry, bind, audit_id = auth_info

         # 查看project domain是否启用
         try:
            self.identity_api.assert_user_enabled(user_id=user_ref['id'], user=user_ref)
               # 确认user user的domain有效
               # 根据user['domain_id']通过resource_api 的assert_domain_enabled获取 domain是否disabled
               # 如果domain disabled则报错




            if tenant_ref:
               self.resource_api.assert_project_enabled(project_id=tenant_ref['id'], project=tenant_ref)

         except AssertionError as e:
            报错退出

         user_ref = self.v3_to_v2_user(user_ref)
            # 将v3中default_project_id变为tenantId,过滤掉domain,用username属性

         if tenant_ref:
            tenant_ref = self.v3_to_v2_project(tenant_ref)
            # 移除domain,移除is_domain,移除parent id

         auth_token_data = self._get_auth_token_data(user_ref, tenant_ref, metadata_ref, expiry, audit_id)
            # 变成字典dict(user=user, tenant=tenant, metadata=metadata, expires=expiry, parent_audit_id=audit_id)

         if tenant_ref:
            catalog_ref = self.catalog_api.get_catalog(user_ref['id'], tenant_ref['id'])
               # 获取catalog
         else:
            catalog_ref = {}

         auth_token_data['id'] = 'placeholder'

         if bind:
            auth_token_data['bind'] = bind

         roles_ref = []

         for role_id in metadata_ref.get('roles', []):
            role_ref = self.role_api.get_role(role_id)
            roles_ref.append(dict(name=role_ref['name']))
            # 根据role_id通过role_api 的get_role获取role ref



         (token_id, token_data) = self.token_provider_api.issue_v2_token(auth_token_data, roles_ref=roles_ref, catalog_ref=catalog_ref)
            # uuid继承common.BaseProvider
            # 转换成v2token  返回 token id以及token_data
            <self.token_provider_api.issue_v2_token>
               def issue_v2_token(self, token_ref, roles_ref=None, catalog_ref=None):
                  token_id, token_data = self.driver.issue_v2_token(token_ref, roles_ref, catalog_ref)
                     # 调用后端issue_v2_token
                     def issue_v2_token(self, token_ref, roles_ref=None, catalog_ref=None):
                        metadata_ref = token_ref['metadata']
                        trust_ref = None
                        # trust这块略过
                        token_data = self.v2_token_data_helper.format_token(token_ref, roles_ref, catalog_ref, trust_ref)
                        token_id = self._get_token_id(token_data)
                        token_data['access']['token']['id'] = token_id
                        return token_id, token_data
                  if self._needs_persistence:
                     data = dict(key=token_id, id=token_id, ...)
                     self._create_token(token_id, data)
                  return token_id, token_data


            </self.token_provider_api.issue_v2_token>
         # trust这一块先滤过
         return token_data







</token>
no comments
Share

发表评论