Skip to content

davit555/ksql-python

 
 

Repository files navigation

ksql-python

A python wrapper for the KSQL REST API. Easily interact with the KSQL REST API using this library.

https://travis-ci.org/bryanyang0528/ksql-python.svg?branch=master

Installation

pip install ksql

Or

git clone https://github.com/bryanyang0528/ksql-python
cd ksql-python
python setup.py install

Getting Started

This is the GITHUB page of KSQL. https://github.com/confluentinc/ksql

Setup

  • Setup for the KSql API:
from ksql import KSQLAPI
client = KSQLAPI('http://ksql-server:8080')

Options

Option Type Required Description
url string yes Your ksql-server url. Example: http://ksql-server:8080
timeout integer no Timout for Requests. Default: 5

Main Methods

ksql

client.ksql('show tables')
  • Example Response [{'tables': {'statementText': 'show tables;', 'tables': []}}]

query

It will execute sql query and keep listening streaming data.

client.query('select * from table1')
  • Example Response

    {"row":{"columns":[1512787743388,"key1",1,2,3]},"errorMessage":null}
    {"row":{"columns":[1512787753200,"key1",1,2,3]},"errorMessage":null}
    {"row":{"columns":[1512787753488,"key1",1,2,3]},"errorMessage":null}
    {"row":{"columns":[1512787753888,"key1",1,2,3]},"errorMessage":null}
    

Simplified API

create_stream/ create_table

client.create_stream(table_name=table_name,
                     columns_type=columns_type,
                     topic=topic,
                     value_format=value_format)

Options

Option Type Required Description
table_name string yes name of stream/table
columns_type list yes ex:['viewtime bigint','userid varchar','pageid varchar']
topic string yes Kafka topic
value_format string no DELIMITED or JSON (Default)
  • Responses
If create table/stream succeed:return True
If failed:raise a CreateError(respose_from_ksql_server)

create_stream_as

a simplified api for creating stream as select

client.create_stream_as(table_name=table_name,
                        select_columns=select_columns,
                        src_table=src_table,
                        kafka_topic=kafka_topic,
                        value_format=value_format,
                        conditions=conditions,
                        partition_by=partition_by,
                        **kwargs)
CREATE STREAM <table_name>
[WITH ( kafka_topic=<kafka_topic>, value_format=<value_format>, property_name=expression ... )]
AS SELECT  <select_columns>
FROM <src_table>
[WHERE <conditions>]
PARTITION BY <partition_by>];

Options

Option Type Required Description
table_name string yes name of stream/table
select_columns list yes you can select [*] or ['columnA', 'columnB']
src_table string yes name of source table
kafka_topic string no The name of the Kafka topic of this new stream(table).
value_format string no DELIMITED, JSON``(Default) or ``AVRO
conditions string no The conditions in the where clause.
partition_by string no Data will be distributed across partitions by this column.
kwargs pair no please provide key=value pairs. Please see more options.

FileUpload

upload

Run commands from a .ksql file. Can only support ksql commands and not streaming queries.

from ksql import FileUpload
pointer = FileUpload('http://ksql-server:8080')
pointer.upload('rules.ksql')

Options

Option Type Required Description
ksqlfile string yes name of file containing the rules
  • Responses
If ksql-commands succesfully executed:return (List of server response for all commands)
If failed:raise the appropriate error

More Options

There are more properties (partitions, replicas, etc...) in the official document.

KSQL Syntax Reference

  • Responses
If create table/stream succeed:return True
If failed:raise a CreatError(respose_from_ksql_server)

About

A python wrapper for the KSQL REST API.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Python 98.9%
  • Other 1.1%