Python Watchdog YAML-Based ETL Pipeline for Azure Data Lake

Project Overview

Developed a robust, event-driven ETL pipeline that monitors filesystem events and automatically processes and uploads data to Azure Data Lake Storage Gen2. The system used YAML configuration files for pipeline definition, making it highly configurable and maintainable.

Business Context

The business needed a flexible solution to continuously monitor specific directories for new data files, process them according to predefined rules, and reliably upload the results to cloud storage. This enabled near real-time data processing without the complexity of a full streaming solution.

Technical Challenges

Challenge 1: Reliable Event Handling

File system events can be triggered multiple times in rapid succession when files are being written, which could lead to processing incomplete files or redundant processing.

Challenge 2: Debouncing File Events

Implementing effective debouncing of filesystem events to prevent multiple processing of the same file while maintaining low latency.

Challenge 3: Schema Validation

Ensuring that all YAML configuration files adhered to a strict schema for consistency and error prevention.

Architecture

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
flowchart TD
    A[File System Events] --> B[Watchdog Observer]
    B --> C[Event Handler]
    C --> D{Event Type?}
    D -->|Created| E[File Created Handler]
    D -->|Modified| F[File Modified Handler]
    D -->|Deleted| G[File Deleted Handler]

    E --> H[Debouncing Logic]
    F --> H

    H --> I[YAML Config Processor]
    I --> J[Schema Validator]

    J --> K[Data Transformation]
    K --> L[Data Quality Checks]
    L --> M[Azure ADLS Upload]

    N[YAML Config Files] --> I
    O[Schema Definitions] --> J

    subgraph "Error Handling"
    P[Retry Mechanism]
    Q[Dead Letter Queue]
    R[Error Logging]
    end

    M -- Failure --> P
    P -- Max Retries --> Q
    P -- Retry --> M
    Q --> R

Implementation Details

Debouncing Implementation

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class DebouncedHandler(FileSystemEventHandler):
    def __init__(self, yaml_config_path, timeout=2.0):
        self.yaml_config_path = yaml_config_path
        self.timeout = timeout
        self.timer = None
        self.last_processed = {}  # Track last processing time by file path

    def on_created(self, event):
        if event.is_directory:
            return
        self._handle_event(event.src_path, "created")

    def on_modified(self, event):
        if event.is_directory:
            return
        self._handle_event(event.src_path, "modified")

    def _handle_event(self, file_path, event_type):
        # Check if we recently processed this file
        now = time.time()
        if file_path in self.last_processed:
            time_diff = now - self.last_processed[file_path]
            if time_diff < self.timeout:
                logging.debug(f"Skipping {event_type} event for {file_path} - within debounce period")
                return

        # Cancel any existing timer for this file
        if self.timer and self.timer.is_alive():
            self.timer.cancel()

        # Create a new timer to process the file after debounce period
        self.timer = Timer(self.timeout, self._process_file, args=[file_path, event_type])
        self.timer.daemon = True
        self.timer.start()

    def _process_file(self, file_path, event_type):
        try:
            logging.info(f"Processing {event_type} event for {file_path}")

            # Update last processed time
            self.last_processed[file_path] = time.time()

            # Load YAML config and process file
            pipeline_config = self._load_yaml_config()
            processor = FileProcessor(pipeline_config)
            result = processor.process(file_path)

            # Upload to Azure
            if result:
                adls_uploader = ADLSUploader(pipeline_config['azure'])
                adls_uploader.upload(result, file_path)

        except Exception as e:
            logging.error(f"Error processing {file_path}: {str(e)}")
            # Handle error according to config (retry, dead letter, etc.)

YAML Configuration Schema

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# Example schema definition using JSON Schema format
schema:
  type: object
  required:
    - pipeline_name
    - source
    - transformations
    - destination
    - azure
  properties:
    pipeline_name:
      type: string
    source:
      type: object
      required:
        - directory
        - file_pattern
      properties:
        directory:
          type: string
        file_pattern:
          type: string
    transformations:
      type: array
      items:
        type: object
        required:
          - type
        properties:
          type:
            type: string
            enum: [regex, replace, column, filter]
          # Additional properties based on transformation type
    destination:
      type: object
      required:
        - format
        - path
      properties:
        format:
          type: string
          enum: [csv, parquet, json]
        path:
          type: string
    azure:
      type: object
      required:
        - account_name
        - container
        - folder_path
      properties:
        account_name:
          type: string
        container:
          type: string
        folder_path:
          type: string
    data_quality:
      type: array
      items:
        type: object
        required:
          - check_type
          - threshold
        properties:
          check_type:
            type: string
            enum: [null_check, type_check, unique_check, range_check]
          threshold:
            type: number

Results and Impact

Key Achievements

  • Reliability: 99.9% successful file processing rate with built-in retry mechanisms
  • Performance: Average processing time of less than 5 seconds per file
  • Maintainability: Configuration changes could be made without code modifications
  • Scalability: Successfully handled 10,000+ files per day across multiple directories

Business Impact

  • Reduced data processing latency from hours to minutes
  • Enabled near real-time analytics on incoming data
  • Significantly reduced maintenance overhead with configuration-based pipelines
  • Improved data quality through automated validation

Lessons Learned

Technical Insights

  • Event-based systems need careful debouncing to prevent duplicate processing
  • YAML schema validation is essential for configuration-driven applications
  • Azure SDK authentication should use managed identities where possible
  • Error handling strategies should be explicitly defined in configuration

Process Improvements

  • Implementing semantic logging improved troubleshooting capabilities
  • Unit tests for YAML schema validation caught many issues early
  • Monitoring dashboards for pipeline health provided valuable insights

Future Enhancements

  • Implement parallel processing for high-volume scenarios
  • Add support for more complex transformation types
  • Create a web UI for monitoring and configuration management
  • Integrate with Azure Event Grid for more sophisticated event handling

Tools and Technologies Used

  • Python: Core programming language
  • Watchdog: File system event monitoring
  • PyYAML: YAML parsing and generation
  • Jsonschema: Schema validation
  • Azure Data Lake Storage Gen2 SDK: Cloud storage integration
  • Pandas: Data transformation and processing
  • Pytest: Testing framework