MDEV-35570 parallel slave ALTER-SEQUENCE attempted to binlog out-of-order

Since MDEV-31503 fixes ALTER-SEQUENCE might be able to  complete its
work including binlogging before a preceding in binlog-order
transaction. There may not be data dependency between the two
transactions, but there would be
   "an attempt was made to binlog GTID D-S-XYZ which would create an
   out-of-order sequence number"
error in the gtid_strict_mode = ON.

After the preceding transaction started committing, and does it rather
slow, ALTER-SEQUNCE was able to find a time window to complete because
it temporarily releases its link with the waitee parent transaction.
And while having it released it also erroneously executes the binlogging part.

Fixed with restoring the commit dependency on the parent before
ALTER-SEQUNCE takes on binlogging.
This commit is contained in:
Andrei
2025-05-13 17:10:30 +03:00
committed by Andrei Elkin
parent 2d5dfc47a9
commit 4e7d06813f
4 changed files with 175 additions and 94 deletions

View File

@ -82,6 +82,30 @@ SELECT @@global.gtid_binlog_state, @@global.gtid_slave_pos as "all through 101 h
@@global.gtid_binlog_state all through 101 have been committed
0-1-101 0-1-101
connection slave;
include/stop_slave.inc
set @saved_mode= @@global.slave_parallel_mode;
set @@global.slave_parallel_mode = conservative;
include/start_slave.inc
connection master;
INSERT INTO ti SET a=2;
include/save_master_gtid.inc
connection slave;
include/sync_with_master_gtid.inc
lock table ti write;
SET GLOBAL debug_dbug= "+d,halt_past_mark_start_commit";
connection master;
INSERT INTO ti SET a=35570;
ALTER SEQUENCE s2 restart with 1;
include/save_master_gtid.inc
connection slave;
unlock tables;
SET debug_sync = "now SIGNAL past_mark_continue";
include/sync_with_master_gtid.inc
include/stop_slave.inc
SET @@global.slave_parallel_mode = @saved_mode;
SET @@global.debug_dbug = @@GLOBAL.debug_dbug;
include/start_slave.inc
connection slave;
flush tables with read lock;
connection master;
CREATE OR REPLACE SEQUENCE s3 ENGINE=innodb;

View File

@ -128,6 +128,53 @@ SET DEBUG_SYNC = 'now SIGNAL continue_worker';
SELECT @@global.gtid_binlog_state, @@global.gtid_slave_pos as "all through 101 have been committed";
#
# MDEV-35570 parallel slave ALTER-SEQUNCE attemted to binlog out-of-order.
# Let two transactions I_1 -> AS_2 where AS_2 depends on a commit parent I_1.
# Under the bug condition AS_2 may complete its work including binlogging
# while I_1 is slowly executing Xid_log_event.
# The test simulate the slowness, AS_2 must defer its completion.
#
--connection slave
--source include/stop_slave.inc
set @saved_mode= @@global.slave_parallel_mode;
set @@global.slave_parallel_mode = conservative;
--source include/start_slave.inc
--connection master
INSERT INTO ti SET a=2;
--source include/save_master_gtid.inc
--connection slave
--source include/sync_with_master_gtid.inc
# allow to proceed to sync with the 1st following WFPT2SC wait condtion
lock table ti write;
# allow to proceed into commit to sync with the 2nd following WFPC wait condition
--let $saved_dbug= @@GLOBAL.debug_dbug
SET GLOBAL debug_dbug= "+d,halt_past_mark_start_commit";
--connection master
INSERT INTO ti SET a=35570;
ALTER SEQUENCE s2 restart with 1;
--source include/save_master_gtid.inc
--connection slave
--let $wait_condition= SELECT count(*) = 1 FROM information_schema.processlist WHERE state LIKE "Waiting for prior transaction to start commit"
--source include/wait_condition.inc
# the 1st wait release
unlock tables;
--let $wait_condition= SELECT count(*) = 1 FROM information_schema.processlist WHERE state LIKE "Waiting for prior transaction to commit"
--source include/wait_condition.inc
# the 2nd wait release
SET debug_sync = "now SIGNAL past_mark_continue";
--source include/sync_with_master_gtid.inc
--source include/stop_slave.inc
SET @@global.slave_parallel_mode = @saved_mode;
--eval SET @@global.debug_dbug = $saved_dbug
--source include/start_slave.inc
# MDEV-31792 Assertion in MDL_context::acquire_lock upon parallel replication of CREATE SEQUENCE
--let $iter = 3

View File

@ -1507,6 +1507,14 @@ handle_rpl_parallel_thread(void *arg)
else
rgi->mark_start_commit();
DEBUG_SYNC(thd, "rpl_parallel_after_mark_start_commit");
#ifdef ENABLED_DEBUG_SYNC
DBUG_EXECUTE_IF("halt_past_mark_start_commit",
{
DBUG_ASSERT(!debug_sync_set_action
(thd, STRING_WITH_LEN("now WAIT_FOR past_mark_continue")));
DBUG_SET_INITIAL("-d,halt_past_mark_start_commit");
};);
#endif
}
}

View File

@ -960,119 +960,121 @@ bool Sql_cmd_alter_sequence::execute(THD *thd)
SEQUENCE *seq;
No_such_table_error_handler no_such_table_handler;
DBUG_ENTER("Sql_cmd_alter_sequence::execute");
{
#if defined(HAVE_REPLICATION)
/* No wakeup():s of subsequent commits is allowed in this function. */
wait_for_commit_raii suspend_wfc(thd);
/* No wakeup():s of subsequent commits is allowed in this function. */
wait_for_commit_raii suspend_wfc(thd);
#endif
if (check_access(thd, ALTER_ACL, first_table->db.str,
&first_table->grant.privilege,
&first_table->grant.m_internal,
0, 0))
DBUG_RETURN(TRUE); /* purecov: inspected */
if (check_access(thd, ALTER_ACL, first_table->db.str,
&first_table->grant.privilege,
&first_table->grant.m_internal,
0, 0))
DBUG_RETURN(TRUE); /* purecov: inspected */
if (check_grant(thd, ALTER_ACL, first_table, FALSE, 1, FALSE))
DBUG_RETURN(TRUE); /* purecov: inspected */
if (check_grant(thd, ALTER_ACL, first_table, FALSE, 1, FALSE))
DBUG_RETURN(TRUE); /* purecov: inspected */
#ifdef WITH_WSREP
if (WSREP(thd) && wsrep_thd_is_local(thd))
{
const bool used_engine= lex->create_info.used_fields & HA_CREATE_USED_ENGINE;
if (wsrep_check_sequence(thd, new_seq, used_engine))
DBUG_RETURN(TRUE);
if (wsrep_to_isolation_begin(thd, first_table->db.str,
first_table->table_name.str,
first_table))
if (WSREP(thd) && wsrep_thd_is_local(thd))
{
DBUG_RETURN(TRUE);
const bool used_engine= lex->create_info.used_fields & HA_CREATE_USED_ENGINE;
if (wsrep_check_sequence(thd, new_seq, used_engine))
DBUG_RETURN(TRUE);
if (wsrep_to_isolation_begin(thd, first_table->db.str,
first_table->table_name.str,
first_table))
{
DBUG_RETURN(TRUE);
}
}
}
#endif /* WITH_WSREP */
if (if_exists())
thd->push_internal_handler(&no_such_table_handler);
error= open_and_lock_tables(thd, first_table, FALSE, 0);
if (if_exists())
{
trapped_errors= no_such_table_handler.safely_trapped_errors();
thd->pop_internal_handler();
}
if (unlikely(error))
{
if (trapped_errors)
if (if_exists())
thd->push_internal_handler(&no_such_table_handler);
error= open_and_lock_tables(thd, first_table, FALSE, 0);
if (if_exists())
{
StringBuffer<FN_REFLEN> tbl_name;
tbl_name.append(&first_table->db);
tbl_name.append('.');
tbl_name.append(&first_table->table_name);
push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE,
ER_UNKNOWN_SEQUENCES,
ER_THD(thd, ER_UNKNOWN_SEQUENCES),
tbl_name.c_ptr_safe());
my_ok(thd);
DBUG_RETURN(FALSE);
trapped_errors= no_such_table_handler.safely_trapped_errors();
thd->pop_internal_handler();
}
if (unlikely(error))
{
if (trapped_errors)
{
StringBuffer<FN_REFLEN> tbl_name;
tbl_name.append(&first_table->db);
tbl_name.append('.');
tbl_name.append(&first_table->table_name);
push_warning_printf(thd, Sql_condition::WARN_LEVEL_NOTE,
ER_UNKNOWN_SEQUENCES,
ER_THD(thd, ER_UNKNOWN_SEQUENCES),
tbl_name.c_ptr_safe());
my_ok(thd);
DBUG_RETURN(FALSE);
}
DBUG_RETURN(TRUE);
}
DBUG_RETURN(TRUE);
}
table= first_table->table;
seq= table->s->sequence;
table= first_table->table;
seq= table->s->sequence;
seq->write_lock(table);
new_seq->reserved_until= seq->reserved_until;
seq->write_lock(table);
new_seq->reserved_until= seq->reserved_until;
/* Copy from old sequence those fields that the user didn't specified */
if (!(new_seq->used_fields & seq_field_used_increment))
new_seq->increment= seq->increment;
if (!(new_seq->used_fields & seq_field_used_min_value))
new_seq->min_value= seq->min_value;
if (!(new_seq->used_fields & seq_field_used_max_value))
new_seq->max_value= seq->max_value;
if (!(new_seq->used_fields & seq_field_used_start))
new_seq->start= seq->start;
if (!(new_seq->used_fields & seq_field_used_cache))
new_seq->cache= seq->cache;
if (!(new_seq->used_fields & seq_field_used_cycle))
new_seq->cycle= seq->cycle;
/* Copy from old sequence those fields that the user didn't specified */
if (!(new_seq->used_fields & seq_field_used_increment))
new_seq->increment= seq->increment;
if (!(new_seq->used_fields & seq_field_used_min_value))
new_seq->min_value= seq->min_value;
if (!(new_seq->used_fields & seq_field_used_max_value))
new_seq->max_value= seq->max_value;
if (!(new_seq->used_fields & seq_field_used_start))
new_seq->start= seq->start;
if (!(new_seq->used_fields & seq_field_used_cache))
new_seq->cache= seq->cache;
if (!(new_seq->used_fields & seq_field_used_cycle))
new_seq->cycle= seq->cycle;
/* If we should restart from a new value */
if (new_seq->used_fields & seq_field_used_restart)
{
if (!(new_seq->used_fields & seq_field_used_restart_value))
new_seq->restart= new_seq->start;
new_seq->reserved_until= new_seq->restart;
}
/* If we should restart from a new value */
if (new_seq->used_fields & seq_field_used_restart)
{
if (!(new_seq->used_fields & seq_field_used_restart_value))
new_seq->restart= new_seq->start;
new_seq->reserved_until= new_seq->restart;
}
/* Let check_and_adjust think all fields are used */
new_seq->used_fields= ~0;
if (new_seq->check_and_adjust(0))
{
my_error(ER_SEQUENCE_INVALID_DATA, MYF(0),
first_table->db.str,
first_table->table_name.str);
error= 1;
/* Let check_and_adjust think all fields are used */
new_seq->used_fields= ~0;
if (new_seq->check_and_adjust(0))
{
my_error(ER_SEQUENCE_INVALID_DATA, MYF(0),
first_table->db.str,
first_table->table_name.str);
error= 1;
seq->write_unlock(table);
goto end;
}
if (likely(!(error= new_seq->write(table, 1))))
{
/* Store the sequence values in table share */
seq->copy(new_seq);
}
else
table->file->print_error(error, MYF(0));
seq->write_unlock(table);
goto end;
if (trans_commit_stmt(thd))
error= 1;
if (trans_commit_implicit(thd))
error= 1;
DBUG_EXECUTE_IF("hold_worker_on_schedule",
{
/* delay binlogging of a parent trx in rpl_parallel_seq */
my_sleep(100000);
});
}
if (likely(!(error= new_seq->write(table, 1))))
{
/* Store the sequence values in table share */
seq->copy(new_seq);
}
else
table->file->print_error(error, MYF(0));
seq->write_unlock(table);
if (trans_commit_stmt(thd))
error= 1;
if (trans_commit_implicit(thd))
error= 1;
DBUG_EXECUTE_IF("hold_worker_on_schedule",
{
/* delay binlogging of a parent trx in rpl_parallel_seq */
my_sleep(100000);
});
if (likely(!error))
error= write_bin_log(thd, 1, thd->query(), thd->query_length());
if (likely(!error))