Skip to content

Commit

Permalink
Job title clean (#31)
Browse files Browse the repository at this point in the history
* add mock to requeirements.txt

* add negative dictionary for jobtitle cleaning

* add jobtitle_clean dag and algorithms

* remove numbers, words with number, states and states abv

* add logging info and upload to s3

* add cities in negatvie dictionary

* add docstrings and comments

* Add testing for negative dictionary

* Add positive dictionary: onet job titles

* fill nan with None

* fill Nan with no title

* remove unused imports

* add aggregation on ['title'] and ['geo', 'title']

* fix conflict

* add negative dictionary for jobtitle cleaning

* add jobtitle_clean dag and algorithms

* remove numbers, words with number, states and states abv

* add logging info and upload to s3

* add cities in negatvie dictionary

* add docstrings and comments

* Add testing for negative dictionary

* Add positive dictionary: onet job titles

* fill nan with None

* fill Nan with no title

* remove unused imports

* add aggregation on ['title'] and ['geo', 'title']

* Replace the STATEURL with the 'us' module

* integrate job title clean dat into the GeoTitleCount DAG

* fix requirement.txt

* delete jobtitle_clean dag

* remove STATEURL from test_datasets_negative_positive_dict.py

* remove the STATEUL and test response because we're now using package us for state names instead

* remove git log in jobtitle_cleaner/clean.py

* add missed.txt in .gitignore
  • Loading branch information
tweddielin authored and thcrock committed Mar 1, 2017
1 parent c58359d commit 774a5d5
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,6 @@ api_v1_db.yaml
# Output folder/s3 upload staging area
output/*
!output/interesting_job_titles.tsv

# missed test log
missed.txt
Empty file.
99 changes: 99 additions & 0 deletions algorithms/jobtitle_cleaner/clean.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import pandas as pd
import re
from collections import OrderedDict
import logging

from datasets import negative_positive_dict

def clean_by_rules(jobtitle):
"""
Remove numbers
:params string jobtitle: A job title string
:return: A cleaned version of job title
:rtype: string
"""
# remove any words with number in it
jobtitle = re.sub('\w*\d\w*', ' ', jobtitle).strip()

# make one space between words
jobtitle = ' '.join(jobtitle.split())

return jobtitle

def clean_by_neg_dic(jobtitle, negative_list, positive_list):
"""
Remove words from the negative dictionary
:params string jobtitle: A job title string
:return: A cleaned version of job title
:rtype: string
"""
# Exact matching
result = []
for word in jobtitle.split():
if word in negative_list:
logging.debug('Found "%s" in negative dictionary', word)
elif word in positive_list:
logging.debug('Found "%s" in positive dictionary', word)
result.append(word)
else:
result.append(word)
result2str = ' '.join(result)

return result2str

def aggregate(df_jobtitles, groupby_keys):
"""
Args:
df_jobtitles: job titles in pandas DataFrame
groupby_keys: a list of keys to be grouped by. should be something like ['title', 'geo']
Returns:
agg_cleaned_jobtitles: a aggregated verison of job title in pandas DataFrame
"""
agg_cleaned_jobtitles = pd.DataFrame(df_jobtitles.groupby(groupby_keys, as_index=False)['count'].sum())
agg_cleaned_jobtitles = agg_cleaned_jobtitles.fillna('without jobtitle')

return agg_cleaned_jobtitles

class JobTitleStringClean(object):
"""
Clean job titles
"""

def __init__(self):
self.dict = negative_positive_dict()
self.negative_list = self.dict['places'] + self.dict['states']
self.positive_list = self.dict['onetjobs']

def clean(self, df_jobtitles):
"""
Clean the job titles by rules and negative dictionary.
Args:
df_jobtitles: job titles in pandas DataFrame
Returns:
cleaned_jobtitles: a clenaed verison of job title in pandas DataFrame
"""
df_jobtitles = df_jobtitles.fillna('without jobtitle')

columns = list(df_jobtitles.columns)
cleaned_jobtitles = OrderedDict({key: [] for key in columns})
progress_count = 0
for i, row in enumerate(df_jobtitles.values):
if progress_count % 1000 == 0:
logging.info('%s/%s jobtitles have been cleaned.', progress_count, len(df_jobtitles))
try:
for colname in columns:
if colname == 'title':
new_title = clean_by_rules(row[columns.index(colname)])
new_title = clean_by_neg_dic(new_title, self.negative_list, self.positive_list)
cleaned_jobtitles[colname].append(new_title)
else:
cleaned_jobtitles[colname].append(row[columns.index(colname)])
progress_count += 1
except TypeError:
logging.warning('There is a TypeError %s', row)

cleaned_jobtitles = pd.DataFrame(cleaned_jobtitles)
return cleaned_jobtitles



69 changes: 68 additions & 1 deletion dags/geo_title_counts.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
from datetime import datetime
import pandas as pd

from airflow import DAG
from airflow.hooks import S3Hook
Expand All @@ -12,6 +13,7 @@
from utils.nlp import NLPTransforms
from datasets import job_postings
from algorithms.aggregators.title import GeoTitleAggregator
from algorithms.jobtitle_cleaner.clean import JobTitleStringClean, aggregate
from config import config

default_args = {
Expand Down Expand Up @@ -72,4 +74,69 @@ def execute(self, context):
upload(s3_conn, count_filename, config['output_tables']['s3_path'])
upload(s3_conn, rollup_filename, config['output_tables']['s3_path'])

GeoTitleCountOperator(task_id='geo_title_count', dag=dag)
class JobTitleCleanOperator(BaseOperator):
def execute(self, context):
s3_conn = S3Hook().get_conn()
quarter = datetime_to_quarter(context['execution_date'])

cleaned_count_filename = '{}/cleaned_geo_title_count_{}_new.csv'.format(
output_folder,
quarter
)

cleaned_rollup_filename = '{}/cleaned_title_count_{}.csv'.format(
output_folder,
quarter
)

count_filename = '{}/geo_title_count_{}.csv'.format(
output_folder,
quarter
)

rollup_filename = '{}/title_count_{}.csv'.format(
output_folder,
quarter
)

logging.info('Cleaning and aggregating geo job titles on %s', quarter)
geo_title_count_df = pd.read_csv(count_filename, header=None)
geo_title_count_df.columns = ['geo', 'title', 'count']
cleaned_geo_title_count_df = JobTitleStringClean().clean(geo_title_count_df)
agg_cleaned_geo_title_count_df = aggregate(cleaned_geo_title_count_df, ['geo', 'title'])

logging.info('Cleaning and aggregating job titles on %s', quarter)
title_count_df = pd.read_csv(rollup_filename)
title_count_df.columns = ['title', 'count']
cleaned_title_count_df = JobTitleStringClean().clean(title_count_df)
agg_cleaned_title_count_df = aggregate(cleaned_title_count_df, ['title'])

total_counts = 0
with open(cleaned_count_filename, 'w') as count_file:
clean_geo_writer = csv.writer(count_file, delimiter=',')
for idx, row in agg_cleaned_geo_title_count_df.iterrows():
total_counts += row['count']
clean_geo_writer.writerow([row['geo'], row['title'], row['count']])

rollup_counts = 0
with open(cleaned_rollup_filename, 'w') as count_file:
clean_writer = csv.writer(count_file, delimiter=',')
for idx, row in agg_cleaned_title_count_df.iterrows():
rollup_counts += row['count']
clean_writer.writerow([row['title'], row['count']])

logging.info(
'Found %s count rows and %s title rollup rows for %s',
total_counts,
rollup_counts,
quarter,
)

upload(s3_conn, cleaned_count_filename, config['output_tables']['s3_path'])
upload(s3_conn, cleaned_rollup_filename, config['output_tables']['s3_path'])

jobtitle_clean = JobTitleCleanOperator(task_id='clean_title_count', dag=dag)
geo_count = GeoTitleCountOperator(task_id='geo_title_count', dag=dag)

jobtitle_clean.set_upstream(geo_count)

1 change: 1 addition & 0 deletions datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
from .place_ua import place_ua
from .cousub_ua import cousub_ua
from .ua_cbsa import ua_cbsa
from .negative_positive_dict import negative_positive_dict
63 changes: 63 additions & 0 deletions datasets/negative_positive_dict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from collections import defaultdict
from utils.fs import cache_json
import unicodecsv as csv
import logging
import requests
import re
import us

PLACEURL = 'http://www2.census.gov/geo/docs/maps-data/data/rel/ua_place_rel_10.txt'
ONETURL = 'https://s3-us-west-2.amazonaws.com/skills-public/pipeline/tables/job_titles_master_table.tsv'
SUFFIXES = [
'city',
'town',
'village',
'CDP',
'zona urbana',
'comunidad',
'borough',
'consolidated government',
'municipality',
'unified government',
'metro government',
'metropolitan government',
'urban county',
]
DELIMITERS = ['/', '-', ' City']

@cache_json('negative_positive_dict_lookup.json')
def negative_positive_dict():
"""
Construct a dictionary of terms that are considered not to be in job title, including
states, states abv, cities
Returns: dictionary of set
"""
logging.info("Beginning negative dictionary build")
states = []
states.extend(list(map(lambda x: x.lower(), list(us.states.mapping('name', 'abbr').keys()))))
states.extend(list(map(lambda x: x.lower(), list(us.states.mapping('name', 'abbr').values()))))

places = []
download = requests.get(PLACEURL)
reader = csv.reader(download.content.decode('latin-1').encode('utf-8').splitlines(), delimiter=',')
next(reader)
for row in reader:
cleaned_placename = re.sub(r'\([^)]*\)', '', row[4]).rstrip()
for suffix in SUFFIXES:
if cleaned_placename.endswith(suffix):
cleaned_placename = cleaned_placename.replace(suffix, '').rstrip()
places.append(cleaned_placename.lower())

places = list(set(places))
places.remove('not in a census designated place or incorporated place')

onetjobs = []
download = requests.get(ONETURL)
reader = csv.reader(download.content.splitlines(), delimiter='\t')
next(reader)
for row in reader:
onetjobs.append(row[2].lower())
onetjobs.append(row[3].lower())
onetjobs = list(set(onetjobs))

return {'states': states, 'places': places, 'onetjobs': onetjobs}
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ git+git://github.com/dssg/metta-data.git
moto
unicodecsv
us
mock
Sqlalchemy
testing.postgresql
psycopg2
mock
46 changes: 46 additions & 0 deletions tests/test_datasets_negative_positve_dict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import httpretty

from datasets.negative_positive_dict import negative_positive_dict, PLACEURL, ONETURL

PLACERESPONSE = """UA,UANAME,STATE,PLACE,PLNAME,CLASSFP,GEOID,POPPT,HUPT,AREAPT,AREALANDPT,UAPOP,UAHU,UAAREA,UAAREALAND,PLPOP,PLHU,PLAREA,PLAREALAND,UAPOPPCT,UAHUPCT,UAAREAPCT,UAAREALANDPCT,PLPOPPCT,PLHUPCT,PLAREAPCT,PLAREALANDPCT
00037,"Abbeville, LA Urban Cluster",22,00100,"Abbeville city",C1,2200100,12073,5168,13424306,13348680,19824,8460,29523368,29222871,12257,5257,15756922,15655575,60.9,61.09,45.47,45.68,98.5,98.31,85.2,85.26
00199,"Aberdeen--Bel Air South--Bel Air North, MD Urbanized Area",24,00125,"Aberdeen borough",C1,2400125,14894,6156,14961125,14942090,213751,83721,349451754,339626464,14959,6191,17618553,17599518,6.97,7.35,4.28,4.4,99.57,99.43,84.92,84.9
99999,"Not in a 2010 urban area",26,13480,"Carp Lake CDP",U1,2613480,357,526,12371409,5331938,,,,,357,526,12371409,5331938,,,,,100,100,100,100
00037,"Abbeville, LA Urban Cluster",22,99999,"Not in a census designated place or incorporated place",,2299999,3810,1537,10712370,10487499,19824,8460,29523368,29222871,,,,,19.22,18.17,36.28,35.89,,,,
62677,"New Orleans, LA Urbanized Area",22,01780,"Ama CDP",U1,2201780,1041,439,3021598,3016072,899703,426562,695715795,651105206,1316,547,11475232,9109388,.12,.1,.43,.46,79.1,80.26,26.33,33.11
01171,"Albuquerque, NM Urbanized Area",35,58070,"Placitas CDP (Sandoval County)",U1,3558070,544,280,2550047,2550047,741318,314851,657890843,648969769,4977,2556,76919539,76919539,.07,.09,.39,.39,10.93,10.95,3.32,3.32
77770,"St. Louis, MO--IL Urbanized Area",29,65000,"St. Louis city",C7,2965000,319293,176000,164556953,159893739,2150706,956440,2421404455,2392205874,319294,176002,171026250,160343174,14.85,18.4,6.8,6.68,100,100,96.22,99.72
43912,"Kansas City, MO--KS Urbanized Area",29,28090,"Grain Valley city",C1,2928090,12719,4818,13426555,13410717,1519417,671028,1773883282,1755587807,12854,4867,15720542,15704704,.84,.72,.76,.76,98.95,98.99,85.41,85.39
96670,"Winston-Salem, NC Urbanized Area",37,75000,"Winston-Salem city",C1,3775000,229432,103881,344210551,340988724,391024,174669,842062274,835485857,229617,103974,346269876,343041264,58.67,59.47,40.88,40.81,99.92,99.91,99.41,99.4
08785,"Boise City, ID Urbanized Area",16,08830,"Boise City city",C1,1608830,204776,92335,172985761,171285375,349684,146177,350800300,346614209,205671,92700,207328481,205550644,58.56,63.17,49.31,49.42,99.56,99.61,83.44,83.33
"""

ONETRESPONSE="""'\tO*NET-SOC Code\tTitle\tOriginal Title\tDescription\tjob_uuid\tnlp_a
0\t11-1011.00\tChief Executives\tChief Executives\tDetermine and formulate policies and provide overall direction of companies or private and public sector organizations within guidelines set up by a board of directors or similar governing body. Plan, direct, or coordinate operational activities at the highest level of management with the help of subordinate executives and staff managers.\te4063de16cae5cf29207ca572e3a891d\tchief executives'
1\t11-1011.03\tChief Sustainability Officers\tChief Sustainability Officers\tCommunicate and coordinate with management, shareholders, customers, and employees to address sustainability issues. Enact or oversee a corporate sustainability strategy.\tb4155ade06cff632fb89ff03057b3107\tchief sustainability officers
"""

@httpretty.activate
def test_negative_dict():
httpretty.register_uri(
httpretty.GET,
PLACEURL,
body=PLACERESPONSE,
content_type='text/csv'
)

httpretty.register_uri(
httpretty.GET,
ONETURL,
body=ONETRESPONSE,
content_type='text/csv'
)

results_places = set(negative_positive_dict.__wrapped__()['places'])
assert results_places == {'abbeville', 'aberdeen', 'winston-salem', 'ama', 'placitas',
'boise city', 'grain valley', 'st. louis', 'carp lake'}

results_onetjobs = set(negative_positive_dict.__wrapped__()['onetjobs'])
assert results_onetjobs == {'chief executives', 'chief sustainability officers'}


0 comments on commit 774a5d5

Please sign in to comment.