A part of the replication project, MWL#107. The event applier API does the opposite of the stacked event generator API, MWL#120. It allows a plugin to apply (=execute) abstract events on a slave. Just like generated events from MWL#120 are abstract without any fixed materialised format, applied events are also abstract with no fixed materialised format imposed by the API. All event data is supplied by the plugin by calling "provider" methods in applier objects obtained from the API. This allows the plugin to choose which information to supply. For example, one plugin may supply row event values identified by column name, while another may supply by column poisition. Using a provider interface, different plugins can supply different information, as long as the provided information is sufficient for the applier API to execute the event in _some_ way. The event applier API also need to provide a facility for initialising a thread (or multiple threads) to be able to execute events (setting up THD etc). Event appliers are stacked, just like event generators. For example, row-based events are stacked on statement-based events. So a row event applier can accept statement events in addition to row events (and recursively also accept transaction events, as statement-based is stacked on transaction-based). On top of the detailed interfaces for event generation and application, one can build simple materialised event formats, which are just byte strings containing all necessary information. On the generator side, this can be done as a filter that consumes row-based events (say) and encapsulates them in some way (say Google protobuf packets). On the applier side, it can be done by implementing a simple event applier class, that has a single provider method only, accepting a materialised event packet. The strength of the full abstraction below this is that it is possible to implement arbitrary such materialised event formats in plugins, and possible to implement event format plugins and event transport plugins independent of each other.
Initialising a thread must set up all necessary stuff in THD, similar to what is done in handle_one_connection() (but with a single API call, so plugins do not have to know all details). Probably it should set thd->slave_thread for the threads so initialised. Here are draft class definitions (public definitions only). Note that memory lifetime is such that all memory passed as pointers into provider methods in classes derived from class base_applier() must remain valid up to and including call of base_applier::apply(). The applier interfaces generally correspond to rpl_event_* classes of MWL#120. However, they are slightly less fine-grained, for example there is no need for an applier to have separate statement_start and statement_end applier objects. class rpl_event_applier { public: /* Initialise thread; must be called from same thread context that will execute events */ int initialise(); void shutdown(); THD *get_thd(); }; class rpl_event_applier_transaction : public rpl_event_applier { public: trx_start_applier *get_trx_start_applier(); trx_commit_applier *get_trx_commit_applier(); trx_rollback_applier *get_trx_rollback_applier(); }; /* Statement applier is stacked on transaction. */ class rpl_event_applier_statement : public rpl_event_applier_transaction { public statement_applier *get_statement_applier(); }; /* Row applier is stacked on statement. */ class rpl_event_applier_row : public rpl_event_applier_statement { public write_row_applier *get_write_row_applier(); update_row_applier *get_update_row_applier(); delete_row_applier *get_delete_row_applier(); }; /* Applier classes for different event types. */ class base_applier { virtual int apply(); }; class trx_start_applier : public base_applier { // Nothing public here I think ... }; class trx_commit_applier : public base_applier { void global_trx_id(global_transaction_id *); }; class trx_rollback_applier : public base_applier { enum enum_reason { USER_REQUEST, ERROR_DURING_COMMIT }; void rollback_cause(enum_reason reason); }; class statement_applier : public case_applier { void current_db(LEX_STRING db); /* Any error code expected when executing the statement. */ void errorcode(int); void query_string(LEX_STRING query); void sql_mode(ulong mode); void character_set_client(CHARSET_INFO *); void collation_connection(CHARSET_INFO *); void collation_server(CHARSET_INFO *); void collation_default(CHARSET_INFO *); void flags(uint32_t value); void auto_increment_offset(ulong offset); void auto_increment_increment(ulong increment); // ... etc. }; /* I do not think we need separate classes for each kind of statement. The applier will have to parse the query string anyway, which will thell the kind of statement. But maybe something special for LOAD DATA will be needed for blocks of data loaded. */ /* Row-based. */ class row_applier : public base_applier { /* Use already opened table. */ void table(TABLE *t); /* Alternative: specify table by name, will be opened and closed in apply(). */ void table(const char *db, const char *name); enum flag_bits { ROW_WRITE_CAN_REPLACE, // HA_EXTRA_WRITE_CAN_REPLACE ROW_IGNORE_DUP_KEY, // HA_EXTRA_IGNORE_DUP_KEY ROW_IGNORE_NO_KEY, // HA_EXTRA_IGNORE_NO_KEY ROW_DISABLE_FOREIGN_KEY_CHECKS, // ! @@foreign_key_checks ROW_DISABLE_UNIQUE_KEY_CHECKS, // ! @@unique_checks }; void flags(uint32_t value); /* Note that a row_applier can be used multiple times, calling apply() for each set of row values passed to provider methods in sub-classes. */ }; class write_row_applier : public row_applier { /* Specify columns using MySQL-style row buffer and write_set. */ void write_set(BITMAP *set); void after_image(uchar *record); /* Specify column values individually. Note that table must be specified first (otherwise destination type would be unknown). */ /* Specify column values by column number. The storage format of data is the one used by Field::unpack(). */ void column_value(uint index, enum_field_types type, uint param_data, uchar *data); /* Specify column values by column name. */ void column_value(const char *column, enum_field_types type, uint param_data, uchar *data); }; class update_row_applier : public row_applier { void read_set(BITMAP *set); void before_image(uchar *record); void write_set(BITMAP *set); void after_image(uchar *record); void column_before_value(uint index, enum_field_types type, uint param_data, uchar *data); void column_after_value(uint index, enum_field_types type, uint param_data, uchar *data); void column_before_value(const char *column, enum_field_types type, uint param_data, uchar *data); void column_after_value(const char *column, enum_field_types type, uint param_data, uchar *data); }; class delete_row_applier : public row_applier { void read_set(BITMAP *set); void before_image(uchar *record); void column_before_value(uint index, enum_field_types type, uint param_data, uchar *data); void column_before_value(const char *column, enum_field_types type, uint param_data, uchar *data); }; /* Here is an example applier able to apply legacy MySQL binlog events. */ class tpl_mysql_binlog_event_aplier : public row_event_applier { mysql_binlog_event_aplier *get_mysql_binlog_event_aplier(); }; class mysql_binlog_event_aplier : public row_applier { void binlog_event(uchar *buffer, size_t len); };
Dependency created: WL#107 now depends on WL#133
High-Level Specification modified. --- /tmp/wklog.133.old.32092 2010-08-27 13:17:42.000000000 +0000 +++ /tmp/wklog.133.new.32092 2010-08-27 13:17:42.000000000 +0000 @@ -1,2 +1,191 @@ +Initialising a thread must set up all necessary stuff in THD, similar to what +is done in handle_one_connection() (but with a single API call, so plugins do +not have to know all details). + +Probably it should set thd->slave_thread for the threads so initialised. + +Here are draft class definitions (public definitions only). + +Note that memory lifetime is such that all memory passed as pointers into +provider methods in classes derived from class base_applier() must remain +valid up to and including call of base_applier::apply(). + +The applier interfaces generally correspond to rpl_event_* classes of +MWL#120. However, they are slightly less fine-grained, for example there is no +need for an applier to have separate statement_start and statement_end applier +objects. + + class rpl_event_applier { + public: + /* + Initialise thread; must be called from same thread context that will + execute events + */ + int initialise(); + void shutdown(); + + THD *get_thd(); + }; + + class rpl_event_applier_transaction : public rpl_event_applier { + public: + trx_start_applier *get_trx_start_applier(); + trx_commit_applier *get_trx_commit_applier(); + trx_rollback_applier *get_trx_rollback_applier(); + }; + + /* Statement applier is stacked on transaction. */ + class rpl_event_applier_statement : public rpl_event_applier_transaction + { + public + statement_applier *get_statement_applier(); + }; + + /* Row applier is stacked on statement. */ + class rpl_event_applier_row : public rpl_event_applier_statement + { + public + write_row_applier *get_write_row_applier(); + update_row_applier *get_update_row_applier(); + delete_row_applier *get_delete_row_applier(); + }; + + + /* Applier classes for different event types. */ + class base_applier + { + virtual int apply(); + }; + + class trx_start_applier : public base_applier + { + // Nothing public here I think ... + }; + + class trx_commit_applier : public base_applier + { + void global_trx_id(global_transaction_id *); + }; + + class trx_rollback_applier : public base_applier + { + enum enum_reason { USER_REQUEST, ERROR_DURING_COMMIT }; + void rollback_cause(enum_reason reason); + }; + + class statement_applier : public case_applier + { + void current_db(LEX_STRING db); + /* Any error code expected when executing the statement. */ + void errorcode(int); + void query_string(LEX_STRING query); + void sql_mode(ulong mode); + void character_set_client(CHARSET_INFO *); + void collation_connection(CHARSET_INFO *); + void collation_server(CHARSET_INFO *); + void collation_default(CHARSET_INFO *); + void flags(uint32_t value); + void auto_increment_offset(ulong offset); + void auto_increment_increment(ulong increment); + // ... etc. + }; + + /* + I do not think we need separate classes for each kind of statement. + The applier will have to parse the query string anyway, which will thell + the kind of statement. + But maybe something special for LOAD DATA will be needed for blocks of data + loaded. + */ + + /* Row-based. */ + + class row_applier : public base_applier + { + /* Use already opened table. */ + void table(TABLE *t); + /* + Alternative: specify table by name, will be opened and closed in + apply(). + */ + void table(const char *db, const char *name); + enum flag_bits + { + ROW_WRITE_CAN_REPLACE, // HA_EXTRA_WRITE_CAN_REPLACE + ROW_IGNORE_DUP_KEY, // HA_EXTRA_IGNORE_DUP_KEY + ROW_IGNORE_NO_KEY, // HA_EXTRA_IGNORE_NO_KEY + ROW_DISABLE_FOREIGN_KEY_CHECKS, // ! @@foreign_key_checks + ROW_DISABLE_UNIQUE_KEY_CHECKS, // ! @@unique_checks + }; + void flags(uint32_t value); + /* + Note that a row_applier can be used multiple times, calling apply() + for each set of row values passed to provider methods in sub-classes. + */ + }; + + class write_row_applier : public row_applier + { + /* Specify columns using MySQL-style row buffer and write_set. */ + void write_set(BITMAP *set); + void after_image(uchar *record); + /* + Specify column values individually. + Note that table must be specified first (otherwise destination type + would be unknown). + */ + + /* + Specify column values by column number. + The storage format of data is the one used by Field::unpack(). + */ + void column_value(uint index, enum_field_types type, uint param_data, + uchar *data); + /* Specify column values by column name. */ + void column_value(const char *column, enum_field_types type, + uint param_data, uchar *data); + }; + + class update_row_applier : public row_applier + { + void read_set(BITMAP *set); + void before_image(uchar *record); + void write_set(BITMAP *set); + void after_image(uchar *record); + + void column_before_value(uint index, enum_field_types type, + uint param_data, uchar *data); + void column_after_value(uint index, enum_field_types type, + uint param_data, uchar *data); + + void column_before_value(const char *column, enum_field_types type, + uint param_data, uchar *data); + void column_after_value(const char *column, enum_field_types type, + uint param_data, uchar *data); + }; + + class delete_row_applier : public row_applier + { + void read_set(BITMAP *set); + void before_image(uchar *record); + + void column_before_value(uint index, enum_field_types type, + uint param_data, uchar *data); + + void column_before_value(const char *column, enum_field_types type, + uint param_data, uchar *data); + }; + + + /* Here is an example applier able to apply legacy MySQL binlog events. */ + class tpl_mysql_binlog_event_aplier : public row_event_applier + { + mysql_binlog_event_aplier *get_mysql_binlog_event_aplier(); + }; + + class mysql_binlog_event_aplier : public row_applier + { + void binlog_event(uchar *buffer, size_t len); + };