Skip to content

Commit

Permalink
Merge pull request #2402 from kumardeepak5/master
Browse files Browse the repository at this point in the history
FIX - Replace MongoClientOptions to MongoClientUri in mongoClient of Mongo/STHSinks #2387
  • Loading branch information
fgalan authored Aug 30, 2024
2 parents 2bfa00e + 016d5eb commit 808bf1f
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 23 deletions.
2 changes: 2 additions & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
- [cygnus-ngsi] Upgrade Debian version from 12.5 to 12.6 in Dockerfile
- [cygnus-common][cygnus-ngsi] New setting mongo_uri (#2387)
- [cygnus-common][cygnus-ngsi] Deprecate (mongo_hosts, mongo_username, mongo_password, mongo_auth_source, mongo_replica_set) (use mongo_uri instead)
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,18 @@
package com.telefonica.iot.cygnus.backends.mongo;

import com.mongodb.*;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.result.UpdateResult;
import com.mongodb.connection.SslSettings;
import com.telefonica.iot.cygnus.log.CygnusLogger;
import com.telefonica.iot.cygnus.sinks.Enums.DataModel;
import static com.telefonica.iot.cygnus.sinks.Enums.DataModel.DMBYATTRIBUTE;
import static com.telefonica.iot.cygnus.sinks.Enums.DataModel.DMBYENTITY;
import static com.telefonica.iot.cygnus.sinks.Enums.DataModel.DMBYSERVICEPATH;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
Expand All @@ -41,14 +38,12 @@
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLContext;
import java.security.NoSuchAlgorithmException;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.security.KeyStore;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.io.FileInputStream;
import java.io.InputStream;

import org.apache.commons.lang.StringUtils;
import org.bson.Document;

/**
Expand All @@ -64,6 +59,8 @@ public class MongoBackendImpl implements MongoBackend {
public enum Resolution { SECOND, MINUTE, HOUR, DAY, MONTH }

private MongoClient client;
private com.mongodb.client.MongoClient clientI;
private final String mongoURI;
private final String mongoHosts;
private final String mongoUsername;
private final String mongoPassword;
Expand All @@ -87,12 +84,14 @@ public enum Resolution { SECOND, MINUTE, HOUR, DAY, MONTH }
* @param mongoReplicaSet
* @param dataModel
*/
public MongoBackendImpl(String mongoHosts, String mongoUsername, String mongoPassword,
public MongoBackendImpl(String mongoURI, String mongoHosts, String mongoUsername, String mongoPassword,
String mongoAuthSource, String mongoReplicaSet, DataModel dataModel,
Boolean sslEnabled, Boolean sslInvalidHostNameAllowed,
String sslKeystorePathFile, String sslKeystorePassword,
String sslTruststorePathFile, String sslTruststorePassword) {
client = null;
clientI = null;
this.mongoURI = mongoURI;
this.mongoHosts = mongoHosts;
this.mongoUsername = mongoUsername;
this.mongoPassword = mongoPassword;
Expand Down Expand Up @@ -589,6 +588,9 @@ protected ArrayList<BasicDBObject> buildUpdateForUpdate(String attrType, boolean
* @return
*/
private MongoDatabase getDatabase(String dbName) {
if(StringUtils.isNotEmpty(mongoURI)) {
return getDatabaseByUsingMongoURI(dbName);
}
// create a ServerAddress object for each configured URI
List<ServerAddress> servers = new ArrayList<>();
String[] uris = mongoHosts.split(",");
Expand Down Expand Up @@ -679,6 +681,48 @@ private MongoDatabase getDatabase(String dbName) {
return client.getDatabase(dbName);
} // getDatabase

/**
* Gets a Mongo database by using mongouri.
* @param dbName
* @return
*/
private MongoDatabase getDatabaseByUsingMongoURI(String dbName) {
if (clientI == null) {
SSLContext sslContext = null;
if (sslEnabled) {
try {
KeyStore keyStore = KeyStore.getInstance(KeyStore.getDefaultType());
if ((sslKeystorePathFile != null) && !sslKeystorePathFile.isEmpty()) {
InputStream keyStoreStream = new FileInputStream(sslKeystorePathFile);
keyStore.load(keyStoreStream, sslKeystorePassword.toCharArray());
} else {
keyStore.load(null);
}
if ((sslTruststorePathFile != null) && !sslTruststorePathFile.isEmpty()) {
InputStream trustStoreStream = new FileInputStream(sslTruststorePathFile);
keyStore.load(trustStoreStream, sslTruststorePassword.toCharArray());
}
TrustManagerFactory trustManagerFactory = TrustManagerFactory
.getInstance(TrustManagerFactory.getDefaultAlgorithm());
trustManagerFactory.init(keyStore);
sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, trustManagerFactory.getTrustManagers(), new java.security.SecureRandom());
} catch (Exception e) {
LOGGER.warn("Error when init SSL Context: " + e.getMessage());
}
}

SslSettings sslSetting = SslSettings.builder().enabled(sslEnabled)
.invalidHostNameAllowed(sslInvalidHostNameAllowed).context(sslContext).build();
MongoClientSettings settings = MongoClientSettings.builder()
.applyConnectionString(new ConnectionString(mongoURI))
.applyToSslSettings(builder -> builder.applySettings(sslSetting).build()).build();

clientI = MongoClients.create(settings);
}
return clientI.getDatabase(dbName);
}

/**
* Given a resolution, gets the range. It is protected for testing purposes.
* @param resolution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public MongoBackendImplTest() {
public void testGetRange() {
System.out.println(getTestTraceHead("[MongoBackendImpl.getRange]")
+ "-------- Given a resolution, its related range is correctly returned");
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, false, false, null, null, null, null);
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, null, false, false, null, null, null, null);


try {
Expand Down Expand Up @@ -115,7 +115,7 @@ public void testGetRange() {
public void testGetOrigin() {
System.out.println(getTestTraceHead("[MongoBackendImpl.getOrigin]")
+ "-------- Given a calendar and a resolution, its related origin is correctly returned");
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, false, false, null, null, null, null);
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, null, false, false, null, null, null, null);
GregorianCalendar calendar = new GregorianCalendar(2017, 4, 5, 11, 46, 13);

try {
Expand Down Expand Up @@ -198,7 +198,7 @@ public void testGetOrigin() {
public void testGetOffset() {
System.out.println(getTestTraceHead("[MongoBackendImpl.getOffset]")
+ "-------- Given a calendar and a resolution, its related offset is correctly returned");
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, false, false, null, null, null, null);
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, null, false, false, null, null, null, null);
GregorianCalendar calendar = new GregorianCalendar(2017, 3, 5, 11, 46, 13); // month 3 is April

try {
Expand Down Expand Up @@ -266,7 +266,7 @@ public void testBuildQueryForInsertAggregated() {
String entityType = "someType";
String attrName = "someName";
GregorianCalendar calendar = new GregorianCalendar(2017, 3, 5, 11, 46, 13); // month 3 is April
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, DataModel.DMBYSERVICEPATH, false, false, null, null, null, null);
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, DataModel.DMBYSERVICEPATH, false, false, null, null, null, null);
String queryForInsertAggregated = "{\"_id\": {\"entityId\": \"someId\", \"entityType\": \"someType\", "
+ "\"attrName\": \"someName\", \"origin\": {\"$date\": 1491392760000}, "
+ "\"resolution\": \"second\", \"range\": \"minute\"}, \"points.offset\": 13}";
Expand Down Expand Up @@ -353,7 +353,7 @@ public void testBuildQueryForInsertAggregated() {
throw e;
} // try catch

backend = new MongoBackendImpl(null, null, null, null, null, DataModel.DMBYENTITY, false, false, null, null, null, null);
backend = new MongoBackendImpl(null, null, null, null, null, null, DataModel.DMBYENTITY, false, false, null, null, null, null);

queryForInsertAggregated = "{\"_id\": {\"attrName\": \"someName\", "
+ "\"origin\": {\"$date\": 1491392760000}, \"resolution\": \"second\", "
Expand Down Expand Up @@ -457,7 +457,7 @@ public void testBuildUpdateForUpdateNumerical() {
double sum2 = 200;
int numSamples = 2;
GregorianCalendar calendar = new GregorianCalendar(2017, 3, 5, 11, 46, 13); // month 3 is April
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, false, false, null, null, null, null);
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, null, false, false, null, null, null, null);
String updateForUpdate = "{\"$set\": {\"attrType\": \"someType\"}, "
+ "\"$inc\": {\"points.$.samples\": 2, \"points.$.sum\": 20.0, \"points.$.sum2\": 200.0}, "
+ "\"$min\": {\"points.$.min\": 0.0}, \"$max\": {\"points.$.max\": 10.0}}";
Expand Down Expand Up @@ -489,7 +489,7 @@ public void testBuildUpdateForUpdateString() {
String value = "someString";
int count = 2;
GregorianCalendar calendar = new GregorianCalendar(2017, 3, 5, 11, 46, 13); // month 3 is April
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, false, false, null, null, null, null);
MongoBackendImpl backend = new MongoBackendImpl(null, null, null, null, null, null, null, false, false, null, null, null, null);
String updateForUpdate = "{\"$set\": {\"attrType\": \"someType\"}, "
+ "\"$inc\": {\"points.13.samples\": 2, \"points.13.occur.someString\": 2}}";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
public abstract class NGSIMongoBaseSink extends NGSISink {

protected static final CygnusLogger LOGGER = new CygnusLogger(NGSIMongoBaseSink.class);
protected String mongoURI;
protected String mongoHosts;
protected String mongoUsername;
protected String mongoPassword;
Expand All @@ -50,6 +51,14 @@ public abstract class NGSIMongoBaseSink extends NGSISink {
protected MongoBackendImpl backend;
protected long dataExpiration;
protected boolean ignoreWhiteSpaces;

/**
* Gets the mongo uri. It is protected since it is used by the tests.
* @return
*/
protected String getMongoURI() {
return mongoURI;
} // getMongoURI

/**
* Gets the mongo hosts. It is protected since it is used by the tests.
Expand Down Expand Up @@ -127,6 +136,8 @@ protected MongoBackendImpl getBackend() {
public void configure(Context context) {
super.configure(context);

mongoURI = context.getString("mongo_uri", "");
LOGGER.debug("[" + this.getName() + "] Reading configuration (mongo_uri=" + mongoURI + ")");
mongoHosts = context.getString("mongo_hosts", "localhost:27017");
LOGGER.debug("[" + this.getName() + "] Reading configuration (mongo_hosts=" + mongoHosts + ")");
mongoUsername = context.getString("mongo_username", "");
Expand Down Expand Up @@ -215,7 +226,7 @@ public void configure(Context context) {
@Override
public void start() {
try {
backend = new MongoBackendImpl(mongoHosts, mongoUsername, mongoPassword,
backend = new MongoBackendImpl(mongoURI, mongoHosts, mongoUsername, mongoPassword,
mongoAuthSource, mongoReplicaSet, dataModel,
sslEnabled, sslInvalidHostNameAllowed,
sslKeystorePathFile, sslKeystorePassword,
Expand Down
11 changes: 6 additions & 5 deletions doc/cygnus-ngsi/flume_extensions_catalogue/ngsi_mongo_sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,12 @@ When datamodel changes Cygnus tries to recreate index (delete current and create
| data\_model | no | dm-by-entity | <i>dm-by-service-path</i>, <i>dm-by-entity</i> or <dm-by-attribute</i>. <i>dm-by-service</i> is not currently supported. |
| attr\_persistence | no | row | <i>row</i> or <i>column</i>. |
| attr\_metadata\_store | no | false | <i>true</i> or <i>false</i>. |
| mongo\_hosts | no | localhost:27017 | FQDN/IP:port where the MongoDB server runs (standalone case) or comma-separated list of FQDN/IP:port pairs where the MongoDB replica set members run. |
| mongo\_username | no | <i>empty</i> | If empty, no authentication is done. |
| mongo\_password | no | <i>empty</i> | If empty, no authentication is done. |
| mongo\_auth_source | no | <i>empty</i> | Auth source database use to authenticate the user. Usually could be `admin`. |
| mongo\_replica_set | no | <i>empty</i> | Replica Set name. Note that this parameter is optional because Cygnus is able to connect to a MongoDB replica set without needing to specify its name.|
| mongo\_uri | no | <i>empty</i> | Mongo DB Connection String. In case of non empty mongo\_uri parameters (mongo\_hosts, mongo\_username, mongo\_password, mongo\_auth_source, mongo\_replica_set) would be ignored. |
| mongo\_hosts | no | localhost:27017 | **DEPRECATED** (use mongo_uri instead). FQDN/IP:port where the MongoDB server runs (standalone case) or comma-separated list of FQDN/IP:port pairs where the MongoDB replica set members run. |
| mongo\_username | no | <i>empty</i> | **DEPRECATED** (use mongo_uri instead). If empty, no authentication is done. |
| mongo\_password | no | <i>empty</i> | **DEPRECATED** (use mongo_uri instead). If empty, no authentication is done. |
| mongo\_auth_source | no | <i>empty</i> | **DEPRECATED** (use mongo_uri instead). Auth source database use to authenticate the user. Usually could be `admin`. |
| mongo\_replica_set | no | <i>empty</i> | **DEPRECATED** (use mongo_uri instead). Replica Set name. Note that this parameter is optional because Cygnus is able to connect to a MongoDB replica set without needing to specify its name.|
| db\_prefix | no | sth_ ||
| mongo\_ssl | no | false | Enable SSL in mongodb connection |
| mongo\_ssl\_invalid\_host\_allowed | no | false | Allow invalid host name in mongo SSL connections |
Expand Down

0 comments on commit 808bf1f

Please sign in to comment.