escore.core package

Submodules

escore.core.definitions module

Project: Eskapade - A python-based package for data analysis.

Created: 2017/02/27

Description:
Definitions used in Eskapade runs: * logging levels * return-status codes * default configuration variables * user options
Authors:
KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands

Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.

class escore.core.definitions.RandomSeeds(**kwargs)

Bases: object

Container for seeds of random generators.

Seeds are stored as key-value pairs and are accessed with getitem and setitem methods. A default seed can be accessed with the key “default”. The default seed is also returned if no seed is set for the specified key.

>>> import numpy as np
>>> seeds = RandomSeeds(default=999, foo=42, bar=13)
>>> seeds['NumPy'] = 100
>>> np.random.seed(seeds['NumPy'])
>>> print(seeds['nosuchseed'])
999
__init__(**kwargs)

Initialize an instance.

Values of the specified keyword arguments must be integers, which are set as seed values for the corresponding key.

class escore.core.definitions.StatusCode

Bases: enum.IntEnum

Return status code enumeration class.

A StatusCode should be returned by the initialize, execute, and finalize methods of links, chains, and the process manager.

The enumerations are:

  • Undefined (-1): Default status.
  • Success (0 == EX_OK / EXIT_SUCCESS): All OK, i.e. there were no errors.
  • RepeatChain (1): Repeat execution of this chain.
  • SkipChain (2): Skip this chain: initialize, execute, and finalize.
  • BreakChain (3): Skip the further execution of this this, but do perform finalize.
  • Recoverable (4): Not OK, but can continue, i.e. there was an error, but the application can recover from it.
  • Failure (5): An error occurred and the application cannot recover from it. In this case the application should just quit.
BreakChain = 3
Failure = 5
Recoverable = 4
RepeatChain = 1
SkipChain = 2
Success = 0
Undefined = -1
escore.core.definitions.set_begin_end_chain_opt(opt_key, settings, args)

Set begin/end-chain variable from user option.

escore.core.definitions.set_custom_user_vars(opt_key, settings, args)

Set custom user configuration variables.

escore.core.definitions.set_log_level_opt(opt_key, settings, args)

Set configuration log level from user option.

escore.core.definitions.set_opt_var(opt_key, settings, args)

Set configuration variable from user options.

escore.core.definitions.set_seeds(opt_key, settings, args)

Set random seeds.

escore.core.definitions.set_single_chain_opt(opt_key, settings, args)

Set single-chain variable from user option.

escore.core.element module

Project: Eskapade - A python-based package for data analysis.

Created: 2017/02/27

Description:

Base classes for the building blocks of an Eskapade analysis run:

  • Link:
  • Chain:
Authors:
KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands

Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.

class escore.core.element.Chain(name, process_manager=None)

Bases: escore.core.meta.Processor, escore.core.meta.ProcessorSequence, escore.core.mixin.TimerMixin

Execution Chain.

A Chain object contains a collection of links with analysis code. The links in a chain are executed in the order in which the links have been added to the chain. Typically a chain contains all links related to one topic, for example ‘validation of a model’, or ‘data preparation’, or ‘data quality checks’.

>>> from escore import process_manager
>>> from escore import Chain
>>> from escore import analysis
>>>
>>> # Create an IO chain. This is automatically registered with the process manager.
>>> io_chain = Chain('Overview')

And Links are added to a chain as follows:

>>> # add a link to the chain
>>> io_chain.add(analysis.ReadToDf(path='foo.csv', key='foo'))
>>>
>>> # Run everything.
>>> process_manager.run()
__init__(name, process_manager=None)

Initialize chain.

add(link: escore.core.element.Link) → None

Add a link to the chain.

Parameters:

link (Link) – The link to add to the chain.

Raises:
  • TypeError – When the link is of an incompatible type.
  • KeyError – When a Link of the same type and name already exists.
clear()

Clear the chain.

discard(link: escore.core.element.Link) → None

Remove a link from the chain.

Parameters:link (Link) –
Raises:KeyError – When the processor does not exist.
execute() → escore.core.definitions.StatusCode

Execute links in chain.

Returns:Execution status code.
Return type:StatusCode
finalize() → escore.core.definitions.StatusCode

Finalize links and chain.

Returns:Finalization status code.
Rtype StatusCode:
 
get(link_name: str) → escore.core.element.Link

Find the link with the given name.

Parameters:link_name (str) – Find a link with the given name.
Returns:The chain.
Return type:Chain
Raises:ValueError – When the given chain name cannot be found.
initialize() → escore.core.definitions.StatusCode

Initialize chain and links.

Returns:Initialization status code.
Return type:StatusCode

Return the number of links in the chain.

Returns:The number of links in the chain.
Return type:int

Bases: escore.core.meta.Processor, escore.core.mixin.ArgumentsMixin, escore.core.mixin.TimerMixin

Link base class.

A link defines the content of an algorithm. Any actual link is derived from this base class.

A link usually does three things: - takes data from the datastore - does something to it - writes data back

To take from the data store there is a simple function load() To write to the data store there is a simple function store()

Links are added to a chain as follows:

>>> from escore import process_manager
>>> from escore import analysis
>>>
>>> # Create a Chain instance. Note that the chain is automatically registered with process manager.
>>> io_chain = Chain('IO')
>>>
>>> # Add a link to the chain
>>> reader = analysis.ReadToDf(name='CsvReader', key='foo')
>>> reader.path = 'foo.csv'
>>> io_chain.add(reader)
>>>
>>> # Run everything.
>>> process_manager.run()
__init__(name=None)

Initialize link.

execute() → escore.core.definitions.StatusCode

Execute the Link.

This method may be overridden by the user.

Returns:Status code.
Return type:StatusCode
finalize() → escore.core.definitions.StatusCode

Finalize the Link.

This method may be overridden by the user.

Returns:Status code.
Return type:StatusCode
initialize() → escore.core.definitions.StatusCode

Initialize the Link.

This method may be overridden by the user.

Returns:Status code.
Type:StatusCode
load(ds, read_key=None)

Read all data from specified source.

Read_key can either be:

  • one Link: return statuscode, [data_from_link,…]
  • A list of locations: return statuscode, [data,…]
  • A list of links with only one output location: return statuscode, [data,…]
  • A list of links with multiple output locations: return statuscode, [data,[moredata]…]
  • Any mixture of the above

Do something logical with a statuscode if this data does not exist link.if_input_missing = statuscode

Returns:a tuple statuscode, [data in same order as read_key]
Return type:(StatusCode,list)
run() → escore.core.definitions.StatusCode

Initialize, execute, and finalize the Link in one go.

This method is useful for testing purposes, e.g. when developing and testing functionality of a link stand-alone and interactively.

It is not used internally by Eskapade, where the functions are called individually by the chain, and all links are initialized together before their common execution, and all links in the chain are also finalized together, after their common execution.

Returns:Status code.
Return type:StatusCode
store(ds, data, store_key=None, force=False)

Store data back to datastore.

Do something logical with a statuscode if this data already exists link.if_output_exists = statuscode uses self.store_key. If self.store_key is a list of locations, I must sent a list of the same length here

summary()

Print a summary of the main settings of the link.

escore.core.exceptions module

Project: Eskapade - A python-based package for data analysis.

Created: 2016/11/08

Description:
Eskapade exceptions.
Authors:
KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands

Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.

exception escore.core.exceptions.Error

Bases: Exception

Base class for all Eskapade core exceptions.

exception escore.core.exceptions.UnknownSetting

Bases: escore.core.exceptions.Error

The user requested an unknown setting.

escore.core.execution module

Project: Eskapade - A python-based package for data analysis.

Created: 2016/11/08

Description:
Functions for running and resetting Eskapade machinery
Authors:
KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands

Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.

escore.core.execution.eskapade_configure(settings=None)

Configure Eskapade.

This function is called by the eskapade_run function (below), to configure eskapade before running:

  • set the configuration object
  • set logging level
  • set matplotlib backend
  • process config file
Parameters:settings – analysis settings. Of type ConfigObject or string, where the string is the macro path.
escore.core.execution.eskapade_run(settings=None)

Run Eskapade.

This function is called in the script eskapade_run when run from the cmd line. The working principle of Eskapade is to run chains of custom code chunks (so-called links).

Each chain should have a specific purpose, for example pre-processing incoming data, booking and/or training predictive algorithms, validating these predictive algorithms, evaluating the algorithms.

By using this principle, links can be easily reused in future projects.

Parameters:settings (ConfigObject) – analysis settings
Returns:status of the execution
Return type:StatusCode
escore.core.execution.reset_eskapade(skip_config=False)

Reset Eskapade objects.

Parameters:skip_config (bool) – skip reset of configuration object

escore.core.meta module

Project: Eskapade - A python-based package for data analysis.

Created: 2017/09/14

Description:

A collection of (generic) meta classes for some (design) patterns:

  • Singleton: Meta class for the Singleton pattern.
  • Processor: Meta class with abstract methods initialize, execute, and finalize.
  • ProcessorSequence: A simple (processor) sequence container.
Authors:
KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands

Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.

class escore.core.meta.Processor(name: str)

Bases: object

Processor metaclass.

__init__(name: str)

Initialize the Processor object.

execute()

Execution logic for processor.

finalize()

Finalization logic for processor.

initialize()

Initialization logic for processor.

logger

A logger that emits log messages to an observer.

The logger can be instantiated as a module or class attribute, e.g.

>>> logger = Logger()
>>> logger.info("I'm a module logger attribute.")
>>>
>>> class Point(object):
>>>     logger = Logger()
>>>
>>>     def __init__(self, x = 0.0, y = 0.0):
>>>         Point.logger.debug('Initializing {point} with x = {x}  y = {y}', point=Point, x=x, y=y)
>>>         self._x = x
>>>         self._y = y
>>>
>>>     @property
>>>     def x(self):
>>>         self.logger.debug('Getting property x = {point._x}', point=self)
>>>         return self._x
>>>
>>>     @x.setter
>>>     def x(self, x):
>>>         self.logger.debug('Setting property y = {point._x}', point=self)
>>>         self._x = x
>>>
>>>     @property
>>>     def y(self):
>>>        self.logger.debug('Getting property y = {point._y}', point=self)
>>>        return self._y
>>>
>>>     @y.setter
>>>     def y(self, y):
>>>         self.logger.debug('Setting property y = {point._y}', point=self)
>>>         self._y = y
>>>
>>> a_point = Point(1, 2)
>>>
>>> logger.info('p_x = {point.x} p_y = {point.y}', point=a_point)
>>> logger.log_level = LogLevel.DEBUG
>>> logger.info('p_x = {point.x} p_y = {point.y}', point=a_point)

The logger uses PEP-3101 (Advanced String Formatting) with named placeholders, see <https://www.python.org/dev/peps/pep-3101/> and <https://pyformat.info/> for more details and examples.

Furthermore, logging events are only formatted and evaluated for logging levels that are enabled. So, there’s no need to check the logging level before logging. It’s also efficient.

name

Get the name of processor.

Returns:The name of the processor.
Return type:str
parent

Get the group parent.

Returns:The parent/group processor sequence.
class escore.core.meta.ProcessorSequence

Bases: object

A doubly linked processor sequence.

It remembers the order in which processors are added to the sequence. It also checks if a processor already has been added to the sequence.

__init__()

Initialize the ProcessorSequence object.

add(processor: escore.core.meta.Processor) → None

Add a processor to the sequence.

Parameters:processor (Processor) – The processor to add.
Raises:KeyError – When a processor of the same type and name already exists.
clear() → None

Clear the sequence.

discard(processor: escore.core.meta.Processor) → None

Remove a processor from the sequence.

Parameters:processor (Processor) – The processor to remove.
Raises:KeyError – When the processor does not exist.
pop(last: bool = True) → escore.core.meta.Processor

Return the popped processor. Raise KeyError if empty.

By default a processor is popped from the end of the sequence.

Parameters:last (bool) – Pop processor from the end of the sequence. Default is True.
Returns:The pop processor.
Raises:KeyError – When trying to pop from an empty list.
class escore.core.meta.Singleton

Bases: type

Metaclass for singletons.

Any instantiation of a Singleton class yields the exact same object, e.g.:

>>> class Klass(metaclass=Singleton):
>>>     pass
>>>
>>> a = Klass()
>>> b = Klass()
>>> a is b
True

See https://michaelgoerz.net/notes/singleton-objects-in-python.html.

escore.core.mixin module

Project: Eskapade - A python-based package for data analysis.

Created: 2016/11/08

Classes: ArgumentsMixin, TimerMixin

Description:
Mixin classes:
  • ArgumentsMixin: processes/checks arguments and sets them as attributes
  • TimerMixin: keeps track of execution time
  • ConfigMixin: reads and handles settings from configuration files
Authors:
KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands

Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.

class escore.core.mixin.ArgumentsMixin

Bases: object

Mixin base class for argument parsing.

Class allows attributes to be accessed as dict items. Plus several argument processing helper functions.

check_arg_callable(*arg_names, allow_none=False)

Check if set of arguments has iterators.

check_arg_iters(*arg_names, allow_none=False)

Check if set of arguments has iterators.

check_arg_opts(allow_none=False, **name_vals)

Check if argument values are in set of options.

check_arg_types(recurse=False, allow_none=False, **name_type)

Check if set of arguments has correct types.

check_arg_vals(*arg_names, allow_none=False)

Check if set of arguments exists as attributes and values.

check_extra_kwargs(kwargs)

Check for residual kwargs.

check_required_args(*arg_names)

Check if set of arguments exists as attributes.

class escore.core.mixin.ConfigMixin(config_path=None)

Bases: object

Mixin base class for configuration settings.

__init__(config_path=None)

Initialize config settings.

Parameters:config_path (str) – path of configuration file
config_path

Path of configuration file.

get_config(config_path=None)

Get settings from configuration file.

Read and return the configuration settings from a configuration file. If the path of this file is not specified as an argument, the value of the “config_path” property is used. If the file has already been read, return previous settings.

Parameters:config_path (str) – path of configuration file
Returns:configuration settings read from file
Return type:configparser.ConfigParser
Raises:RuntimeError – if config_path is not set
reset_config()

Remove previously read settings.

class escore.core.mixin.TimerMixin

Bases: object

Mixin base class for timing.

__init__()

Initialize timer.

start_timer()

Start run timer.

Start the timer. The timer is used to compute the run time. The returned timer start value has an undefined reference and should, therefore, only be compared to other timer values.

Returns:start time in seconds
Return type:float
stop_timer(start_time=None)

Stop the run timer.

Stop the timer. The timer is used to compute the run time. The elapsed time since the timer start is returned.

Parameters:start_time (float) – function start_time input
Returns:time difference with start in seconds
Return type:float
total_time()

Return the total run time.

Returns:total time in seconds
Return type:float

escore.core.persistence module

Project: Eskapade - A python-based package for data analysis.

Created: 2016/11/08

Description:
Utility class and functions to get correct io path, used for persistence of results
Authors:
KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands

Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.

escore.core.persistence.create_dir(dir_path)

Create directory.

Parameters:dir_path (str) – directory path
escore.core.persistence.io_dir(io_type, io_conf=None)

Construct directory path.

Parameters:
  • io_type (str) – type of result to store, e.g. data, macro, results.
  • io_conf – IO configuration object
Returns:

directory path

Return type:

str

escore.core.persistence.io_path(io_type, sub_path, io_conf=None)

Construct directory path with sub path.

Parameters:
  • io_type (str) – type of result to store, e.g. data, macro, results.
  • sub_path (str) – sub path to be included in io path
  • io_conf – IO configuration object
Returns:

full path to directory

Return type:

str

escore.core.persistence.record_file_number(file_name_base, file_name_ext, io_conf=None)

Get next prediction-record file number.

Parameters:
  • file_name_base (str) – base file name
  • file_name_ext (str) – file name extension
  • io_conf – I/O configuration object
Returns:

next prediction-record file number

Return type:

int

escore.core.persistence.repl_whites(name)

Replace whitespace in names.

escore.core.process_manager module

Project: Eskapade - A python-based package for data analysis.

Class: ProcessManager

Created: 2016/11/08

Description:
The ProcessManager class is the heart of Eskapade. It performs initialization, execution, and finalization of analysis chains.
Authors:
KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands

Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.

class escore.core.process_manager.ProcessManager

Bases: escore.core.meta.Processor, escore.core.meta.ProcessorSequence, escore.core.mixin.TimerMixin

Eskapade run process manager.

ProcessManager is the event processing loop of Eskapade. It initializes, executes, and finalizes the analysis chains. There is, under normal circumstances, only one ProcessManager instance.

Here’s an simple but illustrative analysis example:

>>> from escore import process_manager, Chain, Link, StatusCode
>>>
>>> # A chain automatically registers itself with process_manager.
>>> one_plus_one_chain = Chain('one_plus_one')
>>>
>>> class OnePlusOne(Link):
>>>     def execute(self):
>>>         self.logger.info('one plus one = {result}', result=(1+1))
>>>         return StatusCode.Success
>>>
>>> one_plus_one_chain.add(link=OnePlusOne())
>>>
>>> two_plus_two_chain = Chain('two_plus_two')
>>>
>>> class TwoPlusTwo(Link):
>>>     def execute(self):
>>>         self.logger.info('two plus two = {result}', result=(2+2))
>>>         return StatusCode.Success
>>>
>>> two_plus_two_chain.add(TwoPlusTwo())
>>>
>>> process_manager.run()

Ideally the user will not need to interact directly with the process manager. The magic is taken care of by the eskapade_run entry point.

__init__()

Initialize ProcessManager instance.

add(chain: escore.core.element.Chain) → None

Add a chain to the process manager.

Parameters:

chain (Chain) – The chain to add to the process manager.

Raises:
  • TypeError – When the chain is of an incompatible type.
  • KeyError – When a chain of the same type and name already exists.
clear()

“Clear/remove all chains.

execute()

Execute all chains in order.

Returns:status code of execution attempt
Return type:StatusCode
execute_macro(filename, copyfile=True)

Execute an input python configuration file.

A copy of the configuration file is stored for bookkeeping purposes.

Parameters:
  • filename (str) – the path of the python configuration file
  • copyfile (bool) – back up the macro for bookkeeping purposes
Raises:

Exception – if input configuration file cannot be found

finalize()

Finalize the process manager manager.

Returns:status code of finalize attempt
Return type:StatusCode
get(chain_name: str) → escore.core.element.Chain

Find the chain with the given name.

Parameters:chain_name (str) – Find a chain with the given name.
Returns:The chain.
Return type:Chain
Raises:ValueError – When the given chain name cannot be found.
get_service_tree()

Create tree of registered process-service classes.

Returns:service tree
Return type:dict
get_services()

Get set of registered process-service classes.

Returns:service set
Return type:set
import_services(io_conf, chain=None, force=None, no_force=None)

Import process services from files.

Parameters:
  • io_conf (dict) – I/O config as returned by ConfigObject.io_conf
  • chain (str) – name of chain for which data was persisted
  • force (bool or list) – force import if service already registered
  • no_force (list) – do not force import of services in this list
initialize()

Initialize the process manager.

Initializes the process manager by configuring its chains. After initialization the configuration is printed.

Returns:status code of initialize attempt
Return type:StatusCode
n_chains

Return the number of chains in the process manager.

Returns:The number of links in the chain.
Return type:int
persist_services(io_conf, chain=None)

Persist process services in files.

Parameters:
  • io_conf (dict) – I/O config as returned by ConfigObject.io_conf
  • chain (str) – name of chain for which data is persisted
print_chains()

Print all chains defined in the manager.

print_services()

Print registered process services.

remove_all_services()

Remove all registered process services.

remove_service(service_cls, silent=False)

Remove specified process service.

Parameters:
  • service_cls (ProcessServiceMeta) – service to remove
  • silent (bool) – don’t complain if service is not registered
reset()

Reset the process manager.

Resetting comprises removing the chains and closing any open connections/sessions.

run() → escore.core.definitions.StatusCode

Run process manager.

Returns:Status code of run execution.
Return type:StatusCode
service(service_spec)

Get or register process service.

Parameters:service_spec (ProcessServiceMeta or ProcessService) – class (instance) to register
Returns:registered instance
Return type:ProcessService
summary()

Print process-manager summary.

Print a summary of the chains, links, and some analysis settings defined in this configuration.

escore.core.process_services module

Project: Eskapade - A python-based package for data analysis.

Created: 2017/02/27

Description:
Base class and core implementations of run-process services
Authors:
KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands

Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.

class escore.core.process_services.ConfigObject

Bases: escore.core.process_services.ProcessService

Configuration settings for Eskapade.

The ConfigObject is a dictionary meant for containing global settings of Eskapade. Settings are set in the configuration macro of an analysis, or on the command line.

The ConfigObject is a dictionary meant only for storing global settings of Eskapade. In general, it is accessed through the process manager.

Example usage:

>>> # first set logging output level.
>>> from escore.logger import Logger, LogLevel
>>> logger = Logger()
>>> logger.log_level = LogLevel.DEBUG

Obtain the ConfigObject from any location as follows:

>>> from escore import process_manager
>>> from escore import ConfigObject
>>> settings = process_manager.service(ConfigObject)

One can treat the ConfigObject as any other dictionary:

>>> settings['foo'] = 'bar'
>>> foo = settings['foo']

Write the ConfigObject to a pickle file with:

>>> settings.persist_in_file(file_path)

And reload from the pickle file with:

>>> settings = ConfigObject.import_from_file(file_path)

A ConfigObject pickle file can be read in by Eskapade with the command line option (-u).

class IoConfig(**input_config)

Bases: dict

Configuration object for I/O operations.

__init__(**input_config)

Initialize IoConfig instance.

Print()

Print a summary of the settings.

__init__()

Initialize ConfigObject instance.

add_macros(macro_paths)

Add configuration macros for Eskapade run.

copy()

Perform a shallow copy of self.

Returns:copy
get(setting: str, default: Any = None) → object

Get value of setting. If it does not exists return the default value.

Parameters:
  • setting – The setting to get.
  • default – The default value of the setting.
Returns:

The value of the setting or None if it does not exist.

io_base_dirs() → dict

Get configured base directories.

Returns:base directories
Return type:dict
io_conf()

Get I/O configuration.

The I/O configuration contains storage locations and basic analysis info.

Returns:I/O configuration
Return type:IoConfig
set_user_opts(parsed_args)

Set options specified by user on command line.

Parameters:parsed_args (argparse.Namespace) – parsed user arguments
class escore.core.process_services.DataStore

Bases: escore.core.process_services.ProcessService, dict

Store for transient data sets and related objects.

The data store is a dictionary meant for storing transient data sets or any other objects. Links can take one or several data sets as input, transform them or use them as input for a model, and store the output back again in the datastore, to be picked up again by any following link.

Example usage:

>>> # first set logging output level.
>>> from escore.logger import Logger, LogLevel
>>> logger = Logger()
>>> logger.log_level = LogLevel.DEBUG

Obtain the global datastore from any location as follows:

>>> from escore import process_manager
>>> from escore import DataStore
>>>
>>>
>>> ds = process_manager.service(DataStore)

One can treat the datastore as any other dict:

>>> ds['a'] = 1
>>> ds['b'] = 2
>>> ds['0'] = 3
>>> a = ds['a']

Write the datastore to a pickle file with:

>>> ds.persist_in_file(file_path)

And reload from the pickle file with:

>>> ds = DataStore.import_from_file(file_path)
Print()

Print a summary the data store contents.

get(key: str, default: Any = None, assert_type: Any = None, assert_len: bool = False, assert_in: bool = False) → object

Get value of setting. If it does not exists return the default value.

Parameters:
  • key – The key of object to get.
  • default – The default value of the key in case not found.
  • assert_type – if set, check object for given type or tuple of types. If fails, raise TypeError.
  • assert_len – if true, check that object has length greater than 0. If fails, raise TypeError or AssertionError.
  • assert_in – if true, assert that key is known.
Returns:

The value of the key or None if it does not exist.

class escore.core.process_services.ForkStore

Bases: escore.core.process_services.ProcessService

Dict for sharing objects between forked processes.

The ForkStore is a dictionary meant for sharing data sets or any other objects between forked processed. During execute, links in the same chain can take one or several data sets as input, transform them or use them as input for a model, and store the output back again, to be picked up again by another forked process. The ForkStore will not be persisted.

Example usage:

Obtain the global forkstore from any location as follows:

>>> from escore import process_manager, ForkStore
>>> fs = process_manager.service(ForkStore)

One can treat the datastore as any other dict:

>>> fs['a'] = 1
>>> fs['b'] = 2
>>> fs['0'] = 3
>>> a = fs['a']
Print()

Print a summary the shared fork objects.

__init__()

Initialize ForkStore instance.

clear()

Clear fork store dictionary

copy()

Perform a shallow copy of self.

Returns:copy
get(key: str, default: Any = None) → object

Get value of key. If it does not exists return the default value.

Parameters:
  • key – The key to get.
  • default – The default value of the key.
Returns:

The value of the key or None if it does not exist.

wait_until_unlocked()

Wait until unlocked

class escore.core.process_services.ProcessService

Bases: object

Base class for process services.

__init__()

Initialize service instance.

classmethod create()

Create an instance of this service.

Returns:service instance
Return type:ProcessService
finish()

Finish current processes.

This function can be implemented by a process-service implementation to finish running processes and clean up to prepare for a reset of the process manager. This would typically involve deleting large objects and closing files and database connections.

classmethod import_from_file(file_path)

Import service instance from a Pickle file.

Parameters:file_path (str) – path of Pickle file
Returns:imported service instance
Return type:ProcessService
Raises:RuntimeError, TypeError
logger

A logger that emits log messages to an observer.

The logger can be instantiated as a module or class attribute, e.g.

>>> logger = Logger()
>>> logger.info("I'm a module logger attribute.")
>>>
>>> class Point(object):
>>>     logger = Logger()
>>>
>>>     def __init__(self, x = 0.0, y = 0.0):
>>>         Point.logger.debug('Initializing {point} with x = {x}  y = {y}', point=Point, x=x, y=y)
>>>         self._x = x
>>>         self._y = y
>>>
>>>     @property
>>>     def x(self):
>>>         self.logger.debug('Getting property x = {point._x}', point=self)
>>>         return self._x
>>>
>>>     @x.setter
>>>     def x(self, x):
>>>         self.logger.debug('Setting property y = {point._x}', point=self)
>>>         self._x = x
>>>
>>>     @property
>>>     def y(self):
>>>        self.logger.debug('Getting property y = {point._y}', point=self)
>>>        return self._y
>>>
>>>     @y.setter
>>>     def y(self, y):
>>>         self.logger.debug('Setting property y = {point._y}', point=self)
>>>         self._y = y
>>>
>>> a_point = Point(1, 2)
>>>
>>> logger.info('p_x = {point.x} p_y = {point.y}', point=a_point)
>>> logger.log_level = LogLevel.DEBUG
>>> logger.info('p_x = {point.x} p_y = {point.y}', point=a_point)

The logger uses PEP-3101 (Advanced String Formatting) with named placeholders, see <https://www.python.org/dev/peps/pep-3101/> and <https://pyformat.info/> for more details and examples.

Furthermore, logging events are only formatted and evaluated for logging levels that are enabled. So, there’s no need to check the logging level before logging. It’s also efficient.

persist_in_file(file_path)

Persist service instance in Pickle file.

Parameters:file_path (str) – path of Pickle file
class escore.core.process_services.ProcessServiceMeta

Bases: type

Meta class for process-services base class.

persist

Flag to indicate if service can be persisted.

escore.core.run_utils module

Project: Eskapade - A python-based package for data analysis.

Created: 2017/04/11

Description:
Utilities for Eskapade run
Authors:
KPMG Advanced Analytics & Big Data team, Amstelveen, The Netherlands

Redistribution and use in source and binary forms, with or without modification, are permitted according to the terms listed in the file LICENSE.

escore.core.run_utils.create_arg_parser()

Create parser for user arguments.

An argparse parser is created and returned, ready to parse arguments specified by the user on the command line.

Returns:argparse.ArgumentParser

Module contents