Open In Colab

Lesson 2 Data Engineering for ML on AWS

Watch Lesson 2: Data Engineering for ML on AWS Video

Pragmatic AI Labs

alt text

This notebook was produced by Pragmatic AI Labs. You can continue learning about these topics by:

Load AWS API Keys

Put keys in local or remote GDrive:

cp ~/.aws/credentials /Users/myname/Google\ Drive/awsml/

Mount GDrive

from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)
import os;os.listdir("/content/gdrive/My Drive/awsml")

Install Boto

!pip -q install boto3

Create API Config

!mkdir -p ~/.aws &&\
  cp /content/gdrive/My\ Drive/awsml/credentials ~/.aws/credentials 

Test Comprehend API Call

import boto3
comprehend = boto3.client(service_name='comprehend', region_name="us-east-1")
text = "There is smoke in San Francisco"
comprehend.detect_sentiment(Text=text, LanguageCode='en')

2.1 Data Ingestion Concepts

Data Lakes

Central Repository for all data at any scale

data_lake

AWS Lake Formation

  • New Service Announced at Reinvent 2018
  • Build a secure lake in days…not months
  • Enforce security policies
  • Gain and manage insights

aws_lake

Kinesis (STREAMING)

Solves Three Key Problems

  • Time-series Analytics
  • Real-time Dashboards
  • Real-time Metrics

Kinesis Analytics Workflow

Kinesis Analytics

Kinesis Real-Time Log Analytics Example

Real-Time Log Analytics

Kinesis Ad Tech Pipeline

Ad Tech Pipeline

Kinesis IoT

Kinesis IoT

[Demo] Kinesis

AWS Batch (BATCH)

Example could be Financial Service Trade Analysis

financial_services_trade

Using AWS Batch for ML Jobs

https://aws.amazon.com/batch/

alt text

Example submissions tool

@cli.group()
def run():
    """Run AWS Batch"""

@run.command("submit")
@click.option("--queue", default="first-run-job-queue", help="Batch Queue")
@click.option("--jobname", default="1", help="Name of Job")
@click.option("--jobdef", default="test", help="Job Definition")
@click.option("--cmd", default=["uname"], help="Container Override Commands")
def submit(queue, jobname, jobdef, cmd):
    """Submit a job"""

    result = submit_job(
        job_name=jobname,
        job_queue=queue,
        job_definition=jobdef,
        command=cmd
    )
    click.echo("CLI:  Run Job Called")
    return result

Lambda (EVENTS)

  • Serverless
  • Used in most if not all ML Platforms
  • DeepLense
  • Sagemaker
  • S3 Events

Starting development with AWS Python Lambda development with Chalice

Demo on Sagemaker Terminal

https://github.com/aws/chalice

Hello World Example:

$ pip install chalice
$ chalice new-project helloworld && cd helloworld
$ cat app.py

from chalice import Chalice

app = Chalice(app_name="helloworld")

@app.route("/")
def index():
    return {"hello": "world"}

$ chalice deploy
...
https://endpoint/dev

$ curl https://endpoint/api
{"hello": "world"}

References:

Serverless Web Scraping Project

[Demo] Deploying Hello World Lambda Function

Using Step functions with AWS

https://aws.amazon.com/step-functions/

Step Functions

Example Project:

https://github.com/noahgift/web_scraping_python

[Demo] Step Function

2.2 Data Cleaning and Preparation

Ensuring High Quality Data

  • Validity
  • Accuracy
  • Completeness
  • Consistency
  • Uniformity

Dealing with missing values

Often easy way is to drop missing values

import pandas as pd
df = pd.read_csv("https://raw.githubusercontent.com/noahgift/real_estate_ml/master/data/Zip_Zhvi_SingleFamilyResidence.csv")
df.isnull().sum()
RegionID         1
RegionName       1
City             1
State            1
Metro         1140
CountyName       1
SizeRank         1
1996-04       4440
1996-05       4309
1996-06       4285
1996-07       4278
1996-08       4265
1996-09       4265
1996-10       4265
1996-11       4258
1996-12       4258
1997-01       4212
1997-02       3588
1997-03       3546
1997-04       3546
1997-05       3545
1997-06       3543
1997-07       3543
1997-08       3357
1997-09       3355
1997-10       3353
1997-11       3347
1997-12       3341
1998-01       3317
1998-02       3073
              ... 
2015-04         13
2015-05          1
2015-06          1
2015-07          1
2015-08          1
2015-09          2
2015-10          3
2015-11          1
2015-12          1
2016-01          1
2016-02         19
2016-03         19
2016-04         19
2016-05         19
2016-06          1
2016-07          1
2016-08          1
2016-09          1
2016-10          1
2016-11          1
2016-12         51
2017-01          1
2017-02          1
2017-03          1
2017-04          1
2017-05          1
2017-06          1
2017-07          1
2017-08          1
2017-09          1
Length: 265, dtype: int64
df2 = df.dropna()
df2.isnull().sum()
RegionID      0
RegionName    0
City          0
State         0
Metro         0
CountyName    0
SizeRank      0
1996-04       0
1996-05       0
1996-06       0
1996-07       0
1996-08       0
1996-09       0
1996-10       0
1996-11       0
1996-12       0
1997-01       0
1997-02       0
1997-03       0
1997-04       0
1997-05       0
1997-06       0
1997-07       0
1997-08       0
1997-09       0
1997-10       0
1997-11       0
1997-12       0
1998-01       0
1998-02       0
             ..
2015-04       0
2015-05       0
2015-06       0
2015-07       0
2015-08       0
2015-09       0
2015-10       0
2015-11       0
2015-12       0
2016-01       0
2016-02       0
2016-03       0
2016-04       0
2016-05       0
2016-06       0
2016-07       0
2016-08       0
2016-09       0
2016-10       0
2016-11       0
2016-12       0
2017-01       0
2017-02       0
2017-03       0
2017-04       0
2017-05       0
2017-06       0
2017-07       0
2017-08       0
2017-09       0
Length: 265, dtype: int64

Cleaning Wikipedia Handle Example

"""
Example Route To Construct:
https://wikimedia.org/api/rest_v1/ +
metrics/pageviews/per-article/ +
en.wikipedia/all-access/user/ +
LeBron_James/daily/2015070100/2017070500 +
"""
import requests
import pandas as pd
import time
import wikipedia

BASE_URL =\
 "https://wikimedia.org/api/rest_v1/metrics/pageviews/per-article/en.wikipedia/all-access/user"

def construct_url(handle, period, start, end):
    """Constructs a URL based on arguments
    Should construct the following URL:
    /LeBron_James/daily/2015070100/2017070500 
    """

    
    urls  = [BASE_URL, handle, period, start, end]
    constructed = str.join('/', urls)
    return constructed

def query_wikipedia_pageviews(url):

    res = requests.get(url)
    return res.json()

def wikipedia_pageviews(handle, period, start, end):
    """Returns JSON"""

    constructed_url = construct_url(handle, period, start,end)
    pageviews = query_wikipedia_pageviews(url=constructed_url)
    return pageviews

def wikipedia_2016(handle,sleep=0):
    """Retrieve pageviews for 2016""" 
    
    print("SLEEP: {sleep}".format(sleep=sleep))
    time.sleep(sleep)
    pageviews = wikipedia_pageviews(handle=handle, 
            period="daily", start="2016010100", end="2016123100")
    if not 'items' in pageviews:
        print("NO PAGEVIEWS: {handle}".format(handle=handle))
        return None
    return pageviews

def create_wikipedia_df(handles):
    """Creates a Dataframe of Pageviews"""

    pageviews = []
    timestamps = []    
    names = []
    wikipedia_handles = []
    for name, handle in handles.items():
        pageviews_record = wikipedia_2016(handle)
        if pageviews_record is None:
            continue
        for record in pageviews_record['items']:
            pageviews.append(record['views'])
            timestamps.append(record['timestamp'])
            names.append(name)
            wikipedia_handles.append(handle)
    data = {
        "names": names,
        "wikipedia_handles": wikipedia_handles,
        "pageviews": pageviews,
        "timestamps": timestamps 
    }
    df = pd.DataFrame(data)
    return df    


def create_wikipedia_handle(raw_handle):
    """Takes a raw handle and converts it to a wikipedia handle"""

    wikipedia_handle = raw_handle.replace(" ", "_")
    return wikipedia_handle

def create_wikipedia_nba_handle(name):
    """Appends basketball to link"""

    url = " ".join([name, "(basketball)"])
    return url

def wikipedia_current_nba_roster():
    """Gets all links on wikipedia current roster page"""

    links = {}
    nba = wikipedia.page("List_of_current_NBA_team_rosters")
    for link in nba.links:
        links[link] = create_wikipedia_handle(link)
    return links

def guess_wikipedia_nba_handle(data="data/nba_2017_br.csv"):
    """Attempt to get the correct wikipedia handle"""

    links = wikipedia_current_nba_roster() 
    nba = pd.read_csv(data)
    count = 0
    verified = {}
    guesses = {}
    for player in nba["Player"].values:
        if player in links:
            print("Player: {player}, Link: {link} ".format(player=player,
                 link=links[player]))
            print(count)
            count += 1
            verified[player] = links[player] #add wikipedia link
        else:
            print("NO MATCH: {player}".format(player=player))
            guesses[player] = create_wikipedia_handle(player)
    return verified, guesses

def validate_wikipedia_guesses(guesses):
    """Validate guessed wikipedia accounts"""

    verified = {}
    wrong = {}
    for name, link in guesses.items():
        try:
            page = wikipedia.page(link)
        except (wikipedia.DisambiguationError, wikipedia.PageError) as error:
            #try basketball suffix
            nba_handle = create_wikipedia_nba_handle(name)
            try:
                page = wikipedia.page(nba_handle)
                print("Initial wikipedia URL Failed: {error}".format(error=error))
            except (wikipedia.DisambiguationError, wikipedia.PageError) as error:
                print("Second Match Failure: {error}".format(error=error))
                wrong[name] = link
                continue
        if "NBA" in page.summary:
            verified[name] = link
        else:
            print("NO GUESS MATCH: {name}".format(name=name))
            wrong[name] = link
    return verified, wrong

def clean_wikipedia_handles(data="data/nba_2017_br.csv"):
    """Clean Handles"""

    verified, guesses = guess_wikipedia_nba_handle(data=data)
    verified_cleaned, wrong = validate_wikipedia_guesses(guesses)
    print("WRONG Matches: {wrong}".format(wrong=wrong))
    handles = {**verified, **verified_cleaned}
    return handles

def nba_wikipedia_dataframe(data="data/nba_2017_br.csv"):
    handles = clean_wikipedia_handles(data=data)
    df = create_wikipedia_df(handles)    
    return df

def create_wikipedia_csv(data="data/nba_2017_br.csv"):
    df = nba_wikipedia_dataframe(data=data)
    df.to_csv("data/wikipedia_nba.csv")


if __name__ == "__main__":
    create_wikipedia_csv() 

These services could all help prepare and clean data

  • AWS Glue
  • AWS Machine Learning
  • AWS Kinesis
  • AWS Lambda
  • AWS Sagemaker

2.3 Data Storage Concepts

Database Overview

Database Styles

Using AWS DynamoDB

https://aws.amazon.com/dynamodb/

alt text

Query Example:

def query_police_department_record_by_guid(guid):
    """Gets one record in the PD table by guid
    
    In [5]: rec = query_police_department_record_by_guid(
        "7e607b82-9e18-49dc-a9d7-e9628a9147ad"
        )
    
    In [7]: rec
    Out[7]: 
    {'PoliceDepartmentName': 'Hollister',
     'UpdateTime': 'Fri Mar  2 12:43:43 2018',
     'guid': '7e607b82-9e18-49dc-a9d7-e9628a9147ad'}
    """
    
    db = dynamodb_resource()
    extra_msg = {"region_name": REGION, "aws_service": "dynamodb", 
        "police_department_table":POLICE_DEPARTMENTS_TABLE,
        "guid":guid}
    log.info(f"Get PD record by GUID", extra=extra_msg)
    pd_table = db.Table(POLICE_DEPARTMENTS_TABLE)
    response = pd_table.get_item(
        Key={
            'guid': guid
            }
    )
    return response['Item']

[Demo] DynamoDB

Redshift

  • Data Warehouse Solution for AWS
  • Column Data Store (Great at counting large data)

2.4 Learn ETL Solutions (Extract-Transform-Load)

AWS Glue

AWS Glue is fully managed ETL Service

AWS Glue Screen

AWS Glue Workflow

  • Build Data Catalog
  • Generate and Edit Transformations
  • Schedule and Run Jobs

[DEMO] AWS Glue

EMR

  • Can be used for large scale distributed data jobs

Athena

  • Can replace many ETL
  • Serverless
  • Built on Presto w/ SQL Support
  • Meant to query Data Lake

[DEMO] Athena

Data Pipeline

  • create complex data processing workloads that are fault tolerant, repeatable, and highly available

[Demo] Data Pipeline

2.5 Batch vs Streaming Data

Impact on ML Pipeline

  • More control of model training in batch (can decide when to retrain)
  • Continuously retraining model could provide better prediction results or worse results
  • Did input stream suddenly get more users or less users?
  • Is there an A/B testing scenario?

Batch

  • Data is batched at intervals
  • Simplest approach to create predictions
  • Many Services on AWS Capable of Batch Processing
  • AWS Glue
  • AWS Data Pipeline
  • AWS Batch
  • EMR

Streaming

  • Continously polled or pushed
  • More complex method of prediction
  • Many Services on AWS Capable of Streaming
  • Kinesis
  • IoT

2.6 Data Security

AWS KMS (Key Management Service)

  • Integrated with AWS Encryption SDK
  • CloudTrail gives independent view of who accessed encrypted data

AWS Cloud Trail

cloud_trail

  • enables governance, compliance, operational auditing
  • visibility into user and resource activity
  • security analysis and troubleshooting
  • security analysis and troubleshooting

[Demo] Cloud Trail

Other Aspects

  • IAM Roles
  • Security Groups
  • VPC

2.7 Data Backup and Recovery

Most AWS Services Have Snapshot and Backup Capabilities

  • RDS
  • S3
  • DynamoDB

S3 Backup and Recovery

  • S3 Snapshots
  • Amazon Glacier archive

[Demo] S3 Snapshot Demo