Some Words About Physics, Python And The World In General

How we used prefect and MLflow for easy grid searches

At e2m we are currently establishing how we are handling the machine learning model lifecycle. Most of our needs cover time series forecasting in the energy domain. In detail this can cover various things, from market prices, spreads between markets to energy demands or production of individual customers.

There are a number of frameworks out there or in development that help with the modeling itself. But even without these we had reasonable success in the past with using more basic ML frameworks like scikit learn or keras which aren't specialized to time series forecasting .

The much bigger chellange, as is often the case, is of course the operationalisation and maintainance of models. After falling into the trap of trying to develop our own system, we took some time to look around what is "on offer".

In the end, after reviewing several commercial offerings (AzureML, Neptune, Weighs and Biases) as well as open source self-hosted ones (CLearML, MLflow) we felt a little bit lost about what is offered at what price point and what our actual requirements were. And since the best way to discover unknwon requirements is often to just dive into the topic we used OpenMLOps as a starting point (this was helped by our existing usage of prefect as a workflow engine).

And so far we are very happy with our initial expericence of running a setup very similar to OpenMLOps with the biggest difference that we channel all code through a build pipeline. The pipeline effectively does two things, based in the commit message it builds a specified part of the project, packages it in a docker container and registers all related flows in a prefect project. And, again if specified in the commit message, can also trigger a run of a specific flow. (Overall very similar to the apporach descibed here)

This accomplished two things: 1. every developer can schedule a training run without even leaving his IDE on a cluster 2. every run can be traced back to code commited in the central repo

I think it does not make much sense to share too many of the specifics of how we implemented this, since they will either be very close to existing things or differ too much because of different established tools (e.g. wheater you are using Github or Azure DevOps).

However there is one workflow around gridsearch, that utilized existing packages with minimal overhead to produce a very nice workflow we have not seen shared anywhere else so it is probably worth sharing.

Grid Search

With all the available frameworks that help modelling, the task for Data-Scientist becomes much less to programm models itself but rather to quickly find which model and hyper-parameters (e.g. the parameters that define the model) have the best forecasting performance. Again there are frameworks to help this, however we found that with no clear winner in the race for a framework of time series models, we would rather keep it generalized. And as it turns out prefect and sklearn (and Kubernetes) can be combined in rather powerful ways to help.


To quickly start grid searches for existing training tasks we defined a ParameterGridTask that can be used to easily create a parameter search grid.

from sklearn.model_selection import ParameterGrid
from prefect import Flow, Parameter, Task, task, unmapped
from typing import Optional, Callable
import inspect

class PrefectParameterGrid(Parameter):
    def run(self):
        grid = super().run()
        return list(ParameterGrid(grid))

class ParameterGridTask(Task):
    helper class to do grid search.

    def __init__(self, func: Optional[Callable] = None, **kwargs):
        self.func = func

    def run(self, **kwargs):
        params = kwargs.pop("grid")
        return self.func(**kwargs, **params)

    def default_parameter(self) -> PrefectParameterGrid:
        Define a Prefect Parameter with the default values of self.func
        sig = inspect.signature(self.func)
        func_params_with_default = {
            name: [param.default]
            for name, param in sig.parameters.items()
            if param.empty != param.default
        return PrefectParameterGrid("grid", default=func_params_with_default)

def train(a, b=1, c=2):
    # highly sophisticated training function
    print(a, b, c)
    return a+b+c

# we have to use the function not the Task we created
g_task = ParameterGridTask( 

# this is a placeholder for something like training data
def load_data():
    return 1    

# this generates a parameter which by default has all kwargs from train with their default wrapped in a list
# in this case {"b":[1], "c":[2]}
grid = g_task.default_parameter() 

with Flow("test") as flow:
    # all parameters have to be named, positional parameters are not possible, but this is good practice anyway
    data = load_data(), grid=grid)

if __name__ == "__main__":
        parameters={"grid":{"b":[1, 2], "c":[2, 3]}}
# will print 
# 1 1 2
# 1 1 3
# 1 2 2
# 1 2 3

This is will then allow you to quickly define big (be careful) grid searches in the prefect server UI and will automatically pickup any new parameters you add. prefect screenshot

To make this even more powerful one can add a dask executor to a flow, which will distribute the gridsearch among the nodes of a Kubernetes cluster. This only takes a few extra lines but feels enormusly powerful:

from prefect.executors import DaskExecutor
from dask_kubernetes import KubeCluster, make_pod_spec
from typing import Union
import prefect

def create_dask_executor(
    minmum_workers: int = 2,
    maximum_workers: int = 6,
    cpu_limit: Union[float, int] = 1,
    memory_limit: str = "3G",
    idle_timeout: str = "180",
    lifetime: str = "30 minutes",
    de = DaskExecutor(
        cluster_class=lambda: KubeCluster(
                    "env": [
                        {"name": "MLFLOW_TRACKING_URI", "value": "http://mlflow:5000"},
                            "name": "DASK_DISTRIBUTED__WORKERS__LIFETIME__DURATION",
                            "value": lifetime,
                            "name": "DASK_DISTRIBUTED__WORKERS__LIFETIME__STAGGER",
                            "value": "5 minutes",
                            "name": "DASK_DISTRIBUTED__WORKERS__LIFETIME__RESTART",
                            "value": "true",
        adapt_kwargs={"minimum": minmum_workers, "maximum": maximum_workers},
    return de

flow.executor = create_dask_executor()

This cluster will automatically pickup the docker image your flow is using and then start multiple dask workers with it. These works will then go ahead and process all the training tasks defined by the grid search.


In practice we found this approach very powerfull and mostly seamless. The biggest problem is that dask appears to inhibit a bug with very long running tasks leading to the workers not consuming any more tasks. One workaround for this is to set a lifetime for the workers as in the above example. One only needs to make sure the training run does not exceed the life time of the worker so no tasks can ever finish.