Skip to content

Commit

Permalink
fix: PostgreSQL simple protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
aljazerzen committed Jul 23, 2024
1 parent 86bc317 commit 04a3ecf
Showing 1 changed file with 12 additions and 9 deletions.
21 changes: 12 additions & 9 deletions connector_arrow/src/postgres/protocol_simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,37 @@ impl<'conn> Statement<'conn> for PostgresStatement<'conn, ProtocolSimple> {
let stmt = &self.stmt;
let schema = types::pg_stmt_to_arrow(stmt)?;

let rows = self
let messages = self
.client
.simple_query(&self.query)
.map_err(PostgresError::from)?;

let row_count = rows.len();
let row_count_upper_bound = messages.len();

let mut row_reader = PostgresRowsReader {
rows: rows.into_iter(),
messages: messages.into_iter(),
};
let batches = collect_rows_to_arrow(schema.clone(), &mut row_reader, row_count)?;
let batches =
collect_rows_to_arrow(schema.clone(), &mut row_reader, row_count_upper_bound)?;

Ok(ArrowReader::new(schema, batches))
}
}

struct PostgresRowsReader {
rows: std::vec::IntoIter<SimpleQueryMessage>,
messages: std::vec::IntoIter<SimpleQueryMessage>,
}

impl<'stmt> RowsReader<'stmt> for PostgresRowsReader {
type CellReader<'row> = PostgresCellReader where Self: 'row;

fn next_row(&mut self) -> Result<Option<Self::CellReader<'_>>, ConnectorError> {
Ok(self.rows.next().and_then(|message| match message {
SimpleQueryMessage::Row(row) => Some(PostgresCellReader { row, next_col: 0 }),
_ => None,
}))
for message in self.messages.by_ref() {
if let SimpleQueryMessage::Row(row) = message {
return Ok(Some(PostgresCellReader { row, next_col: 0 }));
}
}
Ok(None)
}
}

Expand Down

0 comments on commit 04a3ecf

Please sign in to comment.