In our world, a data flow is a Python program, that reads the data from the source, does transformations and puts the result into a DuckDB table. Then all rows of that table are sent to Apache Kafka.

Additionally compassing documents about the pipeline and the table are defined in the code.


The data flow has to implement an initial load and a delta load method. For the delta load it can save a delta pointer in the data platform, which can be retrieved at delta start.


This code is saved in a Syensqo Github Enterprise repo and we activate one of two CICD pipelines to deploy it either into an Azure Function App or an Azure Container App.


Contrary to the usual way of working with data flows:


Classes to implement

The Syensqo Lib_Producer library contains classes to implement above logic.

function_app.py

The function_app.py is a class required by Azure Function Apps, which tells Azure the function name and its trigger, here an example of an http trigger.

This function connects to an Oracle database, connects to Kafka and starts a runner class.

@app.route(route="LABWARE/run", methods=["PUT", "POST", "GET", "PATCH"])
@app.function_name("LABWARE_Reader")
async def run(req: functions.HttpRequest) -> functions.HttpResponse:
    async with OracleReader(OracleConnectionSettings("Labware-Source"), logger) as reader:
        kafka_settings = KafkaSecretSettings()
        with KafkaProducer(SOURCE_SYSTEM_NAME, FUNCTION_NAME,
                                ZONE, get_environment(), kafka_settings, logger) as producer:
            runner = Runner(FUNCTION_NAME, logger)
            return await runner.http(req, producer, reader)

runner.py

This implements the steps to be executed for an initial and delta load. It is derived from the FunctionRunner class of the Lib_Producer library.

This is where the load as well as the creation of table schema and impact lineage document is done. Loading the source table is easy, a SQL statement is sent to the Oracle database and all data put into a target_table of DuckDB: 

await reader.load(target_table, "select * FROM LIMSPRD.SAMPLE_PLAN", "oracle")

Sending all rows to Kafka is done via these lines. A single producer can be used to load multiple target tables, the target object holds all target table specific properties. Then the data is copied and and a commit confirms that all data has been sent successfully.

target = producer.add_target(target_table, target_table, key_schema, reader.table_schema)
producer.produce_from_duckdb(target_table, target, 40000, RowType.REPLACE, partition=0)
producer.commit()

The schema of the Kafka messages must not be created manually, although it could, because the SQL statement together with introspecting the Oracle table provides all the information. In this case, the SQL is a select * from SAMPLE_PLAN, so it is obvious what the columns, data datatypes, primary key, foreign key, column comments, table comment should be - read all from the Oracle database's data dictionary. This is a staged task, first the reader.set_schema_name_for_sql() sets the name and namespace, the reader.load() creates the schema object based on the SQL projection. At this stage the schema has columns with datatypes but not more.

The last step is the reader.augment_schema() call, which pulls the other information from the database data dictionary. This is a best-effort approach, ultimately the developer is responsible to provide all the information.

Given that we use SQL for the transformation, the impact/lineage document is derived by parsing the select statement.

Note: In this example you might wonder about the set-schema → reader.load → augment_schema sequence. Why is not all done at once? Yes, for a database with prepared statements, cursors and a data dictionary this would have been an option, but other sources - say a RestAPI without OpenAPI definiton - might return the structure during the load() step only.


The last piece is the load type. An initial load might fail and is restarted. Or for other reasons the initial load will be executed in a year again. This should not lead to duplicate data, so we send a first row with RowType = TRUNCATE and all subsequent rows of type REPLACE. This does advise all the consumers, that the target table has to be overwritten completely, most important the Lakehouse Writer process.

The delta load is in this example an initial load.

class Runner(FunctionRunner):

    async def initial_load(self, producer: Producer, reader: SourceAdapter):
        target_table = "SAMPLE_PLAN"
        await reader.load(target_table, "select * FROM LIMSPRD.SAMPLE_PLAN", "LABWARE", "LABWARE")
        reader.table_schema.set_data_product_owner_email("...")
        reader.table_schema.set_repo_url("...")
        reader.table_schema.set_source_system_uri("...")
        reader.table_schema.set_tickets_url("...")
        key_schema = KeySchema(reader.table_schema)
        target = producer.add_target(target_table, target_table, key_schema, reader.table_schema)
        truncate_row = create_truncate_row(reader.table_schema, None)
        producer.produce_row(target, create_key_row(truncate_row, key_schema), truncate_row, RowType.TRUNCATE, partition=0)
        producer.produce_from_duckdb(target_table, target, 40000, RowType.REPLACE, partition=0)
        producer.commit()
 
    async def delta_load(self, producer: Producer, reader: SourceAdapter):
        await self.initial_load(producer, reader)

reader.py

Above example was simple, because an OracleReader class existed already. For all others, a SourceAdapter class must be implemented.

class StarTekReader(SourceAdapter):

    def __init__(self, logger: Logger, page_limit: Optional[int] = None) -> None:
        super().__init__(logger)
        self.startek_credential = azure_utils.read_secret("StarTek-Source")
        self._session = None

    async def __aenter__(self):
        self._session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=600))
        return self

    async def __aexit__(self, exc_type, exc_val, traceback):
        await self._session.close()
        self._session = None

    async def load(self, duckdb_table_name: str, table_schema: ValueSchema, *args, **kwargs):
        pa_schema = self.prepare_load(duckdb_table_name, table_schema)
        tasks = []
        for page in ..:
            tasks.append(self.load_page(page, duckdb_table_name, pa_schema))
        results = await asyncio.gather(*tasks)

    async load_page(self, page, duckdb_table_name, schema) -> int:
        data = [{..}, {..}] # Get the data of this page somehow
        input_arrow = pa.Table.from_pylist(data, schema=pa_schema)
        self.insert_page(duckdb_table_name, input_arrow)
        return len(data)


If the source does not provide detailed information about the schema, then the schema must be created manually, e.g. by extending the ValueSchema class.

class StarTekAvroSchema(ValueSchema):

    def __init__(self, name: str, namespace: str):
        super().__init__(name, namespace)
        self.add_field("Name", AvroNVarchar(1024), "The node name of the Attribute", False)
        self.add_field("AttributeType", AvroNVarchar(1024), "Type according to StarTek", True)
        self.add_field("AttributeValue", AvroNVarchar(1024), "The measure as string or an error text", True)
        self.pks = {"Name", "AttributeType"}

        self.set_data_product_owner_email("...")
        self.set_repo_url("...")
        self.set_source_system_uri("...")
        self.set_tickets_url("...")


Transformations

In above code we had a reader to get the data from an external system into DuckDB and then produce the table as output data. But what if the source is multiple Lakehouse tables, transformations should happen?

Again, thanks to the power of DuckDB, all of that can be accomplished rather easily.

DuckDB supports the reading from OneLake natively via the delta_scan table function.

SELECT * FROM delta_scan('abfss://SySight-dev-emea@onelake.dfs.fabric.microsoft.com/APPLICATION_ZONE/Tables/StarTek/BQ_PIM_startek_DIRECT');

Either this is used to read the entire delta table into memory or it is used as view or as part of a larger select statement. As developer we have all the options as long as the result is one or many tables in DuckDB.


Data within Python can also be processed via DuckDB, if it exists as pyarrow table.

ValueSchema table_schema = ValueSchema("name", "namespace")
table_schema.add_field("Amount", AvroDecimal(15,2))

pa_schema = table_schema.get_pyarrow()

data = [
	{ "Amount": 3.14 }
]

pa_table = pa.Table.from_pylist(data, table_schema)

duckdb.execute("select sum(Amount) from pa_table")





Technical foot notes

It would be better to read and stream simultaneously? No.

I mean, if reading takes 10 minutes and sending to Kafka another 10 minutes, if both is done in parallel, it could be completed in maybe 11 minutes instead of 20, couldn't it?

In theory yes, but Python has the GIL (Global Interpreter Lock) which prevents executing Python code in parallel, even if using a ThreadPoolExecutor or other threading options. It basically helps only in case of file or network I/O.

Once this is removed, it might be worth a try and once Python has a Just-in-time compiler, the fact that Python is an interpreter language and as such much slower in executing code compared to C++, will be another reason to get significant gains.

But even if, loading the data first allows use to check the primary key constraint, allows to perform transformations in SQL that need multiple steps.

Lastly, DuckDB is in the process of getting native Kafka support. In that case, the producer.produce_from_duckdb() call would just invoke the corresponding DuckDB command and then all runs with true C++ speed from the beginning.

In other words, we tried. ThreadPoolExecutors made the code 20% faster, but using asyncio to read and then write, made all 500% faster.