Serving Spark NLP: SynapseML

 

This is the first article of the “Serving Spark NLP via API” series, showcasing how to serve Spark NLP using Synapse ML

Don’t forget to check the other articles in this series, namely:

Background

Spark NLP is a Natural Language Understanding Library built on top of Apache Spark, leveranging Spark MLLib pipelines, that allows you to run NLP models at scale, including SOTA Transformers. Therefore, it’s the only production-ready NLP platform that allows you to go from a simple PoC on 1 driver node, to scale to multiple nodes in a cluster, to process big amounts of data, in a matter of minutes.

Before starting, if you want to know more about all the advantages of using Spark NLP (as the ability to work at scale on air-gapped environments, for instance) we recommend you to take a look at the following resources:

Motivation

Spark NLP is server-agnostic, what means it does not come with an integrated API server, but offers a lot of options to serve NLP models using Rest APIs.

There is a wide range of possibilities to add a web server and serve Spark NLP pipelines using RestAPI, and in this series of articles we are only describing some of them.

Let’s have an overview of how to use Microsoft’s Synapse ML as an example for that purpose.

Microsoft’s Synapse ML

SynapseML serving of Spark NLP pipelines

Synapse ML (previously named SparkMML) is, as they state in their official webpage:

… an ecosystem of tools aimed towards expanding the distributed computing framework Apache Spark in several new directions.

They offer a seamless integratation with OpenCV, LightGBM, Microsoft Cognitive Tool and, the most relevant for our use case, Spark Serving, an extension of *Spark Streaming *with an integrated server and a Load Balancer, that can attend multiple requests via Rest API, balance and attend them leveraging the capabilities of a Spark Cluster. That means that you can sin up a server and attend requests that will be distributed transparently over a Spark NLP cluster, in a very effortless way.

Strengths

  • Ready-to-use server

  • Includes a Load Balancer

  • Distributes the work over a Spark Cluster

  • Can be used for both Spark NLP and Spark OCR

Weaknesses

  • For small use cases that don’t require big cluster processing, other approaches may be faster (as FastAPI using LightPipelines)

  • Requires using an external Framework

  • This approach does not allow you to customize your endpoints, it uses Synapse ML ones

How to set up Synapse ML to serve Spark NLP pipelines

We will skip here how to install Spark NLP. If you need to do that, please follow this official webpage about how to install Spark NLP or, if Spark NLP for Healthcare if you are using the Healthcare library.

Synapse ML recommends using at least Spark 3.2, so first of all, let’s configure the Spark Session with the required jars packages(both for Synapse ML and Spark) with the the proper Spark version (take a look at the suffix spark-nlp-spark32) and also, very important, add to jars.repository the Maven repository for SynapseML.

sparknlpjsl_jar = "spark-nlp-jsl.jar"

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Spark") \
    .master("local[*]") \
    .config("spark.driver.memory", "16G") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "2000M") \
    .config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:0.9.5,com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:[YOUR_SPARKNLP_VERSION])\
    .config("spark.jars", sparknlpjsl_jar)\
    .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven")\
    .getOrCreate()

After the initialization, add your required imports (Spark NLP) and add to them the SynapseML-specific ones:

import sparknlp
import sparknlp_jsl
...

import synapse.ml
from synapse.ml.io import *

Now, let’s create a Spark NLP for Healthcare pipeline to carry out Entity Resolution.

document_assembler = DocumentAssembler()\
      .setInputCol("text")\
      .setOutputCol("document")

sentenceDetectorDL = SentenceDetectorDLModel.pretrained("sentence_detector_dl_healthcare", "en", 'clinical/models') \
      .setInputCols(["document"]) \
      .setOutputCol("sentence")

tokenizer = Tokenizer()\
      .setInputCols(["sentence"])\
      .setOutputCol("token")

word_embeddings = WordEmbeddingsModel.pretrained("embeddings_clinical", "en", "clinical/models")\
  .setInputCols(["sentence", "token"])\
  .setOutputCol("word_embeddings")

clinical_ner = MedicalNerModel.pretrained("ner_clinical", "en", "clinical/models") \
      .setInputCols(["sentence", "token", "word_embeddings"]) \
      .setOutputCol("ner")

ner_converter_icd = NerConverterInternal() \
      .setInputCols(["sentence", "token", "ner"]) \
      .setOutputCol("ner_chunk")\
      .setWhiteList(['PROBLEM'])\
      .setPreservePosition(False)

c2doc = Chunk2Doc()\
      .setInputCols("ner_chunk")\
      .setOutputCol("ner_chunk_doc")

sbert_embedder = BertSentenceEmbeddings.pretrained('sbiobert_base_cased_mli', 'en','clinical/models')\
      .setInputCols(["ner_chunk_doc"])\
      .setOutputCol("sentence_embeddings")\
      .setCaseSensitive(False)

icd_resolver = SentenceEntityResolverModel.pretrained("sbiobertresolve_icd10cm_augmented_billable_hcc","en", "clinical/models") \
     .setInputCols(["sentence_embeddings"]) \
     .setOutputCol("icd10cm_code")\
     .setDistanceFunction("EUCLIDEAN")

resolver_pipeline = Pipeline(
    stages = [
        document_assembler,
        sentenceDetectorDL,
        tokenizer,
        word_embeddings,
        clinical_ner,
        ner_converter_icd,
        c2doc,
        sbert_embedder,
        icd_resolver
  ])

Let’s use a clinical note to test Synapse ML.

clinical_note = """A 28-year-old female with a history of gestational diabetes mellitus diagnosed eight years prior to presentation and subsequent type two diabetes mellitus (T2DM), one prior episode of HTG-induced pancreatitis three years prior to presentation, associated with an acute hepatitis, and obesity with a body mass index (BMI) of 33.5 kg/m2, presented with a one-week history of polyuria, polydipsia, poor appetite, and vomiting. Two weeks prior to presentation, she was treated with a five-day course of amoxicillin for a respiratory tract infection. She was on metformin, glipizide, and dapagliflozin for T2DM and atorvastatin and gemfibrozil for HTG. She had been on dapagliflozin for six months at the time of presentation. Physical examination on presentation was significant for dry oral mucosa; significantly, her abdominal examination was benign with no tenderness, guarding, or rigidity."""

Since SynapseML serves a RestAPI, we will be sending JSON requests. Let’s define a simple json with the clinical note:

data_json = {"text": clinical_note }

Now, let’s spin up a server using Synapse ML Spark Serving. It will consist of:

  1. a streaming server that will receive a json and transform it into a Spark Dataframe

  2. a call to Spark NLP transform on the dataframe, using the pipeline

  3. a write operation returning the output also in json format.

    #1: Creating the streaming server and transforming json to Spark Dataframe

    serving_input = spark.readStream.server() \
        .address("localhost", 9999, "benchmark_api") \
        .option("name", "benchmark_api") \
        .load() \
        .parseRequest("benchmark_api", data.schema)
    

    #2: Applying transform to the dataframe using our Spark NLP pipeline

    serving_output = resolver_p_model.transform(serving_input) \
        .makeReply("icd10cm_code")
    

    #3: Returning the response in json format

    server = serving_output.writeStream \
          .server() \
          .replyTo("benchmark_api") \
          .queryName("benchmark_query") \
          .option("checkpointLocation", "file:///tmp/checkpoints-{}".format(uuid.uuid1())) \
          .start()
    

And we are ready to test the endpoint using the requests library.

import requests
res = requests.post("http://localhost:9999/benchmark_api", data= json.dumps(data_json))

And last, but not least, let’s check the results:

for i in range (0, len(response_list.json())):
  print(response_list.json()[i]['result'])

Results (list of ICD-10-CM codes from NER chunks)

>> O2441 O2411 P702 K8520 B159 E669 Z6841 R35 R631 R630 R111...

SynapseML on Databricks

You can also run the above code in Databricks. To do that, you only need to remove the Creating a Spark Session, since Databricks manages that session for you.

After we remove that part of the code from our notebook, we need to set the same configuration params in the Cluster Configuration, so that Databricks spins a cluster with the proper jars and config params (similarly to what we did programatically in Creating a Spark Session above, but using Databricks UI)

To do so, go to Compute →Clusters in Databricks and create a new cluster (name it, for instance, Synapse).

Creating a new cluster called “Synapse”

In your environment variables, as always, add the keys from your license in a key=value format

SynapseML on Databricks

Then, in Cluster → Libraries, you need to install:

  • SynapseML jar (Maven → com.microsoft.azure:synapseml_2.12:0.9.5)
  • Spark NLP jar ( Maven →com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:[YOUR_SPARKNLP_VERSION])
  • Spark NLP wheel (PyPi → spark-nlp==[YOUR_SPARKNLP_VERSION])
  • If you are using Spark NLP for Healthcare
    • Spark NLP for Healthcare jar. Download the jar using the secret from your license, and then upload the jar to DBFS and add it in the Libraries section (DBFS/ADLS → dbfs:/FileStore/johnsnowlabs/libs/spark_nlp_jsl_[YOUR_SPARKNLP_VERSION].jar)
    • Spark NLP for Healthcare wheel. Same that with the jar. Download the jar using the secret from your license, and then upload the jar to DBFS and add it in the Libraries section (DBFS/ADLS → dbfs:/FileStore/johnsnowlabs/libs/spark_nlp_jsl_[YOUR_SPARKNLP_VERSION].whl)

And the rest of the code from the Importing all the libraries section and on remains exactly the same.

Synapse ML on Databricks: results

Do you want to know more?

Last updated