Python Watchdog YAML-Based ETL Pipeline for Azure Data Lake#
Project Overview#
Developed a robust, event-driven ETL pipeline that monitors filesystem events and automatically processes and uploads
data to Azure Data Lake Storage Gen2. The system used YAML configuration files for pipeline definition, making it highly
configurable and maintainable.
Business Context#
The business needed a flexible solution to continuously monitor specific directories for new data files, process them
according to predefined rules, and reliably upload the results to cloud storage. This enabled near real-time data
processing without the complexity of a full streaming solution.
Technical Challenges#
Challenge 1: Reliable Event Handling#
File system events can be triggered multiple times in rapid succession when files are being written, which could lead to
processing incomplete files or redundant processing.
Challenge 2: Debouncing File Events#
Implementing effective debouncing of filesystem events to prevent multiple processing of the same file while maintaining
low latency.
Challenge 3: Schema Validation#
Ensuring that all YAML configuration files adhered to a strict schema for consistency and error prevention.
Architecture#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| flowchart TD
A[File System Events] --> B[Watchdog Observer]
B --> C[Event Handler]
C --> D{Event Type?}
D -->|Created| E[File Created Handler]
D -->|Modified| F[File Modified Handler]
D -->|Deleted| G[File Deleted Handler]
E --> H[Debouncing Logic]
F --> H
H --> I[YAML Config Processor]
I --> J[Schema Validator]
J --> K[Data Transformation]
K --> L[Data Quality Checks]
L --> M[Azure ADLS Upload]
N[YAML Config Files] --> I
O[Schema Definitions] --> J
subgraph "Error Handling"
P[Retry Mechanism]
Q[Dead Letter Queue]
R[Error Logging]
end
M -- Failure --> P
P -- Max Retries --> Q
P -- Retry --> M
Q --> R
|
Implementation Details#
Debouncing Implementation#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
| class DebouncedHandler(FileSystemEventHandler):
def __init__(self, yaml_config_path, timeout=2.0):
self.yaml_config_path = yaml_config_path
self.timeout = timeout
self.timer = None
self.last_processed = {} # Track last processing time by file path
def on_created(self, event):
if event.is_directory:
return
self._handle_event(event.src_path, "created")
def on_modified(self, event):
if event.is_directory:
return
self._handle_event(event.src_path, "modified")
def _handle_event(self, file_path, event_type):
# Check if we recently processed this file
now = time.time()
if file_path in self.last_processed:
time_diff = now - self.last_processed[file_path]
if time_diff < self.timeout:
logging.debug(f"Skipping {event_type} event for {file_path} - within debounce period")
return
# Cancel any existing timer for this file
if self.timer and self.timer.is_alive():
self.timer.cancel()
# Create a new timer to process the file after debounce period
self.timer = Timer(self.timeout, self._process_file, args=[file_path, event_type])
self.timer.daemon = True
self.timer.start()
def _process_file(self, file_path, event_type):
try:
logging.info(f"Processing {event_type} event for {file_path}")
# Update last processed time
self.last_processed[file_path] = time.time()
# Load YAML config and process file
pipeline_config = self._load_yaml_config()
processor = FileProcessor(pipeline_config)
result = processor.process(file_path)
# Upload to Azure
if result:
adls_uploader = ADLSUploader(pipeline_config['azure'])
adls_uploader.upload(result, file_path)
except Exception as e:
logging.error(f"Error processing {file_path}: {str(e)}")
# Handle error according to config (retry, dead letter, etc.)
|
YAML Configuration Schema#
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
| # Example schema definition using JSON Schema format
schema:
type: object
required:
- pipeline_name
- source
- transformations
- destination
- azure
properties:
pipeline_name:
type: string
source:
type: object
required:
- directory
- file_pattern
properties:
directory:
type: string
file_pattern:
type: string
transformations:
type: array
items:
type: object
required:
- type
properties:
type:
type: string
enum: [regex, replace, column, filter]
# Additional properties based on transformation type
destination:
type: object
required:
- format
- path
properties:
format:
type: string
enum: [csv, parquet, json]
path:
type: string
azure:
type: object
required:
- account_name
- container
- folder_path
properties:
account_name:
type: string
container:
type: string
folder_path:
type: string
data_quality:
type: array
items:
type: object
required:
- check_type
- threshold
properties:
check_type:
type: string
enum: [null_check, type_check, unique_check, range_check]
threshold:
type: number
|
Results and Impact#
Key Achievements#
- Reliability: 99.9% successful file processing rate with built-in retry mechanisms
- Performance: Average processing time of less than 5 seconds per file
- Maintainability: Configuration changes could be made without code modifications
- Scalability: Successfully handled 10,000+ files per day across multiple directories
Business Impact#
- Reduced data processing latency from hours to minutes
- Enabled near real-time analytics on incoming data
- Significantly reduced maintenance overhead with configuration-based pipelines
- Improved data quality through automated validation
Lessons Learned#
Technical Insights#
- Event-based systems need careful debouncing to prevent duplicate processing
- YAML schema validation is essential for configuration-driven applications
- Azure SDK authentication should use managed identities where possible
- Error handling strategies should be explicitly defined in configuration
Process Improvements#
- Implementing semantic logging improved troubleshooting capabilities
- Unit tests for YAML schema validation caught many issues early
- Monitoring dashboards for pipeline health provided valuable insights
Future Enhancements#
- Implement parallel processing for high-volume scenarios
- Add support for more complex transformation types
- Create a web UI for monitoring and configuration management
- Integrate with Azure Event Grid for more sophisticated event handling
- Python: Core programming language
- Watchdog: File system event monitoring
- PyYAML: YAML parsing and generation
- Jsonschema: Schema validation
- Azure Data Lake Storage Gen2 SDK: Cloud storage integration
- Pandas: Data transformation and processing
- Pytest: Testing framework