Can portals interleave with other portals

Поиск
Список
Период
Сортировка
От Evgeny Smirnov
Тема Can portals interleave with other portals
Дата
Msg-id CAB9opTGs-Aa9aQf9kM0xpZHOcGVAeR1rLLkNbH8A-HKD0WJ54w@mail.gmail.com
обсуждение исходный текст
Ответы Re: Can portals interleave with other portals  (Tom Lane <tgl@sss.pgh.pa.us>)
Список pgsql-sql

Greetings!

A Postgresql backend is capable of operating multiple portals within a transaction and switching between them on and off. For instance such a code results in an expected outcome:


```

// The table users_Fetch contains users with ids between 1 and 20

val fetchedIds = Flux.defer {

    Flux.usingWhen(

        // 1. Establish cursors

        databaseClient.sql(

                "DECLARE fetch_test1 SCROLL CURSOR FOR SELECT * FROM users_Fetch;" +

                "DECLARE fetch_test2 SCROLL CURSOR FOR SELECT * FROM users_Fetch;")

            .flatMap { _ -> Mono.just(databaseClient) },


        // 2. Iterate cursors

        {

             // concat subscribes its internals sequentially

            Flux.concat(

                databaseClient.sql("MOVE FORWARD 3 FROM fetch_test1; FETCH FORWARD 5 FROM fetch_test1;")

                    .flatMap { r -> r.map { row, _ -> row.get("userId") as Int } },

                databaseClient.sql("FETCH FORWARD 5 FROM fetch_test2;")

                    .flatMap { r -> r.map { row, _ -> row.get("userId") as Int } },


                // An extended query select in between

                databaseClient.sql("select userId from users_Fetch;")

                    .flatMap { r -> r.map { row, _ -> row.get("userId") as Int } },


                databaseClient.sql("FETCH BACKWARD 5 FROM fetch_test1;")

                    .flatMap { r -> r.map { row, _ -> row.get("userId") as Int } },

                databaseClient.sql("FETCH FORWARD 5 FROM fetch_test2;")

                    .flatMap { r -> r.map { row, _ -> row.get("userId") as Int } }

            )

        },


       // 3. Close cursors

        {

            databaseClient.sql(

                "CLOSE fetch_test1;" +

                "CLOSE fetch_test2;").then()

        }

    )

}.`as`(transactionalOperatorFetch::transactional)


assertEquals(

    listOf(

        4, 5, 6, 7, 8, // MOVE FORWARD 3 FROM fetch_test1; FETCH FORWARD 5 FROM fetch_test1;

        1, 2, 3, 4, 5, // FETCH FORWARD 5 FROM fetch_test2;

        1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, // select userId from users_Fetch;

        7, 6, 5, 4, 3, // FETCH BACKWARD 5 FROM fetch_test1;

        6, 7, 8, 9, 10), // FETCH FORWARD 5 FROM fetch_test2;

    fetchedIds.collectList().block()

)

``` 


Is the same possible for conventional selects issued with extended query protocol? From the protocol perspective it would result in the following traffic:



231 53111 5432 PGSQL 109 >Q ———> BEGIN

232 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=1 Ack=54 Win=6371 Len=0 TSval=2819351776 TSecr=589492423

237 5432 53111 PGSQL 73 <C/Z

238 53111 5432 TCP 56 53111 → 5432 [ACK] Seq=54 Ack=18 Win=6366 Len=0 TSval=589492435 TSecr=2819351788

// A client issues a select

239 53111 5432 PGSQL 276 >P/B/D/E/H ———> select * from …; bind B_1; execute B_1, fetch 2 rows; flush

240 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=18 Ack=274 Win=6368 Len=0 TSval=2819351793 TSecr=589492440

245 5432 53111 PGSQL 552 <1/2/T/D/D/s ———> Data, Data, Portal suspended

// Then the same sequence for another prepared statement and portal (lets say B_2) but without a limit in the Execute command and sync at the end. 

// Then the client proceeds with B_1 till the completion

270 53111 5432 PGSQL 69 > E ———> execute B_1, fetch 2 rows,

271 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=925 Ack=323 Win=6367 Len=0 TSval=2819351846 TSecr=589492493

272 53111 5432 PGSQL 61 >H ———> Flush

274 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=925 Ack=328 Win=6367 Len=0 TSval=2819351846 TSecr=589492493

282 5432 53111 PGSQL 144 <D/C ———> Command completion

283 53111 5432 TCP 56 53111 → 5432 [ACK] Seq=328 Ack=1013 Win=6351 Len=0 TSval=589492496 TSecr=2819351849

284 53111 5432 PGSQL 66 >C ———> Close B_1

285 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=1013 Ack=338 Win=6367 Len=0 TSval=2819351849 TSecr=589492496

286 53111 5432 PGSQL 61 >S ———> Sync

287 5432 53111 TCP 56 5432 → 53111 [ACK] Seq=1013 Ack=343 Win=6366 Len=0 TSval=2819351849 TSecr=589492496

293 5432 53111 PGSQL 67 <3/Z 

294 53111 5432 TCP 56 53111 → 5432 [ACK] Seq=343 Ack=1024 Win=6351 Len=0 TSval=589492498 TSecr=2819351851

295 53111 5432 PGSQL 68 >Q ———> COMMIT


I’m interested because such a communication is intrinsic to r2dbc scenarios like this

```

val usersWithAccouns = Flux.defer {

    // Select all users

    databaseClient.sql("select * from users where userId >= $1 and userId <= $2")

        .bind("$1", 1)

        .bind("$2", 255)

        .flatMap { r -> r.map { row, meta -> } }

         .flatMap { user ->

            // For each user select all its accounts

            databaseClient.sql("select login from accounts where userId=$1 limit 1")

                .bind("$1", user.id)

                .flatMap { r -> r.map { row, meta -> } }

                .reduce …

        }

}.`as`(transactionalOperator::transactional)

```

which results in failure owing to inner requests building up a queue inside the driver.


Thanks!

В списке pgsql-sql по дате отправления:

Предыдущее
От: Sheryl Prabhu David
Дата:
Сообщение: Nested loop behaviour with pg_stat_activity
Следующее
От: Tom Lane
Дата:
Сообщение: Re: Can portals interleave with other portals