"""
Module containing abstract base for PyCurator collector classes.
"""
import itertools
import json
import os
import queue
import sys
import time
from abc import ABC, abstractmethod
from collections.abc import (
Collection,
Callable,
Generator,
Iterable,
)
from typing import Any, AnyStr, NoReturn, Optional, ParamSpec, TypeVar, Union
import pandas as pd
import requests
from .utils.saving import save_results
from .utils.validating import is_all_type, validate_from_arguments
from pycurator._typing import (
JSONDict,
SearchTerm,
SearchType,
SearchTuple,
TermResultDict,
TermTypeResultDict,
QueryResultDict,
)
T = TypeVar("T")
P = ParamSpec("P")
class BaseCollector(ABC):
"""Generic abstract base for data-collection classes.
Parameters
----------
repository_name : str
Name of the repository being collected from. Used for providing
updates to user, loading credentials, and saving output results.
Attributes
----------
continue_running : bool
Flag when the collector completes a run. Used for pushing
updates from the object's status_queue.
current_query_ref : str or None (default=None)
Representation of the current state of the collector run.
num_queries : int or bool or None (default=None)
Number of queries for a given run.
If there is no fixed number, such as for paginated queries, the
variable is True.
queries_completed : int or None (default=None)
status_queue : queue.Queue of str
FIFO collection of the object's status.
See Also
--------
queue.Queue : Queue data structure
BaseAPICollector : Derived Class for API queries.
"""
def __init__(self, repository_name: str) -> None:
self.repository_name = repository_name
self.continue_running = True
self.status_queue = queue.Queue()
self.num_queries = None
self.queries_completed = None
self.current_query_ref = None
def track_determinate_progress(
self, coll: Collection[T]
) -> Generator[T, None, None]:
"""Generator for iterating data and updating progress bar.
Parameters
----------
coll : iterable
Yields
------
next object of coll
Raises
------
TypeError
coll parameter is not iterable.
"""
if not hasattr(coll, "__iter__"):
raise TypeError('Parameter "coll" must be iterable.')
# Initialize tracking vars
if not self.num_queries:
self.num_queries = len(coll)
self.queries_completed = 0
# Yield next item in coll and update tracking vars
for item in coll:
self.current_query_ref = str(item)
yield item
self.queries_completed += 1
# Reset tracking vars
self.num_queries = None
self.queries_completed = None
self.current_query_ref = None
def _save_results(
self, save_dir: str, final_dict: QueryResultDict, output_format: str
) -> None:
"""Helper function for saving results and reporting status to UI."""
self.status_queue.put(f'Saving output to "{save_dir}".')
save_results(results=final_dict, data_dir=save_dir, output_format=output_format)
self.status_queue.put("Save complete.")
@abstractmethod
def run(self) -> NoReturn:
"""Abstract placeholder method for collector run."""
raise NotImplementedError('Subclass must override "run()".')
@staticmethod
def track_indeterminate_progress(
indeterminate_query_func: Callable[P, T]
) -> Callable[P, T]:
"""Progress bar wrapper for indeterminate-length queries."""
def update_pb(self, *args: P.args, **kwargs: P.kwargs) -> Iterable[Any]:
self.num_queries = True
results = indeterminate_query_func(self, *args, **kwargs)
self.num_queries = False
return results
return update_pb
def request_execution(self) -> None:
"""Raise flag to stop output."""
self.continue_running = False
def terminate(self) -> NoReturn:
"""Handle program execution."""
self.status_queue.put("Requesting program termination.")
sys.exit()
def _print_progress(self, page: str) -> None:
"""Update queue with current page being searched."""
self.status_queue.put(f"Searching page {page}")
def _update_query_ref(self, **kwargs: Any) -> None:
"""Combine keywords and update base.current_query_ref."""
self.current_query_ref = kwargs
[docs]class BaseAPICollector(BaseCollector):
"""Base for collection classes utilizing external API.
This base inherits from BaseCollector, which provides general
parameters for tracking collection progress.
Parameters
----------
repository_name : str
Name of the repository being collected from. Used for providing
updates to user, loading credentials, and saving output results.
credentials : str, optional (default=None)
JSON filepath containing credentials in form
{repository_name}: {key}.
Attributes
----------
credentials : str
Refer to credentials parameter.
See Also
--------
BaseTermCollector
BaseTermTypeCollector
BaseTypeCollector
"""
[docs] def __init__(self, repository_name: str, credentials: Optional[str] = None) -> None:
super().__init__(repository_name=repository_name)
if credentials:
self.credentials = self.load_credentials(credential_filepath=credentials)
def load_credentials(self, credential_filepath: str) -> Union[str, None]:
"""Load the credential file from the given filepath.
Parameters
----------
credential_filepath : str or path-like object
Returns
-------
credentials : str or None
Raises
------
ValueError
If credential_filepath is not of type str.
FileNotFoundError
Credentials file does not exist.
See Also
--------
os.path : Module for functions on pathnames.
"""
if not isinstance(credential_filepath, str):
raise TypeError(
(
f"Credential value must be of type str, "
f"not '{type(credential_filepath)}'."
)
)
# Try to load credentials from file
if os.path.exists(credential_filepath):
with open(credential_filepath) as credential_file:
credential_data = json.load(credential_file)
credentials = credential_data.get(self.repository_name)
if not credentials:
self.status_queue.put(
"No credentials found, attempting unauthorized run."
)
return credentials
else:
raise FileNotFoundError(f"{credential_filepath} does not exist.")
@staticmethod
def _all_empty(data_dict: QueryResultDict) -> bool:
"""Check if all DataFrames are empty.
Parameters
----------
data_dict : dict from search params to pandas.DataFrame.
Returns
-------
all_empty : bool
"""
return all(df is None or df.empty for df in data_dict.values())
def run(self, **kwargs: Any) -> None:
"""Queries all data from the implemented API.
Parameters
----------
**kwargs : dict, optional
Can temporarily overwrite base attributes.
Allows users to specify variable save parameters.
Notes
-----
In the following order, this function calls:
get_all_search_outputs
get_all_metadata (if applicable)
merge_search_and_metadata_dicts (if applicable)
"""
self.status_queue.put(f"Running {self.repository_name.title()}...")
# Set save parameters
save_dir = kwargs.pop("save_dir", None)
save_type = kwargs.pop("save_type", None)
try:
# Get search_output
search_dict = self.get_all_search_outputs(**kwargs)
# Set merge parameters
merge_kwargs = {}
try:
merge_kwargs["on"] = getattr(self, "merge_on")
except AttributeError:
try:
merge_kwargs["left_on"] = getattr(self, "merge_left_on")
merge_kwargs["right_on"] = getattr(self, "merge_right_on")
except AttributeError:
raise NotImplementedError(
f"Class '{self.__name__}' must include 'merge_on' or both 'merge_left_on' and 'merge_right_on'"
)
# Try to get metadata (if available)
try:
metadata_dict = self.get_all_metadata(search_dict=search_dict)
merged_dict = self.merge_search_and_metadata_dicts(
search_dict=search_dict,
metadata_dict=metadata_dict,
**merge_kwargs
)
final_dict = merged_dict
except TypeError:
final_dict = search_dict
except Exception as unexpected_error:
self.status_queue.put(
f"An unexpected error has occurred: \n{unexpected_error}"
)
self.continue_running = False
return
# Handle saving if output exists
if not self._all_empty(final_dict):
if save_dir and save_type:
self._save_results(
save_dir=save_dir, final_dict=final_dict, output_format=save_type
)
else:
self.status_queue.put("No results found, nothing to save.")
self.status_queue.put(f"{self.repository_name.title()} run complete.")
self.continue_running = False
def get_all_search_outputs(self, **kwargs: Any) -> NoReturn:
"""Abstract placeholder method for returning search outputs."""
raise NotImplementedError('Subclass must override "get_all_search_outputs()".')
def get_all_metadata(self, search_dict: QueryResultDict) -> NoReturn:
"""Abstract placeholder method for returning metadata."""
raise NotImplementedError('Subclass must override "get_all_metadata()".')
def get_request_output_and_update_query_ref(
self,
url: AnyStr,
params: Optional[Any] = None,
headers: Optional[Any] = None,
**ref_kwargs: Any,
) -> tuple[requests.Response, JSONDict]:
"""Return request output and update base.current_query_ref.
Parameters
----------
url : str
params : dict, optional (default=None)
headers : dict, optional (default=None)
**ref_kwargs : dict, optional
Returns
-------
base.get_request_output(url, params, headers)
See Also
--------
_update_query_ref
get_request_output
"""
self._update_query_ref(**ref_kwargs)
return self.get_request_output(url=url, params=params, headers=headers)
def get_request_output(
self, url: AnyStr, params: Optional[Any] = None, headers: Optional[Any] = None
) -> tuple[requests.Response, JSONDict]:
"""Return Response and JSON from requests.get().
Parameters
----------
url : str
params : dict or list of tuples or bytes, optional
(default=None)
Dictionary, list of types or bytes to send in the query
string for the Request.
headers : dict, optional (default=None)
Dictionary of headers to send with the Request.
Returns
-------
r : requests.Response
output : dict
JSON-encoded content of a response.
Raises
------
RuntimeError
Occurs when a query results in an un-parsable response.
Outputs the parameters provided to the query along with the
response status code for further troubleshooting.
See Also
--------
requests.get : Sends a GET request.
"""
# If user has requested termination, handle cleanup instead of querying
# additional results
if not self.continue_running:
self.terminate()
request_obj = requests.get(url=url, params=params, headers=headers)
try:
output = request_obj.json()
except json.JSONDecodeError as invalid_json:
# 429: Rate limiting (wait and then try the request again)
if request_obj.status_code == 429:
self.status_queue.put("Rate limit hit, waiting for request...")
# Wait until we can make another request
reset_time = int(request_obj.headers["RateLimit-Reset"])
current_time = int(time.time())
time.sleep(reset_time - current_time)
request_obj, output = self.get_request_output(
url=url, params=params, headers=headers
)
else:
raise RuntimeError(
(
f"Query to {url} with {params} params and {headers}"
f" headers fails unexpectedly with status"
f" code {request_obj.status_code} and full output {vars(request_obj)}"
)
) from invalid_json
return request_obj, output
def merge_search_and_metadata_dicts(
self,
search_dict: QueryResultDict,
metadata_dict: dict,
**kwargs: Any,
) -> QueryResultDict:
"""Merges together search and metadata DataFrames by 'on' key.
For multiple DataFrames containing similar search references,
combines into one DataFrame. Search and Metadata DataFrames are
merged across their respective dictionaries via common keys.
For Search DataFrames with no matching Metadata, the Search
DataFrame is added as-is.
Parameters
----------
search_dict : dict of pandas.DataFrame
Dictionary of search output results.
metadata_dict : dict of pandas.DataFrame
Dictionary of metadata results.
**kwargs : dict, optional
Additional keyword arguments to pass to merge.
Returns
-------
df_dict : dict of pandas.DataFrame
Dict containing all the merged search/metadata DataFrames
or singleton search DataFrames.
Raises
------
TypeError
search_dict or metadata_dict are not instances of dict.
ValueError
search_dict or metadata_dict contain entries that are not of
type pandas.DataFrame.
See Also
--------
pandas.merge
"""
if not isinstance(search_dict, dict):
raise TypeError(
f"search_dict must be of type dict, not '{type(search_dict)}'."
)
if not isinstance(metadata_dict, dict):
raise TypeError(
f"metadata_dict must be of type dict, not '{type(metadata_dict)}'."
)
if not is_all_type(search_dict.values(), (pd.DataFrame, None)):
raise ValueError(
"All search_dict entries must be of type pandas.DataFrame."
)
if not is_all_type(metadata_dict.values(), (pd.DataFrame, None)):
raise ValueError(
"All metadata_dict entries must be of type pandas.DataFrame."
)
df_dict = {}
for query_key in search_dict.keys():
search_df = search_dict[query_key]
# If the search DataFrame has matching metadata, merge
if query_key in metadata_dict:
metadata_df = metadata_dict[query_key]
df_all = pd.merge(
left=search_df,
right=metadata_df,
how="outer",
suffixes=("_search", "_metadata"),
**kwargs,
)
# If no metadata, just add the search_df
else:
df_all = search_df
df_dict[query_key] = df_all
return df_dict
class TermQueryMixin:
"""Mixin for API collection classes that utilize search terms.
See Also
--------
BaseTermCollector
BaseTermTypeCollector
"""
_search_terms: Collection[SearchTerm]
@property
def search_terms(self) -> Collection[SearchTerm]:
"""Property method for search terms."""
return self._search_terms
@staticmethod
def validate_search_term(func: Callable[P, T]) -> Callable[P, T]:
"""Decorator for validating search term object type."""
def inner(self, *args, **kwargs):
args, kwargs = validate_from_arguments(
validator=self._validate,
func=func,
args=args,
kwargs=kwargs,
param="search_term",
)
return func(self, *args, **kwargs)
return inner
@search_terms.setter
def search_terms(self, search_terms: Collection[SearchTerm]) -> None:
if isinstance(search_terms, str):
search_terms = [search_terms]
if not is_all_type(search_terms, str):
raise TypeError("All search terms must be of type str.")
self._search_terms = search_terms
@staticmethod
def _validate(search_term: SearchTerm) -> SearchTerm:
"""Validate type of search_term."""
if not isinstance(search_term, str):
raise TypeError(
"search_term must be of type str, not" f" '{type(search_term)}'."
)
return search_term
[docs]class BaseTermCollector(TermQueryMixin, BaseAPICollector):
"""Base for API collection classes that utilize search terms.
This base inherits from BaseAPICollector, which provides credential
info, as well as general parameters for tracking collection
progress, inherited from BaseCollector.
Parameters
----------
repository_name : str
Name of the repository being collected from. Used for providing
updates to user, loading credentials, and saving output results.
search_terms : list-like, optional (default=None)
Terms to search over. Can be (re)set via set_search_terms() or
passed in directly to search functions to override set parameter.
credentials : str, optional (default=None)
JSON filepath containing credentials in form
{repository_name}: {key}.
Attributes
----------
search_terms : list of str
See Also
--------
pycurator.collectors.term_collectors
"""
[docs] def __init__(
self,
repository_name: str,
search_terms: Optional[Collection[SearchTerm]] = None,
credentials: Optional[str] = None,
) -> None:
super().__init__(repository_name=repository_name, credentials=credentials)
self.search_terms = search_terms
def get_all_search_outputs(self, **kwargs: Any) -> TermResultDict:
"""Queries the API for each search term.
Parameters
----------
**kwargs : dict, optional
Can temporarily overwrite self search_terms.
Returns
-------
search_dict : dict of pandas.DataFrame
Stores the results of each call to
get_individual_search_output in the form
search_dict[{search_term}] = df.
"""
# Set method variables if different than default values
search_terms = kwargs.get("search_terms", self.search_terms)
search_dict = {}
for search_term in search_terms:
self.status_queue.put(f"Searching {search_term}.")
search_dict[search_term] = self.get_individual_search_output(
search_term=search_term
)
self.status_queue.put("Search completed.")
return search_dict
@abstractmethod
def get_individual_search_output(self, search_term: SearchTerm) -> None:
"""Abstract placeholder method for search output."""
return
def _get_metadata_from_paths(
self, object_path_dict: dict[SearchTerm, pd.DataFrame]
) -> TermResultDict:
"""Retrieves all metadata related to the provided DataFrames.
Parameters
----------
object_path_dict : dict
Dictionary of the form {query: object_paths} for path lists.
Returns
-------
metadata_dict : dict of {SearchTerm: pd.DataFrame}
"""
metadata_dict = {}
for query, object_paths in object_path_dict.items():
self.status_queue.put(f"Querying {query} metadata.")
metadata_dict[query] = self.get_query_metadata(object_paths=object_paths)
self.status_queue.put("Metadata query complete.")
return metadata_dict
def get_query_metadata(self, object_paths: Iterable[Any]) -> NoReturn:
"""Placeholder method for query metadata retrieval."""
raise NotImplementedError('Subclass must override "get_query_metadata()".')
class TypeQueryMixin:
"""Mixin for API collection classes that utilize search types.
See Also
--------
BaseTermTypeCollector
BaseTypeCollector
"""
_search_types: tuple[SearchType, ...]
search_type_options: tuple[SearchType, ...] = None
@property
def search_types(self) -> tuple[SearchType, ...]:
"""Getter for search_types."""
return self._search_types
@search_types.setter
def search_types(self, search_types: tuple[SearchType, ...]) -> None:
"""Set search_types if all are allowed by current Collector."""
if not all(
search_type in self.search_type_options for search_type in search_types
):
raise ValueError(f"Only {self.search_type_options} search types are valid.")
self._search_types = search_types
@staticmethod
def validate_search_type(func: Callable[P, T]) -> Callable[P, T]:
"""Decorator for validating search term object type."""
def inner(self, *args, **kwargs):
args, kwargs = validate_from_arguments(
validator=self._validate,
func=func,
args=args,
kwargs=kwargs,
param="search_type",
)
return func(self, *args, **kwargs)
return inner
def _validate(self, search_type: SearchType) -> SearchType:
if search_type not in self.search_type_options:
raise ValueError(f"Can only search by {self.search_type_options}.")
return search_type
[docs]class BaseTermTypeCollector(TermQueryMixin, TypeQueryMixin, BaseAPICollector):
"""Base for API collection classes that utilize search terms and types.
This base inherits from BaseAPICollector, which provides credential
info, as well as general parameters for tracking collection
progress, inherited from BaseCollector.
Parameters
----------
repository_name : str
Name of the repository being collected from. Used for providing
updates to user, loading credentials, and saving output results.
search_terms : list-like, optional (default=None)
Terms to search over. Can be (re)set via set_search_terms() or
passed in directly to search functions to override set parameter.
search_types : list-like, optional (default=None)
Data types to search over. Can be (re)set via set_search_types()
or passed in directly to search functions to override set
parameter.
credentials : str, optional (default=None)
JSON filepath containing credentials in form
{repository_name}: {key}.
Attributes
----------
search_terms : list of str
search_types : list of str
See Also
--------
pycurator.collectors.term_type_collectors
"""
[docs] def __init__(
self,
repository_name: str,
search_terms: Optional[Collection[SearchTerm]] = None,
search_types: Optional[Collection[SearchType]] = None,
credentials: Optional[str] = None,
) -> None:
super().__init__(repository_name=repository_name, credentials=credentials)
self.search_terms = search_terms
self.search_types = search_types
@staticmethod
def validate_term_and_type(func: Callable[P, T]) -> Callable[P, T]:
"""Helper for wrapping function in both term/type validators."""
@BaseTermTypeCollector.validate_search_term
@BaseTermTypeCollector.validate_search_type
def inner(self, *args, **kwargs):
return func(self, *args, **kwargs)
return inner
def get_all_search_outputs(self, **kwargs: Any) -> TermTypeResultDict:
"""Queries the API for each search term/type combination.
Parameters
----------
**kwargs : dict, optional
Can temporarily overwrite self search_terms and
search_types.
Returns
-------
search_dict : dict of pandas.DataFrame
Stores the results of each call to
get_individual_search_output in the form
search_dict[(search_term, search_type)] = df.
"""
# Set method variables if different than default values.
search_terms = kwargs.get("search_terms", self.search_terms)
search_types = kwargs.get("search_types", self.search_types)
search_dict = {}
for search_term, search_type in itertools.product(search_terms, search_types):
self.status_queue.put(f"Searching {search_term} {search_type}.")
search_dict[(search_term, search_type)] = self.get_individual_search_output(
search_term=search_term, search_type=search_type
)
self.status_queue.put("Search completed.")
return search_dict
@abstractmethod
def get_individual_search_output(
self, search_term: SearchTerm, search_type: SearchType
) -> None:
"""Abstract placeholder method for retrieving search output."""
raise NotImplementedError
def _get_metadata_from_paths(
self, object_path_dict: dict[SearchTuple, Collection[str]]
) -> TermTypeResultDict:
"""Retrieves metadata for records contained in input DataFrames.
Parameters
----------
object_path_dict : dict
Dictionary of the form {query: object_paths} for path lists.
Returns
-------
metadata_dict : dict of {(SearchTerm, SearchType): pd.DataFrame}
"""
metadata_dict = {}
for query, object_paths in object_path_dict.items():
search_term, search_type = query
self.status_queue.put(f"Querying {search_term} {search_type} metadata.")
metadata_dict[query] = self.get_query_metadata(object_paths=object_paths)
self.status_queue.put("Metadata query complete.")
return metadata_dict
def get_query_metadata(self, object_paths: Collection[str]) -> NoReturn:
"""Placeholder method for query metadata retrieval."""
raise NotImplementedError('Subclass must override "get_query_metadata()".')
[docs]class BaseTypeCollector(TypeQueryMixin, BaseAPICollector):
"""Base for API collection classes that utilize search types.
This base inherits from BaseAPICollector, which provides credential
info, as well as general parameters for tracking collection
progress, inherited from BaseCollector.
Parameters
----------
repository_name : str
Name of the repository being collected from. Used for providing
updates to user, loading credentials, and saving output results.
search_types : list-like, optional (default=None)
types to search over. Can be (re)set via set_search_types() or
passed in directly to search functions to override set parameter.
credentials : str, optional (default=None)
JSON filepath containing credentials in form
{repository_name}: {key}.
Attributes
----------
search_types : list of str
See Also
--------
pycurator.collectors.type_collectors
"""
[docs] def __init__(
self,
repository_name: str,
search_types: Optional[Collection[SearchType]] = None,
credentials: Optional[str] = None,
) -> None:
super().__init__(repository_name=repository_name, credentials=credentials)
self.search_types = search_types
@property
def search_types(self) -> Collection[SearchType]:
"""Getter for search_types."""
return self._search_types
@search_types.setter
def search_types(self, search_types: tuple[SearchType, ...]) -> None:
if not search_types:
return
if not all(
search_type in self.search_type_options for search_type in search_types
):
raise ValueError(f"Only {self.search_type_options} search types are valid.")
self._search_types = search_types
def get_all_search_outputs(self, **kwargs: Any) -> TermResultDict:
"""Queries the API for each search type.
Parameters
----------
**kwargs : dict, optional
Can temporarily overwrite self search_types.
Returns
-------
search_dict : dict of pandas.DataFrame
Stores the results of each call to
get_individual_search_output in the form
search_output_dict[{search_type}] = df.
"""
search_types = kwargs.get("search_types", self.search_types)
search_dict = {}
for search_type in search_types:
self.status_queue.put(f"Searching {search_type}.")
search_dict[search_type] = self.get_individual_search_output(
search_type=search_type
)
self.status_queue.put(f"{search_type} search completed.")
return search_dict
@abstractmethod
def get_individual_search_output(self, search_type: SearchType) -> None:
"""Abstract placeholder method for search output."""
return
def get_query_metadata(
self,
object_paths: Collection[str],
search_type: SearchType,
) -> NoReturn:
"""Placeholder method for retrieving query metadata."""
raise NotImplementedError('Subclass must override "get_query_metadata()".')