- Mon 03 December 2018
- Data Science
- #python, #dataquest, #data-engineering, #postgres
About¶
This is the notebook I created to solve the first mission in the data engineering path in dataquest.io. I am also experimenting with the Pelican-Jupyter Notebook extension, that allows to load jupyter notebooks directly in the blog.
Postgres Mission¶
The problem goes something like this:
Recently, the International Hurricane Watchgroup (IHW) has been asked to update their analysis tools. Because of the increase in public awareness of hurricanes, they are required to be more diligient with the analysis of historical hurricane data they share across the organization. They have asked you, someone with experience in databases, to help work with their team to productionize their services.
Accepting the job, their team tells you that they have been having trouble sharing data across the teams and keeping it consistent. From what they've told you, it seems that their method of sharing the data with their data anaylsts has been to save a CSV file on their local servers and have every data analyst pull the data down. Then, each analyst uses a local SQLite engine to store the CSV, run their queries, and send their results around.
From what they have told you, you might be thinking that this is an inefficient way of sharing data. To understand what you will be working on, they have sent you a CSV file. Their CSV file contains the following fields:
fid - ID for the row year - Recorded year month - Recorded month day - Recorded date ad_time - Recorded time in UTC btid - Hurricane ID name - Name of the hurricane lat - Latitude of the recorded location long - Longitude of the recorded location wind_kts - Wind speed in knots per second pressure - Atmospheric pressure of the hurricane cat - Hurricane category basin - The basin the hurricane is located shape_leng - Hurricane shape length
In this Guided Project, you will be using the local installed version of Postgres you installed from the previous project. This is much different than previous Guided Projects as you will be using your own notebook and your own Python environment instead of Dataquest's. Your job is to show that you can create a database that will accomplish the following requirements:
- Database for the IHW to store their tables.
- Table that contains the fields detailed in the CSV file
- User that can update, read, and insert into a table of the data.
- Insert the data into the table.
Let's do this.
Create the postgres instance¶
We'll be using a docker container with a postgres persistent database. The pgadmin
instance is not needed, but we can use it to have a GUI available by default in 0.0.0.0:8080
. Note this is a toy configuration, and it should not be used to store sensitive information.
We won't be dealing with what this file means. Here we use docker to have a disposable database quickly. You could have a local postgres instance and that would work as well.
You can get more information about postgres in docker in the docker store
!cat docker-compose.yml
!docker-compose -p dq_de up -d
# check if postgres docker instance is alive
!docker ps
# retrieve the ip address of the container, seen from the host
!docker inspect dq_de_db_1 | grep "IPAddress"
Note that we could also refer to our database by the port mapping done in the docker-compose.yml
file. This means that we could reach our database in 0.0.0.0:54320
.
Create users and roles¶
The mission instructions state that
With a table set up, it's now time to create a user on the Postgres database that can insert, update, and read the data but not delete. This is to make sure that someone who might get a hold of this user does not issue a destructive command. Essentially, this is like creating a "data production" user whose job it is is to always write new and existing data to the table.
Futhermore, even though it wasn't according to the spec, we know that the IHW team's analysts just run read queries on the data. Also, since the analysts only know SQLite queries, they may not be well-versed in a production database. As such, it might be risky handing out a general production user for them to query their data.
From what you have learned about security and restricting well meaning users, it might be a good idea to restrict those analysts from some commands. Those commands can be anything from adding new data to the table or changing the values. You should decide what commands should be given to the analyst user.
In short, what I get from this is
- Connected as a
postgres
user, create a userdata_prod_user
that can insert, update, and read the data but not delete. - This is like a "data production" user whose job is to always write new and existing data to the table.
Also,
- Create a role
GROUP
for the IHW analyst team that can only read data from tables - Create a user in this group, called
analyst1
. Picking names is hard
Note that users and groups are all part of the same entity in postgres, the role.
import psycopg2
conn = psycopg2.connect(
dbname="postgres", user="postgres", password="postgres", host="192.168.32.3"
)
conn.autocommit = True
cur = conn.cursor()
DATA_PROD_USER_PWD = "changeme"
DATA_VIEW_USER_PWD = "changeme"
# create user and detach password from command
# also add an IF NOT EXIST clause as a hack from
# https://stackoverflow.com/questions/8092086/create-postgresql-role-user-if-it-doesnt-exist
cur.execute(f"""
CREATE USER data_prod_user WITH PASSWORD '{DATA_PROD_USER_PWD}';
CREATE GROUP readonly WITH NOLOGIN;
CREATE USER analyst1 WITH PASSWORD '{DATA_VIEW_USER_PWD}' IN GROUP readonly;
""")
# create db
print("CREATE DATABASE ihw OWNER postgres")
cur.execute("CREATE DATABASE ihw OWNER postgres")
conn.commit()
conn.close()
If you want to start over, just DROP
everything.
# revoke all permissions by default.
# Use this to start all over.
import psycopg2
conn = psycopg2.connect(
dbname="postgres", user="postgres", password="postgres", host="192.168.32.3"
)
conn.autocommit = True
cur = conn.cursor()
# drop users
cur.execute("""
DROP USER data_prod_user;
DROP USER analyst1;
DROP GROUP readonly;
""")
# drop db
print("DROP DATABASE IF EXISTS ihw")
cur.execute("DROP DATABASE IF EXISTS ihw")
conn.commit()
conn.close()
Preparing for data insertion¶
Now with our new users, we can start creating our tables. A table is assigned by default to the user that created it, and it will live by default in the database that the connection was assigned to.
The steps involved in inserting the data are
- Inspect the data: This process is specific to the problem, and it may vary each time. I will be using
pandas
- Infer the data types from the previous analysis that will be used in out SQL table: This refers to the questions of the type "would an integer suffice?", "how long should these strings be", "Can this column be normalized?". This will probably require shaping the data analyzed previously.
- Create the table: This is the translation of the previous step into a proper SQL table.
- Populate the table: Fill the new table with data. I will be using postgres'
COPY
feature by means ofpsycopg.copy_expert()
Inspection of the data¶
Before even knowing how our table will should be created, we should inspect our data. We will be using pandas
for this purpose.
# load data
import pandas as pd
df = pd.read_csv("https://dq-content.s3.amazonaws.com/251/storm_data.csv")
df.head()
df.info()
df.describe()
Fortunately, we have some guidelines from dataquest of how the table should look like:
fid
is an INT and should be aPRIMARY KEY
YEAR, MONTH, DAY
andAD_TIME
should be merged into one singleTIMESTAMP
, usingTIME ZONE
.btid
looks like anINT
- We have no way of knowing how long a
name
can be, thus we will be using aVARCHAR
lat
is of typeDECIMAL(X,Y)
. We need to figure outX
andY
long
is of typeDECIMAL(X,Y)
, We also need to figure outX
andY
wind_kts
looks like anINT
pressure
looks like anINT
cat
looks like a categorical value, kinda like aVARCHAR(X)
. We need to figure outX
basin
is aVARCHAR(X)
. Same ascat
shape_length
looks like aDECIMAL(X, Y)
The next step is figuring out some variables, like
- The values of
X
andY
inVARCHAR
andDECIMAL
- The maximum values of those columns that look like integers. With this we could use even smaller datatypes like
SMALLINT
and others.
We can infer most of these values through inspection of the min
and max
values in df.describe()
. We could then produce a table like this:
CREATE TABLE hurricanes (
fid INT PRIMARY KEY,
timestamp TIMESTAMP WITH TIME ZONE,
btid SMALLINT,
name VARCHAR,
lat DECIMAL(8,6),
long DECIMAL(9,6),
wind_kts SMALLINT,
pressure SMALLINT,
cat VARCHAR(X),
basin VARCHAR(X),
shape_length DECIMAL(8, 6)
)
We can infer the values of X
in VARCHAR(X)
by doing something like
df['BASIN'].str.len().max()
df['CAT'].str.len().max()
This means that these columns are maximum 15 and 2 characters long. Out table will end looking like
CREATE TABLE hurricanes (
fid INT PRIMARY KEY,
timestamp TIMESTAMP WITH TIME ZONE,
btid SMALLINT,
name VARCHAR,
lat DECIMAL(8,6),
long DECIMAL(9,6),
wind_kts SMALLINT,
pressure SMALLINT,
cat VARCHAR(2),
basin VARCHAR(15),
shape_length DECIMAL(8, 6)
)
Shaping data¶
From here, we need to
- Clean and check for possible missing values
- Merge
year
,month
,day
andad_time
into one column namedtimestamp
Possible optimizations are
- Using label encoding in categorical columns
- Use better datatypes for specific purposes, like georeferenced values
Clean missing values¶
# total na numbers
df.isna().sum()
df.isnull().sum()
Okay, not so much to do here 😌
Merge date columns into a single one¶
We'll be using pd.to_datetime
from https://pandas.pydata.org/pandas-docs/stable/generated/pandas.to_datetime.html
- We'll need to convert AD_TIME from the current format to a valid datetime
df['AD_TIME'].head()
We join everything together in one column, as a dumb string. This is to make the conversion easier afterwards:
df['TIMESTAMP'] = df['YEAR'].astype(str) + \
df['MONTH'].astype(str).str.zfill(2) + \
df['DAY'].astype(str).str.zfill(2) + \
df['AD_TIME']
df['TIMESTAMP'].head()
We convert the dumb string column to a valid timestamp object, and we pass a format and we tell pandas that the time is in UTC:
df['TIMESTAMP'] = pd.to_datetime(df['TIMESTAMP'], format='%Y%m%d%H%MZ', utc=True)
df['TIMESTAMP'].head()
We drop the other columns associated with time:
df = df.drop(['YEAR','MONTH','DAY','AD_TIME'], axis=1)
df.head()
Further optimizations¶
These are valid for those columns that are clearly categorical, CAT
and BASIN
. We could encode this columns using label encoding which would allow us to store integers instead of long strings.
We could create another table that stores the meanings of the codes, this is more or less called a one to one relationship and it is discussed in here
- https://stackoverflow.com/questions/12318870/when-i-should-use-one-to-one-relationship
- and here https://www.postgresql.org/message-id/flat/BB6A4899.9385D%25soft%40bdanube.com
The pros and cons of this approach are
Pros
- It's more efficient to store and query integer values
- We could create auxiliar tables that map the labels to their meanings, thus allowing for more atomic queries
- This would allow for good normalizations practices that will help in scaling our database (and keeping it sane to maintain)
Cons
- Integers lose any semantic meaning without a conversion table
- Adding semantic meaning from an auxiliar table requires a
JOIN
operation, which is costly - It requires a bit more of effort
- It could happen that the column is not categorical at all (i.e. more different values for BASIN show up in the future)
We will not use this approach since it beats the purpose of the problem, and for this case is overkill since each category does not have or need a full model by itself, since it would consist of only one attribute. We'll keep it simple.
This Stackoverflow question deals with normalization on one-to-one relationships:
If it fits within the rules of normalization, then 1:1 relationships can be normalized (by definition!) - In other words, there is nothing about 1:1 relationships that make it impossible for them to obey the normal forms.
To answer your question about the practicality of 1:1 relationships, there are times when this is a perfectly useful construct, such as when you have subtypes with distinct predicates (columns).
The reasons you would use 1:1 relationships depend on your point of view. DBAs tend to think of everything as being a performance decision. Data modelers and programmers tend to think of these decisions as being design or model oriented. In fact, there is a lot of overlap between these points of view. It depends on what your perspectives and priorities are. Here are some examples of motivations for 1:1 relationships:
You have some subset of columns that are very wide and you want to segregate them physically in your storage for performance reasons.
You have some subset of columns that are not read or updated frequently and you want to keep them apart from the frequently used columns for performance reasons.
You have some columns that are optional in general but they are mandatory when you know that the record is of a certain type.
You have some columns that logically belong together for a subtype and you want to model them to fit well with your code's object model.
You have some columns that can only apply to some subtype(s) of an entity super-type, and you want your schema to enforce the absence of this data for other subtypes.
You have some columns that belong to an entity but you need to protect these particular columns using more restrictive access rules (e.g. salary on an employee table).
So you can see, sometimes the driver is performance, sometimes it is model purity, or just a desire to take full advantage of declarative schema rules.
We could even push this further and say that the BASIN
column is more of a boolean value since it has two values max. This allows us to rewrite the column in something like IS_BASIN_EASTERN_PACIFIC
. We could say then that for those values where this is False
then it means the Basin it's North Atlantic
.
But this is never a good approach since it uses this information implicitly (thus hidden, and we don't want hidden stuff!), plus this will break the second a new category shows up.
df['BASIN'].astype('category').cat.codes.unique()
df['CAT'].astype('category').cat.codes.unique()
df.columns = df.columns.str.upper()
df.head()
We look back at our table, and we rearrange the columns in the dataframe to match this table
fid INT PRIMARY KEY,
timestamp TIMESTAMP WITH TIME ZONE,
btid SMALLINT,
name VARCHAR,
lat DECIMAL(8,6),
long DECIMAL(9,6),
wind_kts SMALLINT,
pressure SMALLINT,
cat VARCHAR(2),
basin VARCHAR(16),
shape_length DECIMAL(7, 6)
# we rearrange our columns and reset the dataframe's index
df_csv = df[['FID', 'TIMESTAMP', 'BTID', 'NAME', 'LAT', 'LONG', 'WIND_KTS', 'PRESSURE', 'CAT', 'BASIN', 'SHAPE_LENG']].copy()
df_csv = df_csv.set_index('FID', drop=True)
df_csv.head()
We'll save our ammended csv somewhere else to be used later
df_csv.to_csv(path_or_buf='data/cleaned_files.csv', sep=';')
Creating the table with SQL¶
Next step is actually creating the table
import psycopg2
conn = psycopg2.connect(
dbname="ihw", user="data_prod_user", password="changeme", host="192.168.32.3"
)
conn.autocommit = True
cur = conn.cursor()
# year, month, day, ad_time will be stored as timestampz
print("DROP TABLE IF EXISTS hurricanes")
cur.execute("DROP TABLE IF EXISTS hurricanes")
CREATE_TABLE_SQL ="""
CREATE TABLE hurricanes (
fid INT PRIMARY KEY,
timestamp TIMESTAMP WITH TIME ZONE,
btid SMALLINT,
name VARCHAR,
lat DECIMAL(8,6),
long DECIMAL(9,6),
wind_kts SMALLINT,
pressure SMALLINT,
cat VARCHAR(2),
basin VARCHAR(16),
shape_length DECIMAL(8, 6)
)
"""
print(CREATE_TABLE_SQL)
cur.execute(
CREATE_TABLE_SQL
)
conn.commit()
print("Done.")
conn.close()
Insert the data in the database¶
Assuming we have the CSV available in disk, we can load it onto the table using psycopg
's copy_expert()
import psycopg2
conn = psycopg2.connect(
dbname="ihw", user="data_prod_user", password="changeme", host="192.168.32.3")
conn.autocommit = True
cur = conn.cursor()
with open('data/cleaned_files.csv', 'r+') as f:
cur.copy_expert("COPY hurricanes FROM STDIN WITH CSV HEADER DELIMITER ';'", f)
conn.close()
Grant specific privileges between users and tables¶
With our users and our table ready, we need to restrict the privileges some users have. This is so we can avoid unfortunate events like undesired DELETE
or DROP
instructions. Accidents happen.
import psycopg2
conn = psycopg2.connect(
dbname="ihw", user="postgres", password="postgres", host="192.168.32.3"
)
conn.autocommit = True
cur = conn.cursor()
# revoke all privileges and add back specific ones
# on user data_prod_user
cur.execute("REVOKE ALL ON hurricanes FROM data_prod_user")
cur.execute("GRANT SELECT, INSERT, UPDATE ON hurricanes TO data_prod_user")
# on group readonly
cur.execute("REVOKE ALL ON hurricanes FROM readonly")
cur.execute("GRANT SELECT ON hurricanes TO readonly")
conn.commit()
print("Done.")
conn.close()
Verify permissions and roles¶
As postgres
user¶
import psycopg2
conn = psycopg2.connect(
dbname="ihw", user="postgres", password="postgres", host="192.168.32.3"
)
conn.autocommit = True
cur = conn.cursor()
cur.execute('SELECT * FROM hurricanes LIMIT 1')
print(cur.fetchall())
conn.close()
As data_prod_user
¶
import psycopg2
conn = psycopg2.connect(
dbname="ihw", user="data_prod_user", password="changeme", host="192.168.32.3"
)
conn.autocommit = True
cur = conn.cursor()
cur.execute('SELECT * FROM hurricanes LIMIT 1')
cur.fetchall()
# we should be able to insert data
cur.execute('INSERT INTO hurricanes (fid) values (1000001)')
conn.commit()
# we can read this new row from the table
cur.execute('SELECT * FROM hurricanes WHERE fid = 1000001')
cur.fetchall()
# but we should not be able to delete data
cur.execute('DELETE FROM hurricanes WHERE fid=1000000')
conn.commit()
cur.execute("UPDATE hurricanes SET name = 'fakename' WHERE fid=1000001")
conn.commit()
# we read the updated changes back
cur.execute('SELECT * FROM hurricanes WHERE fid = 1000001')
cur.fetchall()
conn.close()
Checking permissions as analyst1
¶
import psycopg2
conn = psycopg2.connect(
dbname="ihw", user="analyst1", password="changeme", host="192.168.32.3"
)
conn.autocommit = True
cur = conn.cursor()
cur.execute('SELECT * FROM hurricanes LIMIT 1')
cur.fetchall()
# we should be able to insert data
cur.execute('INSERT INTO hurricanes (fid) values (1000001)')
conn.commit()
cur.execute("UPDATE hurricanes SET name = 'fakename' WHERE fid=1000001")
conn.commit()
# but we should not be able to delete data
cur.execute('DELETE FROM hurricanes WHERE fid=1000000')
conn.commit()
Final words¶
There's not so much we can say on top of what has been shown with code. We can achieve almost anything with the psycopg2
driver, which is the most used python driver for postgres databases (their words, not mine).
Moving forward¶
We can add another abstraction layer on top of the driver, namely an ORM or object relational mapper. I use SQLAlchemy in my daily tasks and it has proven useful in many ways, but that's something for another topic.