Skip to content

yzevm/sync-postgresql-with-elasticsearch-example

Folders and files

NameName
Last commit message
Last commit date

Latest commit

ย 

History

11 Commits
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 

Repository files navigation

Sync PostgreSQL with Elasticsearch via Debezium

Schema

                   +-------------+
                   |             |
                   |  PostgreSQL |
                   |             |
                   +------+------+
                          |
                          |
                          |
          +---------------v------------------+
          |                                  |
          |           Kafka Connect          |
          |    (Debezium, ES connectors)     |
          |                                  |
          +---------------+------------------+
                          |
                          |
                          |
                          |
                  +-------v--------+
                  |                |
                  | Elasticsearch  |
                  |                |
                  +----------------+


We are using Docker Compose to deploy the following components:

  • PostgreSQL
  • Kafka
  • Elasticsearch

Usage

docker-compose up --build

# wait until it's setup
./start.sh

Testing

Check database's content

# Check contents of the PostgreSQL database:
docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DATABASE -c "SELECT * FROM users"'

# Check contents of the Elasticsearch database:
curl http://localhost:9200/users/_search?pretty

Create user

docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DATABASE'
test_db=# INSERT INTO users (email) VALUES ('[email protected]');

# Check contents of the Elasticsearch database:
curl http://localhost:9200/users/_search?q=id:6
{
  ...
  "hits": {
    "total": 1,
    "max_score": 1.0,
    "hits": [
      {
        "_index": "users",
        "_type": "_doc",
        "_id": "6",
        "_score": 1.0,
        "_source": {
          "id": 6,
          "email": "[email protected]"
        }
      }
    ]
  }
}

Update user

test_db=# UPDATE users SET email = '[email protected]' WHERE id = 6;

# Check contents of the Elasticsearch database:
curl http://localhost:9200/users/_search?q=id:6
{
  ...
  "hits": {
    "total": 1,
    "max_score": 1.0,
    "hits": [
      {
        "_index": "users",
        "_type": "_doc",
        "_id": "6",
        "_score": 1.0,
        "_source": {
          "id": 6,
          "email": "[email protected]"
        }
      }
    ]
  }
}

Delete user

test_db=# DELETE FROM users WHERE id = 6;

# Check contents of the Elasticsearch database:
curl http://localhost:9200/users/_search?q=id:6
{
  ...
  "hits": {
    "total": 1,
    "max_score": 1.0,
    "hits": []
  }
}