Every month, hundreds of thousands of households fill out the American Community Survey. Among the largest surveys conducted by the U.S. Census Bureau is the American Community Survey (ACS), an ambitious program to better understand the American experience. Covering a sample of over 3 million respondents each year, the aggregate summary data is published with details of hundreds of attributes, such as language spoken to average commute time to public assistance usage among others. The data is provided in standard geographic levels of aggregation (e.g. Census tracts, counties and states), which enables the ACS to be joined to external data sources that are similarly aggregated. Given its versatility and robustness, the data is a mainstay of social research and enables organizations to be more data-driven, whether they are non-profits, corporations, or government agencies.
Imagine a nonprofit that has a national mandate to provide aid and services, specifically to financially distressed neighborhoods with girls under the age of 5, who live in households that collect SNAP benefits, and have remained in the same home over the past year. As it turns out, ACS five-year estimates would allow for an analyst at a non-profit to focus on geographic units that are far smaller than a county, such as Census tracts. And as Census tracts are a common unit of analysis, additional datasets can be merged to improve the diversity of data and better characterize a local neighborhood.
For this example, we used the Consumer Complaint Database produced by the Consumer Financial Protection Bureau and transformed the counts of complaints from each zipcode and into to Census tract approximations following the Census tract to ZCTA relationship file. In the interactive graph below, we show the distribution of the number of financial complaints limited to a maximum of 60 complaints and can see that few Census tracts receive more than ten financial complaints. Thus, we can limit our focus to those areas.
So how do we get ACS data? The American Community Survey releases datasets on one, three and five-year aggregates at varying levels of geographic specificity. With 20,000+ attributes collected on the American experience, it can be challenging to explore and search the attributes in the ACS data repository. For a cursory glance at the available data, there is the US Census Bureau's American Factfinder. For dynamic access, the ACS five-year release is available through of the US Census Bureau's APIs. Here, we show how to access the ACS five-year estimates using a third method that is easier for bulk analysis of Census tract-level data.Due to the size of the ACS, the US Census Bureau breaks the ACS five-year release into five parts:
You can see from the information above that a bulk download of ACS five-year aggregates separates the metadata from the actual estimates. The process to get any specific estimate for the nation at the tract level is roughly:
In the Code section below, we include Python snippets that process any specified list of variables from the Variable Inventory. We use the output to show how to find areas of particular interest.
So where are they located? Back to our non-profit example. Recall the measures that we are interested in:
How does an analyst determine where a non-profit should focus its efforts? Below we highlight Census tracts in state of Alabama that are in the top 30th percentile for each ACS estimate or have more than ten financial complaints. Each pane shows a different measure of interest, highlighting seemingly different areas for outreach.
Now, how can we highlight only those tracts that are highest ranked for across all of the criteria? By averaging the rank of each tract across all measures, we can map them again to find candidates for outreach. This data-driven methodology is simple yet holds the potential of large returns. Below we show that map nationally and list the top ten Census tracts for any view.
In this tutorial, we will illustrate how to pull ACS bulk data, convert zipcode level data to Census tract, and output to GeoJSON using the Python programming language. By the end of the notebook you will have data processed for the visualizations you see above.
To get started quickly, the code for this tutorial can be found at the following Github repo.
Before the executing the code, we have to acquire the data. Here are the inputs required for this tutorial:
We call the necessary packages for processing to generate the wordcloud header and inputs for the visualizations above.
import csv, json, re, numpy as np, fiona, sys, matplotlib.pyplot as plt
from scipy import stats
from shapely.geometry import asShape
from wordcloud import WordCloud, ImageColorGenerator, STOPWORDS
from scipy.misc import imread
from PIL import Image
from tqdm import *
We first define a basic function to load CSVs or JSONs. This is simply for ease of coding.
def loadinput(fname, tp):
with open(fname) as data:
if tp == 'csv':
loadme = []
csvloadme = csv.reader(data)
for row in csvloadme:
loadme.append(row)
elif tp == 'json':
loadme = json.load(data)
elif tp == 'jsons':
loadme = json.loads(data)
else:
print 'Need defined filetype'
return loadme
Next we define how to process a given ACS estimate file and combine it with its associated metadata.
def extract_acs_sum(fname):
# print 'Extracting: %s'%fname
typ = fname[9:10]
st_nm = fname[15:17]
seq_nm = fname[17:21]
acs = []
with open(fname, 'rb') as f:
for row in csv.reader(f, delimiter=','):
acs.append(row)
# Get Header info
with open('./2013_5yr_SAS/%s%s_%s.sas'%(typ,st_nm,seq_nm)) as f:
read_flg = False
header = []
for ln in f:
if ln.strip() not in ('', ';', 'RUN;') and read_flg:
header.append(re.split('\s*', ln.strip())[0])
read_flg = True if 'INPUT' in ln else read_flg
# Get geography
with open('./tab4/sumfile/prod/2009thru2013/geo/g20135%s.txt'%st_nm,'rb') as f:
geog = {}
nottract = []
for row in f:
logrecno = row[13:20]
geoid = row[178:218].strip()
gname = row[218:].strip().decode('utf-8', 'ignore')
geog[logrecno] = [geoid, gname]
for i, row in enumerate(acs):
if geog[row[5]][0][0:2] == '14':
row.extend(geog[row[5]])
else:
# Remove Non tract geography
nottract.append(i)
# Reverse sort to delete without messing index
nottract.sort(reverse=True)
for row in nottract:
del acs[row]
return [header + ['geoid', 'gname']] + acs
Next we define a function to parse the directory with the ACS estimates file since they are broken up by state and output it to a JSON file.
def build_directory(dirnm, force=False):
if force or not os.path.isfile('directory.json'):
directory = {}
for fname in tqdm(os.listdir(dirnm)):
typ = fname[0:1]
st_nm = fname[1:3]
seq_nm = fname[4:8]
with open(dirnm + fname, 'rb') as f:
read_flg = False
for ln in f:
if ln.strip() not in ('', ';', 'RUN;') and read_flg:
varnm = re.split('\s*', ln.strip())[0]
try:
directory[varnm].append([typ, st_nm, seq_nm])
except KeyError:
directory[varnm] = [[typ, st_nm, seq_nm]]
read_flg = True if 'INPUT' in ln else read_flg
for key in ['FILEID', 'FILETYPE', 'STUSAB', 'CHARITER', 'SEQUENCE', 'LOGRECNO']:
directory.pop(key)
with open('directory.json', 'wb') as f:
json.dump(directory, f)
with open('directory.json', 'rb') as f:
directory = json.load(f)
return directory
Here we define a function to fix the slight naming discrepancy between the ACS Variable Inventory JSON and load it.
def load_varlist(fname):
print 'Loading ACS 5 year Summary File Variable List'
with open(fname, 'r') as f:
varlist = json.load(f)['variables']
for varname in tqdm(varlist.keys()):
if '0' in varname:
split = re.split('_', varname)
corrected = split[0] + split[1][-1].lower() + split[1].strip('0')[0:-1]
varlist[corrected] = varlist.pop(varname)
return varlist
And now we put it all together by processing through each state pulling only the variables we want and output to a JSON file.
def build_custom_json(varlist, score, force=False):
with open('header.json') as f:
existing_score = json.load(f).keys()
with open('header.json') as f:
existing_variables = json.load(f).keys()
if score != existing_score or varlist != existing_variables:
force = True
if force or not os.path.isfile('custom.json'):
directory = build_directory('./2013_5yr_SAS/')
custom = {}
header = []
for var in tqdm(varlist):
header.extend([var])
if var in score:
header.extend(['score_' + var])
for table in directory[var]:
typ = table[0]
st_nm = table[1]
seq_nm = table[2]
acs_summary = extract_acs_sum('./group2/' + typ + '20135' + st_nm + seq_nm + '000.txt')
position = [ i for i,x in enumerate(acs_summary[0]) if x == var ]
if var in score:
scoreme = [ x[position[0]] for x in acs_summary[1:] ]
scored = [ round(stats.percentileofscore(scoreme, x),2) for x in scoreme ]
for i, row in enumerate(acs_summary[1:]):
try:
custom[row[-2][7:]][var] = row[position[0]]
except KeyError:
custom[row[-2][7:]] = {}
custom[row[-2][7:]][var] = row[position[0]]
if var in score:
custom[row[-2][7:]]['score_'+var] = scored[i]
custom[row[-2][7:]]['gname'] = row[-1]
position = [ x for x in header if 'score' in x ]
for tract in custom.keys():
avg = round(sum([ float(custom[tract][x]) for x in position ])/len(position),2)
custom[tract]['final_score'] = avg
header.extend(['final_score'])
# custom['varnames'] = header
with open('custom.json', 'wb') as f:
json.dump(custom, f)
with open('custom.json', 'rb') as f:
custom = json.load(f)
return custom
Now process CFPB Fiancial Complaints Database. This involves loading the downladed CSV file, counting at the zipcode level and apportioning it to the Census tract level. We write out a JSON of the distribution of financial complaints to see what the upper limit is.
complaints = loadinput('./Consumer_Complaints.csv','csv')
compl_list = []
for complaint in tqdm(complaints[1:]):
compl_list.append(complaint[9])
compl_cnt = {}
for zips in tqdm(compl_list):
if 'X' not in zips:
if compl_cnt.has_key(zips):
compl_cnt[zips] += 1
else:
compl_cnt[zips] = 1
ziptract = [ [(x[0],x[4]),float(x[18])/100] for x in loadinput('./zcta_tract_rel_10.txt', 'csv')[1:] ]
for i, row in tqdm(enumerate(ziptract)):
zipcode = row[0][0]
if compl_cnt.has_key(zipcode):
ziptract[i].append(row[1]*compl_cnt[zipcode])
else:
ziptract[i].append(0)
compl_append = {}
for row in tqdm(ziptract):
tract = row[0][1]
count = round(row[2],0)
if compl_append.has_key(tract) == False:
compl_append[tract] = count
else:
compl_append[tract] += count
compl_cnt = [ compl_append[tract] for tract in compl_append.keys() ]
compl_dist = [ {'value':x, 'count':compl_cnt.count(x)} for x in set(compl_cnt) ]
with open('./vis/dist.json', 'wb') as f:
json.dump(compl_dist, f)
Here we define our variables of interest and build our custom JSON file.
variables = ['B01001e27', 'B07001e17', 'B22002e2']
scores = variables
varlist = load_varlist('./variables.json')
header = { x: varlist[x] for x in variables }
with open('./header.json', 'wb') as f:
json.dump(header, f)
scored = { x: varlist[x] for x in scores }
with open('./scored.json', 'wb') as f:
json.dump(scored, f)
custom = build_custom_json(variables, scores)
Next we have a directory of shapefiles that we want to append the additional information from the JSON file to the geography. We mount a virtual drive to save disk space for each archive and write the resulting GeoJSON for each state.
rootdir = './tracts/'
us_features = []
empty_tracts = []
for subdir, dirs, files in os.walk(rootdir):
for f in tqdm(files):
if f[-3:] == 'zip':
features = []
with fiona.open('/', vfs='zip://'+os.path.join(subdir, f), layer=0) as src:
for feat in src:
feat['properties']['x'] = round(asShape(feat['geometry']).centroid.x,4)
feat['properties']['y'] = round(asShape(feat['geometry']).centroid.y,4)
tract = feat['properties']['GEO_ID'][9:]
try:
feat['properties']['g'] = custom[tract]['gname']
for popme in [u'NAME', u'LSAD', u'STATE', u'COUNTY', u'TRACT', u'CENSUSAREA', u'GEO_ID']:
feat['properties'].pop(popme)
for rank in variables:
feat['properties']['s'+rank] = custom[tract]['score_'+rank]
feat['properties']['r'] = custom[tract]['final_score']
try:
feat['properties']['f'] = round(compl_append[tract],0)
except KeyError:
feat['properties']['f'] = 0
features.append(feat)
us_features.append(feat)
except KeyError:
empty_tracts.append(tract)
rank_map = {
'type':'FeatureCollection',
'features':features,
'crs':{'init': u'epsg:4269'}}
with open('./tracts/%s.geojson'%f, 'wb') as fl:
json.dump(rank_map, fl)
Now we do another pass and throwout geography of no interest to save space. We then write out the entire nation to a GeoJSON.
for feat in tqdm(us_features):
for rank in variables:
feat['properties'].pop('s'+rank)
if feat['properties']['r'] < 70 or feat['properties']['f'] < 10:
us_features.remove(feat)
rank_map = {
'type':'FeatureCollection',
'features':us_features,
'crs':{'init': u'epsg:4269'}}
with open('./tracts/us_all_tracts.geojson', 'wb') as f:
json.dump(rank_map, f)
We use the wordcloud package in Python to generate a wordcloud from the descriptions in ACS Variable Inventory. Using the original image, we color shift the white to another color and then overlay a color mask to get the final image.
flag = Image.open('./img/America.png').convert('RGBA')
im = np.array(flag)
imcolor = im
red, green, blue, alpha = im.T
white_areas = (red == 255) & (green == 255) & (blue == 255)
null_areas = (red == 0) & (green == 0) & (blue == 0) & (alpha == 0)
im[..., :-1][white_areas.T] = (255, 0, 0)
im[..., :-1][null_areas.T] = (255, 255, 255)
immask = im
Image.fromarray(im).save('./img/America-shifted.png')
varlist = load_varlist('./variables.json')
text = ''
for var in tqdm(varlist.keys()):
text += varlist[var][u'concept']
stopwords = STOPWORDS.copy()
stopwords.add("months")
stopwords.add("year")
stopwords.add("years")
stopwords.add("past")
wordcloud = WordCloud(background_color="white", width=2560, height=1440, scale=1, stopwords=stopwords, mask=immask).generate(text)
image_colors = ImageColorGenerator(imcolor)
plt.figure(figsize=(20,10))
plt.imshow(wordcloud.recolor(color_func=image_colors))
plt.axis("off")
plt.savefig('./img/word_cloud.png', dpi=500)