Source code for ceilometer.storage.impl_sqlalchemy

#
# Author: John Tran <jhtran@att.com>
#         Julien Danjou <julien@danjou.info>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""SQLAlchemy storage backend."""

from __future__ import absolute_import
import datetime
import hashlib
import operator
import os

from oslo.config import cfg
from oslo.db import exception as dbexc
from oslo.db.sqlalchemy import session as db_session
from oslo.utils import timeutils
import six
import sqlalchemy as sa
from sqlalchemy import and_
from sqlalchemy import distinct
from sqlalchemy import func
from sqlalchemy.orm import aliased

import ceilometer
from ceilometer.openstack.common.gettextutils import _
from ceilometer.openstack.common import jsonutils
from ceilometer.openstack.common import log
from ceilometer import storage
from ceilometer.storage import base
from ceilometer.storage import models as api_models
from ceilometer.storage.sqlalchemy import models
from ceilometer.storage.sqlalchemy import utils as sql_utils
from ceilometer import utils

LOG = log.getLogger(__name__)


STANDARD_AGGREGATES = dict(
    avg=func.avg(models.Sample.volume).label('avg'),
    sum=func.sum(models.Sample.volume).label('sum'),
    min=func.min(models.Sample.volume).label('min'),
    max=func.max(models.Sample.volume).label('max'),
    count=func.count(models.Sample.volume).label('count')
)

UNPARAMETERIZED_AGGREGATES = dict(
    stddev=func.stddev_pop(models.Sample.volume).label('stddev')
)

PARAMETERIZED_AGGREGATES = dict(
    validate=dict(
        cardinality=lambda p: p in ['resource_id', 'user_id', 'project_id']
    ),
    compute=dict(
        cardinality=lambda p: func.count(
            distinct(getattr(models.Resource, p))
        ).label('cardinality/%s' % p)
    )
)

AVAILABLE_CAPABILITIES = {
    'meters': {'query': {'simple': True,
                         'metadata': True}},
    'resources': {'query': {'simple': True,
                            'metadata': True}},
    'samples': {'pagination': True,
                'groupby': True,
                'query': {'simple': True,
                          'metadata': True,
                          'complex': True}},
    'statistics': {'groupby': True,
                   'query': {'simple': True,
                             'metadata': True},
                   'aggregation': {'standard': True,
                                   'selectable': {
                                       'max': True,
                                       'min': True,
                                       'sum': True,
                                       'avg': True,
                                       'count': True,
                                       'stddev': True,
                                       'cardinality': True}}
                   },
    'events': {'query': {'simple': True}},
}


AVAILABLE_STORAGE_CAPABILITIES = {
    'storage': {'production_ready': True},
}


def apply_metaquery_filter(session, query, metaquery):
    """Apply provided metaquery filter to existing query.

    :param session: session used for original query
    :param query: Query instance
    :param metaquery: dict with metadata to match on.
    """
    for k, value in six.iteritems(metaquery):
        key = k[9:]  # strip out 'metadata.' prefix
        try:
            _model = sql_utils.META_TYPE_MAP[type(value)]
        except KeyError:
            raise ceilometer.NotImplementedError(
                'Query on %(key)s is of %(value)s '
                'type and is not supported' %
                {"key": k, "value": type(value)})
        else:
            meta_alias = aliased(_model)
            on_clause = and_(models.Resource.internal_id == meta_alias.id,
                             meta_alias.meta_key == key)
            # outer join is needed to support metaquery
            # with or operator on non existent metadata field
            # see: test_query_non_existing_metadata_with_result
            # test case.
            query = query.outerjoin(meta_alias, on_clause)
            query = query.filter(meta_alias.value == value)

    return query


def make_query_from_filter(session, query, sample_filter, require_meter=True):
    """Return a query dictionary based on the settings in the filter.

    :param session: session used for original query
    :param query: Query instance
    :param sample_filter: SampleFilter instance
    :param require_meter: If true and the filter does not have a meter,
                          raise an error.
    """

    if sample_filter.meter:
        query = query.filter(models.Meter.name == sample_filter.meter)
    elif require_meter:
        raise RuntimeError('Missing required meter specifier')
    if sample_filter.source:
        query = query.filter(
            models.Resource.source_id == sample_filter.source)
    if sample_filter.start:
        ts_start = sample_filter.start
        if sample_filter.start_timestamp_op == 'gt':
            query = query.filter(models.Sample.timestamp > ts_start)
        else:
            query = query.filter(models.Sample.timestamp >= ts_start)
    if sample_filter.end:
        ts_end = sample_filter.end
        if sample_filter.end_timestamp_op == 'le':
            query = query.filter(models.Sample.timestamp <= ts_end)
        else:
            query = query.filter(models.Sample.timestamp < ts_end)
    if sample_filter.user:
        query = query.filter(models.Resource.user_id == sample_filter.user)
    if sample_filter.project:
        query = query.filter(
            models.Resource.project_id == sample_filter.project)
    if sample_filter.resource:
        query = query.filter(
            models.Resource.resource_id == sample_filter.resource)
    if sample_filter.message_id:
        query = query.filter(
            models.Sample.message_id == sample_filter.message_id)

    if sample_filter.metaquery:
        query = apply_metaquery_filter(session, query,
                                       sample_filter.metaquery)

    return query


[docs]class Connection(base.Connection): """Put the data into a SQLAlchemy database. Tables:: - meter - meter definition - { id: meter id name: meter name type: meter type unit: meter unit } - resource - resource definition - { internal_id: resource id resource_id: resource uuid user_id: user uuid project_id: project uuid source_id: source id resource_metadata: metadata dictionary metadata_hash: metadata dictionary hash } - sample - the raw incoming data - { id: sample id meter_id: meter id (->meter.id) resource_id: resource id (->resource.internal_id) volume: sample volume timestamp: datetime recorded_at: datetime message_signature: message signature message_id: message uuid } """ CAPABILITIES = utils.update_nested(base.Connection.CAPABILITIES, AVAILABLE_CAPABILITIES) STORAGE_CAPABILITIES = utils.update_nested( base.Connection.STORAGE_CAPABILITIES, AVAILABLE_STORAGE_CAPABILITIES, ) def __init__(self, url): # Set max_retries to 0, since oslo.db in certain cases may attempt # to retry making the db connection retried max_retries ^ 2 times # in failure case and db reconnection has already been implemented # in storage.__init__.get_connection_from_config function cfg.CONF.set_override('max_retries', 0, group='database') self._engine_facade = db_session.EngineFacade( url, **dict(cfg.CONF.database.items()) )
[docs] def upgrade(self): # NOTE(gordc): to minimise memory, only import migration when needed from oslo.db.sqlalchemy import migration path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'sqlalchemy', 'migrate_repo') migration.db_sync(self._engine_facade.get_engine(), path)
[docs] def clear(self): engine = self._engine_facade.get_engine() for table in reversed(models.Base.metadata.sorted_tables): engine.execute(table.delete()) self._engine_facade._session_maker.close_all() engine.dispose()
@staticmethod def _create_meter(conn, name, type, unit): # TODO(gordc): implement lru_cache to improve performance try: meter = models.Meter.__table__ trans = conn.begin_nested() if conn.dialect.name == 'sqlite': trans = conn.begin() with trans: meter_row = conn.execute( sa.select([meter.c.id]) .where(sa.and_(meter.c.name == name, meter.c.type == type, meter.c.unit == unit))).first() meter_id = meter_row[0] if meter_row else None if meter_id is None: result = conn.execute(meter.insert(), name=name, type=type, unit=unit) meter_id = result.inserted_primary_key[0] except dbexc.DBDuplicateEntry: # retry function to pick up duplicate committed object meter_id = Connection._create_meter(conn, name, type, unit) return meter_id @staticmethod def _create_resource(conn, res_id, user_id, project_id, source_id, rmeta): # TODO(gordc): implement lru_cache to improve performance try: res = models.Resource.__table__ m_hash = hashlib.md5(jsonutils.dumps(rmeta, sort_keys=True)).hexdigest() trans = conn.begin_nested() if conn.dialect.name == 'sqlite': trans = conn.begin() with trans: res_row = conn.execute( sa.select([res.c.internal_id]) .where(sa.and_(res.c.resource_id == res_id, res.c.user_id == user_id, res.c.project_id == project_id, res.c.source_id == source_id, res.c.metadata_hash == m_hash))).first() internal_id = res_row[0] if res_row else None if internal_id is None: result = conn.execute(res.insert(), resource_id=res_id, user_id=user_id, project_id=project_id, source_id=source_id, resource_metadata=rmeta, metadata_hash=m_hash) internal_id = result.inserted_primary_key[0] if rmeta and isinstance(rmeta, dict): meta_map = {} for key, v in utils.dict_to_keyval(rmeta): try: _model = sql_utils.META_TYPE_MAP[type(v)] if meta_map.get(_model) is None: meta_map[_model] = [] meta_map[_model].append( {'id': internal_id, 'meta_key': key, 'value': v}) except KeyError: LOG.warn(_("Unknown metadata type. Key (%s) " "will not be queryable."), key) for _model in meta_map.keys(): conn.execute(_model.__table__.insert(), meta_map[_model]) except dbexc.DBDuplicateEntry: # retry function to pick up duplicate committed object internal_id = Connection._create_resource( conn, res_id, user_id, project_id, source_id, rmeta) return internal_id
[docs] def record_metering_data(self, data): """Write the data to the backend storage system. :param data: a dictionary such as returned by ceilometer.meter.meter_message_from_counter """ engine = self._engine_facade.get_engine() with engine.begin() as conn: # Record the raw data for the sample. m_id = self._create_meter(conn, data['counter_name'], data['counter_type'], data['counter_unit']) res_id = self._create_resource(conn, data['resource_id'], data['user_id'], data['project_id'], data['source'], data['resource_metadata']) sample = models.Sample.__table__ conn.execute(sample.insert(), meter_id=m_id, resource_id=res_id, timestamp=data['timestamp'], volume=data['counter_volume'], message_signature=data['message_signature'], message_id=data['message_id'])
[docs] def clear_expired_metering_data(self, ttl): """Clear expired data from the backend storage system. Clearing occurs according to the time-to-live. :param ttl: Number of seconds to keep records for. """ session = self._engine_facade.get_session() with session.begin(): end = timeutils.utcnow() - datetime.timedelta(seconds=ttl) sample_q = (session.query(models.Sample) .filter(models.Sample.timestamp < end)) rows = sample_q.delete() LOG.info(_("%d samples removed from database"), rows) # remove Meter definitions with no matching samples (session.query(models.Meter) .filter(~models.Meter.samples.any()) .delete(synchronize_session=False)) # remove resources with no matching samples resource_q = (session.query(models.Resource.internal_id) .filter(~models.Resource.samples.any())) resource_subq = resource_q.subquery() # remove metadata of cleaned resources for table in [models.MetaText, models.MetaBigInt, models.MetaFloat, models.MetaBool]: (session.query(table) .filter(table.id.in_(resource_subq)) .delete(synchronize_session=False)) resource_q.delete(synchronize_session=False)
[docs] def get_resources(self, user=None, project=None, source=None, start_timestamp=None, start_timestamp_op=None, end_timestamp=None, end_timestamp_op=None, metaquery=None, resource=None, pagination=None): """Return an iterable of api_models.Resource instances :param user: Optional ID for user that owns the resource. :param project: Optional ID for project that owns the resource. :param source: Optional source filter. :param start_timestamp: Optional modified timestamp start range. :param start_timestamp_op: Optional start time operator, like gt, ge. :param end_timestamp: Optional modified timestamp end range. :param end_timestamp_op: Optional end time operator, like lt, le. :param metaquery: Optional dict with metadata to match on. :param resource: Optional resource filter. :param pagination: Optional pagination query. """ if pagination: raise ceilometer.NotImplementedError('Pagination not implemented') s_filter = storage.SampleFilter(user=user, project=project, source=source, start=start_timestamp, start_timestamp_op=start_timestamp_op, end=end_timestamp, end_timestamp_op=end_timestamp_op, metaquery=metaquery, resource=resource) session = self._engine_facade.get_session() # get list of resource_ids res_q = session.query(distinct(models.Resource.resource_id)).join( models.Sample, models.Sample.resource_id == models.Resource.internal_id) res_q = make_query_from_filter(session, res_q, s_filter, require_meter=False) for res_id in res_q.all(): # get latest Sample max_q = (session.query(models.Sample) .join(models.Resource, models.Resource.internal_id == models.Sample.resource_id) .filter(models.Resource.resource_id == res_id[0])) max_q = make_query_from_filter(session, max_q, s_filter, require_meter=False) max_q = max_q.order_by(models.Sample.timestamp.desc(), models.Sample.id.desc()).limit(1) # get the min timestamp value. min_q = (session.query(models.Sample.timestamp) .join(models.Resource, models.Resource.internal_id == models.Sample.resource_id) .filter(models.Resource.resource_id == res_id[0])) min_q = make_query_from_filter(session, min_q, s_filter, require_meter=False) min_q = min_q.order_by(models.Sample.timestamp.asc()).limit(1) sample = max_q.first() if sample: yield api_models.Resource( resource_id=sample.resource.resource_id, project_id=sample.resource.project_id, first_sample_timestamp=min_q.first().timestamp, last_sample_timestamp=sample.timestamp, source=sample.resource.source_id, user_id=sample.resource.user_id, metadata=sample.resource.resource_metadata )
[docs] def get_meters(self, user=None, project=None, resource=None, source=None, metaquery=None, pagination=None): """Return an iterable of api_models.Meter instances :param user: Optional ID for user that owns the resource. :param project: Optional ID for project that owns the resource. :param resource: Optional ID of the resource. :param source: Optional source filter. :param metaquery: Optional dict with metadata to match on. :param pagination: Optional pagination query. """ if pagination: raise ceilometer.NotImplementedError('Pagination not implemented') s_filter = storage.SampleFilter(user=user, project=project, source=source, metaquery=metaquery, resource=resource) # NOTE(gordc): get latest sample of each meter/resource. we do not # filter here as we want to filter only on latest record. session = self._engine_facade.get_session() subq = session.query(func.max(models.Sample.id).label('id')).join( models.Resource, models.Resource.internal_id == models.Sample.resource_id).group_by( models.Sample.meter_id, models.Resource.resource_id) if resource: subq = subq.filter(models.Resource.resource_id == resource) subq = subq.subquery() # get meter details for samples. query_sample = (session.query(models.Sample.meter_id, models.Meter.name, models.Meter.type, models.Meter.unit, models.Resource.resource_id, models.Resource.project_id, models.Resource.source_id, models.Resource.user_id).join( subq, subq.c.id == models.Sample.id) .join(models.Meter, models.Meter.id == models.Sample.meter_id) .join(models.Resource, models.Resource.internal_id == models.Sample.resource_id)) query_sample = make_query_from_filter(session, query_sample, s_filter, require_meter=False) for row in query_sample.all(): yield api_models.Meter( name=row.name, type=row.type, unit=row.unit, resource_id=row.resource_id, project_id=row.project_id, source=row.source_id, user_id=row.user_id)
def _retrieve_samples(self, query): samples = query.all() for s in samples: # Remove the id generated by the database when # the sample was inserted. It is an implementation # detail that should not leak outside of the driver. yield api_models.Sample( source=s.source_id, counter_name=s.counter_name, counter_type=s.counter_type, counter_unit=s.counter_unit, counter_volume=s.counter_volume, user_id=s.user_id, project_id=s.project_id, resource_id=s.resource_id, timestamp=s.timestamp, recorded_at=s.recorded_at, resource_metadata=s.resource_metadata, message_id=s.message_id, message_signature=s.message_signature, )
[docs] def get_samples(self, sample_filter, limit=None): """Return an iterable of api_models.Samples. :param sample_filter: Filter. :param limit: Maximum number of results to return. """ if limit == 0: return [] session = self._engine_facade.get_session() query = session.query(models.Sample.timestamp, models.Sample.recorded_at, models.Sample.message_id, models.Sample.message_signature, models.Sample.volume.label('counter_volume'), models.Meter.name.label('counter_name'), models.Meter.type.label('counter_type'), models.Meter.unit.label('counter_unit'), models.Resource.source_id, models.Resource.user_id, models.Resource.project_id, models.Resource.resource_metadata, models.Resource.resource_id).join( models.Meter, models.Meter.id == models.Sample.meter_id).join( models.Resource, models.Resource.internal_id == models.Sample.resource_id).order_by( models.Sample.timestamp.desc()) query = make_query_from_filter(session, query, sample_filter, require_meter=False) if limit: query = query.limit(limit) return self._retrieve_samples(query)
[docs] def query_samples(self, filter_expr=None, orderby=None, limit=None): if limit == 0: return [] session = self._engine_facade.get_session() query = session.query(models.FullSample) transformer = sql_utils.QueryTransformer(models.FullSample, query) if filter_expr is not None: transformer.apply_filter(filter_expr) transformer.apply_options(orderby, limit) return self._retrieve_samples(transformer.get_query())
@staticmethod def _get_aggregate_functions(aggregate): if not aggregate: return [f for f in STANDARD_AGGREGATES.values()] functions = [] for a in aggregate: if a.func in STANDARD_AGGREGATES: functions.append(STANDARD_AGGREGATES[a.func]) elif a.func in UNPARAMETERIZED_AGGREGATES: functions.append(UNPARAMETERIZED_AGGREGATES[a.func]) elif a.func in PARAMETERIZED_AGGREGATES['compute']: validate = PARAMETERIZED_AGGREGATES['validate'].get(a.func) if not (validate and validate(a.param)): raise storage.StorageBadAggregate('Bad aggregate: %s.%s' % (a.func, a.param)) compute = PARAMETERIZED_AGGREGATES['compute'][a.func] functions.append(compute(a.param)) else: raise ceilometer.NotImplementedError( 'Selectable aggregate function %s' ' is not supported' % a.func) return functions def _make_stats_query(self, sample_filter, groupby, aggregate): select = [ func.min(models.Sample.timestamp).label('tsmin'), func.max(models.Sample.timestamp).label('tsmax'), models.Meter.unit ] select.extend(self._get_aggregate_functions(aggregate)) session = self._engine_facade.get_session() if groupby: group_attributes = [getattr(models.Resource, g) for g in groupby] select.extend(group_attributes) query = (session.query(*select) .join(models.Meter, models.Meter.id == models.Sample.meter_id) .join( models.Resource, models.Resource.internal_id == models.Sample.resource_id) .group_by(models.Meter.unit)) if groupby: query = query.group_by(*group_attributes) return make_query_from_filter(session, query, sample_filter) @staticmethod def _stats_result_aggregates(result, aggregate): stats_args = {} if isinstance(result.count, (int, long)): stats_args['count'] = result.count for attr in ['min', 'max', 'sum', 'avg']: if hasattr(result, attr): stats_args[attr] = getattr(result, attr) if aggregate: stats_args['aggregate'] = {} for a in aggregate: key = '%s%s' % (a.func, '/%s' % a.param if a.param else '') stats_args['aggregate'][key] = getattr(result, key) return stats_args @staticmethod def _stats_result_to_model(result, period, period_start, period_end, groupby, aggregate): stats_args = Connection._stats_result_aggregates(result, aggregate) stats_args['unit'] = result.unit duration = (timeutils.delta_seconds(result.tsmin, result.tsmax) if result.tsmin is not None and result.tsmax is not None else None) stats_args['duration'] = duration stats_args['duration_start'] = result.tsmin stats_args['duration_end'] = result.tsmax stats_args['period'] = period stats_args['period_start'] = period_start stats_args['period_end'] = period_end stats_args['groupby'] = (dict( (g, getattr(result, g)) for g in groupby) if groupby else None) return api_models.Statistics(**stats_args)
[docs] def get_meter_statistics(self, sample_filter, period=None, groupby=None, aggregate=None): """Return an iterable of api_models.Statistics instances. Items are containing meter statistics described by the query parameters. The filter must have a meter value set. """ if groupby: for group in groupby: if group not in ['user_id', 'project_id', 'resource_id']: raise ceilometer.NotImplementedError('Unable to group by ' 'these fields') if not period: for res in self._make_stats_query(sample_filter, groupby, aggregate): if res.count: yield self._stats_result_to_model(res, 0, res.tsmin, res.tsmax, groupby, aggregate) return if not sample_filter.start or not sample_filter.end: res = self._make_stats_query(sample_filter, None, aggregate).first() if not res: # NOTE(liusheng):The 'res' may be NoneType, because no # sample has found with sample filter(s). return query = self._make_stats_query(sample_filter, groupby, aggregate) # HACK(jd) This is an awful method to compute stats by period, but # since we're trying to be SQL agnostic we have to write portable # code, so here it is, admire! We're going to do one request to get # stats by period. We would like to use GROUP BY, but there's no # portable way to manipulate timestamp in SQL, so we can't. for period_start, period_end in base.iter_period( sample_filter.start or res.tsmin, sample_filter.end or res.tsmax, period): q = query.filter(models.Sample.timestamp >= period_start) q = q.filter(models.Sample.timestamp < period_end) for r in q.all(): if r.count: yield self._stats_result_to_model( result=r, period=int(timeutils.delta_seconds(period_start, period_end)), period_start=period_start, period_end=period_end, groupby=groupby, aggregate=aggregate )
def _get_or_create_trait_type(self, trait_type, data_type, session=None): """Find if this trait already exists in the database. If it does not, create a new entry in the trait type table. """ if session is None: session = self._engine_facade.get_session() with session.begin(subtransactions=True): tt = session.query(models.TraitType).filter( models.TraitType.desc == trait_type, models.TraitType.data_type == data_type).first() if not tt: tt = models.TraitType(trait_type, data_type) session.add(tt) return tt def _make_trait(self, trait_model, event, session=None): """Make a new Trait from a Trait model. Doesn't flush or add to session. """ trait_type = self._get_or_create_trait_type(trait_model.name, trait_model.dtype, session) value_map = models.Trait._value_map values = {'t_string': None, 't_float': None, 't_int': None, 't_datetime': None} value = trait_model.value values[value_map[trait_model.dtype]] = value return models.Trait(trait_type, event, **values) def _get_or_create_event_type(self, event_type, session=None): """Check if an event type with the supplied name is already exists. If not, we create it and return the record. This may result in a flush. """ if session is None: session = self._engine_facade.get_session() with session.begin(subtransactions=True): et = session.query(models.EventType).filter( models.EventType.desc == event_type).first() if not et: et = models.EventType(event_type) session.add(et) return et def _record_event(self, session, event_model): """Store a single Event, including related Traits.""" with session.begin(subtransactions=True): event_type = self._get_or_create_event_type(event_model.event_type, session=session) event = models.Event(event_model.message_id, event_type, event_model.generated) session.add(event) new_traits = [] if event_model.traits: for trait in event_model.traits: t = self._make_trait(trait, event, session=session) session.add(t) new_traits.append(t) # Note: we don't flush here, explicitly (unless a new trait or event # does it). Otherwise, just wait until all the Events are staged. return event, new_traits
[docs] def record_events(self, event_models): """Write the events to SQL database via sqlalchemy. :param event_models: a list of model.Event objects. Returns a list of events that could not be saved in a (reason, event) tuple. Reasons are enumerated in storage.model.Event Flush when they're all added, unless new EventTypes or TraitTypes are added along the way. """ session = self._engine_facade.get_session() events = [] problem_events = [] for event_model in event_models: event = None try: with session.begin(): event = self._record_event(session, event_model) except dbexc.DBDuplicateEntry as e: LOG.exception(_("Failed to record duplicated event: %s") % e) problem_events.append((api_models.Event.DUPLICATE, event_model)) except Exception as e: LOG.exception(_('Failed to record event: %s') % e) problem_events.append((api_models.Event.UNKNOWN_PROBLEM, event_model)) events.append(event) return problem_events
[docs] def get_events(self, event_filter): """Return an iterable of model.Event objects. :param event_filter: EventFilter instance """ start = event_filter.start_time end = event_filter.end_time session = self._engine_facade.get_session() LOG.debug(_("Getting events that match filter: %s") % event_filter) with session.begin(): event_query = session.query(models.Event) # Build up the join conditions event_join_conditions = [models.EventType.id == models.Event.event_type_id] if event_filter.event_type: event_join_conditions.append(models.EventType.desc == event_filter.event_type) event_query = event_query.join(models.EventType, and_(*event_join_conditions)) # Build up the where conditions event_filter_conditions = [] if event_filter.message_id: event_filter_conditions.append(models.Event.message_id == event_filter.message_id) if start: event_filter_conditions.append(models.Event.generated >= start) if end: event_filter_conditions.append(models.Event.generated <= end) if event_filter_conditions: event_query = (event_query. filter(and_(*event_filter_conditions))) event_models_dict = {} if event_filter.traits_filter: for trait_filter in event_filter.traits_filter: # Build a sub query that joins Trait to TraitType # where the trait name matches trait_name = trait_filter.pop('key') op = trait_filter.pop('op', 'eq') conditions = [models.Trait.trait_type_id == models.TraitType.id, models.TraitType.desc == trait_name] for key, value in six.iteritems(trait_filter): sql_utils.trait_op_condition(conditions, key, value, op) trait_query = (session.query(models.Trait.event_id). join(models.TraitType, and_(*conditions)).subquery()) event_query = (event_query. join(trait_query, models.Event.id == trait_query.c.event_id)) else: # If there are no trait filters, grab the events from the db query = (session.query(models.Event.id, models.Event.generated, models.Event.message_id, models.EventType.desc). join(models.EventType, and_(*event_join_conditions))) if event_filter_conditions: query = query.filter(and_(*event_filter_conditions)) for (id_, generated, message_id, desc_) in query.all(): event_models_dict[id_] = api_models.Event(message_id, desc_, generated, []) # Build event models for the events event_query = event_query.subquery() query = (session.query(models.Trait). join(models.TraitType, models.Trait.trait_type_id == models.TraitType.id). join(event_query, models.Trait.event_id == event_query.c.id)) # Now convert the sqlalchemy objects back into Models ... for trait in query.all(): event = event_models_dict.get(trait.event_id) if not event: event = api_models.Event( trait.event.message_id, trait.event.event_type.desc, trait.event.generated, []) event_models_dict[trait.event_id] = event trait_model = api_models.Trait(trait.trait_type.desc, trait.trait_type.data_type, trait.get_value()) event.append_trait(trait_model) event_models = event_models_dict.values() return sorted(event_models, key=operator.attrgetter('generated'))
[docs] def get_event_types(self): """Return all event types as an iterable of strings.""" session = self._engine_facade.get_session() with session.begin(): query = (session.query(models.EventType.desc). order_by(models.EventType.desc)) for name in query.all(): # The query returns a tuple with one element. yield name[0]
[docs] def get_trait_types(self, event_type): """Return a dictionary containing the name and data type of the trait. Only trait types for the provided event_type are returned. :param event_type: the type of the Event """ session = self._engine_facade.get_session() LOG.debug(_("Get traits for %s") % event_type) with session.begin(): query = (session.query(models.TraitType.desc, models.TraitType.data_type) .join(models.Trait, models.Trait.trait_type_id == models.TraitType.id) .join(models.Event, models.Event.id == models.Trait.event_id) .join(models.EventType, and_(models.EventType.id == models.Event.id, models.EventType.desc == event_type)) .group_by(models.TraitType.desc, models.TraitType.data_type) .distinct()) for desc_, dtype in query.all(): yield {'name': desc_, 'data_type': dtype}
[docs] def get_traits(self, event_type, trait_type=None): """Return all trait instances associated with an event_type. If trait_type is specified, only return instances of that trait type. :param event_type: the type of the Event to filter by :param trait_type: the name of the Trait to filter by """ session = self._engine_facade.get_session() with session.begin(): trait_type_filters = [models.TraitType.id == models.Trait.trait_type_id] if trait_type: trait_type_filters.append(models.TraitType.desc == trait_type) query = (session.query(models.Trait) .join(models.TraitType, and_(*trait_type_filters)) .join(models.Event, models.Event.id == models.Trait.event_id) .join(models.EventType, and_(models.EventType.id == models.Event.event_type_id, models.EventType.desc == event_type))) for trait in query.all(): type = trait.trait_type yield api_models.Trait(name=type.desc, dtype=type.data_type, value=trait.get_value())