Analyzing Survey Data in Python

Thejas Suvarna
  • Edited

In this video, you will learn how to analyze survey from the MyDataHelps Export Database in Python. You will go through the steps of loading survey data into Tableau, transforming the data, and conducting brief analysis examples to answer questions that you may have. 

Prerequisites: Connecting MyDataHelps Export Database to Python

*PHI/PII Notice: This process will result in connection to a database that may leave PHI/PII on your local machine.

https://www.youtube.com/watch?v=NtYfRT8fags

Load and Transform Survey Data in Pandas

  1. Load your survey data using techniques reviewed in Connecting MyDataHelps Export Database to Python.
  2. Flatten the survey data using the code below
import pandas as pd
import numpy as np  
import boto3
import time
import os
from datetime import datetime
import json

class MDHQuery:
    def __init__(
        self,
        project_schema_name: str, 
        athena_workgroup: str, 
        athena_output_bucket_location: str, 
        aws_profile_name: str = None, 
        query_result_temp_file_name: str = None
    ):

        self._project_schema_name = project_schema_name
        self._athena_workgroup = athena_workgroup
        self._athena_output_bucket_location = athena_output_bucket_location
        self._aws_profile_name = aws_profile_name
        self._query_result_temp_file_name = query_result_temp_file_name
        
        self.__aws_access_key_id = None
        self.__aws_secret_access_key = None
        self.__aws_session_token = None
      
    def get_query_result(self, query_string: str, _query_result_temp_file_name: str = None) -> pd.DataFrame:
        if self._aws_profile_name:
            session = boto3.session.Session(profile_name=self._aws_profile_name)
        else:    
            while not self.__aws_session_token:      
                self.__get_temporary_credentials_with_mfa()

            session = boto3.session.Session(
                aws_access_key_id=self.__aws_access_key_id,
                aws_secret_access_key=self.__aws_secret_access_key,
                aws_session_token=self.__aws_session_token
            )
        athena_client = session.client("athena", region_name="us-east-1")
        query_start = athena_client.start_query_execution(
            QueryString = query_string,
            QueryExecutionContext = {"Database": self._project_schema_name}, 
            WorkGroup = self._athena_workgroup,
            ResultConfiguration = {"OutputLocation": self._athena_output_bucket_location}
        )

        query_execution = athena_client.get_query_execution(QueryExecutionId=query_start["QueryExecutionId"])
        while query_execution["QueryExecution"]["Status"]["State"] in {"RUNNING", "QUEUED"}:
            print(time.strftime("%H:%M:%S", time.localtime()), f'query status: {query_execution["QueryExecution"]["Status"]["State"]}')
            time.sleep(5)
            query_execution = athena_client.get_query_execution(QueryExecutionId=query_start["QueryExecutionId"])

        print(time.strftime("%H:%M:%S", time.localtime()), f'query status: {query_execution["QueryExecution"]["Status"]["State"]}')

        if query_execution["QueryExecution"]["Status"]["State"] in {"FAILED"}:
            print(query_execution["QueryExecution"]["Status"]["StateChangeReason"] )
            return pd.DataFrame()

        else:
            result_uri = query_execution["QueryExecution"]["ResultConfiguration"]["OutputLocation"]
            bucket_name =  result_uri.split("/")[2]             

            s3_client = session.client("s3")

            file_location = (
                _query_result_temp_file_name if _query_result_temp_file_name 
                else self._query_result_temp_file_name if self._query_result_temp_file_name
                else "athena_query_results.csv"
            )

            s3_client.download_file(
                bucket_name,
                result_uri.replace(f"s3://{bucket_name}/", ""),
                file_location,
            )

            result_df = pd.read_csv(file_location)
            print(time.strftime("%H:%M:%S", time.localtime()), f"rows: {len(result_df.index)}", f"columns: {result_df.columns.to_list()}")
            return result_df

    def get_table(self, table_name: str, predicate: str = "") -> pd.DataFrame:
        return self.get_query_result(f"select * from {table_name} {predicate}")

    def __get_temporary_credentials_with_mfa(self) -> None:
        profile_name = input(f"Enter default profile name:")
        boto3.setup_default_session(profile_name=profile_name)

        sts_client = boto3.client("sts")
        user_name = sts_client.get_caller_identity()["Arn"].split("/")[1]

        iam_client = boto3.client("iam")
        mfa_device_serial_number = iam_client.list_mfa_devices(UserName=user_name)["MFADevices"][0]["SerialNumber"]

        token_code = input(f"Enter MFA code for {profile_name} device {mfa_device_serial_number}:")

        sts_client = boto3.client("sts")
        credentials = sts_client.get_session_token(SerialNumber=mfa_device_serial_number, TokenCode=token_code)["Credentials"]

        self.__aws_access_key_id = credentials["AccessKeyId"]
        self.__aws_secret_access_key = credentials["SecretAccessKey"]
        self.__aws_session_token = credentials["SessionToken"]
import pandas as pd

mdh_query = MDHQuery(
    catalog = "{YOUR CATALOG}",
    workgroup = "{YOUR WORKGROUP}",
    s3_output_location = "s3://pep-mdh-export-database-prod/execution/{YOUR DIRECTORY}",
    aws_profile_name = "{YOUR AWS PROFILE NAME}" 
)

Transform Survey Data

The following code will restructure the individual survey related tables from the export database into one flattened format that is ready for analysis.

query_string = f"""
select 
    sr.participantidentifier, 
    sr.startdate,
    sr.enddate,
    ssr.stepidentifier,
    sqr.resultidentifier,
    sqr.answers,
    sr.surveyname,
    sr.type,
    sr.deviceplatform,
    sr.devicename,
    sr.deviceosversion,
    sr.inserteddate
from
    surveyresults sr
    join surveystepresults ssr on ssr.surveyresultkey = sr.surveyresultkey
    join surveyquestionresults sqr on sqr.surveystepresultkey = ssr.surveystepresultkey
"""

survey_results = mdh_query.get_query_result(query_string)
survey_results.head(5)

Analyze Survey Data

Using the flattened table, it becomes easy to use panda manipulations to answer various survey related questions you may have. 

Example 1: Survey Completion Volume

Questions: 

  • How many participants generated data for each survey?
  • How many data instances of survey data have been generated for each survey?

Code:

(
    survey_results[survey_results.type.eq("Survey")].
        groupby(["surveyname"]).
        agg(
            participants=pd.NamedAgg(column="participantidentifier", aggfunc="nunique"),
            results=pd.NamedAgg(column="participantidentifier", aggfunc="count")
        )
)

Example 2: Survey Question Response Breakdown

Questions:

  • For a specified survey, what answers were provided for each question?
  • How many participants provided each answer?
Code:
survey_name = "Feeling Sick?"
survey_results["answers_list"] = survey_results.answers.str.replace("[", "", regex=False).str.replace("]", "", regex=False).str.replace("', '", "'|''", regex=False).str.split("|") ( survey_results[survey_results.surveyname.eq(survey_name)]. explode("answers_list"). groupby(["resultidentifier", "answers_list"]). agg( participants=pd.NamedAgg(column="participantidentifier", aggfunc="nunique") ) ).head(10)

You are now ready to explore more advanced analysis of survey data from MyDataHelps in Python.  

0

Comments

0 comments

Please sign in to leave a comment.