Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added updates to support mongodb. #922

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions sink-connector-lightweight/dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -128,18 +128,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mongodb</artifactId>
<version>2.7.0.Beta2</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>mongodb-driver-sync</artifactId>
<groupId>org.mongodb</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongo-java-driver</artifactId>
Expand Down Expand Up @@ -326,7 +314,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<version.checkstyle.plugin>3.1.1</version.checkstyle.plugin>
<maven.compiler.target>17</maven.compiler.target>
<version.debezium>2.7.0.Beta2</version.debezium>
<version.debezium>2.7.2.Final</version.debezium>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
</properties>
</project>
119 changes: 119 additions & 0 deletions sink-connector-lightweight/docker/config_mongo.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#### Some of the properties are part of Debezium PostgreSQL Connector
#### https://debezium.io/documentation/reference/stable/connectors/postgresql.html
# name: Unique name for the connector. Attempting to register again with the same name will fail.
name: "debezium-embedded-mongo"

auto.create.tables.replicated: "true"

# database.hostname: IP address or hostname of the PostgreSQL database server.
#database.hostname: "postgres"

# database.port: Integer port number of the PostgreSQL database server listening for client connections.
#database.port: "5432"

# database.user: Name of the PostgreSQL database user to be used when connecting to the database.
#database.user: "root"

# database.password: Password of the PostgreSQL database user to be used when connecting to the database.
#database.password: "root"

# database.server.name: The name of the PostgreSQL database from which events are to be captured when not using snapshot mode.
database.server.name: "ER54"

# schema.include.list: An optional list of regular expressions that match schema names to be monitored;
schema.include.list: public,public2

slot.name: connector2



# table.include.list: An optional list of regular expressions that match fully-qualified table identifiers for tables to be monitored;
#table.include.list: "public.tm,public.tm2"

# clickhouse.server.url: Specify only the hostname of the Clickhouse Server.
clickhouse.server.url: "clickhouse"

# clickhouse.server.user: Clickhouse Server User
clickhouse.server.user: "root"

# clickhouse.server.password: Clickhouse Server Password
clickhouse.server.password: "root"

# clickhouse.server.port: Clickhouse Server Port
clickhouse.server.port: "8123"

# database.allowPublicKeyRetrieval: "true" https://rmoff.net/2019/10/23/debezium-mysql-v8-public-key-retrieval-is-not-allowed/
database.allowPublicKeyRetrieval: "true"

# snapshot.mode: Debezium can use different modes when it runs a snapshot. The snapshot mode is determined by the snapshot.mode configuration property.
snapshot.mode: "initial"

# offset.flush.interval.ms: The number of milliseconds to wait before flushing recent offsets to Kafka. This ensures that offsets are committed within the specified time interval.
offset.flush.interval.ms: 5000

# connector.class: The Java class for the connector. This must be set to io.debezium.connector.postgresql.PostgresConnector.
connector.class: "io.debezium.connector.mongodb.MongoDbConnector"

mongodb.connection.string : "mongodb://mongo:27017/?replicaSet=rs0"
topic.prefix : "dbserver1"
mongodb.user : "myUserAdmin"
mongodb.password : "admin"
database.include.list : "inventory"
# offset.storage: The Java class that implements the offset storage strategy. This must be set to io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore.
offset.storage: "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore"

# offset.storage.jdbc.offset.table.name: The name of the database table where connector offsets are to be stored.
offset.storage.jdbc.offset.table.name: "altinity_sink_connector.replica_source_info"

# offset.storage.jdbc.url: The JDBC URL for the database where connector offsets are to be stored.
offset.storage.jdbc.url: "jdbc:clickhouse://clickhouse:8123/altinity_sink_connector"

# offset.storage.jdbc.user: The name of the database user to be used when connecting to the database where connector offsets are to be stored.
offset.storage.jdbc.user: "root"

# offset.storage.jdbc.password: The password of the database user to be used when connecting to the database where connector offsets are to be stored.
offset.storage.jdbc.password: "root"

# offset.storage.jdbc.offset.table.ddl: The DDL statement used to create the database table where connector offsets are to be stored.
offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists %s
(
`id` String,
`offset_key` String,
`offset_val` String,
`record_insert_ts` DateTime,
`record_insert_seq` UInt64,
`_version` UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9))
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY id
SETTINGS index_granularity = 8192"
offset.storage.jdbc.offset.table.delete: "delete from %s where 1=1"
# enable.snapshot.ddl: If set to true, the connector wil parse the DDL statements as part of initial load.
enable.snapshot.ddl: "true"

# auto.create.tables: If set to true, the connector will create the database tables for the destination tables if they do not already exist.
auto.create.tables: "true"

# database.dbname: The name of the PostgreSQL database from which events are to be captured when not using snapshot mode.
database.dbname: "public"

# clickhouse.datetime.timezone: This timezone will override the default timezone of ClickHouse server. Timezone columns will be set to this timezone.
#clickhouse.datetime.timezone: "UTC"

# skip_replica_start: If set to true, the connector will skip replication on startup. sink-connector-client start_replica will start replication.
#skip_replica_start: "false"

# binary.handling.mode: The mode for handling binary values. Possible values are bytes, base64, and decode. The default is bytes.
#binary.handling.mode: "base64"

# ignore_delete: If set to true, the connector will ignore delete events. The default is false.
#ignore_delete: "true"

#Metrics (Prometheus target), required for Grafana Dashboard
metrics.enable: "true"

#disable.ddl: If set to true, the connector will ignore DDL events. The default is false.
#disable.ddl: "false"

#disable.drop.truncate: If set to true, the connector will ignore drop and truncate events. The default is false.
#disable.drop.truncate: "false"
119 changes: 119 additions & 0 deletions sink-connector-lightweight/docker/config_mongo_local.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#### Some of the properties are part of Debezium PostgreSQL Connector
#### https://debezium.io/documentation/reference/stable/connectors/postgresql.html
# name: Unique name for the connector. Attempting to register again with the same name will fail.
name: "debezium-embedded-mongo"

auto.create.tables.replicated: "true"

# database.hostname: IP address or hostname of the PostgreSQL database server.
#database.hostname: "postgres"

# database.port: Integer port number of the PostgreSQL database server listening for client connections.
#database.port: "5432"

# database.user: Name of the PostgreSQL database user to be used when connecting to the database.
#database.user: "root"

# database.password: Password of the PostgreSQL database user to be used when connecting to the database.
#database.password: "root"

# database.server.name: The name of the PostgreSQL database from which events are to be captured when not using snapshot mode.
database.server.name: "ER54"

# schema.include.list: An optional list of regular expressions that match schema names to be monitored;
schema.include.list: public,public2

slot.name: connector2



# table.include.list: An optional list of regular expressions that match fully-qualified table identifiers for tables to be monitored;
#table.include.list: "public.tm,public.tm2"

# clickhouse.server.url: Specify only the hostname of the Clickhouse Server.
clickhouse.server.url: "localhost"

# clickhouse.server.user: Clickhouse Server User
clickhouse.server.user: "root"

# clickhouse.server.password: Clickhouse Server Password
clickhouse.server.password: "root"

# clickhouse.server.port: Clickhouse Server Port
clickhouse.server.port: "8123"

# database.allowPublicKeyRetrieval: "true" https://rmoff.net/2019/10/23/debezium-mysql-v8-public-key-retrieval-is-not-allowed/
database.allowPublicKeyRetrieval: "true"

# snapshot.mode: Debezium can use different modes when it runs a snapshot. The snapshot mode is determined by the snapshot.mode configuration property.
snapshot.mode: "initial"

# offset.flush.interval.ms: The number of milliseconds to wait before flushing recent offsets to Kafka. This ensures that offsets are committed within the specified time interval.
offset.flush.interval.ms: 5000

# connector.class: The Java class for the connector. This must be set to io.debezium.connector.postgresql.PostgresConnector.
connector.class: "io.debezium.connector.mongodb.MongoDbConnector"

mongodb.connection.string : "mongodb://localhost:27017/?replicaSet=rs0"
topic.prefix : "dbserver1"
mongodb.user : "myUserAdmin"
mongodb.password : "admin"
database.include.list : "inventory"
# offset.storage: The Java class that implements the offset storage strategy. This must be set to io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore.
offset.storage: "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore"

# offset.storage.jdbc.offset.table.name: The name of the database table where connector offsets are to be stored.
offset.storage.jdbc.offset.table.name: "altinity_sink_connector.replica_source_info"

# offset.storage.jdbc.url: The JDBC URL for the database where connector offsets are to be stored.
offset.storage.jdbc.url: "jdbc:clickhouse://clickhouse:8123/altinity_sink_connector"

# offset.storage.jdbc.user: The name of the database user to be used when connecting to the database where connector offsets are to be stored.
offset.storage.jdbc.user: "root"

# offset.storage.jdbc.password: The password of the database user to be used when connecting to the database where connector offsets are to be stored.
offset.storage.jdbc.password: "root"

# offset.storage.jdbc.offset.table.ddl: The DDL statement used to create the database table where connector offsets are to be stored.
offset.storage.jdbc.offset.table.ddl: "CREATE TABLE if not exists %s
(
`id` String,
`offset_key` String,
`offset_val` String,
`record_insert_ts` DateTime,
`record_insert_seq` UInt64,
`_version` UInt64 MATERIALIZED toUnixTimestamp64Nano(now64(9))
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY id
SETTINGS index_granularity = 8192"
offset.storage.jdbc.offset.table.delete: "delete from %s where 1=1"
# enable.snapshot.ddl: If set to true, the connector wil parse the DDL statements as part of initial load.
enable.snapshot.ddl: "true"

# auto.create.tables: If set to true, the connector will create the database tables for the destination tables if they do not already exist.
auto.create.tables: "true"

# database.dbname: The name of the PostgreSQL database from which events are to be captured when not using snapshot mode.
database.dbname: "public"

# clickhouse.datetime.timezone: This timezone will override the default timezone of ClickHouse server. Timezone columns will be set to this timezone.
#clickhouse.datetime.timezone: "UTC"

# skip_replica_start: If set to true, the connector will skip replication on startup. sink-connector-client start_replica will start replication.
#skip_replica_start: "false"

# binary.handling.mode: The mode for handling binary values. Possible values are bytes, base64, and decode. The default is bytes.
#binary.handling.mode: "base64"

# ignore_delete: If set to true, the connector will ignore delete events. The default is false.
#ignore_delete: "true"

#Metrics (Prometheus target), required for Grafana Dashboard
metrics.enable: "true"

#disable.ddl: If set to true, the connector will ignore DDL events. The default is false.
#disable.ddl: "false"

#disable.drop.truncate: If set to true, the connector will ignore drop and truncate events. The default is false.
#disable.drop.truncate: "false"
61 changes: 61 additions & 0 deletions sink-connector-lightweight/docker/docker-compose-mongo.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
version: "3.4"

# Ubuntu , set this for redpanda to start
# https://sort.veritas.com/public/documents/HSO/2.0/linux/productguides/html/hfo_admin_ubuntu/ch04s03.htm

# Clickhouse Table Schema
# create table test(id int, message String) ENGINE=MergeTree() PRIMARY KEY id;

services:
mongo:
extends:
file: mongo-service.yml
service: mongo


clickhouse:
extends:
file: clickhouse-service.yml
service: clickhouse
depends_on:
zookeeper:
condition: service_healthy

zookeeper:
extends:
file: zookeeper-service.yml
service: zookeeper

clickhouse-sink-connector-lt:
extends:
file: clickhouse-sink-connector-lt-service.yml
service: clickhouse-sink-connector-lt
depends_on:
- clickhouse
extra_hosts:
- "host.docker.internal:host-gateway"
environment:
JAVA_OPTS: >
-Xmx5G
-Xms128m
volumes:
- ./config_mongo.yml:/config.yml

### MONITORING ####
# prometheus:
# extends:
# file: prometheus-service.yml
# service: prometheus
#
#
# grafana:
# extends:
# file: grafana-service.yml
# service: grafana
# volumes:
# - ./config/grafana/config/dashboard.yml:/etc/grafana/provisioning/dashboards/dashboard.yml
# - ./config/grafana/config/datasource.yml:/etc/grafana/provisioning/datasources/datasource.yml
# - ./config/grafana/config/altinity_sink_connector.json:/var/lib/grafana/dashboards/altinity_sink_connector.json
# depends_on:
# - prometheus
## END OF MONITORING ###
27 changes: 27 additions & 0 deletions sink-connector-lightweight/docker/getLatestDockerTag.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/bin/bash

if [ $# -lt 1 ]
then
cat << HELP

dockertags -- list all tags for a Docker image on a remote registry.

EXAMPLE:
- list all tags for ubuntu:
dockertags ubuntu

- list all php tags containing apache:
dockertags php apache

HELP
fi

image="$1"
tags=`wget -q https://hub.docker.com/v1/repositories/r/altinity/${image}/tags -O - | sed -e 's/[][]//g' -e 's/"//g' -e 's/ //g' | tr '}' '\n' | awk -F: '{print $3}'`

if [ -n "$2" ]
then
tags=` echo "${tags}" | grep "$2" `
fi

echo "${tags}"
28 changes: 28 additions & 0 deletions sink-connector-lightweight/docker/get_latest_docker_tag.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/bin/bash

# Function to fetch the latest tag from Docker Hub
get_latest_tag() {
# Docker Hub repository name in the format "username/repository"
REPO="$1"

# Fetch the tags using Docker Hub API and parse them using jq
# Sort the tags in descending order and pick the first one
LATEST_TAG=$(curl -s "https://hub.docker.com/v2/repositories/$REPO/tags/?page_size=100" \
| jq -r '.results[].name' \
| sort -V \
#| grep -v -e 'latest' -e 'rc' -e '-alpha' -e '-beta' \
| tail -n 1)

echo "The latest tag for $REPO is: $LATEST_TAG"
}

# Check if the repository name is provided as an argument
if [ -z "$1" ]; then
echo "Usage: $0 <repository>"
echo "Example: $0 library/nginx"
exit 1
fi

# Call the function with the provided repository name
get_latest_tag "$1"

6 changes: 6 additions & 0 deletions sink-connector-lightweight/docker/init-replica.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
rs.initiate({
_id: "rs0",
members: [
{ _id: 0, host: "mongo:27017" }
]
});
Loading
Loading