A ZIO-powered wrapper for Apache Parquet’s Java implementation, leveraging ZIO Schema to automatically derive codecs and provide type-safe filter predicates. Operate your parquet files easily using a top-notch ZIO-powered ecosystem without running a Spark cluster.
Ready for more? Check out my other game-changing library that makes working with Apache Arrow format a breeze - ZIO Apache Arrow.
libraryDependencies += "me.mnedokushev" %% "zio-apache-parquet-core" % "@VERSION@"
All examples are self-contained Scala CLI snippets. You can find copies of them in docs/scala-cli
.
To be able to write/read data to/from parquet files you need to define the following schema and value codecs
SchemaEncoder
, ValueEncoder
, and ValueDecoder
for your case classes.
You can get Java SDK’s Type
by using SchemaEncoder
generated by SchemaEncoderDeriver.default
ZIO Schema deriver:
//> using scala "3.5.0"
//> using dep me.mnedokushev::zio-apache-parquet-core:0.1.0
import zio.schema.*
import me.mnedokushev.zio.apache.parquet.core.codec.*
object Schema extends App:
case class MyRecord(a: Int, b: String, c: Option[Long])
object MyRecord {
given schema: Schema[MyRecord] =
DeriveSchema.gen[MyRecord]
given schemaEncoder: SchemaEncoder[MyRecord] =
Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.default)
}
val parquetSchema = MyRecord.schemaEncoder.encode(MyRecord.schema, "my_record", optional = false)
println(parquetSchema)
// Outputs:
// required group my_record {
// required int32 a (INTEGER(32,true));
// required binary b (STRING);
// optional int64 c (INTEGER(64,true));
// }
Alternatively, you can customize the schemas of primitive fields within your record by defining a custom SchemaEncoder
and using the SchemaEncoderDeriver.summoned
deriver.
//> using scala "3.5.0"
//> using dep me.mnedokushev::zio-apache-parquet-core:0.1.0
import me.mnedokushev.zio.apache.parquet.core.Schemas
import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._
object SchemaSummoned extends App:
case class MyRecord(a: Int, b: String, c: Option[Long])
object MyRecord:
given schema: Schema[MyRecord] =
DeriveSchema.gen[MyRecord]
// The custom encoder must be defined before the definition for your record type.
given SchemaEncoder[Int] with {
override def encode(schema: Schema[Int], name: String, optional: Boolean) =
Schemas.uuid.optionality(optional).named(name)
}
given schemaEncoder: SchemaEncoder[MyRecord] =
Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.summoned)
val parquetSchema = MyRecord.schemaEncoder.encode(MyRecord.schema, "my_record", optional = false)
println(parquetSchema)
// Outputs:
// required group my_record {
// required fixed_len_byte_array(16) a (UUID);
// required binary b (STRING);
// optional int64 c (INTEGER(64,true));
// }
Value
is a sealed hierarchy of types for interop between Scala values and Parquet readers/writers.
For converting Scala values into Value
and back we need to define instances of ValueEncoder
and ValueDecoder
type classes. This could be done by using ValueDecoderDeriver.default
ZIO Schema deriver.
//> using scala "3.5.0"
//> using dep me.mnedokushev::zio-apache-parquet-core:0.1.0
import zio.schema._
import me.mnedokushev.zio.apache.parquet.core.codec._
object Value extends App:
case class MyRecord(a: Int, b: String, c: Option[Long])
object MyRecord:
given Schema[MyRecord] =
DeriveSchema.gen[MyRecord]
given encoder: ValueEncoder[MyRecord] =
Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.default)
given decoder: ValueDecoder[MyRecord] =
Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.default)
val value = MyRecord.encoder.encode(MyRecord(3, "zio", None))
val record = MyRecord.decoder.decode(value)
println(value)
// Outputs:
// RecordValue(Map(a -> Int32Value(3), b -> BinaryValue(Binary{"zio"}), c -> NullValue))
println(record)
// Outputs:
// MyRecord(3,zio,None)
Same as for SchemaEncoder
, you can customize the codecs of primitive types by defining custom
ValueEncoder
/ValueDecoder
and using ValueEncoderDeriver.summoned
/ValueDecoderDeriver.summoned
derivers accordingly.
//> using scala "3.5.0"
//> using dep me.mnedokushev::zio-apache-parquet-core:0.1.0
import me.mnedokushev.zio.apache.parquet.core.Value
import zio.schema.*
import me.mnedokushev.zio.apache.parquet.core.codec.*
import java.nio.charset.StandardCharsets
object ValueSummoned extends App:
case class MyRecord(a: Int, b: String, c: Option[Long])
object MyRecord:
given Schema[MyRecord] =
DeriveSchema.gen[MyRecord]
given ValueEncoder[Int] with {
override def encode(value: Int): Value =
Value.string(value.toString)
}
given ValueDecoder[Int] with {
override def decode(value: Value): Int =
value match {
case Value.PrimitiveValue.BinaryValue(v) =>
new String(v.getBytes, StandardCharsets.UTF_8).toInt
case other =>
throw DecoderError(s"Wrong value: $other")
}
}
given encoder: ValueEncoder[MyRecord] =
Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.summoned)
given decoder: ValueDecoder[MyRecord] =
Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.summoned)
val value = MyRecord.encoder.encode(MyRecord(3, "zio", None))
val record = MyRecord.decoder.decode(value)
println(value)
// Outputs:
// RecordValue(Map(a -> BinaryValue(Binary{"3"}), b -> BinaryValue(Binary{"zio"}), c -> NullValue))
println(record)
// Outputs:
// MyRecord(3,zio,None)
Finally, to perform some IO operations we need to initialize ParquetWriter
and ParquetReader
and use either
writeChunk
/readChunk
or writeStream
/readStream
methods.
//> using scala "3.5.0"
//> using dep me.mnedokushev::zio-apache-parquet-core:0.1.0
import zio.schema.*
import me.mnedokushev.zio.apache.parquet.core.codec.*
import me.mnedokushev.zio.apache.parquet.core.hadoop.{ ParquetReader, ParquetWriter, Path }
import zio.*
import java.nio.file.Files
object ParquetIO extends ZIOAppDefault:
case class MyRecord(a: Int, b: String, c: Option[Long])
object MyRecord:
given Schema[MyRecord] =
DeriveSchema.gen[MyRecord]
given SchemaEncoder[MyRecord] =
Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.default)
given ValueEncoder[MyRecord] =
Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.default)
given ValueDecoder[MyRecord] =
Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.default)
val data =
Chunk(
MyRecord(1, "first", Some(11)),
MyRecord(3, "third", None)
)
val recordsFile = Path(Files.createTempDirectory("records")) / "records.parquet"
override def run =
(for {
writer <- ZIO.service[ParquetWriter[MyRecord]]
reader <- ZIO.service[ParquetReader[MyRecord]]
_ <- writer.writeChunk(recordsFile, data)
fromFile <- reader.readChunk(recordsFile)
_ <- Console.printLine(fromFile)
} yield ()).provide(
ParquetWriter.configured[MyRecord](),
ParquetReader.configured[MyRecord]()
)
// Outputs:
// Chunk(MyRecord(1,first,Some(11)),MyRecord(3,third,None))
In the previous code snippet we used ParquetReader.configured[A]()
to initialize a reader that uses a parquet schema taken from a given file. Such a reader will always try to read all columns from a given file.
In case you need to read only part of the columns, use ParquetReader.projected[A]()
. This skips columns that are not present in the schema and reads only those that are, saving precious CPU cycles and time.
Say goodbye to type-unsafe filter predicates such as Col("foo") != "bar"
. The library takes advantage of an underdocumented feature in ZIO Schema - Accessors - the hidden pearl that allows extracting type level infromation about fields of case classes. In addition to the already provided codecs, you need to provide an instance of TypeTag
for your record type. For this, use the TypeTagDeriver.default
deriver.
//> using scala "3.5.0"
//> using dep me.mnedokushev::zio-apache-parquet-core:0.1.0
import zio.*
import zio.schema.*
import me.mnedokushev.zio.apache.parquet.core.codec.*
import me.mnedokushev.zio.apache.parquet.core.hadoop.{ ParquetReader, ParquetWriter, Path }
import me.mnedokushev.zio.apache.parquet.core.filter.syntax.*
import me.mnedokushev.zio.apache.parquet.core.filter.*
import java.nio.file.Files
object Filtering extends ZIOAppDefault:
case class MyRecord(a: Int, b: String, c: Option[Long])
object MyRecord:
// We need to provide field names using singleton types
given Schema.CaseClass3.WithFields["a", "b", "c", Int, String, Option[Long], MyRecord] =
DeriveSchema.gen[MyRecord]
given SchemaEncoder[MyRecord] =
Derive.derive[SchemaEncoder, MyRecord](SchemaEncoderDeriver.default)
given ValueEncoder[MyRecord] =
Derive.derive[ValueEncoder, MyRecord](ValueEncoderDeriver.default)
given ValueDecoder[MyRecord] =
Derive.derive[ValueDecoder, MyRecord](ValueDecoderDeriver.default)
given TypeTag[MyRecord] =
Derive.derive[TypeTag, MyRecord](TypeTagDeriver.default)
// Define accessors to use them later in the filter predicate.
// You can give any names to the accessors as we demonstrate here.
val (id, name, age) = Filter[MyRecord].columns
val data =
Chunk(
MyRecord(1, "bob", Some(10L)),
MyRecord(2, "bob", Some(12L)),
MyRecord(3, "alice", Some(13L)),
MyRecord(4, "john", None)
)
val recordsFile = Path(Files.createTempDirectory("records")) / "records.parquet"
override def run =
(
for {
writer <- ZIO.service[ParquetWriter[MyRecord]]
reader <- ZIO.service[ParquetReader[MyRecord]]
_ <- writer.writeChunk(recordsFile, data)
fromFile <- reader.readChunkFiltered(
recordsFile,
filter(
MyRecord.id > 1 `and` (
MyRecord.name =!= "bob" `or`
// Use .nullable syntax for optional fields.
MyRecord.age.nullable > 10L
)
)
)
_ <- Console.printLine(fromFile)
} yield ()
).provide(
ParquetWriter.configured[MyRecord](),
ParquetReader.configured[MyRecord]()
)
// Outputs:
// Chunk(MyRecord(2,bob,Some(12)),MyRecord(3,alice,Some(13)),MyRecord(4,john,None))