The FIX example is an Envelope pipeline that receives FIX financial messages of order fulfillment (new orders and execution reports) and updates an HBase history table. This example shows how historical order information can be maintained for each stock symbol.
The configuration for this example is found here. The messages do not fully conform to the real FIX protocol but should be sufficient to demonstrate how a complete implementation could be developed.
-
Create the required HBase table using the provided HBase shell script:
hbase shell create_hbase_tables.rb
-
Create the required Kafka topics:
kafka-topics --zookeeper YOURZOOKEEPERHOSTNAME:2181 --create --topic fix --partitions 9 --replication-factor 3
Run the example in non-secure clusters by submitting the Envelope application with the example’s configuration:
$ export EXTRA_CP=/etc/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core.jar $ spark-submit \ --driver-class-path ${EXTRA_CP} \ --conf spark.executor.extraClassPath=${EXTRA_CP} \ envelope-*.jar fix_hbase.conf
Note
|
CDH5 uses spark2-submit instead of spark-submit for Spark 2 applications such as Envelope.
CDH5 clusters may also require the environment variable SPARK_KAFKA_VERSION to be set to 0.10 .
|
For secure clusters, we need to supply a keytab and a JAAS file to the Spark job, as well as ensure that Spark obtains an HBase token on startup. For this we need a couple of supporting files. Firstly, a keytab file user.kt
containing entries for the user submitting the job. Second, we need a JAAS configuration file jaas.conf
for the driver and executors to reference. A sample JAAS file is supplied here.
A method that works on secure clusters in both client and cluster mode is as follows:
$ ln -s user.kt kafka.kt $ export PRINCNAME=REPLACEME $ HADOOP_CONF_DIR=/etc/hbase/conf:/etc/hive/conf:/etc/spark2/conf:/opt/cloudera/parcels/CDH/lib/hbase/hbase-common.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-client.jar:/opt/cloudera/parcels/CDH/lib/hbase/hbase-protocol.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core.jar \ spark-submit \ --keytab user.kt \ --principal ${PRINCNAME} \ --files jaas.conf,fix_hbase.conf,kafka.kt \ --driver-java-options=-Djava.security.auth.login.config=jaas.conf \ --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf \ --conf spark.executor.extraClassPath=/etc/hbase/conf \ envelope-*.jar fix_hbase.conf
(Note: we do the symlink of the user.kt
file to avoid the Spark YARN client complaining about uploading the same file twice. By default the file referenced in --keytab
is only distributed to the Application Master container so we must also supply the keytab in --files
.)
In another terminal session start the data generator, which is itself an Envelope pipeline that writes out one million example orders to Kafka. For unsecure clusters, this can be launched via:
$ spark-submit envelope-*.jar fix_generator.conf
For secure clusters, use something like:
$ spark-submit \ --files jaas.conf,kafka.kt \ --driver-java-options=-Djava.security.auth.login.config=jaas.conf \ --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf \ envelope-*.jar fix_generator.conf
You can verify the data is being loaded into HBase by running a few sample scans from the HBase shell:
$ hbase shell $ scan 'envelopetest:test', { LIMIT => 100, STARTROW => 'GOOG'} ... GOOG:\x00\x00\x01a\x9AH\xD8\x89 column=cf2:orderqty, timestamp=1518711928433, value=\x00\x00\x14G GOOG:\x00\x00\x01a\x9AH\xD8\x8A column=cf1:clordid, timestamp=1518711929062, value=74687578-c24f-4d98-930a-7f466cece584 GOOG:\x00\x00\x01a\x9AH\xD8\x8A column=cf2:cumqty, timestamp=1518711929062, value=\x00\x00\x08D GOOG:\x00\x00\x01a\x9AH\xD8\x8A column=cf2:leavesqty, timestamp=1518711929062, value=\x00\x00\x17# GOOG:\x00\x00\x01a\x9AH\xD8\x8A column=cf2:orderqty, timestamp=1518711929062, value=\x00\x00\x1E\x19 100 row(s) in 0.4580 seconds $ count 'envelopetest:test' ... Current count: 380000, row: VMW:\x00\x00\x01a\x9AJ17 Current count: 381000, row: VMW:\x00\x00\x01a\x9AJ7\xD9 Current count: 382000, row: VMW:\x00\x00\x01a\x9AJ?\x9A 382705 row(s) in 10.5010 seconds