Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 42 additions & 38 deletions ruby/red-arrow-format/lib/arrow-format/array.rb
Original file line number Diff line number Diff line change
Expand Up @@ -79,54 +79,34 @@ def initialize(type, size, validity_buffer, values_buffer)
super(type, size, validity_buffer)
@values_buffer = values_buffer
end
end

class Int8Array < IntArray
def to_a
apply_validity(@values_buffer.values(:S8, 0, @size))
apply_validity(@values_buffer.values(@type.buffer_type, 0, @size))
end
end

class Int8Array < IntArray
end

class UInt8Array < IntArray
def to_a
apply_validity(@values_buffer.values(:U8, 0, @size))
end
end

class Int16Array < IntArray
def to_a
apply_validity(@values_buffer.values(:s16, 0, @size))
end
end

class UInt16Array < IntArray
def to_a
apply_validity(@values_buffer.values(:u16, 0, @size))
end
end

class Int32Array < IntArray
def to_a
apply_validity(@values_buffer.values(:s32, 0, @size))
end
end

class UInt32Array < IntArray
def to_a
apply_validity(@values_buffer.values(:u32, 0, @size))
end
end

class Int64Array < IntArray
def to_a
apply_validity(@values_buffer.values(:s64, 0, @size))
end
end

class UInt64Array < IntArray
def to_a
apply_validity(@values_buffer.values(:u64, 0, @size))
end
end

class FloatingPointArray < Array
Expand Down Expand Up @@ -393,6 +373,27 @@ def to_a
end
end

class MapArray < VariableSizeListArray
def to_a
super.collect do |entries|
if entries.nil?
entries
else
hash = {}
entries.each do |key, value|
hash[key] = value
end
hash
end
end
end

private
def offset_type
:s32 # TODO: big endian support
end
end

class UnionArray < Array
def initialize(type, size, types_buffer, children)
super(type, size, nil)
Expand Down Expand Up @@ -432,24 +433,27 @@ def to_a
end
end

class MapArray < VariableSizeListArray
class DictionaryArray < Array
def initialize(type, size, validity_buffer, indices_buffer, dictionary)
super(type, size, validity_buffer)
@indices_buffer = indices_buffer
@dictionary = dictionary
end

def to_a
super.collect do |entries|
if entries.nil?
entries
values = []
@dictionary.each do |dictionary_chunk|
values.concat(dictionary_chunk.to_a)
end
buffer_type = @type.index_type.buffer_type
indices = apply_validity(@indices_buffer.values(buffer_type, 0, @size))
indices.collect do |index|
if index.nil?
nil
else
hash = {}
entries.each do |key, value|
hash[key] = value
end
hash
values[index]
end
end
end

private
def offset_type
:s32 # TODO: big endian support
end
end
end
4 changes: 3 additions & 1 deletion ruby/red-arrow-format/lib/arrow-format/field.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ module ArrowFormat
class Field
attr_reader :name
attr_reader :type
def initialize(name, type, nullable)
attr_reader :dictionary_id
def initialize(name, type, nullable, dictionary_id)
@name = name
@type = type
@nullable = nullable
@dictionary_id = dictionary_id
end

def nullable?
Expand Down
139 changes: 99 additions & 40 deletions ruby/red-arrow-format/lib/arrow-format/file-reader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,65 @@ def initialize(input)

validate
@footer = read_footer
@record_batches = @footer.record_batches
@record_batch_blocks = @footer.record_batches
@schema = read_schema(@footer.schema)
@dictionaries = read_dictionaries
end

def n_record_batches
@record_batches.size
@record_batch_blocks.size
end

def read(i)
block = @record_batches[i]
fb_message, body = read_block(@record_batch_blocks[i])
fb_header = fb_message.header
unless fb_header.is_a?(Org::Apache::Arrow::Flatbuf::RecordBatch)
raise FileReadError.new(@buffer,
"Not a record batch message: #{i}: " +
fb_header.class.name)
end
read_record_batch(fb_header, @schema, body)
end

def each
return to_enum(__method__) {n_record_batches} unless block_given?

@record_batch_blocks.size.times do |i|
yield(read(i))
end
end

private
def validate
minimum_size = STREAMING_FORMAT_START_OFFSET +
FOOTER_SIZE_SIZE +
END_MARKER_SIZE
if @buffer.size < minimum_size
raise FileReadError.new(@buffer,
"Input must be larger than or equal to " +
"#{minimum_size}: #{@buffer.size}")
end

start_marker = @buffer.slice(0, START_MARKER_SIZE)
if start_marker != MAGIC_BUFFER
raise FileReadError.new(@buffer, "No start marker")
end
end_marker = @buffer.slice(@buffer.size - END_MARKER_SIZE,
END_MARKER_SIZE)
if end_marker != MAGIC_BUFFER
raise FileReadError.new(@buffer, "No end marker")
end
end

def read_footer
footer_size_offset = @buffer.size - END_MARKER_SIZE - FOOTER_SIZE_SIZE
footer_size = @buffer.get_value(FOOTER_SIZE_FORMAT, footer_size_offset)
footer_data = @buffer.slice(footer_size_offset - footer_size,
footer_size)
Org::Apache::Arrow::Flatbuf::Footer.new(footer_data)
end

def read_block(block)
offset = block.offset

# If we can report property error information, we can use
Expand Down Expand Up @@ -101,54 +149,65 @@ def read(i)

metadata = @buffer.slice(offset, metadata_length)
fb_message = Org::Apache::Arrow::Flatbuf::Message.new(metadata)
fb_header = fb_message.header
unless fb_header.is_a?(Org::Apache::Arrow::Flatbuf::RecordBatch)
raise FileReadError.new(@buffer,
"Not a record batch message: #{i}: " +
fb_header.class.name)
end
offset += metadata_length

body = @buffer.slice(offset, block.body_length)
read_record_batch(fb_header, @schema, body)
end

def each
return to_enum(__method__) {n_record_batches} unless block_given?

@record_batches.size.times do |i|
yield(read(i))
end
[fb_message, body]
end

private
def validate
minimum_size = STREAMING_FORMAT_START_OFFSET +
FOOTER_SIZE_SIZE +
END_MARKER_SIZE
if @buffer.size < minimum_size
raise FileReadError.new(@buffer,
"Input must be larger than or equal to " +
"#{minimum_size}: #{@buffer.size}")
end
def read_dictionaries
dictionary_blocks = @footer.dictionaries
return nil if dictionary_blocks.nil?

start_marker = @buffer.slice(0, START_MARKER_SIZE)
if start_marker != MAGIC_BUFFER
raise FileReadError.new(@buffer, "No start marker")
dictionary_fields = {}
@schema.fields.each do |field|
next unless field.type.is_a?(DictionaryType)
dictionary_fields[field.dictionary_id] = field
end
end_marker = @buffer.slice(@buffer.size - END_MARKER_SIZE,
END_MARKER_SIZE)
if end_marker != MAGIC_BUFFER
raise FileReadError.new(@buffer, "No end marker")

dictionaries = {}
dictionary_blocks.each do |block|
fb_message, body = read_block(block)
fb_header = fb_message.header
unless fb_header.is_a?(Org::Apache::Arrow::Flatbuf::DictionaryBatch)
raise FileReadError.new(@buffer,
"Not a dictionary batch message: " +
fb_header.inspect)
end

id = fb_header.id
if fb_header.delta?
unless dictionaries.key?(id)
raise FileReadError.new(@buffer,
"A delta dictionary batch message " +
"must exist after a non delta " +
"dictionary batch message: " +
fb_header.inspect)
end
else
if dictionaries.key?(id)
raise FileReadError.new(@buffer,
"Multiple non delta dictionary batch " +
"messages for the same ID is invalid: " +
fb_header.inspect)
end
end

value_type = dictionary_fields[id].type.value_type
schema = Schema.new([Field.new("dummy", value_type, true, nil)])
record_batch = read_record_batch(fb_header.data, schema, body)
if fb_header.delta?
dictionaries[id] << record_batch.columns[0]
else
dictionaries[id] = [record_batch.columns[0]]
end
end
dictionaries
end

def read_footer
footer_size_offset = @buffer.size - END_MARKER_SIZE - FOOTER_SIZE_SIZE
footer_size = @buffer.get_value(FOOTER_SIZE_FORMAT, footer_size_offset)
footer_data = @buffer.slice(footer_size_offset - footer_size,
footer_size)
Org::Apache::Arrow::Flatbuf::Footer.new(footer_data)
def find_dictionary(id)
@dictionaries[id]
end
end
end
Loading
Loading