Source code for aodh.keystone_client

#
# Copyright 2015 eNovance <licensing@enovance.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import os


from keystoneauth1 import discover as ka_discover
from keystoneauth1 import exceptions as ka_exception
from keystoneauth1 import identity as ka_identity
from keystoneauth1.identity.generic import password
from keystoneauth1 import loading as ka_loading
from keystoneclient import session
from keystoneclient.v3 import client as ks_client_v3
from oslo_config import cfg
from oslo_log import log

LOG = log.getLogger(__name__)

CFG_GROUP = "service_credentials"


def get_session(conf):
    """Get an aodh service credentials auth session."""
    auth_plugin = ka_loading.load_auth_from_conf_options(conf, CFG_GROUP)
    return ka_loading.load_session_from_conf_options(
        conf, CFG_GROUP, auth=auth_plugin
    )


def get_client(conf):
    """Return a client for keystone v3 endpoint."""
    sess = get_session(conf)
    return ks_client_v3.Client(session=sess)


def get_trusted_client(conf, trust_id):
    # Ideally we would use load_session_from_conf_options, but we can't do that
    # *and* specify a trust, so let's create the object manually.
    if conf[CFG_GROUP].auth_type == "password-aodh-legacy":
        auth_url = conf[CFG_GROUP].os_auth_url
        try:
            auth_url_noneversion = auth_url.replace('/v2.0', '/')
            discover = ka_discover.Discover(auth_url=auth_url_noneversion)
            v3_auth_url = discover.url_for('3.0')
            if v3_auth_url:
                auth_url = v3_auth_url
            else:
                auth_url = auth_url
        except Exception:
            auth_url = auth_url.replace('/v2.0', '/v3')
        auth_plugin = password.Password(
            username=conf[CFG_GROUP].os_username,
            password=conf[CFG_GROUP].os_password,
            auth_url=auth_url,
            user_domain_id='default',
            trust_id=trust_id)
    else:
        auth_plugin = password.Password(
            username=conf[CFG_GROUP].username,
            password=conf[CFG_GROUP].password,
            auth_url=conf[CFG_GROUP].auth_url,
            user_domain_id=conf[CFG_GROUP].user_domain_id,
            trust_id=trust_id)

    sess = session.Session(auth=auth_plugin)
    return ks_client_v3.Client(session=sess)


def get_auth_token(client):
    return client.session.auth.get_access(client.session).auth_token


def get_client_on_behalf_user(auth_plugin):
    """Return a client for keystone v3 endpoint."""
    sess = session.Session(auth=auth_plugin)
    return ks_client_v3.Client(session=sess)


def create_trust_id(conf, trustor_user_id, trustor_project_id, roles,
                    auth_plugin):
    """Create a new trust using the aodh service user."""
    admin_client = get_client(conf)
    trustee_user_id = admin_client.session.get_user_id()

    client = get_client_on_behalf_user(auth_plugin)
    trust = client.trusts.create(trustor_user=trustor_user_id,
                                 trustee_user=trustee_user_id,
                                 project=trustor_project_id,
                                 impersonation=True,
                                 role_names=roles)
    return trust.id


def delete_trust_id(trust_id, auth_plugin):
    """Delete a trust previously setup for the aodh user."""
    client = get_client_on_behalf_user(auth_plugin)
    try:
        client.trusts.delete(trust_id)
    except ka_exception.NotFound:
        pass


OPTS = [
    cfg.StrOpt('region-name',
               default=os.environ.get('OS_REGION_NAME'),
               deprecated_name="os-region-name",
               help='Region name to use for OpenStack service endpoints.'),
    cfg.StrOpt('interface',
               default=os.environ.get(
                   'OS_INTERFACE', os.environ.get('OS_ENDPOINT_TYPE',
                                                  'public')),
               deprecated_name="os-endpoint-type",
               choices=('public', 'internal', 'admin', 'auth', 'publicURL',
                        'internalURL', 'adminURL'),
               help='Type of endpoint in Identity service catalog to use for '
                    'communication with OpenStack services.'),
]


def register_keystoneauth_opts(conf):
    ka_loading.register_auth_conf_options(conf, CFG_GROUP)
    ka_loading.register_session_conf_options(
        conf, CFG_GROUP,
        deprecated_opts={'cacert': [
            cfg.DeprecatedOpt('os-cacert', group=CFG_GROUP),
            cfg.DeprecatedOpt('os-cacert', group="DEFAULT")]
        })
    conf.set_default("auth_type", default="password-aodh-legacy",
                     group=CFG_GROUP)


def setup_keystoneauth(conf):
    if conf[CFG_GROUP].auth_type == "password-aodh-legacy":
        LOG.warning("Value 'password-aodh-legacy' for '[%s]/auth_type' "
                    "is deprecated. And will be removed in Aodh 3.0. "
                    "Use 'password' instead.",
                    CFG_GROUP)
    ka_loading.load_auth_from_conf_options(conf, CFG_GROUP)


[docs]class LegacyAodhKeystoneLoader(ka_loading.BaseLoader): @property def plugin_class(self): return ka_identity.V2Password
[docs] def get_options(self): options = super(LegacyAodhKeystoneLoader, self).get_options() options.extend([ ka_loading.Opt( 'os-username', default=os.environ.get('OS_USERNAME', 'aodh'), help='User name to use for OpenStack service access.'), ka_loading.Opt( 'os-password', secret=True, default=os.environ.get('OS_PASSWORD', 'admin'), help='Password to use for OpenStack service access.'), ka_loading.Opt( 'os-tenant-id', default=os.environ.get('OS_TENANT_ID', ''), help='Tenant ID to use for OpenStack service access.'), ka_loading.Opt( 'os-tenant-name', default=os.environ.get('OS_TENANT_NAME', 'admin'), help='Tenant name to use for OpenStack service access.'), ka_loading.Opt( 'os-auth-url', default=os.environ.get('OS_AUTH_URL', 'http://localhost:5000/v2.0'), help='Auth URL to use for OpenStack service access.'), ]) return options
[docs] def load_from_options(self, **kwargs): options_map = { 'os_auth_url': 'auth_url', 'os_username': 'username', 'os_password': 'password', 'os_tenant_name': 'tenant_name', 'os_tenant_id': 'tenant_id', } identity_kwargs = dict((options_map[o.dest], kwargs.get(o.dest) or o.default) for o in self.get_options() if o.dest in options_map) return self.plugin_class(**identity_kwargs)