Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • I see 42 rows because I actually have 42 rows in the list (first image)

Image Added

  • however, the replication shows 44 rows because it appears to have stored the changes (second image):
    • “The cumulative count of replicated rows, including all inserts, updates, and deletes applied to the target table.”

Image Added

Is there a way to see those 44 lines (including updates and deletions) instead of the 42? 

1. Reading the Delta Table (Current Version)

Even for mirrored tables, the data is stored as a standard Delta table in OneLake and can be accessed via its ABFSS path.

Delta table - data

Code Block
path = "abfss://69300957-941f-4c8a-970f-49b3fce16e0d@onelake.dfs.fabric.microsoft.com/f73eb213-d657-4e43-8856-30a30f7cb1bf/Tables/dbo/RefSujetsIAAURA"

...


df = spark.read.format("delta").load(path)

...



display(df)

This returns the the latest state of the table, equivalent to what is visible in Fabric after synchronization.

...

You can list these files using:

Code Block
path = "abfss://69300957-941f-4c8a-970f-49b3fce16e0d@onelake.dfs.fabric.microsoft.com/f73eb213-d657-4e43-8856-30a30f7cb1bf/Tables/dbo/RefSujetsIAAURA"

...


files = spark._jvm.org.apache.hadoop.fs.FileSystem \

...


    .get(spark._jsc.hadoopConfiguration()) \

...


    .listStatus(spark._jvm.org.apache.hadoop.fs.Path(path))

...



for f in files:

...


    print(f.getPath().toString())

Example

...

Although not exposed in the Fabric UI for mirrored tables, Delta Lake versioning is still fully available via Spark.from delta

Code Block
languagepy
from delta.tables import DeltaTable

...


from pyspark.sql import functions as F

...



path = "abfss://69300957-941f-4c8a-970f-49b3fce16e0d@onelake.dfs.fabric.microsoft.com/f73eb213-d657-4e43-8856-30a30f7cb1bf/Tables/dbo/RefSujetsIAAURA"

...




# 1) Historique Delta

...


delta_table = DeltaTable.forPath(spark, path)

...


history_df = delta_table.history()  # reverse chronological order

...



display(history_df)

...



# 2) Liste des versions disponibles, triées dans l'ordre croissant

...


versions = [

...


    row["version"]

...


    for row in history_df.select("version").distinct().orderBy("version").collect()

...


]print("Versions disponibles :", versions)

...



# 3) Création d'un DataFrame par version

...


dfs_by_version = {}

...



for v in versions:

...


    df_v = (

...


        spark.read

...


        .format("delta")

...


        .option("versionAsOf", v)

...


        .load(path)

...


        .withColumn("_delta_version", F.lit(v))

...

    )

...


    )
    dfs_by_version[v] = df_v

...




print(f"{len(dfs_by_version)} DataFrames créés")

...


print("Exemple version la plus récente :", max(dfs_by_version.keys()))

...

Image Added 

Example 


Version 2  :

Code Block
display(dfs_by_version[2])

Image Added



Version 3 - Updated made on one field (most recent version so equivalent from delta table synchronization) → most recent version

Code Block
display(dfs_by_version[3])

Image Added

4. Key Observations

...

It seems we can access to commit logs


Image Added

But without testing more tok now which row was deleted or added 

Image Added