Внедрение сбора данных об изменениях для обеспечения аналитики в режиме, близком к реальному времени
Posted: Thu Dec 12, 2024 8:12 am
Приоритетом, который у нас был некоторое время, было сокращение времени, необходимого для коммерческого взаимодействия с нашими продуктами, чтобы сделать их доступными для анализа; люди хотят аналитику в реальном времени. Мы хотим быстро реагировать на наших клиентов, независимо от того, означает ли это новые регистрации клиентов, обновления или понижения — изменения аналитики в режиме, близком к реальному времени, должны отражаться на платформе Leadfeeder .
В основном мы используем пакетную обработку, и для некоторых наборов данных мы сузили частоту пакетной обработки до одного часа (все еще не идеально). Я мог бы продолжать и сравнивать пакетную обработку с потоковой, но я оставлю это на другой раз.
Допустим, наши пользователи хотят видеть, что Материал номера телефона Саудовской Аравии происходит в бизнесе в режиме реального времени или максимально приближенном к нему.
Как работает аналитика в реальном времени?
В рамках аналитических конвейеров Leadfeeder с поддержкой AWS без сервера наша инженерная команда интегрирует данные из реляционных баз данных в Redshift . Наши аналитики используют эти данные для различных целей, включая бизнес-аналитику и аналитику в реальном времени в рамках продукта.
Раньше мы использовали только AWS Glue для переноса наших операционных данных в Redshift для аналитики. Основные недостатки нашего устаревшего процесса:
Мы интегрировали моментальные снимки данных, поэтому у нас нет возможности отслеживать изменения между моментальными снимками.
Учитывая пакетный характер конвейера, данные доступны со значительной задержкой. Любые сбои в пакетном режиме могут привести к дальнейшему увеличению задержек.
Этот устаревший процесс также имеет свои преимущества: его можно было довольно быстро разработать и довольно легко расширить.
Используя AWS Glue и его краулеры, мы можем легко идентифицировать изменения таблиц и схем в источнике. Добавление новых таблиц в Redshift становится простой модификацией.
Однако по мере того, как наши сценарии использования становятся более сложными (наши аналитики данных и ученые, как правило, весьма креативны), этот шаблон становится недостаточным, главным образом из-за его моментального характера.
CDC Pipeline для аналитики в реальном времени
CDC означает « захват измененных данных»; он включает в себя программные шаблоны для обнаружения изменений в исходной системе и распространения их ниже по цепочке для дальнейшей обработки и/или хранения.
В MySQL есть функция, называемая двоичным журналом, которая регистрирует события, изменяющие базу данных, будь то операторы DDL или DML, и служит двум основным целям: репликации и восстановления данных.
Существует несколько способов реализации CDC. В нашем случае мы используем двоичный журнал MySQL для создания потока событий, который затем можно использовать для аналитики.
Продюсер
Для доступа к двоичному журналу и создания потока данных мы реализовали службу ECS , которая непрерывно считывает двоичный журнал и выступает в качестве производителя для потока Kinesis.
Основные возможности сервиса ECS:
Данные, считываемые из базы данных, передаются без каких-либо преобразований и сериализуются как JSON с разделителями новой строки. Цель состоит в том, чтобы производить необработанные события, т. е. любая операция DML, обнаруженная в базе данных, будет отправлена в поток Kinesis как есть, одно событие на измененную строку в базе данных.
События передаются в поток Kinesis синхронно и упорядочиваются по частям в каждой таблице базы данных.
Позволяет перезапустить с определенной позиции в двоичном журнале. Для экземпляров MySQL RDS максимальный срок хранения двоичного журнала составляет 7 дней.
Реализует механизм плавного перезапуска. При захвате SIGTERM сбрасывает данные и сохраняет контрольную точку. Это важно для развертываний.
Fargate используется для продолжения нашего бессерверного подхода к аналитике и ограничения любой операционной деятельности по настройке и обслуживанию вычислительных ресурсов.
Мониторинг с использованием показателей и оповещений Cloudwatch.
Потребители
Для использования данных из этого потока событий мы решили использовать Kinesis Firehose .
Firehose позволяет нам ускорить реализацию конвейера, поскольку он предоставляет функции потребителя данных. Все, что нам нужно сделать, это определить конфигурацию для наших потоков доставки.
Из диаграммы выше видно, что у нас есть два основных потребителя: один для Redshift и один для S3 .
Для обеспечения аналитики в режиме, близком к реальному времени, нам требовалось только передавать данные в Redshift, однако мы быстро поняли ценность также передачи наших операционных данных в S3, чтобы мы могли включить дополнительные шаблоны интеграции в наше хранилище данных.
Не все наборы данных требуют анализа в реальном времени, но мы хотели отслеживать все изменения, чтобы создать полный журнал изменений событий в наших операционных базах данных. Из этого журнала изменений мы можем поддерживать более сложное моделирование.
Оставайтесь с нами для получения дальнейших технических обновлений в
В основном мы используем пакетную обработку, и для некоторых наборов данных мы сузили частоту пакетной обработки до одного часа (все еще не идеально). Я мог бы продолжать и сравнивать пакетную обработку с потоковой, но я оставлю это на другой раз.
Допустим, наши пользователи хотят видеть, что Материал номера телефона Саудовской Аравии происходит в бизнесе в режиме реального времени или максимально приближенном к нему.
Как работает аналитика в реальном времени?
В рамках аналитических конвейеров Leadfeeder с поддержкой AWS без сервера наша инженерная команда интегрирует данные из реляционных баз данных в Redshift . Наши аналитики используют эти данные для различных целей, включая бизнес-аналитику и аналитику в реальном времени в рамках продукта.
Раньше мы использовали только AWS Glue для переноса наших операционных данных в Redshift для аналитики. Основные недостатки нашего устаревшего процесса:
Мы интегрировали моментальные снимки данных, поэтому у нас нет возможности отслеживать изменения между моментальными снимками.
Учитывая пакетный характер конвейера, данные доступны со значительной задержкой. Любые сбои в пакетном режиме могут привести к дальнейшему увеличению задержек.
Этот устаревший процесс также имеет свои преимущества: его можно было довольно быстро разработать и довольно легко расширить.
Используя AWS Glue и его краулеры, мы можем легко идентифицировать изменения таблиц и схем в источнике. Добавление новых таблиц в Redshift становится простой модификацией.
Однако по мере того, как наши сценарии использования становятся более сложными (наши аналитики данных и ученые, как правило, весьма креативны), этот шаблон становится недостаточным, главным образом из-за его моментального характера.
CDC Pipeline для аналитики в реальном времени
CDC означает « захват измененных данных»; он включает в себя программные шаблоны для обнаружения изменений в исходной системе и распространения их ниже по цепочке для дальнейшей обработки и/или хранения.
В MySQL есть функция, называемая двоичным журналом, которая регистрирует события, изменяющие базу данных, будь то операторы DDL или DML, и служит двум основным целям: репликации и восстановления данных.
Существует несколько способов реализации CDC. В нашем случае мы используем двоичный журнал MySQL для создания потока событий, который затем можно использовать для аналитики.
Продюсер
Для доступа к двоичному журналу и создания потока данных мы реализовали службу ECS , которая непрерывно считывает двоичный журнал и выступает в качестве производителя для потока Kinesis.
Основные возможности сервиса ECS:
Данные, считываемые из базы данных, передаются без каких-либо преобразований и сериализуются как JSON с разделителями новой строки. Цель состоит в том, чтобы производить необработанные события, т. е. любая операция DML, обнаруженная в базе данных, будет отправлена в поток Kinesis как есть, одно событие на измененную строку в базе данных.
События передаются в поток Kinesis синхронно и упорядочиваются по частям в каждой таблице базы данных.
Позволяет перезапустить с определенной позиции в двоичном журнале. Для экземпляров MySQL RDS максимальный срок хранения двоичного журнала составляет 7 дней.
Реализует механизм плавного перезапуска. При захвате SIGTERM сбрасывает данные и сохраняет контрольную точку. Это важно для развертываний.
Fargate используется для продолжения нашего бессерверного подхода к аналитике и ограничения любой операционной деятельности по настройке и обслуживанию вычислительных ресурсов.
Мониторинг с использованием показателей и оповещений Cloudwatch.
Потребители
Для использования данных из этого потока событий мы решили использовать Kinesis Firehose .
Firehose позволяет нам ускорить реализацию конвейера, поскольку он предоставляет функции потребителя данных. Все, что нам нужно сделать, это определить конфигурацию для наших потоков доставки.
Из диаграммы выше видно, что у нас есть два основных потребителя: один для Redshift и один для S3 .
Для обеспечения аналитики в режиме, близком к реальному времени, нам требовалось только передавать данные в Redshift, однако мы быстро поняли ценность также передачи наших операционных данных в S3, чтобы мы могли включить дополнительные шаблоны интеграции в наше хранилище данных.
Не все наборы данных требуют анализа в реальном времени, но мы хотели отслеживать все изменения, чтобы создать полный журнал изменений событий в наших операционных базах данных. Из этого журнала изменений мы можем поддерживать более сложное моделирование.
Оставайтесь с нами для получения дальнейших технических обновлений в