Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions Assessment_sartaj.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"fixed_width_parser"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import csv\n",
"\n",
"class FixedWidthParser:\n",
" def __init__(self, spec_file, input_file, output_file, encoding=\"utf-8\"):\n",
" self.spec_file = spec_file\n",
" self.input_file = input_file\n",
" self.output_file = output_file\n",
" self.encoding = encoding\n",
" self.column_specs = []\n",
"\n",
" def parse_spec(self):\n",
" \"\"\"Reads the spec file and extracts column names and widths.\"\"\"\n",
" with open(self.spec_file, \"r\", encoding=self.encoding) as file:\n",
" for line in file:\n",
" parts = line.strip().split()\n",
" if len(parts) == 2:\n",
" column_name, width = parts\n",
" self.column_specs.append((column_name, int(width)))\n",
"\n",
" def parse_fixed_width_file(self):\n",
" \"\"\"Reads fixed-width data and writes to CSV.\"\"\"\n",
" self.parse_spec()\n",
" \n",
" with open(self.input_file, \"r\", encoding=self.encoding) as infile, \\\n",
" open(self.output_file, \"w\", newline=\"\", encoding=self.encoding) as outfile:\n",
" writer = csv.writer(outfile)\n",
" writer.writerow([col[0] for col in self.column_specs]) # Write header\n",
"\n",
" for line in infile:\n",
" values = []\n",
" start = 0\n",
" for _, width in self.column_specs:\n",
" values.append(line[start:start+width].strip())\n",
" start += width\n",
" writer.writerow(values)\n",
"\n",
"if __name__ == \"__main__\":\n",
" parser = FixedWidthParser(\"spec.txt\", \"input_data/fixed_width.txt\", \"output_data/output.csv\")\n",
" parser.parse_fixed_width_file()\n",
" print(\"✅ Fixed-width file successfully parsed to CSV!\")\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"data_anonymizer"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import csv\n",
"import hashlib\n",
"import random\n",
"import string\n",
"\n",
"class DataAnonymizer:\n",
" def __init__(self, input_file, output_file, chunk_size=10000):\n",
" self.input_file = input_file\n",
" self.output_file = output_file\n",
" self.chunk_size = chunk_size\n",
" self.anonymized_map = {}\n",
"\n",
" def anonymize_text(self, text):\n",
" \"\"\"Generate a consistent hash-based anonymized string.\"\"\"\n",
" if text in self.anonymized_map:\n",
" return self.anonymized_map[text]\n",
" hashed = hashlib.sha256(text.encode()).hexdigest()[:10]\n",
" fake_value = \"\".join(random.choices(string.ascii_letters, k=10))\n",
" self.anonymized_map[text] = fake_value\n",
" return fake_value\n",
"\n",
" def anonymize_csv(self):\n",
" \"\"\"Reads and anonymizes data in chunks.\"\"\"\n",
" with open(self.input_file, \"r\", encoding=\"utf-8\") as infile, \\\n",
" open(self.output_file, \"w\", newline=\"\", encoding=\"utf-8\") as outfile:\n",
" reader = csv.reader(infile)\n",
" writer = csv.writer(outfile)\n",
"\n",
" # Read header\n",
" headers = next(reader)\n",
" writer.writerow(headers)\n",
"\n",
" for row in reader:\n",
" row[0] = self.anonymize_text(row[0]) # first_name\n",
" row[1] = self.anonymize_text(row[1]) # last_name\n",
" row[2] = self.anonymize_text(row[2]) # address\n",
" writer.writerow(row)\n",
"\n",
"if __name__ == \"__main__\":\n",
" anonymizer = DataAnonymizer(\"input_data/data.csv\", \"output_data/anonymized_data.csv\")\n",
" anonymizer.anonymize_csv()\n",
" print(\"✅ Data successfully anonymized!\")\n"
]
}
],
"metadata": {
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 2
}