This application tries to resolve a hypothetical request, developing a monitor that:
- Monitors website availability over network and collects:
- HTTP response time
- error code returned
- pattern that is expected to be found on the page
- Produces corresponding metrics and passes these events through a Kafka instance into a PostgreSQL database
- These components may run in different systems
- One or more database tables could handle a reasonable amount of checks performed over a longer period of time
There are 2 main programs:
In addition, a 3rd program is responsible for initializing the environment:
This component is designed in a way that allows several copies of it run as processes on the same or several independent systems. Each process creates a bunch of threads which monitor the listed URLs (1 threads monitors 5 URLs). All threads publish to the same Kafka topic.
ℹ️ The numbers of URLs monitored by thread (5) are tuned based on the HTTP GET request timeout (15 seconds). If 4 of URLs assigned to a thread suffer timeouts, the 5th doesn't get its next monitoring check delayed (e.g.: 5 URLs * 15 max. time check = 60 seconds == MONITORING_RETRY_SECS=60).
Note: Its main restriction is max. number of open sockets.
This component is designed thinking of performance. Since its code is not Threadsafe, see kafka-python -- Project description:
Thread safety
The KafkaProducer can be used across threads without issue, unlike the KafkaConsumer which cannot.
While it is possible to use the KafkaConsumer in a thread-local manner, multiprocessing is recommended.
it has been implemented as a mono-thread which intensifies memory usage for a better performance.
Therefore, it consumes Kafka messages in windows of either time or number of messages and stores them in group (as a batch) in a Postgres database, where transactional commit is disabled (we relax this setting since all our SQL operations are ACID).
Additionally, performance can be further improved if both Kafka and Postgres components work independently in a continuous stream of data, for example using Store_manager.insert_metrics_copy() and/or implementing shared memory (mmap system call).
On the other hand, for ensuring that our storage is optimized for metrics (time-series data) and can store them for long periods of time, we took profit of TimescaleDB plug-in, e.g:
Scalable
- Transparent time/space partitioning for both scaling up (single node) and scaling out (forthcoming).
- High data write rates (including batched commits, in-memory indexes, transactional support, support for data backfill).
- Right-sized chunks (two-dimensional data partitions) on single nodes to ensure fast ingest even at large data sizes.
- Parallelized operations across chunks and servers.
It initializes the environment, creating the required resources on Kafka and Postgres services.
- Clone or download a ZIP of this project, e.g.:
$ git clone [email protected]:elminster-aom/homeworks.git- Ensure that you have the right version of Python (v3.9, see below)
- Create and activate Python Virtual Environment and install required packages, e.g.:
$ python3 -m venv homeworks \
&& source homeworks/bin/activate \
&& python3 -m pip install --requirement homeworks/requirements.txt- Move into the new environment:
$ cd homeworksFurther details on Installing packages using pip and virtual environments
- Create (if doesn't exist already) a Kafka and PostgresSQL service ([aiven.io] is an interesting option)
- In the case of Kafka, you need to download files for authentication process. Where to find them and where set their path is described bellow, in .env section
- All available settings are based on an environment variables file in the home of our application. For its creation you can use this template: env_example, e.g.:
$ cp docs/env_example .env
$ nano .env
# For information of its parameters, see .env section below
$ chmod 0600 .env- Run
initialize_infra.pyfor initializing the infrastructure *
$ ./initialize_infra.py- Start collecting metrics using
web_monitor_agent.py**
$ ./web_monitor_agent.py- Store them using
sink_connector.py**
$ ./sink_connector.py* It needs to be run only once per environment, for initialization reasons
** They can run on the same server or different ones
Once completed above section, How to install. Tests can be run like:
# Validate that sensitive data is protected
$ ./tests/security_test1.sh
$ ./tests/security_test2.sh
# Validate that infrastructure is properly created
$ python3 -m pytest tests/tests.py
# Validate that all parts work together: URL monitoring, Kafka communication and DB storing
$ ./tests/integration_test.sh- _WORKSPACE_PATH: Full path to the project (e.g.:
/home/user1/homeworks) - KAFKA_ACCESS_CERTIFICATE: Full path to the Kafka access certificate (e.g.:
${_WORKSPACE_PATH}/tests/service.cert), available on your Aiven console: Services -> <Your Kafka> -> Overview -> Access Certificate IMPORTANT! (Although it's encrypted) Do not forget to set service.cert to read-only for file owner (chmod 0600 service.cert) and exclude it from git repository. - KAFKA_ACCESS_KEY: Full path to the Kafka access key (e.g.:
${_WORKSPACE_PATH}/tests/service.key), available on your Aiven console: Services -> <Your Kafka> -> Overview -> Access Key IMPORTANT! Do not forget to set service.key to read-only for file owner (chmod 0600 service.key) and exclude it from git repository. - KAFKA_CA_CERTIFICATE: Full path to the Kafka access certificate (e.g.:
${_WORKSPACE_PATH}/tests/ca.pem), available on your Aiven console: Services -> <Your Kafka> -> Overview -> CA Certificate - KAFKA_HOST: Kafka hostname (e.g.:
kafka.aivencloud.com), available on your Aiven console: Services -> <Your Kafka> -> Overview -> Host - KAFKA_PORT: Kafka TCP listener port (e.g.:
2181), available on your Aiven console: Services -> <Your Kafka> -> Overview -> Port - KAFKA_TOPIC_NAME: A unique string which identifies the Kafka topic for this application (e.g.
web_monitoring) - MONITORING_LOG_LEVEL=INFO: Log level in console, valid values: DEBUG, INFO, WARNING, ERROR and FATAL
- MONITORING_RETRY_SECS: How often web_monitor_agent.py will check the target URLs (in seconds) (e.g.:
60) - MONITORING_TARGETS_PATH: Full path to text file with the target URLs, webs to monitor (e.g.:
${_WORKSPACE_PATH}/tests/list_web_domains.txt) - MONITORING_TARGETS_REGEX: String with a Regex expression web_monitor_agent.py will look for a match on HTTP GET request's body
- POSTGRES_AUTOCOMMIT: As documented before, this parameter must be set to
Truefor performance reasons - POSTGRES_HOST: PostgresSQL hostname (e.g.:
postgres.aivencloud.com), available on your Aiven console: Services -> <Your PostgresSQL> -> Overview -> Host - POSTGRES_USER: PostgresSQL user (e.g.:
avnadmin), available on your Aiven console: Services -> <Your PostgresSQL> -> Overview -> User - POSTGRES_PASSWORD: PostgresSQL password (e.g.:
p4ssW0rd1), available on your Aiven console: Services -> <Your PostgresSQL> -> Overview -> Password - POSTGRES_PORT: PostgresSQL TCP listener port (e.g.:
5432), available on your Aiven console: Services -> <Your PostgresSQL> -> Overview -> Port - POSTGRES_SSL: PostgresSQL SSL Mode Description (default:
require). For a full list of values, check PostgresSQL documentation - POSTGRES_TABLE: Name of a database hypertable for storing our web metrics (e.g.:
web_health_metrics)
- Only Unix-like systems are supported
- The code has been tested with only these versions (different versions may work too but we cannot ensure it):
- Kafka 2.7.0
- PostgresSQL 13.2
- Python 3.9.4
- TimescaleDB 2.1
- For a detailed list of Python modules check out the requirements.txt
Review the list of TODOs
I would like to reference some useful information sources which have been crucial for the implementation of this solution and give thanks to their creators for their collaboration:
- How Postgresql COPY TO STDIN With CSV do on conflic do update?
- UPSERTs not working correctly #100,
- Reference about enable_auto_commit=False
- except is not needed, if any it can be raised
- Reference about enable_auto_commit=False
- How to create tzinfo when I have UTC offset?
- Fastest Way to Load Data Into PostgreSQL Using Python
- Getting started with TimescaleDB in Aiven for PostgreSQL
- 3 Libraries You Should Know to Master Apache Kafka in Python
- How to create topics if it does not exists in Kafka dynamically using kafka-python
- Python Kafka Consumers: at-least-once, at-most-once, exactly-once
- PEP 612 -- Parameter Specification Variables