Connecting MyDataHelps Export Database to Python

Thejas Suvarna

In this video, you will learn how to connect the MDH Export Database to Python. You will go through the steps of accessing credentials, loading data into dataframes, and building a simple analysis using a Jupyter notebook.

Prerequisites: Local installation of Jupyter

*PHI/PII Notice: This process will result in connection to a database that may leave PHI/PII on your local machine.

https://www.youtube.com/watch?v=Ix1kHmib4RA

Configure Access Credentials

  1. Access the Project Settings > Export Explorer > External Connection tab.
  2. Press the "Create Credentials" button in the Authentication section to create AWS Access Credentials. 
  3. Copy the credential information into your ~/.aws/credentials file.

Loading Data into Dataframes

     1. Copy and update the following configuration into your Jupyter notebook.

PROJECT_SCHEMA_NAME = "mdh_export_database_rk_3a563d9e_fitbit_intraday_qa"
AWS_PROFILE_NAME = "3a563d9e_fitbit_intraday"
ATHENA_OUTPUT_BUCKET_LOCATION ="s3://pep-mdh-export-database-qa/execution/rk_3a563d9e_fitbit_intraday"
ATHENA_WORKGROUP = "mdh_export_database_external_qa"
QUERY_RESULT_TEMP_FILE_LOCATION = "athena_query_results.csv" # preferred location on your local machine

     2. Copy the following code into your Jupyter notebook to extract data from the database.

import boto3, pandas as pd, numpy as np, time

def execute_query(query_string: str, download_file_location: str = None) -> pd.DataFrame:
  session = boto3.session.Session(profile_name=AWS_PROFILE_NAME)
  athena_client = session.client("athena", region_name="us-east-1")
  query_start = athena_client.start_query_execution(
      QueryString = query_string,
      QueryExecutionContext = {"Database": PROJECT_SCHEMA_NAME},
      WorkGroup = ATHENA_WORKGROUP,
      ResultConfiguration = {"OutputLocation": f"{ATHENA_OUTPUT_BUCKET_LOCATION}/"}
  )
 
  query_execution = athena_client.get_query_execution(QueryExecutionId=query_start['QueryExecutionId'])
  while query_execution["QueryExecution"]["Status"]["State"] in {"RUNNING", "QUEUED"}:
      print(time.strftime("%H:%M:%S", time.localtime()), f"query status: {query_execution['QueryExecution']['Status']['State']}")
      time.sleep(5)
        query_execution = athena_client.get_query_execution(QueryExecutionId=query_start['QueryExecutionId'])

  print(time.strftime("%H:%M:%S", time.localtime()), f"query status: {query_execution['QueryExecution']['Status']['State']}")
  s3_client = session.client("s3")
  file_location = download_file_location if download_file_location is not None else QUERY_RESULT_TEMP_FILE_LOCATION
  result_uri = query_execution["QueryExecution"]["ResultConfiguration"]["OutputLocation"]
  bucket_name =  result_uri.split("/")[2]            

  s3_client.download_file(
      bucket_name,
      result_uri.replace(f"s3://{bucket_name}/", ""),
      file_location,
    )
    return pd.read_csv(file_location)
Loading Data into Dataframes
     1. Build table visualizations using code similar to the following. 
query_string = f"""
select
  participantidentifier,
  date,
  restingheartrate,
  steps
from
  fitbitdailydata
"""
fitbitdailydata = execute_query(query_string)
fitbitdailydata
(fitbitdailydata[fitbitdailydata.participantidentifier.eq("MDH-1514-7516")]
.groupby("participantidentifier")[["restingheartrate", "steps"]]
.agg("mean"))

You are now ready to use Python to begin analysis of your MyDataHelps project data. 

0

Comments

1 comment

  • Comment author
    Kelley Bridges

    For programmers looking to parse the custom fields from allparticipants, here is an option: 

    import re

    query_string = f"""
    select
        participantidentifier,
      customfields
    from
      allparticipants
    """

    allparticipants = execute_query(query_string)

    # Extracting key-value pairs and creating new columns
    for index, row in allparticipants.iterrows():
      info = re.findall(r'(\w+)=([^,}]+)', row['customfields'])
      info_dict = {key: value for key, value in info}
      for key, value in info_dict.items():
          allparticipants.at[index, key] = value

    # Drop the original last column
    allparticipants = allparticipants.drop(columns=['customfields'])
    allparticipants.to_csv("output.csv" )
    0

Please sign in to leave a comment.