tharwan.de

Some Words About Physics, Python And The World In General

How we used prefect and MLflow for easy grid searches

At e2m we are currently establishing how we are handling the machine learning model lifecycle. Most of our needs cover time series forecasting in the energy domain. In detail this can cover various things, from market prices, spreads between markets to energy demands or production of individual customers.

There are a number of frameworks out there or in development that help with the modeling itself. But even without these we had reasonable success in the past with using more basic ML frameworks like scikit learn or keras which aren't specialized to time series forecasting .

The much bigger chellange, as is often the case, is of course the operationalisation and maintainance of models. After falling into the trap of trying to develop our own system, we took some time to look around what is "on offer".

In the end, after reviewing several commercial offerings (AzureML, Neptune, Weighs and Biases) as well as open source self-hosted ones (CLearML, MLflow) we felt a little bit lost about what is offered at what price point and what our actual requirements were. And since the best way to discover unknwon requirements is often to just dive into the topic we used OpenMLOps as a starting point (this was helped by our existing usage of prefect as a workflow engine).

And so far we are very happy with our initial expericence of running a setup very similar to OpenMLOps with the biggest difference that we channel all code through a build pipeline. The pipeline effectively does two things, based in the commit message it builds a specified part of the project, packages it in a docker container and registers all related flows in a prefect project. And, again if specified in the commit message, can also trigger a run of a specific flow. (Overall very similar to the apporach descibed here)

This accomplished two things: 1. every developer can schedule a training run without even leaving his IDE on a cluster 2. every run can be traced back to code commited in the central repo

I think it does not make much sense to share too many of the specifics of how we implemented this, since they will either be very close to existing things or differ too much because of different established tools (e.g. wheater you are using Github or Azure DevOps).

However there is one workflow around gridsearch, that utilized existing packages with minimal overhead to produce a very nice workflow we have not seen shared anywhere else so it is probably worth sharing.

Grid Search

With all the available frameworks that help modelling, the task for Data-Scientist becomes much less to programm models itself but rather to quickly find which model and hyper-parameters (e.g. the parameters that define the model) have the best forecasting performance. Again there are frameworks to help this, however we found that with no clear winner in the race for a framework of time series models, we would rather keep it generalized. And as it turns out prefect and sklearn (and Kubernetes) can be combined in rather powerful ways to help.

ParameterGridTask

To quickly start grid searches for existing training tasks we defined a ParameterGridTask that can be used to easily create a parameter search grid.

from sklearn.model_selection import ParameterGrid
from prefect import Flow, Parameter, Task, task, unmapped
from typing import Optional, Callable
import inspect

class PrefectParameterGrid(Parameter):
    def run(self):
        grid = super().run()
        return list(ParameterGrid(grid))


class ParameterGridTask(Task):
    """
    helper class to do grid search.
    """

    def __init__(self, func: Optional[Callable] = None, **kwargs):
        self.func = func
        super().__init__(**kwargs)

    def run(self, **kwargs):
        params = kwargs.pop("grid")
        return self.func(**kwargs, **params)

    def default_parameter(self) -> PrefectParameterGrid:
        """
        Define a Prefect Parameter with the default values of self.func
        :return:
        """
        sig = inspect.signature(self.func)
        func_params_with_default = {
            name: [param.default]
            for name, param in sig.parameters.items()
            if param.empty != param.default
        }
        return PrefectParameterGrid("grid", default=func_params_with_default)

@task
def train(a, b=1, c=2):
    # highly sophisticated training function
    print(a, b, c)
    return a+b+c

# we have to use the function not the Task we created
g_task = ParameterGridTask(func=train.run) 

# this is a placeholder for something like training data
@task
def load_data():
    return 1    

# this generates a parameter which by default has all kwargs from train with their default wrapped in a list
# in this case {"b":[1], "c":[2]}
grid = g_task.default_parameter() 

with Flow("test") as flow:
    # all parameters have to be named, positional parameters are not possible, but this is good practice anyway
    data = load_data()
    g_task.map(a=unmapped(data), grid=grid)

if __name__ == "__main__":
    flow.run(
        run_on_schedule=False,
        parameters={"grid":{"b":[1, 2], "c":[2, 3]}}
        )
# will print 
# 1 1 2
# 1 1 3
# 1 2 2
# 1 2 3

This is will then allow you to quickly define big (be careful) grid searches in the prefect server UI and will automatically pickup any new parameters you add. prefect screenshot

To make this even more powerful one can add a dask executor to a flow, which will distribute the gridsearch among the nodes of a Kubernetes cluster. This only takes a few extra lines but feels enormusly powerful:

from prefect.executors import DaskExecutor
from dask_kubernetes import KubeCluster, make_pod_spec
from typing import Union
import prefect

def create_dask_executor(
    minmum_workers: int = 2,
    maximum_workers: int = 6,
    cpu_limit: Union[float, int] = 1,
    memory_limit: str = "3G",
    idle_timeout: str = "180",
    lifetime: str = "30 minutes",
):
    de = DaskExecutor(
        cluster_class=lambda: KubeCluster(
            make_pod_spec(
                image=prefect.context.image,
                memory_limit=memory_limit,
                memory_request=memory_limit,
                cpu_limit=cpu_limit,
                cpu_request=cpu_limit,
                extra_container_config={
                    "env": [
                        {"name": "MLFLOW_TRACKING_URI", "value": "http://mlflow:5000"},
                        {
                            "name": "DASK_DISTRIBUTED__WORKERS__LIFETIME__DURATION",
                            "value": lifetime,
                        },
                        {
                            "name": "DASK_DISTRIBUTED__WORKERS__LIFETIME__STAGGER",
                            "value": "5 minutes",
                        },
                        {
                            "name": "DASK_DISTRIBUTED__WORKERS__LIFETIME__RESTART",
                            "value": "true",
                        },
                    ]
                },
            ),
            idle_timeout=idle_timeout,
        ),
        adapt_kwargs={"minimum": minmum_workers, "maximum": maximum_workers},
    )
    return de

flow.executor = create_dask_executor()

This cluster will automatically pickup the docker image your flow is using and then start multiple dask workers with it. These works will then go ahead and process all the training tasks defined by the grid search.

Caveats

In practice we found this approach very powerfull and mostly seamless. The biggest problem is that dask appears to inhibit a bug with very long running tasks leading to the workers not consuming any more tasks. One workaround for this is to set a lifetime for the workers as in the above example. One only needs to make sure the training run does not exceed the life time of the worker so no tasks can ever finish.

mpi4py Allgatherv Example

My current work involves some agent based simulation written in Python. I now wanted teach it some new tricks including running on multiple cores. While my normal approch for multithreading in Python is to just run several simulations in parallel to get some average, I decided it would now make sense to also have single simulations run on multiple cores. One reason for that was to make it scale more easily on the computing cluster, the other was that I assumed it would be quite easy with the agent based stucture.

For the same reasons I decided to use MPI (mpi4py): it seamed like a good fit for an agent based model (agents sending messages around) and it was available on the computing cluster. It turned out that in the end I did not use any mpi code in the agents to keep them more flexible.

Anyway, the simulations is structured in the following way: on each core/process a controller is initalized which handels not only the communication but is also in charge of the agents. This is a good fit because the simulations involves global state as base of the agents decisions.

I decide in the beginning which controller controlls which agents. This might lead to some unused processing time during the sync between the controllers because they might have to wait to get the globals state. But since each agent has a lot of data attatched, I assumed it would not make sense to depatch the agents to a different process every timestep. This would mean to shuffle around big python objects and therefore presumeably slow.

Instead only the global state is shared between the controllers. I am not sure if this is the best solutions since I did not try any other but it turned out to have a nice structure and work quite well.

One problem I sumbled upon was the that I could not gurante that each group of agents would be the same size, because when you want to simulate 100 agents on 16 cores it does not add up. This meant I could not use the normal allgather methods of MPI to collect the state but had to use allgatherv which can deal with the different sizes. Sadly there is no documentation about allgatherv in MPI4PY. With some googling I worked out an example of how it works I wanted to share so here it is:


from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
mpi_size = comm.Get_size()

# work_size = 127  # arbitrary prime number
work_size = 100  
work = np.zeros(work_size)

base = work_size / mpi_size
leftover = work_size%mpi_size

sizes = np.ones(mpi_size)*base
sizes[:leftover]+=1
offsets = np.zeros(mpi_size)
offsets[1:]=np.cumsum(sizes)[:-1]


start = offsets[rank]
local_size = sizes[rank]
work_local = np.arange(start,start+local_size,dtype=np.float64)


print "local work: {} in rank {}".format(work_local,rank)

comm.Allgatherv(work_local,[work,sizes,offsets,MPI.DOUBLE])
summe = np.empty(1,dtype=np.float64)

comm.Allreduce(np.sum(work_local),summe,op=MPI.SUM)

print "work {} vs {} in rank {}".format(np.sum(work),summe,rank)    

 page 1 / 1