Merkle Trees for Comparisons – Example

Merkle trees (named after Ralph Merkle, one of the fathers of modern cryptography) are fascinating data structures used in hash based data structures to verify the integrity of data in peer-to-peer systems. Systems like Dynamo use this to compare hashes  – essentially itself a binary tree of hashes and typically used to remove conflicts for reads. For example – in a distributed system, if a replica node falls considerably behind  its peers, using techniques like vector clocks might take unacceptable times to resolve. A hash-based comparison approach like Merkle tree would help quickly compare two copies of a range of data on different replicas. This is also a core part of blockchains like Ethereum which uses a non-binary variant but the binary ones are the most common and easy to understand and fun to implement.

Conceptually this involves:

  1. Comparing the root hashes of both trees.
  2. Continue recursion on the left and right children of the tree until the root hashes are equal.

The “Merkle root” stores the summary of all the transaction value in a singular value.

Simple Example

For example , if TA, TB,TC ,TD are transactions ( could be files, keys etc) and H is a Hash function. You can construct a tree by taking the transactions, hashing their concatenated values to generate children and finally reduced to a single root. In my scrawl above, this means hashing TA and TB, TC and TD, then hashing their concatenations H (AB), H(CD) to land at H(ABCD).Essentially keep hashing the until all the transactions meet at a single hash.

Example

Here’s an example that uses this technique to compare two files by generating their Merkle root to validate if they are equal of not (comments inline).

Invoke the script by calling “python merkle_sample.py “<file1>.csv” “<file2>.csv” to compare two merkle trees. Code below:

Key advantage here is that each branch of the tree can be checked independently without downloading the whole dataset to compare.

This translates to reducing the number of disk reads for synchronization though that efficiency needs to be balanced against the recalculation of the entire tree when nodes leave or go down. This is fundamental to Crypto currencies when transactions need to be validated by nodes and there is enormous time and space cost to validate every transaction which can be mitigated by Merkle trees in logarithmic time instead of linear time.  The Merkle root get put into the block header that gets hashed in the process of mining and comparisons are made via the Merkle root rather than submitting all the transactions over the network. Ethereum uses a more complex variant of the Merkle, namely the Merkle Patricia tree.

The applications of this range beyond blockchains to Torrents, Git, Certificates and more.

Text Summarizer on Hugging Face with mlflow

Hugging Face Emoji Classic Round Sticker - EmojiPrints

Hugging Face is the go-to resource open source natural language processing these days. The Hugging Face hubs are an amazing collection of models, datasets and metrics to get NLP workflows going. Its relatively easy to incorporate this into a mlflow paradigm if using mlflow for your model management lifecycle. mlflow makes it trivial to track model lifecycle, including experimentation, reproducibility, and deployment. mlflow’s open format makes it my go-to framework for tracking models in an array of personal projects and It also has an impressive enterprise implementation that my teams at work enable for large enterprise use cases. For smaller projects, its great to use mlflow locally for any projects that requires model management as this example illustrates.

The beauty of Hugging Face (HF) is the ability to use their pipelines to to use models for inference. The models are products of massive training workflows performed by big tech and available to ordinary users who can use them for inference. The HF pipelines offer a simple API dedicated to performing inference in these models thus sparing the ordinary the user the complexity and compute / storage requirements for running such large models.

The goal was to put some sort of tracking around all my experiments with the Hugging Face Summarizer that I’ve been using to  summarize text and then use the mlflow Serving via REST as well as running predictions on the inferred model by passing in a  text file. Code repository is here with snippets below.

Running the Text Summarizer and calling it via curl

Text summarization consists of Extractive and Abstractive types where Extractive selects sentence that has the most valuable context while Abstractive is trained to create summaries.

Considering I was running on a CPU, I picked a small model like the T5-small model trained on Wikihow All data set that has been trained to write summaries. The boiler plate code on the HuggingFace website gives you all you need to get started. Note that this models input length is set to 512 tokens max which may not be optimum for usecases with larger text.

a) First step is to define a wrapper around the model code so it can be called easily later on by subclassing it with the mlflow.pyfunc.PythonModel to use custom logic and artifacts.

class Summarizer(mlflow.pyfunc.PythonModel):
    '''
    Any MLflow Python model is expected to be loadable as a python_function model.
    '''

    def __init__(self):
        from transformers import pipeline, AutoTokenizer, AutoModelWithLMHead

        self.tokenizer = AutoTokenizer.from_pretrained(
            "deep-learning-analytics/wikihow-t5-small")

        self.summarize = AutoModelWithLMHead.from_pretrained(
            "deep-learning-analytics/wikihow-t5-small")

    def summarize_article(self, row):
        tokenized_text = self.tokenizer.encode(row[0], return_tensors="pt")

        # T5-small model trained on Wikihow All data set.
        # model was trained for 3 epochs using a batch size of 16 and learning rate of 3e-4.
        # Max_input_lngth is set as 512 and max_output_length is 150.
        s = self.summarize.generate(
            tokenized_text,
            max_length=150,
            num_beams=2,
            repetition_penalty=2.5,
            length_penalty=1.0,
            early_stopping=True)

        s = self.tokenizer.decode(s[0], skip_special_tokens=True)
        return [s]

    def predict(self, context, model_input):
        model_input[['name']] = model_input.apply(
            self.summarize_article)

        return model_input

b) We define the tokenizer to prepare the inputs of the model and the model using the HuggingFace specifications. This is a smaller model trained on Wikihow All data set. From the documentation – the model was trained for 3 epochs using a batch size of 16 and learning rate of 3e-4. Max_input_length is set as 512 and max_output_length is 150.

c) Then define the model specifications of the T5-small model by calling the summarize_article function with the tokenized text that will called it for every row in the dataframe input and eventually return the prediction.

d) The prediction function calls the summarize_article providing the  model input and calling the summarizer and returns the prediction. This is also where we can plug in mlflow  to infer the predictions.

The input and output schema are defined in the ModelSignature class as follows :

# Input and Output formats
input = json.dumps([{'name': 'text', 'type': 'string'}])
output = json.dumps([{'name': 'text', 'type': 'string'}])
# Load model from spec
signature = ModelSignature.from_dict({'inputs': input, 'outputs': output}) input = json.dumps([{'name': 'text', 'type': 'string'}])
 output = json.dumps([{'name':'text', 'type':'string'}]) 


e) We can set mlflow operations by setting the tracking URI which was “” in this case since its running locally. Its trivial in a platform like Azure to spin up a databricks workspace and get a tracking server spun up automatically so you can persist all artifacts at cloud scale.


Start tracking the runs by wrapping the mlflow.start_run invocation. The key here is to call the model for inference using the mlflow.pyfunc function to make the python code load into mlflow. In this case , the dependencies of the model are all stored directly with the model. Plenty of parameters here that can be tweaked described here.

# Start tracking
with mlflow.start_run(run_name="hf_summarizer") as run:
    print(run.info.run_id)
    runner = run.info.run_id
    print("mlflow models serve -m runs:/" +
          run.info.run_id + "/model --no-conda")
    mlflow.pyfunc.log_model('model', loader_module=None, data_path=None, code_path=None,
                            conda_env=None, python_model=Summarizer(),
                            artifacts=None, registered_model_name=None, signature=signature,
                            input_example=None, await_registration_for=0)


f) Check the runs via mlflow UI either using the “mlflow ui” command or just invoke the commandmlflow models serve -m runs:/<run_id>


g) Thats it – Call the curl command using sample text below:

curl -X POST -H "Content-Type:application/json; format=pandas-split" --data '{"columns":["text"],"data":[["Howard Phillips Lovecraft August 20, 1890 – March 15, 1937) was an American writer of weird and horror fiction, who is known for his creation of what became the Cthulhu Mythos.Born in Providence, Rhode Island, Lovecraft spent most of his life in New England. He was born into affluence, but his familys wealth dissipated soon after the death of his grandfather. In 1913, he wrote a critical letter to a pulp magazine that ultimately led to his involvement in pulp fiction.H.P.Lovecraft wrote his best books in Masachusettes."]]}' http://127.0.0.1:5000/invocations

Output:

"name": "Know that Howard Phillips Lovecraft (H.P.Lovecraft was born in New England."}]%

Running the Text Summarizer and calling it via a text file

For larger text, its more convenient reading the text from a file, formatting it and running the summarizer on it. The predict_text.py does exactly that.


a) Clean up the text in article.txt and load the text into a dictionary.

b) Load the model using pyfunc.load_model and then run the model.predict on the dictionary.

# Load model as a PyFuncModel.
loaded_model = mlflow.pyfunc.load_model(logged_model)


# Predict on a Pandas DataFrame.
summary = loaded_model.predict(pd.DataFrame(dict1, index=[0]))

print(summary['name'][0])

Code here

In summary, this makes for a useful way to track models and outcomes from readily available transformer pipelines to pick the best ones for the task.

Spotify Recommender API Call

One of my favorite features in Spotify are the recommendations. The app’s recommendations includes the Discover Weekly,  Daily Mix, Release Radar and the Artist Radio features. I could go through hours of recommendations substituting white noise while working on projects and usually encounter a song or an artist that appeals to my guitar/keyboard driven sensibilities in a session. While Discover Weekly, Daily Mix yield gems once in a while, the song specific ones usually based on Artist / Song radio yield a lot more matches to my sensibilities.

The recommendations endpoints that generates reccs based on a seed is a favorite. I’ve usually had a good match rate with songs that “stick” based on the API. There are plenty of other endpoints (artists, songs  etc) that could be easily plugged in to generate relevant predictions.

The API documentation of Spotify has always been stellar and its usability is enhanced by being able to test all the API calls easily within their developer console.

This API also has a bunch of parameters that can be configured for fine-tuning the recommendation: key, genre, loudness, energy, instrumentalness, popularity, speechiness, danceability etc.

Per the official docs – “Recommendations are generated based on the available information for a given seed entity and matched against similar artists and tracks. If there is sufficient information about the provided seeds, a list of tracks will be returned together with pool size details. For artists and tracks that are very new or obscure there might not be enough data to generate a list of tracks.

One of the key things here is to generate seeds for the recommendations, this can be done by using endpoints like Get a User’s Top Artists and Tracks to obtain artists and tracks based on my listening history and use these artists and tracks as seeds for the Get Recommendations Based on Seeds endpoint. This endpoint will only return tracks. 
The Web API Authorization Guide is a must to read before querying these endpoints and the developer console makes it super easy to try out different endpoints. 

I wanted a quick way to query the recommendations API for new recommendations and the combination of the streamlit + Spotify API was quick simple solve to get that working. At a high level I wanted to be able to query a song or artist and generate recommendations based on it. A secondary need is also to collect data for a reccomender I am training to customize ML-driven reccomendations but more on that in a different post.

A lot of the code is boilerplate and pretty self explanatory but at a high level it consists of the class to interact with the Spotify API (spotify_api.py) ,  a UI wrapper using Streamlit to render the app (spotify_explorer.py).  Given a client id and client secret, spotify_api.py gets client credentials from the Spotify API to invoke the search. Sample code inline with comments. The code can obviously be much more modular and pythonic but for investing a quick hour of hacking, this got the job done.

class SpotifyAPI(object):
    access_token = None
    access_token_expires = datetime.datetime.now()
    access_token_did_expire = True
    client_id = None
    client_secret = None
    token_url = 'https://accounts.spotify.com/api/token'

    def __init__(self, client_id, client_secret, *args, **kwargs):
        self.client_id = client_id
        self.client_secret = client_secret

    # Given a client id and client secret, gets client credentials from the Spotify API.
    def get_client_credentials(self):
        ''' Returns a base64 encoded string '''
        client_id = self.client_id
        client_secret = self.client_secret
        if client_secret == None or client_id == None:
            raise Exception("check client IDs")
        client_creds = f"{client_id}:{client_secret}"
        client_creds_b64 = base64.b64encode(client_creds.encode())
        return client_creds_b64.decode()

    def get_token_header(self):  # Get header
        client_creds_b64 = self.get_client_credentials()
        return {"Authorization": f"Basic {client_creds_b64}"}

    def get_token_data(self):  # Get token
        return {
            "grant_type": "client_credentials"
        }

    def perform_auth(self):  # perform auth only if access token has expired
        token_url = self.token_url
        token_data = self.get_token_data()
        token_headers = self.get_token_header()

        r = requests.post(token_url, data=token_data, headers=token_headers)

        if r.status_code not in range(200, 299):
            print("Could not authenticate client")
        data = r.json()
        now = datetime.datetime.now()
        access_token = data["access_token"]
        expires_in = data['expires_in']
        expires = now + datetime.timedelta(seconds=expires_in)
        self.access_token = access_token
        self.access_token_expires = expires
        self.access_token_did_expire = expires < now
        return True

    def get_access_token(self):

        token = self.access_token
        expires = self.access_token_expires
        now = datetime.datetime.now()
        if expires < now:
            self.perform_auth()
            return self.get_access_token()
        elif token == None:
            self.perform_auth()
            return self.get_access_token()
        return token

    # search for an artist/track based on a search type provided
    def search(self, query, search_type="artist"):
        access_token = self.get_access_token()
        headers = {"Content-Type": "application/json",
                   "Authorization": f"Bearer { access_token}"}
        # using the  search API at https://developer.spotify.com/documentation/web-api/reference/search/search/
        search_url = "https://api.spotify.com/v1/search?"
        data = {"q": query, "type": search_type.lower()}
        from urllib.parse import urlencode
        search_url_formatted = urlencode(data)
        search_r = requests.get(
            search_url+search_url_formatted, headers=headers)
        if search_r.status_code not in range(200, 299):
            print("Encountered isse=ue")
            return search_r.json()
        return search_r.json()

    def get_meta(self, query, search_type="track"):  # meta data of a track
        resp = self.search(query, search_type)
        all = []
        for i in range(len(resp['tracks']['items'])):
            track_name = resp['tracks']['items'][i]['name']
            track_id = resp['tracks']['items'][i]['id']
            artist_name = resp['tracks']['items'][i]['artists'][0]['name']
            artist_id = resp['tracks']['items'][i]['artists'][0]['id']
            album_name = resp['tracks']['items'][i]['album']['name']
            images = resp['tracks']['items'][i]['album']['images'][0]['url']

            raw = [track_name, track_id, artist_name, artist_id, images]
            all.append(raw)

        return all

  

The get_recommended_songs function is the core of the app querying the API for results based on the query passed in. The more the parameters the better the results. Customizing the call to any API call is fairly trivial.

   def get_reccomended_songs(self, limit=5, seed_artists='', seed_tracks='', market="US",
                              seed_genres="rock", target_danceability=0.1):  # reccomendations API
        access_token = self.get_access_token()
        endpoint_url = "https://api.spotify.com/v1/recommendations?"
        all_recs = []
        self.limit = limit
        self.seed_artists = seed_artists
        self.seed_tracks = seed_tracks
        self.market = market
        self.seed_genres = seed_genres
        self.target_danceability = target_danceability

        # API query plus some additions
        query = f'{endpoint_url}limit={limit}&market={market}&seed_genres={seed_genres}&target_danceability={target_danceability}'
        query += f'&seed_artists={seed_artists}'
        query += f'&seed_tracks={seed_tracks}'
        response = requests.get(query, headers={
                                "Content-type": "application/json", "Authorization": f"Bearer {access_token}"})
        json_response = response.json()

        # print(json_response)
        if response:
            print("Reccomended songs")
            for i, j in enumerate(json_response['tracks']):
                track_name = j['name']
                artist_name = j['artists'][0]['name']
                link = j['artists'][0]['external_urls']['spotify']

                print(f"{i+1}) \"{j['name']}\" by {j['artists'][0]['name']}")
                reccs = [track_name, artist_name, link]
                all_recs.append(reccs)
            return all_recs

Wrapping both the calls in a Streamlist app is refreshingly simple and dockerizing and pushing to Azure container registry was trivial.

Code

https://github.com/vishwanath79/spotifier

Usage

To run the app, run:
streamlit run spotify_explorer.py

Deployed at

https://spotiapp2.azurewebsites.net/

Part 2 to follow at some point as I continue building out a custom recommender that compares the current personalizer with a custom personalizer that takes in Audio features and more personalized inputs and tuneable parameters.

ONNX for ML Interoperability

Having been a Keras user since  I read  the seminal Deep Learning with Python , I’ve been experimenting with exporting formats to different frameworks to be more framework-agnostic.


ONNX ( Open Neural Network Exchange) is an open format for representing traditional and deep learning ML models.  Key goal being promoting inter-operability between a variety of frameworks and target environments. ONNX helps you to export a fully trained model into its format and enables targeting diverse environments without you doing manual optimization and painful rewrites of the models to accommodate environments.
It defines an extensible computation graph model along with built-in operators and standard data types to allow for a compact and cross-platform representation for serialization. A typical use case could be scenarios where you want to use transfer learning to use model weights of another model possibly built in another framework into your own model i.e. if you build  a model in Tensorflow, you get a protobuf (PB) file as output and it would be great if there is one universal format that you can now convert to the PT format to load and reuse in Pytorch or use its own hardware agnostic runtime.

For high-performance inference requirements in varied frameworks, this is great with platforms like NVIDIA’s TensorRT supporting ONNX with optimizations aimed at the accelerator present on their devices like the Tesla GPUs or the Jetson embedded devices.

Format

The ONNX file is a protobuf encoded tensor graph. List of operators supported are documented here and operations are referred to as “opsets” i.e. operation sets. Opsets are defined for different runtimes in order to enable interoperability. The operations are a growing list of widely used linear operations, functions and other primitives used to deal with tensors.

The operations include most of the typical deep learning primitives, linear operations, convolutions and activation functions. The model is mapped to the ONNX format by executing the model with often just random input data and tracing the execution. The operations executed are mapped to ONNX operations and so the entire model graph is mapped into the ONNX format. After this the ONNX model is then saved as .onnx protobuf file which can be read and executed by a wide and growing range of ONNX runtimes.

Note – Opsets are fast evolving and with fast release cycles of competing frameworks, it may not always be easy to upgrade to the latest ONNX version if it breaks compatibility with other frameworks. The file format consists of the following:

  • Model: Top level construct
    • Associates version Info and Metadata with a graph
  • Graph: describes a function
    • Set of metadata fields
    • List of model parameters
    • List of computation nodes – Each node has zero or more inputs and one or more outputs.
  • Nodes: used for computation
    • Name of node
    • Name of an operator that it invokes a list of named inputs
    • List of named outputs
    • List of attributes

More details here.

Runtime


The ONNX model can be inferenced with ONNX runtime that uses a variety of hardware accelerators for optimal performance. The promise of ONNX runtime is that it abstracts the underlying hardware to enable developers to use a single set of APIs for multiple deployment targets. Note – the ONNX runtime is a separate project and aims to perform inference for any prediction function converted to the ONNX format.

This has  advantages over dockerized pickle models that is usually the approach in a lot of production deployments where there are runtime restrictions (i.e. can run only in .NET or JVM) , memory and storage overhead, version dependencies, and batch prediction requirements.


ONNX runtime has been integrated in WINML, Azure ML with MSFT as its primary backer. Some of the new enhancements include INT8 quantization to reduce floating point numbers for reducing model size, memory footprint and to increase efficiencies benchmarked here.


The usual path to proceed :

  • Train models with frameworks
  • Convert into ONNX with ONNX converters
  • Use onnx-runtime to verify correctness and Inspect network structure using netron (https://netron.app/)
  • Use hardware-accelerated inference with ONNX runtime ( CPU/GPU/ASIC/FPGAs)

Tensorflow


To convert Tensorflow models, the easiest way is to use the tf2onnx tool from the command line. This converts the saved model to a model representation that includes the inference graph.


Here is an end-to-end example of saving a simple Tensorflow model , converting it to ONNX and then running the predictions using the ONNX model and verifying the predictions match.


Challenges


However, some things to consider while using this format is the lack of “official support” from frameworks like Tensorflow. For example, Pytorch does provide the functionality to exports models into ONNX (torch.ONNX ) however I could not find any function to import an ONNX model to out put a Pytorch model. Considering CAFFE 2 that is a part of PyTorch fully supports ONNX import/export, it may not be totally unreasonable to expect an official conversion importer(there is a proposal already documented here).

The Tensorflow converters seem to be part of the ONNX project i.e. not an official/out of the box Tensorflow implementation. List of Tensorflow Ops supported are documented here. The github repo is a treasure trove of information on the computation graph model and the operators/data types that power the format. However, as indicated earlier depending on the complexity of the model (especially in transfer learning scenarios), it’s likely to encounter conversion issues during function calls that may cause the ONNX converter to fail. In this case, there are likely scenarios which may necessitate modifying the graph in order to fit the format. I’ve had a few issues running into StatefulPartitionednCalls especially in using TransferLearning situations for larger encoders in language models.


I have also had to convert Tensorflow to PyTorch by first converting Tensorflow to ONNX. Then the ONNX models to Keras using onnx2keras and then convert to Pytorch using MMdn with mixed results and a lot of debugging and many abandons. However, I think ONNX runtime for inference rather than framework-to-framework conversions will be a better use of ONNX.



The overall viability of a universal format like ONNX though well intentioned and highly sought may not fully ever come into fruition with so many divergent interests amongst the major contributors and priorities though its need cannot be disputed.

Deep Learned Shred Solo

Music generation with Recurrent Neural Nets has been of great interest to me with projects like Magenta displaying amazing feats of ML-driven creativity. AI is increasingly being used to augment human creativity and this trend will lay to rest creativity blocks like in the future. As someone who is usually stuck in a musical rut, this is great for spurring creativity.

With a few covid-induced reconnects with old friends (some of whom are professional musicians) and some inspired late night midi programming on Ableton, I decided to modify some scripts / tutorials that have been lying around on my computer to blend deep learning and compose music around it as I research on the most optimal ways to integrate Deep Learning into original guitar music compositions.

There’s plenty of excellent blogs and code on the web on LSTMs including this one and this one on generating music using Keras. LSTMs have plenty of boiler plate code on github that demonstrate LSTM and GRUs for creating music. For this project, I was going for recording a guitar solo based on artists I like and to set up a template for future experimentation for research purposes. A few mashed up solos of Yngwie served as the source data but the source data could have been pretty much anything in the midi format and it helps to know how to manipulate these files in the DAW, which in my case was Ableton. Most examples on the web have piano midi files that generate music in isolation. However, I wanted to combine the generated music with minimal accompaniment so as to make it “real”.

With the key of the track being trained on being in F Minor , I also needed to make sure i have some accompaniment in the key of FMinor for which I recorded a canned guitar part with some useful drum programming thanks to EZDrummer.

Tracks in Ableton

Note: this was for research purposes only and for further research into composing pieces that actually make sense based on the key being fed into the model. 

Music21 is invaluable for manipulating midi via code. Its utility is that is lets us manipulate starts, durations and pitch. I used Ableton to use the midi notes generated to plug in an instrument along with programmed drums and rhythm guitars.

Step 1:

Find the midi file(s) you want to base your ML solo on. In this case, Im going for generating a guitar solo to layer over a backing track. This could be pretty much anything as long as its midi that can be processed by Music21.

Step 2:

Preprocessing the midi file(s): The original midi file had guitars over drums, bass and keyboards. So, the goal was to extract the list of notes first to save them, the instrument.partitionByInstrument() function, separates the stream into different parts according to the instrument. If we have multiple files we can loop over the different files to partition it by individual instrument. This returns a list of notes and chords in the file.

from tqdm import tqdm
songs = glob(' /ml/vish/audio_lstm/YJM.mid') # this could be any midi file to be trained
notes = []
for file in tqdm(songs):
    midi = converter.parse(file) # convert all supported data formates to music21 objects
    notes_parser = None
    try:
        # partition parts for each unique instrument
        parts = instrument.partitionByInstrument(midi)
    except:
        print("No uniques")

    if parts: 
        notes_parser = parts.parts[0].recurse()
    else:
        notes_parser = midi.flat.notes # flatten notes to get all the notes in the stream
        print("parts == None")

    for element in notes_parser:
        if isinstance(element, note.Note):# check if elements are in the note class
            notes.append(str(element.pitch))  # Returns  Pitch objects found as a Python List
        elif(isinstance(element, chord.Chord)):
          notes.append('.'.join(str(n) for n in element.normalOrder))  
    
print("notes:", notes)

Step 3:

Creating the model inputs: Convert the items in the notes list to an integer so they can serve as model inputs. We create arrays for the network input and output to train the model. We have 5741 notes in  our input data and have defined a sequence length of 50 notes. The input sequence will be 50 notes and the output array will store the 51st note for every input sequence that we enter. Then we reshape and normalize the input vector sequence. We also one hot encoder on the integers so that we have the number of columns equal to the number of categories to get a network output shape of  (5691, 92). I’ve commented out some of the output so the results are easier to follow.

pitch_names = sorted(set(item for item in notes))   # ['0', '0.3.7', '0.4.7', '0.5', '1', '1.4.7', '1.5.8', '1.6', 10', '10.1.5',..]
note_to_int = dict((note, number) for number, note in enumerate(pitch_names))  #{'0': 0,'0.3.7': 1, '0.4.7': 2,'0.5': 3, '1': 4,'1.4.7': 5,..]
sequence_length = 50
len(pitch_names) # 92
range(0, len(notes) - sequence_length, 1) #range(0, 5691)
# Deifne input and output sequence
network_input = []
network_output = []
for i in range(0, len(notes) - sequence_length, 1):
    sequence_in = notes[i: i + sequence_length]
    sequence_out = notes[i + sequence_length]
    network_input.append([note_to_int[char] for char in sequence_in]) 
    network_output.append(note_to_int[sequence_out])
print("network_input shape (list):", (len(network_input), len(network_input[0]))) #network_input shape (list): (5691, 50)
print("network_output:", len(network_output)) #network_output: 5691
patterns = len(network_input)  
print("patterns , sequence_length",patterns, sequence_length) #patterns , sequence_length 5691 50
network_input = np.reshape(network_input, (patterns , sequence_length, 1)) # reshape to array of (5691, 50, 1)
print("network input",network_input.shape) #network input (5691, 50, 1)
n_vocab = len(set(notes))
print('unique notes length:', n_vocab) #unique notes length: 92
network_input = network_input / float(n_vocab) 
# one hot encode the output vectors to_categorical(y, num_classes=None)
network_output = to_categorical(network_output)  
network_output.shape #(5691, 92)

Step 4:

Model: We invoke Keras to build out the model architecture using LSTM. Each input note is used to predict the next note. Code below uses standard model architecture from tutorials without too many tweaks. Plenty of tutorials online that explain the model way better than I can such as this: http://colah.github.io/posts/2015-08-Understanding-LSTMs/

Training on the midi input can be expensive and time consuming so I suggest setting a high epoch number with calls backs defined based on the metrics to monitor, In this case,  I used loss and also created checkpoints for recovery and save the model as ‘weights.musicout.hdf5’. Also note , I trained this on community edition Databricks for convenience.

def create_model():
  from tensorflow.keras.models import Sequential
  from tensorflow.keras.layers import Activation, Dense, LSTM, Dropout, Flatten

  model = Sequential()
  model.add(LSTM(128, input_shape=network_input.shape[1:], return_sequences=True))
  model.add(Dropout(0.2))
  model.add(LSTM(128, return_sequences=True))
  model.add(Flatten())
  model.add(Dense(256))
  model.add(Dropout(0.3))
  model.add(Dense(n_vocab))
  model.add(Activation('softmax'))
  model.compile(loss='categorical_crossentropy', optimizer='adam',metrics=["accuracy"])
  model.summary()
  return model

from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
model = create_model()

save_early_callback = EarlyStopping(monitor='loss', min_delta=0,
                                    patience=3, verbose=1,
                                    restore_best_weights=True)
epochs = 5000
filepath = 'weights.musicout.hdf5'
checkpoint = ModelCheckpoint(filepath, monitor='loss', verbose=0, save_best_only=True)
model.fit(network_input, network_output, epochs=epochs, batch_size=32, callbacks=[checkpoint,save_early_callback])

Step 5:

Predict: Once we have the model trained, we can start generating nodes based on the trained model weights along with feeding the model a sequence of notes. We can pick a random integer and a random sequence from the input sequence as a starting point. In my case, it involved calling the model.predict function for a 1000 notes that can be converted to a midi file. The results might vary at this stage, for some reason I saw some degradation after 700 notes so some tuning required here.

start = np.random.randint(0, len(network_input)-1)  # randomly pick an integer from input sequence as starting point
print("start:", start)
int_to_note = dict((number) for number in enumerate(pitch_names))
pattern = network_input[start]
prediction_output = [] # store the generated notes
print("pattern.shape:", pattern.shape)
pattern[:10] # check shape

# generating 1000 notes

for note_index in range(1000):
    prediction_input = np.reshape(pattern, (1, len(pattern), 1))
    prediction_input = prediction_input / float(n_vocab)

    prediction = model.predict(prediction_input, verbose=0) # call the model predict function to predict a vector of probabilities
    
    predict_index = np.argmax(prediction)  # Argmax is finding out the index of the array that results in the largest predict value
    #print("Prediction in progress..", predict_index, prediction)
    result = int_to_note[predict_index]   
    prediction_output.append(result)

    pattern = np.append(pattern, predict_index)
    # Next input to the model
    pattern = pattern[1:1+len(pattern)]

print('Notes generated by model...')
prediction_output[:25] # Out[30]: ['G#5', 'G#5', 'G#5', 'G5', 'G#5', 'G#5', 'G#5',...

Step 6:

Convert to Music21: Now that we have our prediction_output numpy array with the predicted notes, it’s time to convert it back into a format that Music21 can recognize with the objective of converting that back to a midi file.

offset = 0
output_notes = []

# create note and chord objects based on the values generated by the model
# convert to Note objects for  music21
for pattern in prediction_output:
    if ('.' in pattern) or pattern.isdigit():  # pattern
        notes_in_chord = pattern.split('.')
        notes = []
        for current_note in notes_in_chord:
            new_note = note.Note(int(current_note))
            new_note.storedInstrument = instrument.Piano() 
            notes.append(new_note)
        new_chord = chord.Chord(notes)
        new_chord.offset = offset
        output_notes.append(new_chord)
    else:  # pattern
        new_note = note.Note(pattern)
        new_note.offset = offset
        new_note.storedInstrument = instrument.Piano()  
        output_notes.append(new_note)

    # increase offset each iteration so that notes do not stack
    offset += 0.5

#Convert to midi
midi_output = music21.stream.Stream(output_notes)
print('Saving Output file as midi....')
midi_output.write('midi', fp=' /ml/vish/audio_lstm/yjmout.midi')

Step 7:

Once we have the midi file with the generated notes, the next step was to load the midi track into Ableton. The next steps were standard recording processes one would follow to record a track in the DAW.

a) Compose and Record the Rhythm guitars, drums and Keyboards.

Instruments/software I used:

Midi
Midi

b) Insert the midi track into the DAW and quantize and sequence accordingly. This can take significant time depending on the precision wanted. In my case, this was just a quick fun project not really destined for the charts so a quick rough mix and master sufficed.

The track is on soundcloud here. The solo kicks in around the 16 second mark. Note I did have to adjust the pitch to C to blend in with the rhythm track though it was originally trained on a track in F minor

There are other ways of dealing with more sophisticated training like using different activation functions or by normalizing inputs. GRUs are another way to get past this problem and I plant iterate on more complex pieces blending deep learning with my compositions. This paper gives a great primer on the difference between LSTMs and GRUs: https://www.scihive.org/paper/1412.355

TFDV for Data validation

Working with my teams trying to build out a robust feature store these days, it’s becoming even more imperative to ensure feature Engineering data quality. The models that gain efficiency out of a performant feature store are only as good as the underlying data. 

Tensorflow Data Validation (TFDV) is a python package from the TF Extended ecosystem. The package has been around for a while but now has evolved to a point of being extremely useful for machine learning pipelines as part of feature engineering and determining data drift scenarios. Its main functionality is to compute descriptive statistics, infer  schema,and detect data anomalies.  It’s well integrated with the Google Cloud Platform and Apache Beam. The core API uses Apache Beam transforms to compute statistics over input data.

I end up using it in cases where I need quick checks on data to validate and identify drift scenarios before starting expensive training workflows. This post is a summary of some of my notes on the usage of the package. Code is here.

Data Load

TFDV accepts CSV, Dataframes or TFRecords as input.

The csv integration and the built-in visualization function makes it relatively easy to use within Jupyter notebooks. The library takes input feature data and then analyzes them by feature to visualize them. This makes it easy to get a quick understanding of the distribution of values, helps identifying anomalies and identifying training/test/validate skew. Also a great way to discover bias in the data since you can infer aggregates of values that skewed towards certain features.

As evident, with trivial amount of code you can spot issues immediately – missing columns, inconsistent distribution and data drift scenarios where newer dataset could have different statistics compared to earlier trained data.

I used a dataset from Kaggle to quickly illustrate the concept:

import tensorflow_data_validation as tfdv
train = tfdv.generate_statistics_from_csv(data_location='Data/Musical_instruments_reviews.csv', delimiter=',')
# Infer schema
schema = tfdv.infer_schema(TRAIN)
tfdv.display_schema(schema)

This generates a data structure that stores summary statistics for each feature.

TFDV Schema

Schema Inference

The schema properties describe every feature present in the 10261 reviews. Example:

  • their type (STRING)
  • Uniqueness of features – for example 1429 unique reviewer IDs.
  • the expected domains of features.
  • the min/max of the number of values for a feature in each example. For example: If A2EZWZ8MBEDOLN is a reviewerid and has 36 occurrences
top_values {
        value: "A2EZWZ8MBEDOLN"
        frequency: 36.0
      }
datasets {
  num_examples: 10261
  features {
    type: STRING
    string_stats {
      common_stats {
        num_non_missing: 10261
        min_num_values: 1
        max_num_values: 1
        avg_num_values: 1.0
        num_values_histogram {
          buckets {
            low_value: 1.0
            high_value: 1.0
            sample_count: 1026.1
          }
          buckets {
            low_value: 1.0
            high_value: 1.0
            sample_count: 1026.1
          }

Schema inference is usually tedious but becomes a breeze with TFDV. This schema is stored as a protocol buffer

schema = tfdv.infer_schema(train)
tfdv.display_schema(schema)

The schema also generates definitions like “Valency” and “Presence”. I could not find too much detail in the documentation but I found this useful paper that describes it well.

  • Presence: The expected presence of each feature, in terms of a minimum count and fraction of examples that must contain the feature.
  • Valency: The expected valency of the feature in each example, i.e., minimum and maximum number of values.

TFDV has inferred the revewerName as STRING and the universe of values around them termed as Domain. Note – TFDV can also encode your fields as BYTES. Im not seeing any function call in the API to update the column type as-is but you could easily update it externally if you want to explicitly specify a string. From the documentation, its explicitly advised to review the inferred schema and refine it per the requirement so as to embellish this auto-inference with our domain knowledge based on the data. You could also update the Feature based on the Data Type to BYTES, INT, FLOAT or STRUCT.

# Convert to BYTES
tfdv.get_feature(schema, 'helpful’).type=1 


Once loaded, you can generate the statistics from the csv file.
For a comparison and to simulate a  dataset validation scenario, I cut down the Musical_instruments_reviews.csv to 100 rows to compare with the original and also added an extra feature called ‘Internal’ with the values A, B,C randomly interspersed for every row.

Visualize Statistics

After this you can pass in the ‘visualize_statistics’ call to first visualize the two datasets based on the schema of the first dataset (TRAIN in the code). Even though this is limited to two datasets, this is a powerful way to identify issues immediately. For example – it can right off the bat identify “missing features” such as over 99.6% values in the feature. “reviewerName” as well as split the visualization into numerical and categorical features based on its inference of the data type.

# Load test data to compare
TEST = tfdv.generate_statistics_from_csv(data_location='Data/Musical_instruments_reviews_100.csv', delimiter=',')
# Visualize both datasets
tfdv.visualize_statistics(lhs_statistics=TRAIN, rhs_statistics=TEST, rhs_name="TEST_DATASET",lhs_name="TRAIN_DATASET")


A particularly nice option is the ability to choose a log scale for validating categorical features. The ‘Percentages’ option can show quartile percentages.

Anomalies

Anomalies can be detected using  the display_anomalies call. The long and short descriptions allow easy visual inspection of the issues in the data. However, for large scale validation this may not be enough and you will need to   use tooling that handle a stream of defects being presented. 

# Display anomalies
anomalies = tfdv.validate_statistics(statistics=TEST, schema=schema)
tfdv.display_anomalies(anomalies)


The various kinds of anomalies that can be detected and their invocation are described here. Some especially useful ones are:

  • SCHEMA_MISSING_COLUMN
  • SCHEMA_NEW_COLUMN
  • SCHEMA_TRAINING_SERVING_SKEW
  • COMPARATOR_CONTROL_DATA_MISSING
  • COMPARATOR_TREATMENT_DATA_MISSING
  • DATASET_HIGH_NUM_EXAMPLES
  • UNKNOWN_TYPE
Anomaly

Schema Updates

Another useful feature here is the ability to update the schema and values to make corrections. For example, in order to insert a particular value

# Insert Values
names = tfdv.get_domain(schema, 'reviewerName').value
names.insert(6, "Vish") #will insert "Vish" as the 6th value of the reviewerName feature

You can also adjust the minimum number of values that must be preset in the domain and choose to drop it if is below a certain threshold.

# Relax the minimum fraction of values that must come from the domain for feature reviewerName
name = tfdv.get_feature(schema, 'reviewerName')
name.distribution_constraints.min_domain_mass = 0.9

Environments

The ability to split data into ‘Environments’ helps indicate the features that are not necessary in certain environments. For example,if we want the ‘internal’  column to be in the TEST data but not the TRAIN data. Features in schema can be associated with a set of environments using:

  •  default_environment
  •  in_environment
  •  not_in_environment
# All features are by default in both TRAINING and SERVING environments.
schema2.default_environment.append('TESTING')

# Specify that 'Internal' feature is not in SERVING environment.
tfdv.get_feature(schema2, 'Internal').not_in_environment.append('TESTING')

tfdv.validate_statistics(TEST, schema2, environment='TESTING')
#serving_anomalies_with_env

Sample anomaly output:

string_domain {
    name: "Internal"
    value: "A"
    value: "B"
    value: "C"
  }
  default_environment: "TESTING"
}
anomaly_info {
  key: "Internal"
  value {
    description: "New column Internal found in data but not in the environment TESTING in the schema."
    severity: ERROR
    short_description: "Column missing in environment"
    reason {
      type: SCHEMA_NEW_COLUMN
      short_description: "Column missing in environment"
      description: "New column Internal found in data but not in the environment TESTING in the schema."
    }
    path {
      step: "Internal"
    }
  }
}
anomaly_name_format: SERIALIZED_PATH

Skews & Drifts

The ability to detect data skews and drifts is invaluable. However, the drift  here does not indicate a divergence from the mean but refers to the “L-infinity”  norm of the difference between the summary statistics of the two datasets. We can specify a threshold which if exceeded for the given feature flags the drift. 

Lets say we have two vectors [2,3,4] and [-4,-7,8] , the L-infinity norm is the maximum absolute value of the difference between the two vectors so in this case the absolute maximum of [6,10,-4] which is 1.

#Skew comparison
tfdv.get_feature(schema,
                 'helpful').skew_comparator.infinity_norm.threshold = 0.01
skew_anomalies = tfdv.validate_statistics(statistics=TRAIN,
                                          schema=schema,
                                          serving_statistics=TEST)
skew_anomalies

Sample Output:

anomaly_info {
  key: "helpful"
  value {
    description: "The Linfty distance between training and serving is 0.187686 (up to six significant digits), above the threshold 0.01. The feature value with maximum difference is: [0, 0]"
    severity: ERROR
    short_description: "High Linfty distance between training and serving"
    reason {
      type: COMPARATOR_L_INFTY_HIGH
      short_description: "High Linfty distance between training and serving"
      description: "The Linfty distance between training and serving is 0.187686 (up to six significant digits), above the threshold 0.01. The feature value with maximum difference is: [0, 0]"
    }
    path {
      step: "helpful"
    }
  }
}
anomaly_name_format: SERIALIZED_PATH

The drift comparator is useful in cases where you could have the same data being loaded in a frequent basis and you need to watch for anomalies to reengineer features. The validate_statistics call combined with the drift_comparator threshold can be used to monitor for any changes that you need to action on.

#Drift comparator
tfdv.get_feature(schema,'helpful').drift_comparator.infinity_norm.threshold = 0.01
drift_anomalies = tfdv.validate_statistics(statistics=TRAIN,schema=schema,previous_statistics=TRAIN)
drift_anomalies
Anomaly_info {
  key: "reviewerName"
  value {
    description: "The feature was present in fewer examples than expected."
    severity: ERROR
    short_description: "Column dropped"
    reason {
      type: FEATURE_TYPE_LOW_FRACTION_PRESENT
      short_description: "Column dropped"
      description: "The feature was present in fewer examples than expected."
    }
    path {
      step: "reviewerName"
    }
  }
}

You can easily save the updated schema in the format you want for further processing.

Overall, this has been useful to me to use for mainly models within the TensorFlow ecosystem and the documentation indicates that using components like StatisticsGen with TFX makes this a breeze to use in pipelines with out-of-the box integration on a platform like GCP.

The use case for avoiding time-consuming preprocessing/training steps by using TFDV to identify anomalies for feature drift and inference decay is a no-brainer however defect handling is up to the developer to incorporate. It’s important to also consider that ones domain knowledge on the data plays a huge role in these scenarios for optimizing data according to your needs so an auto-fix on all data anomalies may not really work in cases where a careful review is unavoidable.

This can also be extended for overall general data quality by applying to any validation cases where you are constantly getting updated data for the features. The application of TFDV could even be post-training for any data input/output scenario to ensure that values are as expected.


Official documentation is here.

Autoencoders for Data Anomalies

With more and more emphasis on data anomaly detection and the proliferation of build/buy options, I’ve been exploring auto encoders for a few projects. In a nutshell, Autoencoders are a type of neural network that take an input (image, data) minimize it down to core features and then reverse the process to recreate the input. Key aspect being that the encoding part is actually done in an unsupervised manner hence the ‘auto’.

For example, dismantling a picture of a automobile, taking out every part and representing ( encoding) them as chassis, wheels as representative components and then reassembling them (decoding) from the encoding minimizing some amount of expected reconstruction errors.

Autoencoders use an encoder that learns the concise representation of the input data and the decoder reconstructs that representation that has been compressed. A lot of the literature online calls this compressed vector to be the “latent space representation”.

The seminal paper on the subject that shows the benefits of Autoencoders has been dissected many times and demonstrates the use of Restricted Boltzmann Machines (a 2-layer Autoencoder consisting of a visible/hidden layer) that learns the difference between the hidden and visible layer using a metric called K-L divergence and provides a greater dimensionality reduction than Principal Component Analysis. Thankfully the implementation is much more approachable than some of the background math used to prove the model!

These are feedforward, non-recurrent neural networks having an input layer, output layer and one of more hidden layers with the count of output nodes matching the input nodes minimizing “noise’ instead of predicting a target variable as we do in supervised learning implementations. Hence, they dont require labels which qualifies them to be unsupervised.

In a market rife with products offering “data quality” solutions, using Autoencoders to detect for anomalies could have the potential for a low cost, easy to use solution built in house to add to existing options.

My focus has been more on exploring this for analyzing data anomalies in structured data. In terms of cost/benefit here, one could argue this might be overkill to use a neural network instead of more rule-based checks on the data which is very valid and extensively used in large enterprises instead of neural net deployments. However, the benefits of squashing the input data into a smaller representative vector help in cases where we deliberately need dimensionality reduction and recognizing outliers at scale. There are tons of material on the web for Image processing using autoencoders for use cases such as image compression, image denoising and medical imaging. For example, fascinating results by converting THIS to THIS make colorizing an engrossing endevor. Also tons of applications in the Natural Language Processing field for understanding text, word embeddings and learning semantic meaning of words.

Autoencoders – unlike GANS can’t generate newer datapoints since their core goal is to determine an identity function suing compression. Also, if the goal is to just achieve compression, they are poor general-purpose image compressors.

There are a few types of an Autoencoder well described here:

  • Denoising autoencoder
  • Sparse Autoencoder
  • Deep Autoencoder
  • Contractive Autoencoder
  • Undercomplete Autoencoder
  • Convolutional Autoencoder
  • Variational Autoencoder

Most of the examples I found online dealt with images, so for exploration I used Faker to generate a million records to simulate a data scenario for regular versus non-regular coffee consumers. The irregulars were determined on a random rule say those who spent less than a specified threshold.

The objective was to have the Autoencoder learn from the fabricated data examples on what the values for the “regular” customers were, test against a holdout dataset from the “regular” group and then use the model to identify anomalies post reconstruction to identify cases of irregularities. Essentially, have the autoencoder achieve reasonable compression on the data and then identify anomalous inputs while reading out data with irregular values that do not match the original representation.

Below is a simple gist I created for a walkthrough of the process for a possible implementation with comments inline that should be self-explanatory.

  • Customer Test Score : 0.014549643226719535
  • Customer Validation Score : 0.014477944187138688
  • Irregular Customer Validation Score : 3.541257450764963

The scores reflected the anomaly for a synthetic dataset consisting of a million records and I was able to use spark to scale this to well over 10 million records. Essentially, as you can tell the Irregular Customer validation scores against a validation dataset is around 35% well over the Customer validation score over the entire data set (1%). Next step is to try some of these approaches against more “production”-type data at scale and implement some alerting against this data to make this more actionable.

There are tons of considerations that make a quality data anomaly solution work for particular use cases not limited to Statistical analysis use cases, storage considerations, UI/UX for test case development, the right orchestration tools, database/data lake operability, scaling and developmental costs and security audit requirements. Hence, the methodology for detection is just one piece for a much larger puzzle.

Some interesting reads/videos:

Spark AI Summit 2020 Notes

Spark + AI Summit - Databricks

Spark AI Summit just concluded this week and as always, plenty of great announcements. (Note: I was one of the speakers at the event but this post is more about the announcements and areas of my personal interest in Spark. The whole art of virtual public speaking is another topic). The ML enhancements and impact is a bigger topic probably for another day as I catch up with all the relevant conference talks and try out the new features.

Firstly, I think the online format worked for this instance. This summit ( and I’ve been to it every year since its inception) was way more relaxing and didn’t leave me exhausted physically and mentally with information overload. Usually held at the Moscone in San Francisco, the event becomes a great opportunity to network with former colleagues, friends and Industry experts which is the most enjoyable part yet taxing in many ways with limited time to manage. The virtual interface was way better than most of the online events I’ve been to before – engaging and convenient. The biggest drawback was the networking aspect and the online networking options just don’t cut it. The video conferencing fatigue probably didn’t hit since it was 3 days and the videos were available instantly online so plenty of them are in my “Watch Later” list. (Note the talks I refer to below are only the few I watched so plenty of many more interesting ones)

The big announcement was the release of Spark 3.0 – Hard to believe but it’s been 10 years of evolution. I remember 2013 as the year I was adapting to the Hadoop ecosystem writing map-reduce using Java/Pig/Hive for large scale data pipelines when Spark started emerging as a fledgling competitor with an interesting distributed computational engine using Resilient Distributed Datasets (RDD). Fast-forward to 2020 and Spark is the foundation of large scale data implementations across the industry and its ecosystem has evolved to frameworks and engines like Delta and MLflow which are also gaining a foothold as foundational to the enterprise across Cloud providers. More importantly, smart investment into its DataFrames API has reduced the barrier to entry to it with the SQL access patterns.

There were tons of new features introduced but focusing on the ones I paid attention to. There has not been a major release of Spark for years so this is pretty significant (2.0 was in 2016).

Spark 3.0

  • Adaptive Query execution: At the core, this helps in changing the number of reducers at runtime. It divides the SQL Execution plan into stages earlier instead of the usual RDD graph. Newer stages help injecting optimizations before the queries get executed as later stages have the full picture of the entire query plan to have a global picture of all shuffle dependencies . The execution plans can be auto-optimized at runtime for example changing a SortMergeJoin to a BroadcastJoin where applicable. This is huge in large-scale implementation when I see tons of poorly formed queries eating a lot of compute thanks to skewed joins. More specifically, settings like the number of shuffle partitions set using spark.sql.shuffle.partitions which has defaulted to 200 since inception can now be automatically tuned based on the reducers required for the mapping stage output – i.e. setting it high for larger data and smaller for smaller data.

  • Dynamic partition pruning: Enables the ability to perform filter pushdowns versus table scans by adding a partition pruning filter. At the core if you consider a broadcast hash join between a fact and dimension table, the enhancement intercepts the result of the broadcast and plugs them as a filter on top of the dynamic filter on the fact table as opposed to the earlier approach of pushing out the broadcast hash table derived from the dimension table to every worker to determine the value of the join with the fact. This is huge to avoid scanning irrelevant data. This session explains it well.

  • Accelerator-aware scheduler: Traditionally, the bottleneck usually has been small data in partitions that GPUs find hard to handle, cache processing efficiencies, slow I/O on disk, UDFs that need CPU processing and a lot more issues. But GPUs are massively useful for high cardinality datasets, matrix operations, window operations and transcoding situations. Originally termed project Hydrogen, this feature helps Spark be GPU-aware. The cluster managers now have GPU support that schedulers can request from. The schedulers can now understand GPUs allocations to executors and assign GPUs appropriately to tasks. The GPU resources still need to be configured using the configs to assign the appropriate resources. We can request resources at the executor, drive and the task level. This also allows the resources to be discovered on the nodes and their assignments. This is supported in YARN, Kubernetes and Standalone modes.
  • Pandas UDF overhaul: Extensive use of python type annotations – this becomes more and more imperative as codebases scale and newer engineers take longer to understand and maintain the code effectively. instead of writing hundreds of test cases or worse find out about it from irate users. Great documentation and examples here.

  • PySpark UDF: Another feature that I’ve looked forward is to enable PySpark to handle Pandas Vectorized UDFs as an array. In the past, we needed to jump through god awful hoops like writing scala functions as a helper and then switch over to Python in order to help Python read these as arrays. ML engineers will welcome this.

  • Structured Streaming UI: Great to see more focus on the UI and additional features appearing in the workspace interface which frankly has got to be pretty stale over the last few years. The new tab shows more statistics for running and completed queries and more importantly will help developers debug exceptions quickly rather than poring through log files.

  • Proleptic Gregorian calendar: Switched to this from the previous hybrid (Julian + Gregorian). This uses Java 8 API classes from the java.time packages that are based on ISO chronology . The “proleptic” part comes from extending the Gregorian calendar backward to dates before before 1582 when it was officially introduced.

    Fascinating segway here –
Pope Gregory XIII portrait.jpg

The Gregorian Calendar (named after pope Gregory the 13th , not the guy who gave us the awesome Gregorian Chants, that was Gregory 1 ) is what we use today as part of ISO 8601:2004. The Gregorian calendar’ replaced the the Julian Calendar due to its inaccuracies in determining an actual year plus issues where it could not really take into the complexities of adding a leap year almost every 4 years. Catholics liked this and adopted it while protestants held out for 200 years (!) with suspicion before  England and the colonies switched over advancing the date from September 2 to September 14, 1752! Would you hand over 12 days of your life as a write -off? In any case, you can thank Gregory the 13th for playing a part in this enhancement.

  • Also a whole lot of talk on better ANSI SQL compatibility that I need to look closer at. Working with a large user base of SQL users, this could only be good news.

  • A few smaller super useful enhancements:
    • “Show Views” command
    • “Explain” output formatted for better usability instead of a jungle of text
    • Better documentation for SQL
    • Enhancements on MLlib, GraphX

Useful talks:

Delta Engine/Photon/Koalas

Being a big Delta proponent, this was important to me especially as adoption grows and large-scale implementations need continuous improvements in this product to justify rising storage costs on cloud providers as the scale grows.

The Delta Engine now has an improved query optimizer and a native vectorized execution engine written in C++. This builds on the optimized reads and writes in today’s NVMe SSDs that eclipse the SATA SSDs found in previous generations along with faster seek times. Gaining these efficiencies out of the CPU at the bare metal level is significant especially as data teams deal with more and more unstructured data and high velocity. The C++ implementation helps exploiting data-level and instruction-level parallelism as explained in detail in the keynote by Reynold Xin. Some interesting benchmarks on strings using regex to demonstrate faster processing. Looking forward to more details on how the optimization works under the hood and implementation guidelines.

Koalas 1.0 now implements 80% of the Pandas APIs. We can invoke accessors to use the Pyspark APIs from Koalas. Better type hinting and a ton of enhancements on DataFrames, Series and Indexes with support for Python 3.8 make this another value proposition on Spark.

A lot of focus on Lakehouse in ancillary meetings were encouraging and augurs well for data democratization on a single linear stack versus fragmenting data across data warehouses and data lakes. The Redash acquisition will provide another option for large scale enterprises for easy-to-use dashboarding and visualization capabilities on these curated data lake. Hope to see more public announcements on that topic.

MLflow

More announcements around the MLflow model serving aspects with Model Registry (announced in April) that lets data scientists track model lifecycle across versions such as Staging, Production, or Archived. With MLflow in the Linux Foundation, it helps evangelizing it to a larger audience with a vendor-independent non-profit managing this project.

  • Autologging : Enables automatic logging of Spark datasource information at read-time, without the need for explicit log statements. mlflow.spark.autolog() will enable auto logging for spark data sources if you provide the relevant data and versions using Delta Lake so the managed Databricks implementation definitely looks slicker with the UI. Implementation would be as easy as attaching a ml-flow spark JARS and then call mlflow.spark.autolog. More significantly, enables the cloning of models.
  • On Azure – the updated mlflow.azureml.deploy API for deploying MLflow models to AzureML. This now uses the up-to-date Model.package() and Model.deploy() APIs.
  • Model schemas for input and out schemas, custom metadata tags for tracking which means more metadata to track which is great.
  • Model Serving : Ability to deploy models via a rest endpoint on Hosted ML Flow which is great. Would have loved to see more turnkey methods to deploy to an agnostic deployment endpoint say, a managed Kubernetes service – the current implementation is for databricks clusters from what I noticed.

  • Lots of cool UI fixes including highlighting different parameter values when comparing runs, UI plot updates with scaling to thousands of points.

Useful talks:

Looking forward to trying out the new MLflow features which will go on public preview later in July.

Let it read – A character-based RNN generating Beatles lyrics

The Bob Spitz biography stares at me all day as I spend most of my waking hours at my home office desk taunting me to finish it one of these days. Probably subliminally influenced me to write a quick app to generate lyrics using a Recurrent Neural Net. The Databricks community edition or Google collab makes it a breeze to train these models at a zero or reasonable price point using GPUs.

All the buzz around GPT-3 and my interest in Responsible AI is helping inspire some late night coding sessions blown away by the accuracy of some of these pretrained language models. Also wanted to play around with Streamlit which saves me from tinkering around with javascript frameworks and what not while trying to deploy an app. The lack of a WSGI option limits some deployment options but I found it relatively easy to containerize and deploy on Azure.

The official TensorFlow tutorial is great for boilerplate which in turn is based on this magnum opus. The tutorial is pretty straight forward and google collab is great for training on GPUs.

Plenty of libraries available to scrape data – this one comes to mind. The model is character-based and predicts the next character in the sequence given a sequence. I tweaked around training this on small batches of text and finally settled on around 150 characters to start seeing some coherent structures. My source data scould do better, it stops after Help I believe. On my todo list is to embellish it with full discography so as to make the model better.

The model summary is as defined below:

Didn’t really have to tweak the sequential model too much to start seeing some decent output. The 3 layers were enough along with a Gated Recurrent Unit ( GRU). The GRU seemed to give me a better output than LSTM so I let it be…

As per the documentation, for each character the embedding is inputted into the GRU which is then run with one timestep. The dense layer generates the logits predicting the log-likelihood of the next character.

For each character the model looks up the embedding, runs the GRU one timestep with the embedding as input, and applies the dense layer to generate logits predicting the log-likelihood of the next character.

The standard tf.keras.losses.sparse_categorical_crossentropy  loss function is my usual go-to in this case because the classes are all mutually exclusive. Couldn’t get past 50 epochs on my mac book pro without the IDE hanging so had to shift to a Databricks instance which got the job done with no sweat on a 28.0 GB Memory, 8 Core machine.

All things must pass and 30 minutes later, we had a trained model thanks to an early stopping callback monitoring loss.

The coolest part of the RNN is the text generating function from the documentation that you can configure the number of characters to generate. It uses the start string and the RNN state to get the next character. The next predicted character is based on the categorical distribution which provides the index of the highest distributed category as the next input into the model. The state of the model is retained per input and modified states of the model are fed back into the model to help it learn. This is the magical mystery of the model. Plenty of more training to do as I wait for the Peter Jackson release.

Link to app: https://stream-web-app.azurewebsites.net/

Github: https://github.com/vishwanath79/let_it_read

Leader Election

Citizens in a democratic society usually vote to elect their leaders based on a pool of candidates. Distributed systems can choose a master node as well. Moreso , they can invoke “spot” elections in the situation of a leadership vacuum which helps in maintaining redundancy.

Introducing redundancy in services may introduce new problems of duplication and multi-operations. Leaders in a group of servers for redundancy will serve the business logic with the followers ready to take over leadership. Not a trivial problem with multiple systems that need to share state and gain consensus to elect the leader. Synchronization overhead and reduction of network trips are key considerations for this process.

Dealing with frameworks like Apache Spark on a daily basis makes you more aware of the inherent challenges stemming from a driver/worker architecture and the limited fault tolerance options when the master node goes down especially in high availability scenarios. Though several mitigation options exist (restarting with checkpoints, WALs etc)

As described here, Stable leader election ensures the leader remains that until any crashes irrespective of other behavior and its preferred they ensure minimal communication overhead, fault tolerant and ensure the leader is elected in constant time when the system is stable.

The Leader election process should guarantee that at most there is one leader at a time and eliminate the situation of multiple leaders being elected at the same time. This could be possible in network-partitioned systems where leaders being elected are unaware of each other. Zookeeper, Multi-paxos and Raft use temporary leaders to reduce number of messages required to reach an agreement.

Common Leader Election algorithms include the Bully algorithm or the Ring algorithm.


Bully Algorithm

Explained here

  • Each node gets a unique rank to help identify the leader
  • Elections begin when a node does not respond and the node that notices it first starts an election notifying the nodes greater than the fallen leaders rank.
  • The highest ranked responder takes over the process by responding to lower ranked nodes as well as a check to the fallen leader if any response.
  • If the node does not respond, there is a new leader elected.

This could be vulnerable to the multi-leader scenario if the nodes get split into different partitions. Also, the proclivity to the highest rank means unstable leaders can be elected which may mean frequent elections.

Here’s a quick barebones non-socket invocation of a 6-node election process.

Ring Algorithm

Explained here

  • Follows a ring topology and all nodes in the system form a ring
  • New election started when leader undergoes failure, with an alerting node notifying the next node on the ring.
  • Each node on the ring then contacts the next available node (higher ranked within the ring topology)
  • The node receives the responses back and confirms completeness of the traversal around the ring when it sees its own ID on the list and then picks the highest ID.

Apart from these, there are several other interesting implementations such as:

  • Next-in-line failover: Each elected leader provides a list of failover nodes and the leader failure starts a new election round.
  • Candidate/ordinary optimization: The nodes are split into candidate and ordinary – only one of the candidates can become a leader.
  • Invitation algorithm- Allows nodes to invite other nodes to join their groups instead of trying to outrank them

Alex Petrov’s book on Database Internals does a great job of explaining Election scenarios in a distribution context and has served as an inspiration for more detailed study in this topic.