Sunday, March 3, 2024

Working towards Knowledge Engineering with a Kaggle Competitors | by Paolo Rechia | Oct, 2023


This 12 months I have been fairly deep into the subject of generative AI, particularly by way of giant language fashions. In some unspecified time in the future, I felt like I wanted to step again and return to extra sensible AI functions.

So I took half in a pure language processing Kaggle competitors. My remaining rating wasn’t that nice, 1380th place out of 2061 opponents, however I just about developed the answer independently. And as somebody whose fundamental job isn’t being a knowledge scientist, I am fairly pleased with the outcomes.

Principally, the competitors’s aim was to discover a good resolution to automate the grading of scholars’ essays. College students have been sometimes requested to learn a reference textual content and write about it.

Now my resolution might be present in GitHub, and my greatest efficiency pipeline did fine-tune a BART mannequin. I used to be additionally very cautious to not leak take a look at knowledge into the coaching, which might be seen in how by little the mannequin loss will increase between the private and non-private submissions:

Public Dataset (take a look at)
Personal Dataset (precise remaining rating)

Nonetheless, my resolution isn’t groundbreaking within the sense that it isn’t within the high vary of efficiency, so I’ll not deal with explaining what I did by way of modeling.

As an alternative, what was attention-grabbing is that this time I utilized some fundamentals of software program/knowledge engineering expertise to the competitors, specifically:

  1. Created a easy DAG-like framework
  2. Outlined duties, pipelines, tables, and so on
  3. Artifact administration, packaging, and delivery supply code in addition to educated fashions to the Kaggle platform.

These items in conjunction, allowed me to develop regionally with my GPU, and simply submit a ready-to-use model of the inference code, together with a fine-tuned mannequin.

This meant that the GPU quota in Kaggle was solely used for inference, and I not often reached near utilizing the entire quota.

What I need to share is a bit about these software program/knowledge engineering methods. I had already written an inside readme for the individual that joined my Kaggle staff to know the codebase, which I named “Paolo’ Information to the over-engineered Kaggle repository”, so I’ve merely tailored what I already had there for this text.

One remaining word: the code examples I share use scikit-learn, and are principally a baseline mannequin/pipeline I’ve applied earlier than going into the deep studying one. For those who’re within the code that makes use of hugging face and pytorch, do verify the repository! Be at liberty to achieve me out if you want to debate this resolution, or have questions.

When getting began with knowledge engineering, one of many first matters that usually pop up are frameworks that provide a direct acyclic graph (DAG) implementation of some type for the programmer.

Well-known examples that work supply this are airflow, and within the cloud world AWS State Machine (although right here, technically a cycle is feasible).

Generally within the knowledge engineering world, we name such a DAG that processes knowledge a “knowledge pipeline”, or only a “pipeline”. Moreover, it’s widespread to confer with a node within the DAG as a “ job”. It might be seen as a unit of labor.

An instance of an AWS state machine implementing a graph.

The fundamental concept behind these frameworks is orchestration. You possibly can run a job in a spot, say an online server, whereas one other job makes use of an AWS Lambda and so forth.

The second concept is fault restoration. When coping with knowledge, it’s particularly laborious to foresee all issues, so it’s very helpful to have the ability to visualize the entire stream when debugging a bizarre drawback. It’s additionally doable to restart the stream simply from the failed part or add retry mechanisms.

The third benefit of this mannequin is that it’s very simple to increase current flows, because the frameworks pressure a sure diploma of decoupling. Sure right here, as a result of context remains to be shared between nodes.

To grasp context higher, let’s contemplate, as an illustration, this DAG:

Clearly, SplitData is dependent upon the enter knowledge, and TrainModel is dependent upon the information that’s already divided.

A technique these frameworks implement context is by passing a dictionary that shops some form of international state shared throughout all nodes of the graph.

For example, a possible context for the above graph is that this:

{}  --> 
{"input_data": data_frame} -->
{"input_data": data_frame, "trained_model": model_obj} -->
...

When creating a machine studying mannequin, one can leverage these frameworks, as a result of the event course of very often is analogous to an information pipeline. Besides that you simply add fashions within the combine, which brings just a few further steps:

  1. Mannequin coaching
  2. Mannequin storage
  3. Mannequin serving (inference)

There are different doable steps, like monitoring, label assortment, and auto retraining, however these are essentially the most elementary ones to each AI resolution.

In fact, when modeling a knowledge scientist may stick with a Jupyter pocket book, and that’s advantageous. However as soon as you’re taking a mannequin right into a productive system, the code must exit that pocket book and turn out to be a part of an precise code base that’s shipped someplace (in all probability within the cloud).

Apparently, one additionally finds specialised frameworks for machine studying pipelines (as an illustration kubeflow). On this case, the framework is dependent upon Kubernetes, and is kind of a stretch to make use of it regionally.

In Kaggle you continue to use a pocket book to submit your resolution, so why is that this related? Effectively, it’s a matter of selection/desire.

I personally discover it rather more pleasant creating code inside an IDE like VSCode, and testing my fashions by means of scripts regionally than by means of a Jupyter pocket book. Most likely as a result of I’m not a knowledge scientist, and have as an alternative a software program engineering background.

There are additionally some clear benefits:

  1. Risk to use code formatters and liners, that are sometimes not doable in Kaggle (so far as I do know).
  2. Risk to separate code into modules, growing code reuse

Additionally, in my expertise, all my machine studying tasks turn out to be an absolute mess with time, even when I develop them regionally. Check out this different GitHub repository from a earlier competitors to know what I imply.

The identical occurred to me whereas in faculty. Sadly, although I appear to have misplaced entry to my bit-bucket account the place my machine studying repositories have been saved, in any other case I’d have extra examples to share.

So this time I made a decision so as to add some underlying construction earlier than beginning the precise mannequin growth, to see if it helps hold the code base manageable because the variety of experiments grows over time.

There are two fundamental causes:

  1. It is a fairly easy framework and we don’t want all of the options supplied by one thing like airflow.
  2. We’d like a customized deployment technique, which is simply simpler to implement from scratch than determining methods to hack one thing that’s speculated to run within the cloud or as an online server.

Bear in mind: our goal deployment is the Kaggle platform.

To any extent further, we’ll take a look at explicit code elements, so it is suggested to look GitHub repository in case of fascinated by understanding one thing higher.

If you wish to execute the code, keep in mind that earlier than beginning, you must obtain the competitors knowledge and add it unzipped beneath knowledge/enter.

Observe that this listing is not revealed to git.

We use poetry to trace undertaking dependencies, and construct packages.

That is essential as Kaggle competitions run with out web entry, so we can’t reliably depend on pypi being obtainable through the pipeline execution.

You’ll see the configuration for this in pyproject.toml. The resolved/pinned dependencies variations are saved in poetry.lock.

Nonetheless, poetry falls quick when putting in packages from a selected index, so I needed to outline a necessities.txt file for those that rely on CUDA runtime.

For those who’re not conversant in poetry, I counsel you skim over the documentation to know it’s capabilities. One factor I like about it’s that it additionally supplies an auto managed virtualenv for you.

These are outlined (unsurprisingly) inside git-hooks.

Each time you commit, this runs and checks that your supply code conforms to the format commonplace and that no errors are detected by the linters.

Each time you push, this builds the present code as a Python bundle and uploads to Kaggle as a brand new dataset model.

The Kaggle dataset is outlined in a file that’s not dedicated to the repo at present. I place the dataset-metadata.jsoncontained in the dist listing that poetry makes use of for the packaged code.

{
"title": "common-lit-pipelines",
"id": "paolorechia/common-lit-pipelines",
"licenses": [
{
"name": "CC0-1.0"
}
]
}

To activate a git hook regionally, you want to:

  1. copy the script from git-hooks into .git/hooks
  2. be sure it has no extension title, e.g., [pre-commit.sh](<http://pre-commit.sh>) have to be named as pre-commit
  3. be sure it’s executable (e.g., use chmod+x .git/hooks/pre-commit)

All code is executed by means of this file. It mechanically finds the uncovered pipelines and executes them in accordance with the handed argument.

I sometimes run utilizing this command utilizing poetry run to make use of the auto-managed virtualenv. You will get assist like this:

poetry run python [main.p](<http://fundamental.py/>)y -h

It’ll print a pleasant assist and present you the obtainable pipelines.

utilization: Pipeline executor [-h] pipeline_name

positional arguments:
pipeline_name
choices:
-h, --help present this assist message and exit
Out there pipelines: ['basic_train_linear_regressor', 'basic_predict_random_forest', 'basic_train_random_forest', 'explore_input_data',
'set_train_linear', 'set_train_random_forest', 'set_predict_random_forest', 'split_train_test', 'train_bart_regression',
'zero_train_random_forest']

So to run the primary pipeline, you’d do:

poetry run python [main.py](<http://fundamental.py>) basic_train_linear_regressor

The remaining of the code is inside common_lit_kaggle listing, which is packaged by poetry on the pre-push git hook. Within the subsequent sections, we’ll take a look at the construction for this half.

All of the [main.py](<http://fundamental.py>) does is examine the common_lit_kaggle.pipelines.__init__.py, and discover the uncovered courses there.

So each time you add a brand new pipeline, be sure to incorporate it on this file (common_lit_kaggle.pipelines.__init__.py). Right this moment it seems like this:

from .pipeline_basic_linear_regressor import BasicLinearRegressorPipeline
from .pipeline_basic_random_forest import BasicRandomForestPipeline
from .pipeline_explore_data import ExploreDataPipeline
from .pipeline_predict_basic_random_forest import BasicPredictRandomForestPipeline
from .pipeline_predict_set_random_forest import (
SentenceTransformersPredictRandomForestPipeline,
)
from .pipeline_set_linear_regression import SentenceTransformerLinearRegressionPipeline
from .pipeline_set_random_forest import SentenceTransformerRandomForestPipeline
from .pipeline_split_train_test import SplitTrainTestPipeline
from .pipeline_train_bart_regression import TrainBartRegressionPipeline
from .pipeline_zero_shot import ZeroShotRandomForestPipeline

It then calls Pipeline.run(). Let’s briefly take a look at the common_lit_kaggle.framework.pipeline.

💡 All the things inside common_lit_kaggle.framework is generic code that’s extensively reused all through the repository. The opposite directories outline particular situations that implement the framework code. You consider this as a mini library.

import logging
from typing import Iterable
from common_lit_kaggle.framework.job import Job

logger = logging.getLogger(__name__)

class Pipeline:

def __init__(self, title: str, duties: Iterable[Task]) -> None:
self.title = title
self.duties = duties

def run(self):
context = {}
for job in self.duties:
logger.data("Beginning job: %s", job)
outcome = job.run(context)
context.replace(outcome)

Hopefully this code is self-explanatory. All it’s doing is it’s constructing a quite simple DAG and passing context.

Every node is unsurprisingly known as a Job. A job should implement just one technique: run. There’s the place the context is injected by the pipeline and the place write new code.

You possibly can add new code in any construction you need, e.g., create new features, strategies and even complete new packages. In the long run, although, you import these code elements inside a brand new Job run technique and add them to the pipeline.

from abc import abstractmethod
from typing import Any, Mapping, Non-obligatory

class Job:
def __init__(self, title: Non-obligatory[str] = None) -> None:
self.title = title
if title is None:
self.title = self.__class__.__name__
@abstractmethod
def run(self, context: Mapping[str, Any]) -> Mapping[str, Any]:
cross

Equally to pipelines, you should expose the exported duties in common_lit_kaggle.duties.__init__py

💡 All outlined courses need to be manually exported within the corresponding __init__.py file. This is perhaps a bit boring, however it helps protecting the imports clear within the particular person recordsdata.

The remaining of the framework cope with a normal means of studying and storing tables. I’m utilizing polars (https://pola-rs.github.io/polars-book/) as an alternative of the extra widespread pandas, just because I used to be fascinated by making an attempt it out.

To keep away from rewriting the identical code again and again, I outlined these courses to facilitate the method.

Granted, the method is a bit convoluted due to the schemas.

Nonetheless, it does add a pleasant verify to catch errors when studying/writing knowledge.

The method goes as follows:

  1. Outline a schema you need to learn or write
  2. Outline a desk that makes use of the schema
  3. Add these courses in the best __init__.py recordsdata
  4. Import them into a brand new Job and use them with table_io.

However what are these courses successfully doing? Let’s have a look.

All of the framework is doing right here is offering a approach to convert a category to a dictionary:

class Schema:

@classmethod
def to_dict(cls):
knowledge = {}
for attr_name in dir(cls):
# Hopefully ignore all non-public fields
if not attr_name.startswith("_"):
knowledge[attr_name] = getattr(cls, attr_name)
return knowledge

This defines principally the place you could find a desk, it’s title, it’s schema and the anticipated format

import pathlib
from dataclasses import dataclass
from common_lit_kaggle.framework.schema import Schema

@dataclass
class TableReference:
title: str
path: pathlib.Path
schema: kind[Schema]
format: str

Right here we use a TableReference to successfully learn/write knowledge, and validate its schema in opposition to what’s handed round.

import logging
import os

import polars as pl
from common_lit_kaggle.framework.desk import TableReference

logger = logging.getLogger(__name__)

def _validate_schema(polars_dataframe: pl.DataFrame, desk: TableReference):
schema_dict = desk.schema.to_dict()

for column in polars_dataframe.columns:
column_type = polars_dataframe[column].dtype
# logger.data("Discovered column: %s (kind: %s)", column, column_type)
expected_col_type = schema_dict[column]
assert (
expected_col_type == column_type
), f"Column kind mismatch. Anticipated: {expected_col_type}, discovered: {column_type}"

def read_table(desk: TableReference) -> pl.DataFrame:
logger.data("Studying desk: %s", desk)
polars_dataframe = None

if desk.format == "csv":
polars_dataframe = pl.read_csv(desk.path)
_validate_schema(polars_dataframe, desk)

assert (
polars_dataframe isn't None
), f"Invalid or unsupported format: {desk.format}"

return polars_dataframe

def write_table(dataframe: pl.DataFrame, desk: TableReference):
if desk.format == "csv":
_validate_schema(dataframe, desk)
strive:
os.makedirs(desk.path.dad or mum)
besides FileExistsError:
cross
dataframe.write_csv(desk.path)
else:
elevate TypeError(f"Invalid or unsupported format: {desk.format}")

That’s it! We’ve gone by means of the whole framework! That being mentioned, there’s nonetheless lots to cowl. Within the subsequent sections, we’ll check out:

  1. the config administration
  2. mlflow integration
  3. understanding an instance pipeline end-to-end
  4. understanding the deployment course of to Kaggle and submitting a pipeline into the competitors

Inside common_lit_kaggle.settings.config we outline a Singleton (https://en.wikipedia.org/wiki/Singleton_pattern) class.

Right here we outline configuration parameters like listing paths and so forth. The rationale why it’s a Singleton class is that we wish to have the ability to override it for the Kaggle pocket book. We’ll see in a later part how that works.

For executing regionally, all you want to do is modify straight this file.

One at present lacking function is mechanically creating all the trail references to the native listing:

self.data_intermediate_dir = pathlib.Path(self.data_root_dir / "intermediate")
self.data_exploration_dir = pathlib.Path(self.data_root_dir / "exploration")
self.data_train_dir = pathlib.Path(self.data_root_dir / "prepare")
self.data_test_dir = pathlib.Path(self.data_root_dir / "take a look at")
self.plots_dir = pathlib.Path(self.data_root_dir / "plots")
self.models_root_dir = pathlib.Path(self.data_root_dir / "fashions")[[

This is something I still have to implement, so when you run the pipeline the first time you’ll probably run into some issues.

Please ping me if this is still not done when you get to test a pipeline.

I’m using mlflow to register experiments score and compare different models (https://mlflow.org/). Usage is very simple, I just started a web server locally with poetry run mlflow ui

Then I can access it on localhost:5000:

Runs are created automatically by the framework. What I’m doing is manually adding metrics and parameters.

Usage is as simple as using mlflow.log_param and mlflow.log_metric:

import mlflow

mlflow.log_param("distance_metric", config.distance_metric)
mlflow.log_metric("wording_mean_squared_error", wording_score)

Note that this doesn’t run in Kaggle, so I wrap mlflow into a separate module also called mlflow, that simply returns a mock if the import fails.

Let’s look at an actual pipeline, end to end. Let’s use the basic_train_linear_regressor as the example, as it uses all features:

from common_lit_kaggle import tasks
from common_lit_kaggle.framework import Pipeline

LABEL = "basic_train_linear_regressor"

class BasicLinearRegressorPipeline(Pipeline):

def __init__(self) -> None:
super().__init__(
LABEL,
[
tasks.ReadTrainDataTask(),
tasks.AddBasicFeaturesTrainTask(),
tasks.TrainBasicLinearRegressorTask(),
tasks.ReadTestDataTask(),
tasks.AddBasicFeaturesTestTask(),
tasks.TestBasicLinearRegressorTask(),
tasks.WritePredictionsTask(),
tasks.AnalysePredictionsTask(name=LABEL),
],
)

First we load the prepare knowledge. Observe that this knowledge cut up was created by one other pipeline (SplitTrainTestPipeline):

from typing import Any, Mapping

from common_lit_kaggle.framework import table_io
from common_lit_kaggle.framework.job import Job
from common_lit_kaggle.tables import TrainSplitTable

class ReadTrainDataTask(Job):

def run(self, _: Mapping[str, Any]) -> Mapping[str, Any]:
input_data = table_io.read_table(TrainSplitTable())
return {"train_data": input_data}

Then we add some dumb fundamental options to the information. Discover how we’re utilizing the context dictionary to cross state between the duties.

from typing import Any, Mapping
from common_lit_kaggle.options import add_basic_features
from common_lit_kaggle.framework.job import Job

class AddBasicFeaturesTrainTask(Job):

def run(self, context: Mapping[str, Any]) -> Mapping[str, Any]:
train_data = context["train_data"]
enriched_train_data = add_basic_features(train_data)
return {"enriched_train_data": enriched_train_data}

We then prepare a linear regression mannequin utilizing scikit-learn:

class TrainBasicLinearRegressorTask(Job):

def run(self, context: Mapping[str, Any]) -> Mapping[str, Any]:
config = Config.get()
train_data: pl.DataFrame = context["enriched_train_data"]

# Get options
strive:
extra_features = context["extra_features"]
besides KeyError:
extra_features = None

options = config.used_features

if extra_features:
options.lengthen(extra_features)

x_features = train_data.choose(options).to_numpy()
# Get wording labels
y_wording = train_data.choose("wording").to_numpy()
# Get content material labels
y_content = train_data.choose("content material").to_numpy()

content_regressor = LinearRegression()
content_regressor.match(x_features, y_content)

wording_regressor = LinearRegression()
wording_regressor.match(x_features, y_wording)

return {
"wording_regressor": wording_regressor,
"content_regressor": content_regressor,
"options": options,
}

We then repeat this for the take a look at cut up, nothing too fancy there:

duties.ReadTestDataTask(),
duties.AddBasicFeaturesTestTask(),
duties.TestBasicLinearRegressorTask(),

Possibly this one ought to be renamed to PredictBasicLinearRegressorTask 😀

class TestBasicLinearRegressorTask(Job):

def run(self, context: Mapping[str, Any]) -> Mapping[str, Any]:
test_data: pl.DataFrame = context["enriched_test_data"]
original_test_data: pl.DataFrame = context["test_data"]

wording_regressor: LinearRegression = context["wording_regressor"]
content_regressor: LinearRegression = context["content_regressor"]

used_features = context["features"]

x_features = test_data.choose(used_features).to_numpy()

wording_preds = wording_regressor.predict(x_features)
content_preds = content_regressor.predict(x_features)

data_with_predictions = original_test_data.with_columns(
pl.Collection(title="wording_preds", values=wording_preds),
pl.Collection(title="content_preds", values=content_preds),
)

return {"data_with_predictions": data_with_predictions}

We output the predictions:

from typing import Any, Mapping

import polars as pl
from common_lit_kaggle.framework import table_io
from common_lit_kaggle.framework.job import Job
from common_lit_kaggle.tables import OutputPredictionTable

class WritePredictionsTask(Job):

def run(self, context: Mapping[str, Any]) -> Mapping[str, Any]:

data_with_predictions: pl.DataFrame = context["data_with_predictions"]
output = data_with_predictions.choose("student_id", "content material", "wording")
table_io.write_table(output, OutputPredictionTable())
return {}

Observe that this OutputPredictionTable is already within the format anticipated by the Kaggle competitors. And lastly, we analyse the leads to our prepare cut up:

class AnalysePredictionsTask(Job):
def run(self, context: Mapping[str, Any]) -> Mapping[str, Any]:
config = Config.get()

logger.data("Analysing predictions...")
data_with_predictions: pl.DataFrame = context["data_with_predictions"]
mlflow.set_tag("title", self.title)

for idx, function in enumerate(config.used_features):
mlflow.log_param(f"features_{idx}", function)

for idx, immediate in enumerate(config.train_prompts):
mlflow.log_param(f"train_prompt_{idx}", immediate)

mlflow.log_param("distance_metric", config.distance_metric)
mlflow.log_param("sentence_transformer", config.sentence_transformer)
mlflow.log_param("distance_stategy", config.distance_stategy)

wording_score = mean_squared_error(
data_with_predictions.choose("wording_preds").to_numpy(),
data_with_predictions.choose("wording").to_numpy(),
squared=True,
)

logger.data("Wording error: %s", wording_score)

mlflow.log_metric("wording_mean_squared_error", wording_score)

content_score = mean_squared_error(
data_with_predictions.choose("content_preds").to_numpy(),
data_with_predictions.choose("content material").to_numpy(),
squared=True,
)

logger.data("Content material error: %s", content_score)

mlflow.log_metric("content_mean_squared_error", content_score)

imply = (content_score + wording_score) / 2

logger.data("Imply error: %s", imply)

mlflow.log_metric("avg_mean_squared_error", imply)
logger.data("Plotting predictions...")

plot_labels_x_predictions(
"wording",
data_with_predictions.choose("wording_preds").to_numpy(),
data_with_predictions.choose("wording").to_numpy(),
)
plot_labels_x_predictions(
"content material",
data_with_predictions.choose("content_preds").to_numpy(),
data_with_predictions.choose("content material").to_numpy(),
)
return {}

I hope that was clear! We have now just one step left, which is knowing methods to join this with Kaggle.

In our repository, we use the kaggle CLI https://github.com/Kaggle/kaggle-api to programmatically work together with datasets.

Usually, we outline a dataset metadata, as we noticed within the git hook part.

For our instance configuration, as soon as we run kaggle create datasets, and that is the dataset.

From that time, we will simply publish new variations with:

kaggle datasets model -m "message"

Bear in mind that is mechanically executed by the pre-push git hook. Because of this, we’ve got a pleasant historical past of pushed variations:

For those who’d prefer to skip pushing a model to Kaggle, you’ll be able to all the time do git push --no-verify to skip the git hook.

We are able to see an instance pocket book in Kaggle right here.

On the best facet you’ll see our dataset as a part of the inputs:

Discover a small icon to the facet of the common-lit-pipelines, click on on it for those who see it to replace the model:

Updating this dataset basically means updating your code model.

Be sure that to restart the pocket book compute useful resource simply to be secure. Then click on run all, and the pocket book will deal with the code set up, and the pipeline will run 🙂

Let’s see some code snippets.

import pathlib
import common_lit_kaggle

from common_lit_kaggle.settings.config import Config

config = Config.get(
root_dir="/kaggle/working/",
input_dir="/kaggle/enter/commonlit-evaluate-student-summaries",
output_dir="/kaggle/working/",
train_prompts = ["3b9047"],
test_prompts = [
"814d6b",
"ebad26",
],
used_features = [
"text_length",
"word_count",
"sentence_count",
"unique_words",
"word_intersection",
],
)

from common_lit_kaggle.utils.setup import create_stdout_handler
create_stdout_handler()
from common_lit_kaggle.pipelines import SplitTrainTestPipeline
split_pipe = SplitTrainTestPipeline()
split_pipe.run()
from common_lit_kaggle.pipelines import BasicPredictRandomForestPipeline
pipe = BasicPredictRandomForestPipeline()
pipe.run()

When you run your pocket book for testing, you’ll be able to verify it ran efficiently by manually inspecting the output file. In your proper facet, on the file navigator you must discover a .csv file known as submission.csv.

Obtain and examine it. It ought to look one thing like this:

It’s just a few stub knowledge from Kaggle, so you’ll be able to take a look at you’re creating output within the anticipated format. If it seems good, click on on Submit!

Related Articles

1 COMMENT

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles