Schema Registry
Schema Registry serves as a centralized repository for message metadata. It stores schemas that define the structure and type of messages. Schema Registry also provides pluggable serializer and deserializer interfaces and supplies serializer and deserializer implementations at runtime based on the incoming messages. Additionally, it facilitates the reuse, discovery, authoring, and collaboration of schemas.
Each schema is primarily represented by the following metadata:
- Name: The unique name of the schema within the registry.
- Type: The schema type, such as Avro, ProtoBuf, or JSON.
- Schema Group: The group to which this schema belongs, which could be categories like Kafka, Hive, Spark, or system logs.
- Compatibility: The level of compatibility between different versions of the schema.
- Description: Information about the various versions of a schema.
Schemas can evolve over time through multiple versions. Each version of a schema includes:
- Schema Text: The textual representation of the schema.
- Description: Details about this specific version.
Compatibility
The compatibility between different versions of a schema can be set to one of the following options:
- Backward: This setting ensures that a new version of the schema is compatible with the previous version. This means data written with the earlier version can be deserialized using the new version of the schema.
- Forward: This option indicates that the current schema version is compatible with future versions. Thus, data written with a new version can still be read by the old version of the schema.
- Full: This setting means that a new version of the schema supports both backward and forward compatibility.
- None: There is no compatibility between different versions of the schema.
Installation
Initialize Database
- Create a user in the assigned database and provide access to the registry server host:
The following steps are an example for MYSQL database. Do the following on MySQL CLI to enable the registry user to access the registry database.
#Create the database registry
create database registry;
#Create user and grant access to registry database identified by the password
CREATE USER 'registry'@'registry_server_hostname' IDENTIFIED BY 'registry_password';
GRANT ALL PRIVILEGES ON registry.* to 'registry'@'registry_server_hostname';
FLUSH PRIVILEGES;
Ambari Installation Steps
To install Ambari Registry Mpack, perform the following:
- Download the branch from the code and zip it with tar.gz extension.
- Upload the gzipped file to the ambari-server.
- Execute the below command to install:
ambari-server install-mpack --mpack=registry-1.0.0.tar.gz --verbose
On running the above commands, the following message is displayed:
INFO: Management pack registry-ambari-mpack-1.0.0 successfully installed! Please restart ambari-server.
INFO: Loading properties from /etc/ambari-server/conf/ambari.properties
Ambari Server 'install-mpack' completed successfully.
However, it requires a few symlink modifications on the Ambari server before initiating a restart.
cd /var/lib/ambari-server/resources/stacks/ODP/3.0/services/
rm -f REGISTRY
ln -s /var/lib/ambari-server/resources/mpacks/registry-ambari-mpack-1.0.0/common-services/REGISTRY/1.0.0 REGISTRY
cd ../../3.1/services/
rm -f REGISTRY
ln -s /var/lib/ambari-server/resources/mpacks/registry-ambari-mpack-1.0.0/common-services/REGISTRY/1.0.0 REGISTRY
cd ../../3.2/services/
rm -f REGISTRY
ln -s /var/lib/ambari-server/resources/mpacks/registry-ambari-mpack-1.0.0/common-services/REGISTRY/1.0.0 REGISTRY
#Restart Ambari server
ambari-server restart
- Login to the Ambari UI and navigate to the Add Schema Registry service.
- Select the hosts for the Registry server.
- Update the configuration according to the specified sections. The following configurations are necessary for enabling the registry server, as well as for authentication, authorization, and other authentication mechanisms.
Database Configuration (Mandatory)
Update the necessary configurations below to maintain the schema registry service metadata on an RDBMS service:
- Add details for the storage user, storage password, database type, and host:port as established during the initial database setup.

SSL Enablement
Update the following properties as per your SSL configurations for the respective hosts:

Kerberos Configuration
Service principal and keytab configuration will be automated by Ambari, but one manual adjustment is necessary for handling authentication to local rule translation, as illustrated here:
- Verify the property
hadoop.security.auth_to_local
in the HDFS/YARN core-site, or adapt your own rule following this example:
RULE:[1:$1@$0](ambari-qa-odp_odin@ADSRE.COM)s/.*/ambari-qa/
RULE:[2:$1@$0](registry@ADSRE.COM)s/.*/registry/
DEFAULT
To adjust the kerberos.name.rules
property in Custom registry-common, modify the rules from newlines to spaces as follows:
RULE:[1:$1@$0](ambari-qa-odp_odin@ADSRE.COM)s/.*/ambari-qa/ RULE:[2:$1@$0](registry@ADSRE.COM)s/.*/registry/ DEFAULT
Ranger Configuration
To enable or disable Ranger authorization, update the below configuration and select the enable_ranger checkbox:

When enabling Ranger in a Kerberized environment, ensure to perform the following:
- Enable Kerberos on the UI by checking the enable_ui_security checkbox.

- Set the Authentication Type to “Kerberos” in the Ranger Schema Registry configuration properties section.

- Add the below mentioned policies on the Ranger UI for Schema Registry.





- Save and give permissions to users for all the policies as shown below.

- Make sure to give permission to the registry user in the Kafka policies as well.

For advanced changes navigate to the Advanced ranger-schema-registry-security section. If Ranger is enabled with SSL, then, update the following properties as per your cluster settings:

For reference, you can check the below screenshot.

Ranger Audit can be enabled or disabled based on the settings in the Ranger configuration:
To disable HDFS/Solr auditing, update the following configurations:
xasecure.audit.is.enabled=false
xasecure.audit.destination.hdfs=false
- Deselect the option for Audit to SOLR.

Knox Configuration
All Knox configuration details will be automatically filled in by the Ambari service advisor during the installation of the service. If modifications are necessary, refer to the following section:

- Start all stopped or required services.
- Access the UI as per the enabled protocol:
- HTTP -
http://hostname:7788
- HTTPS -
https://hostname:7790
- HTTP -

Running Kafka (Example)
Create a Kafka Topic
$KAFKA_HOME/bin/kafka-topics.sh –zookeeper localhost:2181 –topic truck_events_stream –partitions 1 –replication-factor 1 –create
Grant your user permissions to access the running producer and consumer for the specified topic:
ACL
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=<zookeeper_host>:2181 --add --allow-principal User:principal_name --allow-host "*" --operation All --topic truck_events_stream
Ranger
Go to Ranger UI and create/update policy under Kafka to provide access to given user.
Run Producer to Register Schema and Send Data
- Navigate to the schema-registry Avro examples directory by executing:
cd $REGISTRY_HOME/examples/schema-registry/avro
- To send messages to the topic
truck_events
_stream``, update thedata/kafka-producer.props
file with the necessary configurations and save it. Modify the security protocol andsasl.jaas.config
according to your security context. Once updated, execute the following command to start sending messages:
topic=truck_events_stream
bootstrap.servers=<kafka_host1>:9092,<kafka_host2>:9092
schema.registry.url=http://<regisry_host>:7788/api/v1
security.protocol=SASL_PLAINTEXT
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer
ignoreInvalidMessages=true
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true renewTicket=true serviceName="kafka";
java -Djava.security.auth.login.config=/etc/kafka/conf/kafka_client_jaas.conf \
-cp "avro-examples-1.0.0.3.2.2.0-1.jar:/usr/odp/3.2.3.3-2/registry/libs/*:/usr/odp/3.2.3.3-2/kafka/libs/*" \
com.hortonworks.registries.schemaregistry.examples.avro.KafkaAvroSerDesApp \
-d data/truck_events_json -p data/kafka-producer.props -sm -s data/truck_events.avsc
Run Consumer to Retrieve Schema and Deserialize Messages
- Navigate to the schema-registry Avro examples directory by executing:
cd $REGISTRY_HOME/examples/schema-registry/avro
- To consume messages from the topic
truck_events_stream
, update thedata/kafka-consumer.props
file and save the changes. Adjust the security protocol andsasl.jaas.config
according to your security context. After making these updates, execute the subsequent command to begin consuming messages.
topic=truck_events_stream
bootstrap.servers=<kafka_host1>:9092,<kafka_host2>:9092
schema.registry.url=http://<regisry_host>:7788/api/v1
security.protocol=SASL_PLAINTEXT
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer
group.id=truck_group
auto.offset.reset=earliest
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useTicketCache=true renewTicket=true serviceName="kafka";
java -Djava.security.auth.login.config=/etc/kafka/conf/kafka_client_jaas.conf \
-cp "avro-examples-1.0.0.3.2.2.0-1.jar:/tmp/libs/*:/usr/odp/3.2.3.3-2/kafka/libs/*" \
com.hortonworks.registries.schemaregistry.examples.avro.KafkaAvroSerDesApp \
-c data/kafka-consumer.props -cm
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-serdes</artifactId>
</dependency>
API Examples
You can access all available APIs for Schema Registry, along with descriptions and use cases, by visiting the following Swagger URL:
http://registry_hostname:7788/swagger

Using Schema related APIs
The following code snippets illustrate how to use the SchemaRegistryClient for various functions such as registering new versions of schemas, retrieving registered schema versions, registering serializers/deserializers, and fetching serializers/deserializers for a specific schema.
String schema1 = getSchema("/device.avsc");
SchemaMetadata schemaMetadata = createSchemaMetadata("com.hwx.schemas.sample-" + System.currentTimeMillis());
// registering a new schema
SchemaIdVersion v1 = schemaRegistryClient.addSchemaVersion(schemaMetadata, new SchemaVersion(schema1, "Initial version of the schema"));
LOG.info("Registered schema [{}] and returned version [{}]", schema1, v1);
// adding a new version of the schema
String schema2 = getSchema("/device-next.avsc");
SchemaVersion schemaInfo2 = new SchemaVersion(schema2, "second version");
SchemaIdVersion v2 = schemaRegistryClient.addSchemaVersion(schemaMetadata, schemaInfo2);
LOG.info("Registered schema [{}] and returned version [{}]", schema2, v2);
//adding same schema returns the earlier registered version
SchemaIdVersion version = schemaRegistryClient.addSchemaVersion(schemaMetadata, schemaInfo2);
LOG.info("Received version [{}] for schema metadata [{}]", version, schemaMetadata);
// get a specific version of the schema
String schemaName = schemaMetadata.getName();
SchemaVersionInfo schemaVersionInfo = schemaRegistryClient.getSchemaVersionInfo(new SchemaVersionKey(schemaName, v2.getVersion()));
LOG.info("Received schema version info [{}] for schema metadata [{}]", schemaVersionInfo, schemaMetadata);
// get latest version of the schema
SchemaVersionInfo latest = schemaRegistryClient.getLatestSchemaVersionInfo(schemaName);
LOG.info("Latest schema with schema key [{}] is : [{}]", schemaMetadata, latest);
// get all versions of the schema
Collection<SchemaVersionInfo> allVersions = schemaRegistryClient.getAllVersions(schemaName);
LOG.info("All versions of schema key [{}] is : [{}]", schemaMetadata, allVersions);
// finding schemas containing a specific field
SchemaFieldQuery md5FieldQuery = new SchemaFieldQuery.Builder().name("md5").build();
Collection<SchemaVersionKey> md5SchemaVersionKeys = schemaRegistryClient.findSchemasByFields(md5FieldQuery);
LOG.info("Schemas containing field query [{}] : [{}]", md5FieldQuery, md5SchemaVersionKeys);
SchemaFieldQuery txidFieldQuery = new SchemaFieldQuery.Builder().name("txid").build();
Collection<SchemaVersionKey> txidSchemaVersionKeys = schemaRegistryClient.findSchemasByFields(txidFieldQuery);
LOG.info("Schemas containing field query [{}] : [{}]", txidFieldQuery, txidSchemaVersionKeys);
// deleting a schema metadata and all data associated with it including versions, branches, etc
schemaRegistryClient.deleteSchema(schemaName);
Create a New Schema with API
When a new schema is created, three objects are created: a SchemaMetadata
, a SchemaBranch
and a SchemaVersion
. To create all of them, two API endpoints are required:
- To create a new SchemaMetadata, use the REST API endpoint
POST /api/v1/schemaregistry/schemas
. This endpoint allows you to register metadata about a schema. The Swagger documentation provides an example body and a curl command for further guidance:
curl -X POST "http://HOSTNAME:PORT/api/v1/schemaregistry/schemas" -H "accept: application/json" -H "Content-Type: application/json" -d "{ \"type\": \"avro\", \"schemaGroup\": \"kafka\", \"name\": \"meta\", \"description\": \"metadata description\", \"compatibility\": \"NONE\", \"validationLevel\": \"LATEST\"}"
This command, when executed successfully, returns an ID indicating the creation of the SchemaMetadata.
- To create a new SchemaVersion, use the endpoint
POST /api/v1/schemaregistry/schemas/{name}/versions
, where{name}
refers to the name of the previously created SchemaMetadata. This endpoint also creates a new SchemaBranch (MASTER) and a new SchemaVersion, both linked to the specified SchemaMetadata. The Swagger documentation again provides an example body and a curl command to assist with usage:
curl -X POST "http://HOSTNAME:PORT/api/v1/schemaregistry/schemas/meta/versions?branch=MASTER&disableCanonicalCheck=false" -H "accept: application/json" -H "Content-Type: application/json" -d "{ \"description\": \"string\", \"schemaText\": \"{\\\"type\\\": \\\"record\\\",\\\"namespace\\\": \\\"com.example\\\",\\\"name\\\": \\\"FullName\\\",\\\"fields\\\": [{ \\\"name\\\": \\\"first\\\", \\\"type\\\": \\\"string\\\" },{ \\\"name\\\": \\\"last\\\", \\\"type\\\": \\\"string\\\" }]}\", \"initialState\": \"5\", \"stateDetails\": [ \"null\" ]}"
This request helps in registering a new version of a schema, linked to the created metadata, under the MASTER branch.
Default Serializer and Deserializer APIs
Default serializer and deserializer for a given schema provider can be retrieved with the below APIs:
// for avro,
AvroSnapshotSerializer serializer = schemaRegistryClient.getDefaultSerializer(AvroSchemaProvider.TYPE);
AvroSnapshotDeserializer deserializer = schemaRegistryClient.getDefaultDeserializer(AvroSchemaProvider.TYPE);
Using Serializer and Deserializer Related APIs
Registering a serializer and deserializer involves the following steps:
- Upload the JAR File: First, upload a JAR file that contains the necessary serializer and deserializer classes along with any dependencies.
- Register the Serializer/Deserializer: Once the JAR file is uploaded, proceed to register the serializer and deserializer classes.
- Map the Serializer/Deserializer with a Registered Schema: After registration, map the serializer and deserializer to a specific schema that has already been registered in the system.
- Fetch and Use the Serializer/Deserializer: Finally, fetch the registered serializer and deserializer to marshal (serialize) and unmarshal (deserialize) payloads effectively.
Upload the jar File
String serdesJarName = "/serdes-examples.jar";
InputStream serdesJarInputStream = SampleSchemaRegistryApplication.class.getResourceAsStream(serdesJarName);
if (serdesJarInputStream == null) {
throw new RuntimeException("Jar " + serdesJarName + " could not be loaded");
}
String fileId = schemaRegistryClient.uploadFile(serdesJarInputStream);
Register the Serializer and Deserializer
String simpleSerializerClassName = "org.apache.schemaregistry.samples.serdes.SimpleSerializer";
String simpleDeserializerClassName = "org.apache.schemaregistry.samples.serdes.SimpleDeserializer";
SerDesPair serializerInfo = new SerDesPair(
"simple-serializer-deserializer",
"simple serializer and deserializer",
fileId,
simpleSerializerClassName,
simpleDeserializerClassName);
Long serDesId = schemaRegistryClient.addSerDes(serializerInfo);
Map Serializer or Deserializer with Schema
Map the serializer and deserializer with a schema key. For each schema, one serializer or deserializer is sufficient unless you want to maintain multiple implementations of serializers or deserializers.
String schemaName = ...
schemaRegistryClient.mapSchemaWithSerDes(schemaName, serializerId);
Fetch and Use the Serializer or Deserializer
SnapshotSerializer<Object, byte[], SchemaMetadata> snapshotSerializer = getSnapshotSerializer(schemaMetadata);
String payload = "Random text: " + new Random().nextLong();
byte[] serializedBytes = snapshotSerializer.serialize(payload, schemaMetadata);
SnapshotDeserializer<byte[], Object, Integer> snapshotdeserializer = getSnapshotDeserializer(schemaMetadata);
Object deserializedObject = snapshotdeserializer.deserialize(serializedBytes, null);