Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem writing string columns to avro file #17

Open
kobusherbst opened this issue Apr 27, 2022 · 3 comments
Open

Problem writing string columns to avro file #17

kobusherbst opened this issue Apr 27, 2022 · 3 comments

Comments

@kobusherbst
Copy link

When I try to convert an Arrow file to Avro using this code:

function t_arrowtoavro(path, file)
    a = Arrow.Table(joinpath(path, "$(file).arrow")) |> DataFrame
    println(Tables.schema(a))
    Avro.writetable(joinpath(path, "$(file).avro"), a; compress=:zstd)
end

I get the following error whenever the input file contains a string column, e.g.

Tables.Schema:
 :IndividualId  Int32
 :ResultDate    Union{Missing, Date}
 :HIVResult     Union{Missing, String}
ERROR: ArgumentError: internal writing error: buffer too small, len = 1563130

Whereas this version does not produce this error:

function t_arrowtoavro(path, file)
    a = Arrow.Table(joinpath(path, "$(file).arrow")) |> DataFrame
    println(Tables.schema(a))
    b = select(a, :IndividualId, :ResultDate)
    println(Tables.schema(b))
    Avro.writetable(joinpath(path, "$(file).avro"), b; compress=:zstd)
end

Output:

Tables.Schema:
 :IndividualId  Int32
 :ResultDate    Union{Missing, Date}
 :HIVResult     Union{Missing, String}
Tables.Schema:
 :IndividualId  Int32
 :ResultDate    Union{Missing, Date}
"D:\\Data\\Demography\\AHRI\\Staging\\HIVResults.avro"

How do I resolve this problem if I need a string column in my output file?

@djholiver
Copy link
Contributor

djholiver commented Apr 15, 2024

It appears that in the method "tables.writewithschema" only the first row is assessed in assigning the size of the buffer Vector, leading to the above error.

altering to the following, where the maximum row size is leveraged for the buffer Vector, appears to address the issue.


function writewithschema(io, parts, rows, st, sch, dictrow, compress, kw)
    comp = get(COMPRESSORS, compress, nothing)

    schtyp = schematype(sch)
    meta = Dict("avro.schema" => JSON3.write(schtyp))
    if comp !== nothing
        meta["avro.codec"] = String(compress)
    end
    sync = _cast(NTuple{16, UInt8}, rand(UInt128))
    buf = write((magic=MAGIC, meta=meta, sync=sync); schema=FileHeaderRecordType)
    Base.write(io, buf)
    @debug 1 "wrote file header from bytes 1:$(pos - 1)"
    i = 1
    while true
        # if rows didn't have schema or length, we materialized w/ Tables.dictrowtable
        nrow = length(rows)
        @debug 1 "writing block count ($nrow) at pos = $pos"
        rowsstate = iterate(rows)
        pos = 1
        if rowsstate === nothing
            bytes = UInt8[]
            pos = 0
        else
            row, rowst = rowsstate
            # calc nbytes on all rows to find max, then allocate bytes
            bytesperrow = nbytes(schtyp, row)
            while true 
                rowsstate = iterate(rows, rowst)
                rowsstate === nothing && break
                row, rowst = rowsstate
                nb = nbytes(schtyp, row)
                if nb > bytesperrow
                    bytesperrow = nb
                end
            end
            rowsstate = iterate(rows)
            row, rowst = rowsstate
            blen = trunc(Int, nrow * bytesperrow * 1.05) # add 5% cushion
            bytes = Vector{UInt8}(undef, blen)
            n = 1
            nb = nbytes(schtyp, row)
            while true
                pos = writevalue(Binary(), schtyp, row, bytes, pos, blen, kw)
                rowsstate = iterate(rows, rowst)
                rowsstate === nothing && break
                row, rowst = rowsstate
                nb = nbytes(schtyp, row)
                bytesperrow += nb
                n += 1
            end
        end
        # compress
        if comp !== nothing
            finalbytes = transcode(comp[Threads.threadid()], unsafe_wrap(Base.Array, pointer(bytes), pos - 1))
        else
            finalbytes = bytes
        end
        block = Block(nrow, view(finalbytes, 1:length(finalbytes)), sync)
        buf = write(block; schema=BlockType)
        Base.write(io, buf)
        state = iterate(parts, st)
        state === nothing && break
        part, st = state
        rows = Tables.rows(part)
        sch = Tables.schema(rows)
        if dictrow
            rows = Tables.dictrowtable(rows)
        end
    end
    return
end

I've added a PR for the above.

@kobusherbst
Copy link
Author

Thanh you!

@djholiver
Copy link
Contributor

Sorry to @ you @quinnj but would you mind reviewing the associated PR for the above - its self contained.

Your fantastic work on this has potentially been under - utilised; and I believe the PR makes Avro usable "out of the box" for Julia.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants