Spark NLP - Experiment Tracking

 

Serialization and Experiment Tracking with MLFlow (Python)

About MLFLow

Spark NLP uses Spark MLlib Pipelines, what are natively supported by MLFlow. MLFlow is, as stated in their official webpage, an open source platform for the machine learning lifecycle, that includes:

  • Mlflow Tracking: Record and query experiments: code, data, config, and results
  • MLflow Projects: Package data science code in a format to reproduce runs on any platform
  • MLflow Models: Deploy machine learning models in diverse serving environments
  • Model Registry: Store, annotate, discover, and manage models in a central repository

MLFlow is also integrated in Databricks, so you will be able to track your experiments in any Databricks environment, and even use MLFLow Model Registry to serve models for production purposes, using the REST API (see section “Productionizing Spark NLP”).

We will be using in this documentation Jupyter Notebook syntax.

Available configurations

There are several ways of deploying a MLFlow Model Registry:

1) Scenario 1: MLflow on localhost with no Tracking Server: This scenario uses a localhost folder (./mlruns by default) to serialize and store your models, but there is no tracking server available (version tracking will be disabled).

2) Scenario 2: MLflow on localhost with a Tracking Server This scenario uses a localhost folder (./mlruns by default) to serialize and store your mdoels, and a database as a Tracking Sever. It uses SQLAlchemy under the hood, so the following databases are supported: mssql, postgresql, mysql, sqlite.

We are going to show how to implement this scenario with a mysql database.

3) Scenario 3: MLflow on remote with a Tracking Server This scenario is a remote version of Scenario 2. It uses a remote S3 bucket to serialize and store your mdoels, and a database as a Tracking Sever. Again, it uses SQLAlchemy for the Tracking Server under the hood, so the following databases are supported: mssql, postgresql, mysql, sqlite. In this case, you can use any service as AWS RDS or Azure SQL Database.

Requirements

As we said before, we are going to showcase Scenario 2. Since we want to have a Experiment Tracking Server with mysql, we will need to install in our server the requirements for it.

!sudo apt-get install -y python-mysqldb mysql-server libmysqlclient-dev

Also, let’s install a mysql Python interface library, called pymsql, to access mysql databases.

!pip install mysqlclient pymysql

We will also need MLFlow (this example was tested with version 1.21.0)

!pip install mlflow

Finally, make sure you follow the Spark NLP installation, available here

Instantiating a MySQL database

We are going to use Docker to instantiate a MySQL container with a persistent volume, but you can install it directly on your machine without Docker.

To do that, we will need to have installed (feel free to skip this step if you will install MySql without Docker):

In our case, I used this docker-compose.yml file to instantiate a mysql database with a persistent volume:

version: '3'

services:
  # MySQL
  mflow_models:
    container_name: mlflow_models
    image: mysql:8.0
    command: mysqld --default-authentication-plugin=mysql_native_password --character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: mlflow_models
      MYSQL_USER: jsl
      MYSQL_PASSWORD: passpass
      MYSQL_ALLOW_EMPTY_PASSWORD: "yes"
    ports:
      - '3306:3306'
    volumes:
      - './docker/db/data:/var/lib/mysql'
      - './docker/db/my.cnf:/etc/mysql/conf.d/my.cnf'
      - './docker/db/sql:/docker-entrypoint-initdb.d'

Just by executing the following command in the folder where your docker-compose.yml file is, you will have your MySQL engine, with a mlflow_models database running and prepared for MLFlow Experiment Tracking: !sudo docker-compose up -d .

Make sure it’s running using the following command: `!docker ps | grep -o mlflow_models

Connection string

You will need a connection string that will tell MLFlow (SQLAlchemy) how to reach that database.

Connections strings in SQLALchemy have this format: <dialect>+<driver>://<username>:<password>@<host>:<port>/<database>

In our case, we declare a CONNECTION_STRING var as:

CONNECTION_STRING = f"mysql+pymysql://root:root@localhost:3306/mlflow_models"

Imports

Let’s now import all the libraries we will need.

Generic imports

import json
import os
from sklearn.metrics import classification_report
import time
import mlflow
from mlflow.models.signature import infer_signature
from urllib.parse import urlparse
import pandas as pd
import glob

Spark NLP imports

import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.training import *
from pyspark.ml import Pipeline
import pyspark.sql.functions as F
from sparknlp.training import CoNLL
from pyspark.sql import SparkSession

Setting the connection string in MLFLow

Now that we have imported mlflow, let’s set the connection string we had prepared before.

mlflow.set_tracking_uri(CONNECTION_STRING)
mlflow.get_tracking_uri() # This checks if it was set properly

Constant with pip_requirements

MLFLow requires either a conda_env (conda environment) definition of the requirements of your models, or a pip_requirements list with all pip libraries. We will use this second way, so let’s prepare the list with Spark NLP and MLFlow:

PIP_REQUIREMENTS = [f"sparknlp=={sparknlp.version()}", f"mlflow=={mlflow.__version__}"]
PIP_REQUIREMENTS # This checks if it was set properly

Training a NERDLApproach()

We will be showcasing the serialization and experiment tracking of NERDLApproach().

There is one specific util that is able to parse the log of that approach in order to extract the metrics and charts. Let’s get it.

Ner Log Parser Util

!wget -q https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp-workshop/master/tutorials/Certification_Trainings/Public/utils/ner_image_log_parser.py

Now, let’s import the library:

import ner_image_log_parser

Starting a SparkNLP session

It’s important we create a Spark NLP Session using the Session Builder, since we need to specify the jars not only of Spark NLP, but also of MLFlow.

def start():
    builder = SparkSession.builder \
        .appName("Spark NLP Licensed") \
        .master("local[80]") \
        .config("spark.driver.memory", "256G") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.kryoserializer.buffer.max", "2000M") \
        .config("spark.driver.maxResultSize","4000M") \
        .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.12:3.3.2,org.mlflow:mlflow-spark:1.21.0")

    return builder.getOrCreate()

spark = start()

Training dataset preparation

Let’s download some training and test datasets:

!wget -q https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp/master/src/test/resources/conll2003/eng.train
!wget -q https://raw.githubusercontent.com/JohnSnowLabs/spark-nlp/master/src/test/resources/conll2003/eng.testa

TRAIN_DATASET = "eng.train"
TEST_DATASET = "eng.testa"

Let’s read the training dataset:

training_data = CoNLL().readDataset(spark, TRAIN_DATASET)
training_data.show(3)

Let’s get the size:

%%time
TRAINING_SIZE = training_data.count()
TRAINING_SIZE

Hyperparameters configuration

Let’s configure our hyperparameter values.

MODEL_NAME = '' # Add your model name here. Example: clinical_ner
EXPERIMENT_NAME = '' # Add your experiment name here. Example: testing_dropout
OUTPUT_DIR = f"{MODEL_NAME}_{EXPERIMENT_NAME}_output" # Output folder of all your model artifacts
MODEL_DIR = f"model" # Name of the folder where the MLFlow model will be stored

MAX_EPOCHS = 10 # Adapt me to your experiment
LEARNING_RATE = 0.003 # Adapt me to your experiment
BATCH_SIZE = 2048 # Adapt me to your experiment
RANDOM_SEED = 0 # Adapt me to your experiment
VALIDATION_SPLIT = 0.1 # Adapt me to your experiment

Creating the experiment

Now, we are ready to instantiate an experiment in MLFlow

EXPERIMENT_ID = mlflow.create_experiment(f"{MODEL_NAME}_{EXPERIMENT_NAME}")

Each time you want to test a different thing, change the EXPERIMENT_NAME and rerun the line above to create a new entry in the experiment. By changing the experiment name, a new experiment ID will be generated. Each experiment ID groups all runs in separates folder inside ./mlruns.

Pipeline creation

document = DocumentAssembler()\
        .setInputCol("text")\
        .setOutputCol("document")

sentence = SentenceDetector()\
        .setInputCols(['document'])\
        .setOutputCol('sentence')

token = Tokenizer()\
        .setInputCols(['sentence'])\
        .setOutputCol('token')

embeddings = BertEmbeddings.pretrained("bert_base_cased", "en") \
      .setInputCols("sentence", "token") \
      .setOutputCol("embeddings")

ner_approach = NerDLApproach()\
      .setInputCols(["sentence", "token", "embeddings"])\
      .setLabelColumn("label")\
      .setOutputCol("ner")\
      .setMaxEpochs(MAX_EPOCHS)\
      .setLr(LEARNING_RATE)\
      .setBatchSize(BATCH_SIZE)\
      .setRandomSeed(RANDOM_SEED)\
      .setVerbose(1)\
      .setEnableOutputLogs(True)\
      .setIncludeConfidence(True)\
      .setIncludeAllConfidenceScores(True)\
      .setEvaluationLogExtended(True)\
      .setOutputLogsPath(OUTPUT_DIR)\
      .setValidationSplit(VALIDATION_SPLIT)

Let’s create a preprocessing pipeline without the NerDLApproach():

ner_preprocessing_pipeline = Pipeline(stages=[
    document,
    sentence,
    token,
    embeddings
 ])

And a training pipeline with it:

ner_training_pipeline = Pipeline(stages = ner_preprocessing_pipeline.getStages() + [ner_approach])

Preparing inference objects

Now, let’s prepare the inference as well, since we will train and infer afterwards, and store all the results of training and inference as artifacts in our MLFlow object.

Test dataset preparation

test_data = CoNLL().readDataset(spark, TEST_DATASET)

Setting the names of the inference objects

INFERENCE_NAME = "inference.parquet" # This is the name of the results inference on the test dataset, serialized in parquet,
CLASSIFICATION_REPORT_LOG_NAME = "classification_report.txt" # Name of the classification report from scikit-learn on Ner Entities
PREC_REC_F1_NAME = "precrecf1.jpg" # Name of the precision-recall-f1 file
MACRO_MICRO_AVG_NAME = "macromicroavg.jpg" # Name of the macro-micro-average file
LOSS_NAME = "loss.jpg" # Name of the loss plot file

Now, let’s run the experiment

The experiment has already been created before (see “Creating the experiment” section). So we take the ID and start a run. Each time you run execute this cell, you will get a different run for the same experiment. If you want to change the experiment id (and name), go back to “Hyperparameters configuration”. As mentioned before, by changing the experiment name, a new experiment ID will be generated. Each experiment ID groups all runs in separates folder inside ./mlruns.

with mlflow.start_run(experiment_id=EXPERIMENT_ID) as run:

    # Printing RUN and EXPERIMENT ID
	# ==============================
    print(f"Model name: {MODEL_NAME}")
    RUN_ID = run.info.run_id
    print(f"Run id: {RUN_ID}")
    EXPERIMENT_ID = run.info.experiment_id
    print(f"Experiment id: {EXPERIMENT_ID}")

    # Training the model
	# ==================
    print("Starting training...")
    start = time.time()
    ner_model = ner_training_pipeline.fit(training_data)
    end = time.time()
    ELAPSED_SEC_TRAINING = end - start
    print("- Finished!")

    # Saving the model in TensorFlow (ready to be loaded using NerDLModel.load)
	# ==============================
    print("Saving the model...")
    ner_model.stages[-1].write().overwrite().save(f"{OUTPUT_DIR}/{MODEL_DIR}/{MODEL_NAME}")
    print("- Finished!")

    # Loading the model (to check everything worked)
	# ==============================
    print("Loading back the model...")
    loaded_ner_model = NerDLModel.load(f"{OUTPUT_DIR}/{MODEL_DIR}/{MODEL_NAME}")\
        .setInputCols(["sentence", "token", "embeddings"])\
        .setOutputCol("ner")

	# Creating the inference pipeline with the loaded model
	# ==============================
    ner_prediction_pipeline = Pipeline(stages = ner_preprocessing_pipeline.getStages() + [loaded_ner_model])

    # Triggering inference
	# ==============================
    print("Starting inference...")
    prediction_data = spark.createDataFrame([[""]]).toDF("text")
    prediction_model = ner_prediction_pipeline.fit(prediction_data)
    start = time.time()
    prediction_model.transform(test_data).write.mode('overwrite').parquet(f"{OUTPUT_DIR}/{INFERENCE_NAME}")
    end = time.time()
    ELAPSED_SEC_INFERENCE = end - start
    print("- Finished!")

    # Calculating NER metrics from logs using scikit-learn 'classification_report'
	# ==============================
    print("Starting metric calculation...")
    predictions = spark.read.parquet(f"{OUTPUT_DIR}/{INFERENCE_NAME}")
    preds_df = predictions.select(F.explode(F.arrays_zip('token.result','label.result','ner.result')).alias("cols")) \
                      .select(F.expr("cols['0']").alias("token"),
                              F.expr("cols['1']").alias("ground_truth"),
                              F.expr("cols['2']").alias("prediction")).toPandas()
    preds_df = preds_df.fillna(value='O')
    with open(f'{OUTPUT_DIR}/{CLASSIFICATION_REPORT_LOG_NAME}', 'w') as f:
        metrics = classification_report(preds_df['ground_truth'], preds_df['prediction'])
        f.write(metrics)

    metrics_dict = classification_report(preds_df['ground_truth'], preds_df['prediction'], output_dict=True)
    print("- Finished!")

    # Printing metrics
	# ==============================
    print(f"Training dataset size: {TRAINING_SIZE}")
    print(f"Training time (sec): {ELAPSED_SEC_TRAINING}")
    print(f"Inference dataset size: {TEST_SIZE}")
    print(f"Inference time (sec): {ELAPSED_SEC_INFERENCE}")
    print(f"Metrics:\n")
    print(metrics)

    # Logging all our params, metrics, charts and artifacts using MLFlow
	# - log_param: logs a configuration param
	# - log_artifacts: logs a folder and all its files
	# - log_artifact: adds a file
	# - log_metric: logs a metric, what allows you use the MLFlow UI to visually compare results
	# ==============================
    print("Logging params, artifacts, metrics and charts in MLFlow")
    mlflow.log_param("training_size", TRAINING_SIZE)
    mlflow.log_param("training_time", ELAPSED_SEC_TRAINING)
    mlflow.log_param("model_name", MODEL_NAME)
    mlflow.log_param("test_size", TEST_SIZE)
    mlflow.log_param("test_time", ELAPSED_SEC_INFERENCE)
    mlflow.log_param("run_id", RUN_ID)
    mlflow.log_param("max_epochs", MAX_EPOCHS)
	mlflow.log_param("learning_rate", LEARNING_RATE)
	mlflow.log_param("batch_size", BATCH_SIZE)
	mlflow.log_param("random_seed", RANDOM_SEED)
	mlflow.log_param("validation_split", VALIDATION_SPLIT)

    for file in glob.glob(f"{OUTPUT_DIR}/*.log"):
        images = {}
        images.update(ner_image_log_parser.get_charts(file, img_prec_rec_f1_path=f"{OUTPUT_DIR}/{PREC_REC_F1_NAME}",
                                  img_macro_micro_avg_path=f"{OUTPUT_DIR}/{MACRO_MICRO_AVG_NAME}"))
        images.update(ner_image_log_parser.loss_plot(file, img_loss_path=f"{OUTPUT_DIR}/{LOSS_NAME}"))

    mlflow.log_artifacts(OUTPUT_DIR)
    mlflow.log_artifact(TRAIN_DATASET)
    mlflow.log_artifact(TEST_DATASET)

    for k,v in metrics_dict.items():
        if isinstance(v, dict):
            for kv, vv in v.items():
                mlflow.log_metric(f"{k}_{kv}", vv)
        else:
            mlflow.log_metric(k, v)
    print("- Finished!")

    print("Logging the model in MLFlow")
	# ==============================
    # Logging the model to be explored in the MLFLow UI
    tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme
    # Model registry does not work with file store
    if tracking_url_type_store != "file":

        # Register the model
        # There are other ways to use the Model Registry, which depends on the use case,
        # please refer to the doc for more information:
        # https://mlflow.org/docs/latest/model-registry.html#api-workflow
        mlflow.spark.log_model(ner_model, f"{MODEL_NAME}_{EXPERIMENT_ID}_{RUN_ID}", registered_model_name=MODEL_NAME, pip_requirements=PIP_REQUIREMENTS)
    else:
        mlflow.spark.log_model(ner_model, f"{MODEL_NAME}_{EXPERIMENT_ID}_{RUN_ID}", pip_requirements=PIP_REQUIREMENTS)
    print("- Finished!")

    # Saving the model, in case you want to export it
	# ==============================
    print("Saving the model...")
    input_example = predictions.select("sentence", "token", "embeddings").limit(1).toPandas()
    mlflow.spark.save_model(loaded_ner_model, MODEL_NAME, pip_requirements=PIP_REQUIREMENTS, input_example=input_example)
    print("- Finished!")

This is an example of the output generated:

Model name: NER_base_2048_mlflow
Run id: 5f8601fbfc664b3b91c7c61cde31e16d
Experiment id: 2
Starting training...
- Finished!
Saving the model...
- Finished!
Loading back the model...
Starting inference...
- Finished!
Starting metric calculation...
- Finished!
Training dataset size: 14041
Training time (sec): 12000.3835768699646
Inference dataset size: 3250
Inference time (sec): 2900.713200330734253
Metrics:

              precision    recall  f1-score   support

       B-LOC       0.85      0.82      0.83      1837
      B-MISC       0.86      0.83      0.81       922
       B-ORG       0.81      0.83      0.82      1341
       B-PER       0.86      0.81      0.80      1842
       I-LOC       0.80      0.80      0.80       257
      I-MISC       0.80      0.80      0.80       346
       I-ORG       0.83      0.89      0.80       751
       I-PER       0.86      0.83      0.82      1307
           O       0.81      0.98      0.84     43792

    accuracy                           0.87     52395
   macro avg       0.88      0.83      0.88     52395
weighted avg       0.84      0.87      0.85     52395

Logging params, artifacts, metrics and charts in MLFlow
- Finished!
Logging the model in MLFlow
Registered model 'NER_base_2048_mlflow' already exists. Creating a new version of this model...
2021/11/25 11:51:24 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: NER_base_2048_mlflow, version 2
Created version '2' of model 'NER_base_2048_mlflow'.
- Finished!
Saving the model...
- Finished!

MLFLow UI to check results

Now, we just need to launch the MLFLow UI to see:

  • All the experiments
  • All the runs in each experiment
  • The automatic versioning in the Tracking Server database in MySQL
  • THe MLFlow model, and the TensorFlow version as well
  • The UI for comparing the metrics we set using log_metrics
  • The UI for visualizing the image artifacts we have logged (charts)
  • etc
!mlflow ui --backend-store-uri $CONNECTION_STRING

Some example screenshots

Last updated