import logging
import re
from datetime import datetime
import pypuppetdb
from pypuppetdb.QueryBuilder import EqualsOperator
from pypuppetdb.api.base import BaseAPI
from pypuppetdb.errors import APIError
from pypuppetdb.types import Node, Report
log = logging.getLogger(__name__)
[docs]class PqlAPI(BaseAPI):
"""This class provides methods that interact with the `pdb/query/v4`
PuppetDB API endpoint.
"""
[docs] def _pql(self, pql, request_method="GET"):
"""This method prepares a PQL query to PuppetDB. Actual making
the HTTP request is done by _make_request().
:param pql: PQL query
:type pql: :obj:`string`
:param request_method: (optional) GET or POST, the default is GET
:raises: :class:`~pypuppetdb.errors.EmptyResponseError`
:returns: The decoded response from PuppetDB
:rtype: :obj:`dict` or :obj:`list`
"""
log.debug(f"_pql called with pql={pql}, request_method={request_method}")
pql = pql.strip()
if not pql:
log.error("Non-empty PQL query is required!")
raise APIError
payload = {}
# PQL queries are made to the same endpoint regardless of the queried entities
url = self._url("pql")
payload["query"] = pql
return self._make_request(url, request_method, payload)
# TODO: deduplicate this - see QueryAPI.nodes()
[docs] def pql(self, pql, with_status=False, unreported=2, with_event_numbers=True):
"""Makes a PQL (Puppet Query Language) and tries to cast results
to a rich type. If it won't work, returns plain dicts.
:param pql: PQL query
:type pql: :obj:`string`
:param with_status: (optional, only for queries for nodes) include
the node status in the returned nodes
:type with_status: :bool:
:param unreported: (optional, only for queries for nodes) amount
of hours when a node gets marked as unreported
:type unreported: :obj:`None` or integer
:param with_event_numbers: (optional, only for queries for nodes)
include the exact number of
changed/unchanged/failed/noop events when
with_status is set to True. If set to False
only "some" string is provided if there are
resources with such status in the last report.
This provides performance benefits as potentially
slow event-counts query is omitted completely.
:type with_event_numbers: :bool:
:returns: A generator yielding elements of a rich type or plain dicts
"""
type_class = self._get_type_from_query(pql)
if type_class == Node and (
with_status or unreported != 2 or not with_event_numbers
):
log.error(
"with_status, unreported and with_event_numbers are used only"
" for queries for nodes!"
)
raise APIError
for element in self._pql(pql=pql):
if type_class == Node:
# TODO: deduplicate this - see QueryAPI.nodes()
now = datetime.utcnow()
latest_events = None
if with_status and with_event_numbers:
latest_events = self._query(
"event-counts",
query=EqualsOperator("latest_report?", True),
summarize_by="certname",
)
yield Node.create_from_dict(
self,
element,
with_status,
with_event_numbers,
latest_events,
now,
unreported,
)
elif type_class == Report:
yield Report.create_from_dict(self, element)
elif type_class:
yield type_class.create_from_dict(element)
else:
yield element
[docs] @staticmethod
def _get_type_from_query(pql):
"""Gets a rich type of the entities returned by the given
PQL query.
:param pql: PQL query
:type pql: :obj:`string`
:return: a rich type, if this library supports it
otherwise - None
"""
pql = pql.strip()
if not pql:
log.error("Non-empty PQL query is required!")
raise APIError
# in PQL the beginning of the query is the type of returned entities
# but only if the projection is empty ([]) or there is no projection
pattern = re.compile(r"([a-z]*?)\s*(\[])?\s*{")
match = pattern.match(pql)
if match:
type_name_lowercase = match.group(1)
# class name is capitalized
type_name = type_name_lowercase.capitalize()
# depluralize - remove trailing "s"
if type_name.endswith("s"):
type_name_singular = type_name[:-1]
else:
type_name_singular = type_name
log.debug(f"Type name: {type_name_singular}")
try:
type_class = getattr(pypuppetdb.types, type_name_singular)
return type_class
except AttributeError:
log.debug(
f"PQL returns entities of a type {type_name_singular},"
f" but it is not supported by this library yet."
)
return None
else:
log.debug("No match!")
return None