Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent use of portals and simple queries invalidates portals #553

Open
hnaderi opened this issue Sep 23, 2021 · 15 comments
Open

Concurrent use of portals and simple queries invalidates portals #553

hnaderi opened this issue Sep 23, 2021 · 15 comments

Comments

@hnaderi
Copy link
Contributor

hnaderi commented Sep 23, 2021

To reproduce:

//from CursorTest.scala 
  sessionTest("invalidation") { s =>
    cursor(s).use { c =>
      for {
        _ <- c.fetch(1)
        _ <- s.execute(sql"select 1;".query(int4))
        _ <- c.fetch(1)
      } yield ()
    }
  }

This will fail with exception Problem: Portal "portal_2" does not exist.

From what is seen on tcpdump and wireshark, Portal get invalidated implicitly and server takes it as a given and not sends any message for its invalidation.

expected behavior:
This is hard to say, as it is exactly what postgres do which is inconvenient IMO, but this is issue can be mitigated by warnings on document, method docs etc.

@tpolecat
Copy link
Member

Thanks for this, I'll do some experiments and update the doc as needed.

@tpolecat
Copy link
Member

By the way you can add debug = true to your Session configuration and you'll get a transcript of all the messages that go between client and server. No need for Wireshark, etc.

@hnaderi
Copy link
Contributor Author

hnaderi commented Sep 23, 2021

related document which specifies this behavior:
https://www.postgresql.org/docs/10/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY

If successfully created, a named portal object lasts till the end of the current transaction, unless explicitly destroyed.

@hnaderi
Copy link
Contributor Author

hnaderi commented Sep 23, 2021

No need for Wireshark, etc.

I was trying to capture server communication itself to see if other clients behaviors are different, but it turns out to be fruitless 😄

@benhutchison
Copy link
Member

I suspect Im hitting this or a related problem.

Im trying to do a bunch of bulk inserts.

When I do them serially with traverse it works. When I try in parallel with parTraverseN(8), I get postgres errors like Portal "portal_35" does not exist.

@tpolecat
Copy link
Member

I would need to see some code I guess, but it sounds like you need to operate inside a transaction if you want to have multiple active portals.

@benhutchison
Copy link
Member

Thanks for the pointer, issue solved :)

I added a session pool and ran each of my parallel insert jobs in a separate session & transaction. Stable and very fast! 🏎️

@matthughes
Copy link
Contributor

Don't the Postgres docs indicate the prepared statement should be good for the life of the connection?

Prepared statements only last for the duration of the current database session. When the session ends, the prepared statement is forgotten, so it must be recreated before being used again. This also means that a single prepared statement cannot be used by multiple simultaneous database clients; however, each client can create their own prepared statement to use. Prepared statements can be manually cleaned up using the DEALLOCATE command.

I don't know. Reading through their docs I don't get a clear session of whether session = tx or session = connection.

@matthughes
Copy link
Contributor

I still get this randomly even inside explicit transactions. I'm not doing anything parallel as far as I can tell. Will try to experiment with debug to see if I can reproduce.

@hnaderi
Copy link
Contributor Author

hnaderi commented Jun 16, 2023

I don't know. Reading through their docs I don't get a clear session of whether session = tx or session = connection.

@matthughes From what I remember, the word session in pg docs is used broadly, and in this case, the term refers to a live transaction.
If the transaction is terminated (for whatever reason it be), the session is also closed and all of the prepared statements are destroyed, while the connection is still alive and functional.
This seems to be an implicit protocol behavior, as there are no messages transferred between the client and server in this scenario.

I still get this randomly even inside explicit transactions. I'm not doing anything parallel as far as I can tell. Will try to experiment with debug to see if I can reproduce.

It's probably an implicit failure in the transactions. e.g. a merging two streams together.

@matthughes
Copy link
Contributor

matthughes commented Jun 16, 2023

Here's an example log showcasing the failure. I have a service that obtains a session, opens a transaction and does a session.prepare call. I hit this service in parallel to exhibit the behavior. I believe this is showing the two transactions communication intermingled.

 → Query(BEGIN)
 ← CommandComplete(Begin)
 ← ReadyForQuery(Active)
 → Query(select txid_current(), pg_backend_pid())
 ← RowDescription(Field(txid_current, 20); Field(pg_backend_pid, 23))
 ← RowDate()
 ← CommandComplete(Select(1))
 ← ReadyForQuery(Active)
 → Bind(portal_1275,statement_29,List(Some(73bc60f8-3cce-4080-8958-d064ca93651e)))
 → Flush
 ← BindComplete
 → Execute(portal_1275,2147483647)
 → Flush
 ← RowDate()
 ← CommandComplete(Select(1))
 → Sync
 ← ReadyForQuery(Active)
 → Close(P,portal_1275)
 → Flush
 ← CloseComplete
 → Query(COMMIT)
 ← CommandComplete(Commit)
 ← ReadyForQuery(Idle)
 → Query(UNLISTEN *)
 ← CommandComplete(Unlisten)
 ← ReadyForQuery(Idle)
 → Query(RESET ALL)
 ← CommandComplete(Reset)
 ← ReadyForQuery(Idle)
 → Bind(portal_1276,statement_31,List(Some(SomeParam)))
 → Flush
 ← BindComplete
 → Execute(portal_1276,2147483647)
 → Flush
 ← RowDate()
 ← CommandComplete(Select(1))
 → Sync
 ← ReadyForQuery(Idle)
 → Close(P,portal_1276)
 → Flush
 ← CloseComplete
 → Bind(portal_1277,statement_21,List(Some(SomeParam)))
 → Flush
 ← BindComplete
 → Query(BEGIN)
 ← CommandComplete(Begin)
 ← ReadyForQuery(Active)
 → Execute(portal_1277,2147483647)
 → Flush
 ← RowDate()
 ← CommandComplete(Select(1))
 → Sync
 ← ReadyForQuery(Active)
 → Query(select txid_current(), pg_backend_pid())
 ← RowDescription(Field(txid_current, 20); Field(pg_backend_pid, 23))
 ← RowDate()
 ← CommandComplete(Select(1))
 ← ReadyForQuery(Active)
 → Close(P,portal_1277)
 → Flush
 ← CloseComplete
 → Bind(portal_1278,statement_29,List(Some(73bc60f8-3cce-4080-8958-d064ca93651e)))
 → Flush
 ← BindComplete
 → Bind(portal_1279,statement_52,List(Some(SomeParam), Some(73bc60f8-3cce-4080-8958-d064ca93651e)))
 → Flush
 ← BindComplete
 → Execute(portal_1278,2147483647)
 → Flush
 ← RowDate()
 ← CommandComplete(Select(1))
 → Sync
 ← ReadyForQuery(Active)
 → Execute(portal_1279,2147483647)
 → Flush
 ← RowDate()
 ← CommandComplete(Select(1))
 → Sync
 ← ReadyForQuery(Active)
 → Close(P,portal_1278)
 → Flush
 ← CloseComplete
 → Close(P,portal_1279)
 → Flush
 ← CloseComplete
 → Query(COMMIT)
 ← CommandComplete(Commit)
 ← ReadyForQuery(Idle)
 → Bind(portal_1280,statement_40,List(Some(SomeParam), Some(73bc60f8-3cce-4080-8958-d064ca93651e)))
 → Flush
 ← BindComplete
 → Query(UNLISTEN *)
 ← CommandComplete(Unlisten)
 ← ReadyForQuery(Idle)
 → Execute(portal_1280,2147483647)
 → Flush
 ← ErrorResponse(F -> postgres.c, M -> portal "portal_1280" does not exist, V -> ERROR, L -> 2074, C -> 34000, R -> exec_execute_message, S -> ERROR
 → Sync
 ← ReadyForQuery(Idle)
 → Query(RESET ALL)
 ← CommandComplete(Reset)
 ← ReadyForQuery(Idle)
 → Close(P,portal_1280)
 → Flush
 ← CloseComplete
 → Bind(portal_1281,statement_31,List(Some(SomeParam)))
 → Flush
 ← BindComplete
 → Execute(portal_1281,2147483647)

@matthughes
Copy link
Contributor

This UNLISTEN right after the BindComplete looks wrong but it's hard to know since this (I believe) is messages from two different connections being interspersed. Is there a relatively easy way to add some sort of connection identifier to the debug log?

Or can we somehow use MessageSocket.history in an error message so we can replay the last N failed logs before a failure?

@matthughes
Copy link
Contributor

Note that turning commandCache = 0 makes this problem go away.

@matthughes
Copy link
Contributor

Ok I finally found what was causing the issue in my code and was able to reproduce in Skunk PR.

#904

Seems calling .flatMap(identity) or .flatten "outside" of session/tx resource acquisition causes this problem. I don't know if that's a bug in Skunk or a bug in CE or just a bug in my understanding.

@silles79
Copy link

silles79 commented Jul 3, 2023

Hi, we have hit the same problem.

we have something like this

Stream(getStuffFromDb1, getStuffFromDb2 ).parJoinUnbounded
  .through(KafkaProducer.pipe(producerSettings, kafkaProducer))
  .evalTap(_ => commitOffsets(Chunk(record)))
  .flatMap(r => Stream.chunk(r.records.map(_._1.value)))

getStuffFromDb1 and getStuffFromDb2 goes to db and streams some data from here and there:
does something like:

for {
        session <- Stream.resource(database.makeSession)
        r <- getStuffFromDb(session)(id)       
      } yield r

when using parJoinUnbounded we had the same portal_X closed.
using .parJoin(1) "fixed" the problem, but obviously not ideal

so did using queryCache = 0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants