Skip to content

Commit

Permalink
Merge branch 'stability-improvements-v2' into improve-parquet-perform…
Browse files Browse the repository at this point in the history
…ance
  • Loading branch information
apoorva918 authored Feb 6, 2024
2 parents 8e8b7fc + 4a226dc commit a2ddf24
Show file tree
Hide file tree
Showing 76 changed files with 561 additions and 413 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ For a list of commonly-used configuration values, see the
| `PRAVEGA_SENSOR_COLLECTOR_RAW1_DELETE_COMPLETED_FILES` | `false` | If true, PSC immediately delete the file soon after processing |
| `PRAVEGA_SENSOR_COLLECTOR_RAW1_TRANSACTION_TIMEOUT_MINUTES` | `2.0` | Timeout for each transaction. Default value is 2 minutes |
| `PRAVEGA_SENSOR_COLLECTOR_RAW1_CREATE_SCOPE` | `false` | If Pravega is on SDP, set this to `false`. Accept Boolean value. |
| `PRAVEGA_SENSOR_COLLECTOR_RAW1_EXACTLY_ONCE` | true | If true, it will use transactional write. For raw file ingestion it is recommended to set it as false as in transactional write, client can process maximum file size of 8mb. |
| `PRAVEGA_SENSOR_COLLECTOR_RAW1_ENABLE_LARGE_EVENT` | false | if false, will not allow to write large event. It is recommended to set it as true for non transactional write. |
| `HADOOP_HOME` | `${HOME}/dev` | For windows, Hadoop requires native libraries on Windows to work properly. You can download `Winutils.exe` to fix this. <br> See [here](https://cwiki.apache.org/confluence/display/HADOOP2/WindowsProblems). Add the location of bin/winutils.exe in the parameter HADOOP_HOME. <br> **This is required only for Parquet file type not for CSV and Raw file ingestion type** |


Expand Down
22 changes: 22 additions & 0 deletions pravega-sensor-collector/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ apply plugin: "com.github.spotbugs"
apply plugin: 'checkstyle'
apply plugin: "application"
apply plugin: "com.github.johnrengelman.shadow"
apply plugin: "jacoco"

group = "io.pravega"
archivesBaseName = "pravega-sensor-collector"
Expand Down Expand Up @@ -156,3 +157,24 @@ checkstyle {
}
}


test {
finalizedBy jacocoTestReport // report is always generated after tests run
}

jacocoTestReport {
dependsOn test // tests are required to run before generating the report
}

jacoco {
toolVersion = "0.8.9"
reportsDirectory = layout.buildDirectory.dir('customJacocoReportDir')
}

jacocoTestReport {
reports {
xml.required = false
csv.required = false
html.outputLocation = layout.buildDirectory.dir('jacocoHtml')
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
export pravega_client_auth_method=Bearer
export pravega_client_auth_loadDynamic=true
export KEYCLOAK_SERVICE_ACCOUNT_FILE=/opt/pravega-sensor-collector/conf/keycloak.json
export JAVA_OPTS="-Xmx512m"
export JAVA_OPTS="-Xmx512m -Dlogback.configurationFile=/opt/pravega-sensor-collector/bin/logback.xml"
export PRAVEGA_SENSOR_COLLECTOR_NET1_CLASS=io.pravega.sensor.collector.network.NetworkDriver
export PRAVEGA_SENSOR_COLLECTOR_NET1_NETWORK_INTERFACE=ens33
export PRAVEGA_SENSOR_COLLECTOR_NET1_MEMORY_QUEUE_CAPACITY_ELEMENTS=10000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package io.pravega.sensor.collector;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractService;
import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
Expand All @@ -23,15 +24,15 @@
* This is an abstract class for all device drivers.
*/
public abstract class DeviceDriver extends AbstractService implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(DeviceDriver.class);
private static final Logger LOGGER = LoggerFactory.getLogger(DeviceDriver.class);

private final DeviceDriverConfig config;

private static final String CREATE_SCOPE_KEY = "CREATE_SCOPE";

public DeviceDriver(DeviceDriverConfig config) {
this.config = config;
log.info("Create Scope: {}", isCreateScope());
this.config = Preconditions.checkNotNull(config, "config");
LOGGER.info("Create Scope: {}", isCreateScope());
}

public String getProperty(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
*/
package io.pravega.sensor.collector;

import com.google.common.base.Preconditions;

import java.util.Map;

public class DeviceDriverConfig {
Expand All @@ -18,19 +20,19 @@ public class DeviceDriverConfig {
private final DeviceDriverManager deviceDriverManager;

public DeviceDriverConfig(String instanceName, String className, Map<String, String> properties, DeviceDriverManager deviceDriverManager) {
this.instanceName = instanceName;
this.className = className;
this.properties = properties;
this.deviceDriverManager = deviceDriverManager;
this.instanceName = Preconditions.checkNotNull(instanceName, "instanceName");
this.className = Preconditions.checkNotNull(className, "className");
this.properties = Preconditions.checkNotNull(properties, "deviceDriverProperties");
this.deviceDriverManager = Preconditions.checkNotNull(deviceDriverManager, "deviceDriverManager");
}

@Override
public String toString() {
return "DeviceDriverConfig{" +
"instanceName='" + instanceName + '\'' +
", className='" + className + '\'' +
", properties=" + properties +
'}';
return "DeviceDriverConfig{"
+ "instanceName='" + instanceName + '\''
+ ", className='" + className + '\''
+ ", properties=" + properties
+ '}';
}

public String getInstanceName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,23 @@
*/
package io.pravega.sensor.collector;

import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeviceDriverFactory {
private static final Logger log = LoggerFactory.getLogger(DeviceDriverFactory.class);
private static final Logger LOGGER = LoggerFactory.getLogger(DeviceDriverFactory.class);

/**
* Instantiate a concrete subclass of DeviceDriver based on key/value properties.
*/
DeviceDriver create(DeviceDriverConfig config) {
try {
log.info("Creating device driver instance {} with class {}", config.getInstanceName(), config.getClassName());
Preconditions.checkNotNull(config, "deviceDriverConfig");
LOGGER.info("Creating device driver instance {} with class {}", config.getInstanceName(), config.getClassName());
final Class<?> deviceDriverClass = Class.forName(config.getClassName());
final DeviceDriver driver = (DeviceDriver) deviceDriverClass.getConstructor(DeviceDriverConfig.class).newInstance(config);
log.info("Done creating device driver instance {}", config.getInstanceName());
LOGGER.info("Done creating device driver instance {}", config.getInstanceName());
return driver;
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package io.pravega.sensor.collector;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -20,7 +21,7 @@
import java.util.stream.Stream;

public class DeviceDriverManager extends AbstractService {
private static final Logger log = LoggerFactory.getLogger(DeviceDriverManager.class);
private static final Logger LOGGER = LoggerFactory.getLogger(DeviceDriverManager.class);

private static final String PREFIX = Parameters.getEnvPrefix();
private static final String SEPARATOR = "_";
Expand All @@ -31,17 +32,17 @@ public class DeviceDriverManager extends AbstractService {
private List<DeviceDriver> drivers;

public DeviceDriverManager(Map<String, String> properties) {
configs = configFromProperties(PREFIX, SEPARATOR, properties);
configs = configFromProperties(PREFIX, SEPARATOR, Preconditions.checkNotNull(properties, "properties"));
}

@Override
protected void doStart() {
log.info("Starting device drivers");
LOGGER.info("Starting device drivers");
final DeviceDriverFactory factory = new DeviceDriverFactory();
drivers = configs.stream().map(factory::create).collect(Collectors.toList());
drivers.stream().forEach((driver) -> driver.startAsync());
drivers.stream().forEach((driver) -> driver.awaitRunning());
log.info("All device drivers started successfully");
LOGGER.info("All device drivers started successfully");
notifyStarted();
}

Expand Down Expand Up @@ -75,11 +76,11 @@ private List<DeviceDriverConfig> configFromProperties(String prefix, String sep,
}
return Stream.empty();
}).collect(Collectors.toList());
log.debug("configFromProperties: instanceNames={}", instanceNames);
LOGGER.debug("configFromProperties: instanceNames={}", instanceNames);
// Copy properties with prefix to keys without a prefix.
final List<DeviceDriverConfig> config = instanceNames.stream().map((instanceName) -> {
final String className = properties.get(prefix + instanceName + sep + CLASS_KEY);
assert(className != null);
assert (className != null);
final Map<String, String> instanceProperties = new HashMap<>();
properties.entrySet().stream().forEach((entry) -> {
final String key = entry.getKey();
Expand All @@ -92,7 +93,7 @@ private List<DeviceDriverConfig> configFromProperties(String prefix, String sep,
});
return new DeviceDriverConfig(instanceName, className, instanceProperties, this);
}).collect(Collectors.toList());
log.info("configFromProperties: config={}", config);
LOGGER.info("configFromProperties: config={}", config);
return config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package io.pravega.sensor.collector;

import com.google.common.base.Preconditions;
import io.pravega.client.ClientConfig;

import java.net.URI;
Expand All @@ -22,8 +23,8 @@ public class PravegaClientConfig {
private static final String PRAVEGA_CONTROLLER_URI_KEY = "PRAVEGA_CONTROLLER_URI";

public PravegaClientConfig(URI controllerURI, String scopeName) {
this.controllerURI = controllerURI;
this.scopeName = scopeName;
this.controllerURI = Preconditions.checkNotNull(controllerURI, "controllerURI");
this.scopeName = Preconditions.checkNotNull(scopeName, "scopeName");
}

public PravegaClientConfig(Map<String, String> properties, String scopeName) {
Expand All @@ -48,11 +49,11 @@ public String toString() {

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }
PravegaClientConfig that = (PravegaClientConfig) o;
return controllerURI.equals(that.controllerURI) &&
scopeName.equals(that.scopeName);
return controllerURI.equals(that.controllerURI)
&& scopeName.equals(that.scopeName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package io.pravega.sensor.collector;

import com.google.common.base.Preconditions;
import io.pravega.client.ClientConfig;
import io.pravega.client.EventStreamClientFactory;
import org.slf4j.Logger;
Expand All @@ -18,13 +19,13 @@
import java.util.Map;

public class PravegaClientPool implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(DeviceDriverManager.class);
private static final Logger log = LoggerFactory.getLogger(PravegaClientPool.class);

private final Map<PravegaClientConfig, ClientConfig> clientConfigs = new HashMap<>();
private final Map<PravegaClientConfig, EventStreamClientFactory> eventStreamClientFactories = new HashMap<>();

public synchronized ClientConfig getClientConfig(PravegaClientConfig config) {
final ClientConfig clientConfig = clientConfigs.get(config);
final ClientConfig clientConfig = clientConfigs.get(Preconditions.checkNotNull(config, "pravegaClientConfig"));
if (clientConfig != null) {
log.info("Reusing client config for {}", config);
return clientConfig;
Expand All @@ -36,7 +37,7 @@ public synchronized ClientConfig getClientConfig(PravegaClientConfig config) {
}

public synchronized EventStreamClientFactory getEventStreamClientFactory(PravegaClientConfig config) {
final EventStreamClientFactory factory = eventStreamClientFactories.get(config);
final EventStreamClientFactory factory = eventStreamClientFactories.get(Preconditions.checkNotNull(config, "pravegaClientConfig"));
if (factory != null) {
log.info("Reusing client factory for {}", config);
return factory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@
import java.util.Map;

public class PravegaSensorCollectorApp {
private static final Logger log = LoggerFactory.getLogger(PravegaSensorCollectorApp.class);
private static final Logger LOG = LoggerFactory.getLogger(PravegaSensorCollectorApp.class);

public static void main(String[] args) {
try {
log.info("Collector starting");
LOG.info("Collector starting");
final Map<String, String> properties = Parameters.getProperties();
log.debug("Properties: {}", properties);
LOG.debug("Properties: {}", properties);
final DeviceDriverManager deviceDriverManager = new DeviceDriverManager(properties);
deviceDriverManager.startAsync();
deviceDriverManager.awaitTerminated();
} catch (Exception e) {
log.error("Fatal Error", e);
LOG.error("Fatal Error", e);
System.exit(2);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@
import java.util.stream.Collectors;

public class AccelerometerDriver extends SimpleDeviceDriver<AccelerometerRawData, AccelerometerSamples> {
private static final Logger log = LoggerFactory.getLogger(AccelerometerDriver.class);
private static final Logger LOGGER = LoggerFactory.getLogger(AccelerometerDriver.class);

private static final String CONFIG_DEVICE_FILE_KEY = "CONFIG_DEVICE_FILE";
private static final String DATA_DEVICE_FILE_KEY = "DATA_DEVICE_FILE";

private final static int SAMPLES_PER_QUEUE_ELEMENT = 16;
private final static int BYTES_PER_SAMPLE = 16;
private static final int SAMPLES_PER_QUEUE_ELEMENT = 16;
private static final int BYTES_PER_SAMPLE = 16;

// Scale for x, y, z.
private final List<Double> scales;
Expand All @@ -45,8 +45,8 @@ public AccelerometerDriver(DeviceDriverConfig config) {

final String sensorConfigurationDeviceFileName = getSensorConfigurationDeviceFileName();
final String sensorDataDeviceFileName = getSensorDataDeviceFileName();
log.info("Sensor Configuration Device File: {}", sensorConfigurationDeviceFileName);
log.info("Sensor Data Device File: {}", sensorDataDeviceFileName);
LOGGER.info("Sensor Configuration Device File: {}", sensorConfigurationDeviceFileName);
LOGGER.info("Sensor Data Device File: {}", sensorDataDeviceFileName);

try {
// TODO: IIO device numbering may change after reboot. Need to scan file system to find desired device.
Expand All @@ -63,7 +63,7 @@ public AccelerometerDriver(DeviceDriverConfig config) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
log.info("scales={}", scales);
LOGGER.info("scales={}", scales);
randomAccessFile = new RandomAccessFile(new File(sensorDataDeviceFileName), "r");
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package io.pravega.sensor.collector.accelerometer;

import com.google.common.base.Preconditions;
import org.apache.commons.codec.binary.Hex;

/**
Expand All @@ -18,13 +19,13 @@ public class AccelerometerRawData {
public final byte[] bytes;

public AccelerometerRawData(byte[] bytes) {
this.bytes = bytes;
this.bytes = Preconditions.checkNotNull(bytes, "bytes");
}

@Override
public String toString() {
return "AccelerometerRawData{" +
", bytes=" + Hex.encodeHexString(bytes) +
'}';
return "AccelerometerRawData{"
+ ", bytes=" + Hex.encodeHexString(bytes)
+ '}';
}
}
Loading

0 comments on commit a2ddf24

Please sign in to comment.